In [None]:
import pandas as pd
import numpy as np
import time
import re
from scipy.optimize import curve_fit
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import glob

**Implementação do algoritmo**

- definição das etapas
- definição das estruturas de dados
- escrita do algorimto
- normalização do algoritmo


In [None]:
from threading import Thread
from time import sleep, time

delta = dict()

def waiting(t, f):
    sleep(t)
    f['value'] = True

def handler(limit):
    t0 = time()

    f = {"value": False}
    t = Thread(target=waiting, args=(10,f))
    t.start()

    while(time() - t0 <= limit and not f['value']):
        continue

    print("Finish with "+str(time() - t0)+" s")

In [None]:
class MT(): #Turing Machine

    def __init__(self):
        pass

    def auto_run(self):
        
        E = input().split(" ") #1° line: list of states
        alphabet = input().split(" ") #2° line: alphabet
        tape_alphabet = input().split(" ") #3° line: the alphabet of the data tape
        self.l_limiter = input() #4° line: data tape left limiter
        self.white = input() #5° line: the white character
        n = int(input()) #n transactions

        #----- set states ------
        alphabet.append("*") #make sure the alphabet have the empty character
        
        #initialize the data tape
        tape_alphabet = [self.l_limiter] + tape_alphabet #add left limiter
        tape_alphabet.append(self.white) #make sure the data tape have white character after left limiter

        iteractions = []
        for i in range(n):
            iteractions.append(input())

        self.mt_graph = self._build_graph(E, alphabet, tape_alphabet, iteractions)

        self.init_state = str(input())

        self.final_states = str(input()).split(" ")

        words = input().split(" ")

        for word in words:
            print(self.extended_delta(list(word)))

    def _build_graph(self, E, alphabet, tape_alphabet, iteractions):
        """Initialize the graph which describes the rule of transactions.

        @E: (List) list of states
        @alphabet: (List) the set of characters of alphabet (Σ)
        @tape_alphabet: (List) the set of characters of alphabet (Γ)
        Σ is subset of Γ
        """
        mt_graph = dict()

        for e in E: #initialize the graph
            for c in alphabet: 
                mt_graph[(e, c)] = []
                mt_graph[(e, c)] = []
                mt_graph[(e, c)] = []

            for c in tape_alphabet: 
                mt_graph[(e, c)] = []
                mt_graph[(e, c)] = []
                mt_graph[(e, c)] = []

        for it in iteractions: #to each n iteration, receive <a, b, c, d, e>:
            """
            a: 0 = start state, b: 1 = character on tape, c: 2 = destination state, 
            d: 3 = character to be writed on tape, e: 4 = direction (E = left, D = right, I = feft before right)
            """
            s = it.split(" ")
            mt_graph[(s[0], s[1])].append((s[2], s[3], s[4])) #(a, b)_<p> -> [(c, d, e)_1, (c, d, e)_2, ..., (c, d, e)_n]

        return mt_graph

    def direction(self, d, i):
        
        if d == "D":
            return i + 1
        
        if d == "E":
            return i - 1

        return i

    def extended_delta(self, tape):

        if not self.l_limiter in tape: #Make sure the tape is with correct shape
            tape = [self.l_limiter] + tape
        tape.extend( list(self.white*10)) #10 is margin to avoid index out range.

        threads = [(self.init_state, 1, tape)] # (current state, heah position, tape data)
        
        while threads:
            c_ste, i, tape_t = threads.pop() #tape_t: tape at thread t. Each thread have a tape
            
            transactions = self.mt_graph[(c_ste, tape_t[i])]  #Transactions we have to state c_ste receiving word_i
            
            if c_ste in self.final_states and len(transactions) == 0: #if current state is a final state and new transactions to this thread doesn't exists
                return "S" #is accepted

            self.delta(i, transactions, tape_t, threads) #threads is changed by reference
            
        return "N"

    def delta(self, i, transactions, tape_t, threads):
        """Process transactions. Return (bool).
        Return True only if the current thread stop on final state and have not new transactions to do
        @i: (int) head position
        @transactions: (List) transactions to current thread
        @tape_t: (List) data tape to current thread
        @threads: (List) remain threads to be processed
        """
        for c, d, e in transactions: #c: new state, d: character to be writed on tape_t[i], e: direction
            i_ = self.direction(e,i) #change the head position based on direction definition
            tape_t_ = tape_t.copy() #make sure not change the current tape_t to avoid conflits
            tape_t_[i] = d #write the character e at i position of new tape_t
            threads.append((c, i_, tape_t_)) #insert on threads to be processed

    def setup(self, definition):
        """Method to test the algorithm
        @Definition: (Dataframe)
        """
        E = definition[0].split(" ") #1° line: list of states
        alphabet = definition[1].split(" ") #2° line: alphabet
        tape_alphabet = definition[2].split(" ") #3° line: the alphabet of the data tape
        self.l_limiter = definition[3] #4° line: data tape left limiter
        self.white = definition[4] #5° line: the white character
        n = int(definition[5]) #n transactions

        #----- set states ------
        alphabet.append("*") #make sure the alphabet have the empty character
        
        #initialize the data tape
        tape_alphabet = [self.l_limiter] + tape_alphabet #add left limiter
        tape_alphabet.append(self.white) #make sure the data tape have white character after left limiter

        iteractions = []
        for i in range(6, n+6, 1):
            iteractions.append(definition[i])

        self.mt_graph = self._build_graph(E, alphabet, tape_alphabet, iteractions)

        self.init_state = str(definition[n+6])

        self.final_states = str(definition[n+7]).split(" ")


