In [109]:
# importing necessary packages

import random
import numpy as np

NUMBER_OF_POSITIONS = 4       # number of states assumed as an upper limit, |Q|

# declaring start state
START_POSITION = np.zeros(NUMBER_OF_POSITIONS) 
START_POSITION[0] = 1         # start position is [1 0 0 0]


PERCENT_OF_TESTS = 0.1        # percent of data to be used for testing
TenzToAdd = 2.0
EPS = 0.01                    # error margin to determine successful training
NU = 0.7                      # learning rate for tensor
NU_ADDER = 0.3                # learning rate for adder

In [103]:
# Dictionary of alphabets set according to the alphabet size, it is known before constructing a DFA
DICTIONARY = ['a', 'b'] 
NUMBER_OF_CHARS = len(DICTIONARY)              # number of alphabets, |A|

# getting the dataset of the language
f = open('dataset/tl1_dataset.txt', 'r')  # get the dataset for a language
dataset = []                              # array to hold the (w,ans) 
for line in f:
    arr = line.split(' ')
    ans = 1.0
    if arr[1][0] == '0':
        ans = 0.0
    dataset.append([arr[0], ans])
    
print(dataset)

[['ba', 1.0], ['bb', 0.0], ['a', 1.0], ['bab', 1.0], ['aa', 0.0], ['baa', 0.0], ['bba', 0.0], ['aa', 0.0], ['aab', 0.0], ['a', 1.0], ['bab', 1.0], ['bbb', 0.0], ['bb', 0.0], ['aa', 0.0], ['b', 1.0], ['b', 1.0], ['ab', 1.0], ['bba', 0.0], ['ba', 1.0], ['ab', 1.0], ['a', 1.0], ['b', 1.0], ['ab', 1.0], ['b', 1.0], ['aaa', 0.0], ['a', 1.0], ['ba', 1.0], ['ab', 1.0], ['b', 1.0], ['b', 1.0], ['b', 1.0], ['ab', 1.0], ['aa', 0.0], ['b', 1.0], ['ab', 1.0], ['a', 1.0], ['a', 1.0], ['baa', 0.0], ['ba', 1.0], ['aa', 0.0], ['bb', 0.0], ['a', 1.0], ['ba', 1.0], ['aba', 1.0], ['abb', 0.0], ['a', 1.0], ['bba', 0.0], ['b', 1.0], ['a', 1.0], ['baa', 0.0], ['baba', 1.0], ['ababababab', 1.0], ['ababababab', 1.0], ['ababa', 1.0], ['abab', 1.0], ['babababab', 1.0], ['abababab', 1.0], ['babab', 1.0], ['babababa', 1.0], ['ababababa', 1.0], ['bababab', 1.0], ['abab', 1.0], ['ababa', 1.0], ['ababababa', 1.0], ['bababababa', 1.0], ['abababa', 1.0], ['bababab', 1.0], ['abab', 1.0], ['ababa', 1.0], ['abab', 1.0], 

### definition of all the function used in the RNN architecture

In [104]:
# MSE loss function (ytrue - yloss)^2
def cost_function(exp, res):                   
    return (res - exp) ** 2

# derivative of loss function (ytrue - yloss)
def cost_function_derivative(exp, res):       
    return res - exp


# Match function: takes the nn tensor, current state and the next character and performs the transition
# ch is the next character and pos is the current position
# Q x S > Q i.e next state = tensor x next character x current state
def match(nn, ch, pos):                         
    new_pos = np.zeros(NUMBER_OF_POSITIONS)         # create a new array for the next state
    for k in range(NUMBER_OF_POSITIONS):            # for each value in the new state
        for i in range (NUMBER_OF_CHARS):           # go through next character array
            for j in range (NUMBER_OF_POSITIONS):   # go through current staste array
                new_pos[k] += nn.tensor[k][i][j] * ch[i] * pos[j] # new state position is tensor x next character x current state
    return new_pos                                  # return the new state

# derivative of Match function wrt current state: takes the tensor, a gradient and next input character
# derivative (2D array)= tensor x next character
def match_derivative(nn, dz, ch):
    derivative = np.zeros([NUMBER_OF_POSITIONS, NUMBER_OF_POSITIONS]) # dim: |Q| x |Q|
    for i in range(NUMBER_OF_POSITIONS):                   # for each column in derivative i.e values in current state [p1 p2 p3 ..]
        for k in range(NUMBER_OF_POSITIONS):               # for each row in derivative i.e vlues in new state [h1 h2 h3 ..]
            for j in range (NUMBER_OF_CHARS):              # loop through current character  
                derivative[k][i] += nn.tensor[k][j][i] * ch[j] # derivative (2D array)= tensor x next character
    return np.dot(dz, derivative)                          # dot product of previous gradient with new one

# derivative of Match function wrt tensor: take gradient, next input character and current state 
# derivative(3D array) = next character x current state 
def match_derivative_tensor(dz, ch, pos):
    sample_matrix = np.zeros([NUMBER_OF_CHARS, NUMBER_OF_POSITIONS]) # create a two 2D array of next character and current state
    for i in range(NUMBER_OF_CHARS):
        for j in range(NUMBER_OF_POSITIONS):
            sample_matrix[i][j] = ch[i] * pos[j]                     # fill the sample matrix vy multiplying next character and current state
    derivative = np.zeros([NUMBER_OF_POSITIONS, NUMBER_OF_CHARS, NUMBER_OF_POSITIONS]) # new derivative as a 3D array
    for k in range(NUMBER_OF_POSITIONS): # for each row in first dimension
        derivative[k] = dz[k] * sample_matrix      # new derivative is old * sample matrix
    return derivative                              # return derivative


# normalize function for each value in h_k return h/sum(h)
def normalize(h):                    
    return h / np.sum(h)

# derivative of normalize: takes in current derivative, and input(h) to the output layer
def normalize_derivative(dz, h):
    derivative = np.zeros([NUMBER_OF_POSITIONS, NUMBER_OF_POSITIONS])
    sum = np.sum(h)
    for i in range(NUMBER_OF_POSITIONS):      # for each value in the output layer vector 
        for j in range(NUMBER_OF_POSITIONS):  # for each value in the output layer vector
            if(i == j):                       # if position of value wrt to which derivative is being taken is equal, 
                derivative[i][j] = (sum - h[i]) / (sum ** 2) # derivative is (sum - value)/sum^2
            else:
                derivative[i][j] = -h[i] / (sum ** 2)        # else -value/sum^2
    return np.dot(dz, derivative)             # dot product of previous derivative with new one


# applying adder function to output layer, takes the adder and the final state
# final output neuron = adder x final state
def lastsum(nn, x):
    return np.dot(nn.adder, x) # final output neuron = adder x final state

# derivative of adder wrt its argument k, takes adder and current derivative
def lastsum_derivative(nn, dz):
    return np.dot(dz, nn.adder)      # dot product of adder values(k) and current derivative

# derivative of adder wrt to k_i's, takes adder, current deriviative and a output layer state
def lastsum_derivative_adder(nn, dz, inp):
    derivative = np.multiply(inp, nn.adder) # derivative is output layer state x adder 
    return np.dot(dz, derivative)

# function to change a character to vector encodings
def char_to_vector(ch):
    index = DICTIONARY.index(ch)    # get the index of a character  from dictionary
    vec = np.zeros(NUMBER_OF_CHARS) # get a 1D array of len(alphabet) x 1
    vec[index] = 1.0                # change the index of character character to 1
    return vec                      # return the vector

# cut function
def cut(x):
    if (x > 1.0):
        return 1.0
    if (x < 0.0):
        return 0.0
    return x



### RNN architecture 

In [105]:
# neural network object with two attributes and three methods
# two attributes are tensor and adder which are alterable parameters of the network, tensor and adder
# three methods are check, train, train_online, get_automaton, and run
class NeuralNetwork:
    
    # dim: |Q| x |A| x |Q|, Q is number of states and |A| is alphabet size
    tensor = np.zeros([NUMBER_OF_POSITIONS, NUMBER_OF_CHARS, NUMBER_OF_POSITIONS])  
    adder = np.zeros(NUMBER_OF_POSITIONS)

    def __init__(self): # init method that creates tensor and adder attribute and initializes it
        
        self.tensor = np.zeros([NUMBER_OF_POSITIONS, NUMBER_OF_CHARS, NUMBER_OF_POSITIONS]) # dim: to x ch x fr
        for fr in range(NUMBER_OF_POSITIONS):  # for each index in fr state
            for ch in range(NUMBER_OF_CHARS):  # for each index in a character
                z = np.random.rand(NUMBER_OF_POSITIONS) # get a random value intialized array of lenght |Q|
                z = normalize(z)                        # normalize it 
                for to in range(NUMBER_OF_POSITIONS):   # for each index in to state
                    self.tensor[to][ch][fr] = z[to]     # initialize the given index of tensor
                    
        self.adder = np.zeros(NUMBER_OF_POSITIONS)      # dim: |Q| x 1
        for to in range(NUMBER_OF_POSITIONS):           # for each index in the adder array
            # initialize each index with a value corresponding to its index, value = (index + 1)/|Q|
            self.adder[to] = 1.0 * (to + 1) / (NUMBER_OF_POSITIONS) 


    def check(self, word): # method to check whether a word belongs to the language or not
        curr_pos = START_POSITION                       # start from start position 
        for k in range(len(word)):                      # for each character in the word
            curr_word = char_to_vector(word[k])         # convert it to vector form 
            curr_pos = match(self, curr_word, curr_pos) # make a transition 
            curr_pos = normalize(curr_pos)              # normalize the next state
        output = lastsum(self, curr_pos)                # then apply an adder function to get a final output
        return output

    def train_online(self, dataset): # method to online train the network i.e weight are adjusted after every word is read
        average_error = 1.0          # initializing and average error
        epoch_number = 0             # epoch number
        n = len(dataset)             # number of words in the datset
        tests_size = int(PERCENT_OF_TESTS * n) # number of words allocated for testing the dataset
        while (average_error > EPS):           # while the error is greater than specified epsilon  
            random.shuffle(dataset)            # shuffle the datset for better training
            cases_left = len(dataset)          # varaible to track the number of words lefts to train on
            epoch_number += 1                  # increment the epch by 1
            print ( 'Epoch #' + str(epoch_number))  # print epoch number
            while(cases_left > tests_size):         # for a epoch train on the whole dataset 
                self.train(dataset[cases_left - 1][0], dataset[cases_left - 1][1]) # training is done from the last word
                cases_left -= 1
            average_error = 0.0                    
            for i in range(cases_left):         # for each word in the dataset 
                average_error += cost_function(dataset[i][1], self.check(dataset[i][0])) # predict and calculate the error
            average_error /= cases_left         # normalize the average error
            print ("Average error: " + str(average_error)) # print the average error

    def train(self, word, exp):
        cut_v = np.vectorize(cut)
        word_length = len(word)
        positions = np.zeros([word_length + 1, NUMBER_OF_POSITIONS])
        before_normalize = np.zeros([word_length, NUMBER_OF_POSITIONS])
        d_tensor = np.zeros([NUMBER_OF_POSITIONS, NUMBER_OF_CHARS, NUMBER_OF_POSITIONS])
        d_adder = np.zeros(NUMBER_OF_POSITIONS)
        positions[0] = START_POSITION

        for k in range(word_length):
            curr_word = char_to_vector(word[k])
            before_normalize[k] = match(self, curr_word, positions[k])
            positions[k + 1] = normalize(before_normalize[k])
        answer = lastsum(self, positions[-1])
        error = cost_function(exp, answer)


        gradient = cost_function_derivative(exp, answer)
        d_adder += lastsum_derivative_adder(self, gradient, positions[-1])
        gradient = lastsum_derivative(self, gradient)
        first_gradient = sum(abs(gradient)) * TenzToAdd
        for k in range(word_length - 1, -1, -1):
            curr_grad = sum(abs(gradient))
            if (curr_grad < 0.001):
                koef = 1.0
            else:
                koef = first_gradient / sum(abs(gradient))
            gradient *= koef
            curr_word = char_to_vector(word[k])
            gradient = normalize_derivative(gradient, before_normalize[k])
            d_tensor += match_derivative_tensor(gradient, curr_word, positions[k])
            gradient = match_derivative(self, gradient, curr_word)

        d_tensor /= word_length
        self.tensor = cut_v(self.tensor - NU * d_tensor)
        self.adder = cut_v(self.adder - NU * NU_ADDER * d_adder)
        return error

    def get_automaton(self): # method to get the DFA, traverse through 3D array tensor
        
        for i in range(NUMBER_OF_POSITIONS): # for each index of a state, last index of 3D array, from state
            for j in range(NUMBER_OF_CHARS): # for each index of a character, middle index of 3D array, character
                max_ind = 0                  # set max_ind as 0 initially
                for k in range(1, NUMBER_OF_POSITIONS): # for each index of a state, first index of 3D array, to state
                    if (nn.tensor[k][j][i] > nn.tensor[max_ind][j][i]): # find the index with the maximum value 
                        max_ind = k                                     # set it to k
                print(str(i) + "--" + str(DICTIONARY[j]) + '-->' + str(max_ind)) # then from(i) state upon consuming(j)
                                                     # has maximum probability of going to to(k) state
        
        for k in range(NUMBER_OF_POSITIONS):  # for each index of a state in adder vector
            if (nn.adder[k] > 0.5):           # if the value of the a particular index is greater than 0.5
                print(str(k) + " is terminal") # then it has more than 50% chance to be terminal
    
    
    def run(self, word): # method to run a word on the trained dfa and check whether the word gets accepted or rejected
        output = nn.check(word)        # call check method
        if (output > float(0.6)):      # if the probability is greater than 60 %
            ans = 'Accepted'           # then accept the word
        else: 
            ans = 'Rejected'           # or reject the word
        return output, ans             # return both output and ans
                              

In [106]:
# training the NN and getting the DFA

nn = NeuralNetwork();
nn.train_online(dataset)
nn.get_automaton()

Epoch #1
Average error: 8.4795933554787e-08
0--a-->1
0--b-->2
1--a-->3
1--b-->2
2--a-->1
2--b-->3
3--a-->3
3--b-->3
1 is terminal
2 is terminal


In [91]:
output, ans = nn.run("abaaba")
print(ans)

Rejected


In [93]:
# print(nn.tensor)