### Results

In [None]:
def d_from_file(file):
    """Definition from file"""

    f = open(file,"r")
    data = []
    for l in f.readlines():
        data.append(l[0:-1])        
    f.close()
    return data

def get_definitions(path):
    definitions = []
    for file in glob.glob(path+"/*.mt"):
        definitions.append(d_from_file(file))

    return definitions

#### Generate words

In [None]:
## Generate too many words
words = []
for k in np.arange(0, 34, 1):
    ones = str(np.ones(int(k/2)).astype(int))[1:-1].replace(" ","")
    zeros = str(np.zeros(int(k/2)).astype(int))[1:-1].replace(" ","")
    words.append(zeros+ones)
words_df = pd.DataFrame({"words":words})

In [None]:
words_df.to_csv("words_01.csv", index=False)

In [None]:
words_df = pd.read_csv("words_01.csv")
words_df.loc[0, "words"] = "*"

#### Test

In [None]:
turing_machine = MT()
turing_machine.setup(d_from_file("./data/def1.tm"))

for w in "* ab ba abb aab aabb".split(" "):
    print(w, end=" : ")
    print(turing_machine.extended_delta(list(w)))

## Curve adjustment

In [None]:
#choose the model
def func(x, a, b, c):
    return a*np.exp(b*x-c)

#fit
for i in tests.i.unique():
    test_i = tests[tests.i == i]
    popt, pcov = curve_fit(func, test_i.len_w, test_i.delta_T)

    #parameters found to curve fitted
    definitions.loc[i,'a'] = popt[0]
    definitions.loc[i,'b'] = popt[1]
    definitions.loc[i,'c'] = popt[2]

    #plot the result
    x = np.arange(0, test_i.len_w.max()+3, 1)

    fig, ax = plt.subplots(figsize=(15, 9))
    
    ax.axvline(x=0, color='red')
    ax.axhline(y=0, color='yellow')

    ax.plot(x, func(x, *popt), 'r-', label='fit: a = %f, b = %f, c = %f'%tuple(popt))
    ax.plot(test_i.len_w, test_i.delta_T, 'b-', label='Data')

    ax.set(xlabel='|w|->', ylabel='deltaT(|w|) (s)->',
            title="Nº "+str(i)+", Time cost, "+str(test_i.qtd_s.iloc[0])+" states")
    ax.grid()
    ax.legend()
    plt.show()

The relative error is as follow: 

$ RE = \dfrac{|y-y_{pred}|}{y} $

In [None]:
#lets calculate the error
tests['y_exp'] = func(tests.len_w, *popt)
tests['error'] = np.abs(test_i.delta_T - tests['y_exp'])/test_i.delta_T
tests['mse'] = mean_squared_error(test_i.delta_T, tests['y_exp'] )

### Adjustment with linear

In [None]:
tests['logT'] = np.log(tests.delta_T)

def linear(x, a, b):
    return a*x+b

popt, pcov = curve_fit(func, test_i.len_w, test_i.delta_T)

In [None]:
x = np.arange(0, 63, 1)

fig, ax = plt.subplots(figsize=(15, 9))
    
ax.axvline(x=0, color='red')
ax.axhline(y=0, color='yellow')

ax.plot(x, np.log(func(x, *popt)), 'r-', label='fit: a = %f, b = %f, c = %f'%tuple(popt))
ax.plot(test_i.len_w, np.log(test_i.delta_T), 'b-', label='Data')

ax.set(xlabel='|w|->', ylabel='deltaT(|w|) ->',
        title="Time cost, "+str(test_i.qtd_s.iloc[0])+" states")
ax.grid()
ax.legend()
plt.show()