<a href="https://colab.research.google.com/github/limpa105/RegexPlus/blob/neural_stuff/neural/neural_speaker.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install hypothesis

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting hypothesis
  Downloading hypothesis-6.50.1-py3-none-any.whl (388 kB)
[K     |████████████████████████████████| 388 kB 6.0 MB/s 
[?25hCollecting exceptiongroup>=1.0.0rc8
  Downloading exceptiongroup-1.0.0rc8-py3-none-any.whl (11 kB)
Installing collected packages: exceptiongroup, hypothesis
Successfully installed exceptiongroup-1.0.0rc8 hypothesis-6.50.1


In [2]:
import string 
import random 
from numpy.random import choice
from scipy.stats import skewnorm

from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import string
import re
import random
import hypothesis

import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
def generate_training_data(num_examples: int, max_size:int, regex_things:list) -> list:
  examples = [ [ regex_things[random.randint(0,len(regex_things)-1)] for  i in range(random.randint(1,max_size))] for k in range(num_examples)]
  return examples
  
def generate_training_data_pairs(training_data: list) -> list:
  return [(i, hypothesis.strategies.from_regex('^' + ''.join(i) + '$').example().strip()) for i in training_data]

In [5]:
# make the language 
# first iteration = very restricted language no optionals no constants
regex_things : list = ['[0-9]','[a-z]','[A-Z]','[a-zA-Z]', '[a-zA-Z0-9]']
ascii_char : list = list(string.printable)[:95]
special_char = ['.', '\\', '+', '*', ')', '(', '?' ]
#regex_things = regex_things + [ i for i in ascii_char if i not in special_char]
#regex_things = regex_things + [ '\\' + i for i in special_char]
# takes some time to run 
# TODO: Write this into a file and read from a filwe
training_data = generate_training_data(1000, 10, regex_things)
pairs = generate_training_data_pairs(training_data)

In [6]:
SOS_token = 0
EOS_token = 1

# constructing langiage using 
# https://pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial.html

class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0: "SOS", 1: "EOS"}
        self.n_words = 2  # Count SOS and EOS
      
    def addList(self, list_words):
        for word in list_words:
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

In [7]:
MAX_LENGTH = 12
def readLangs(lang1: list, lang2:list, reverse=False):
  input_lang = Lang('regex')
  output_lang = Lang('english')
  input_lang.addList(lang1)
  output_lang.addList(lang2)
  print(input_lang.name, input_lang.n_words)
  print(output_lang.name, output_lang.n_words)
  return input_lang, output_lang


In [8]:
input_lang, output_lang = readLangs(regex_things, ascii_char)

regex 7
english 97


In [9]:
# model code also taken from 
# https://pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial.html
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        output = embedded
        output, hidden = self.gru(output, hidden)
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)


class AttnDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH):
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length

        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input, hidden, encoder_outputs):
        embedded = self.embedding(input).view(1, 1, -1)
        embedded = self.dropout(embedded)

        attn_weights = F.softmax(
            self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(0),
                                 encoder_outputs.unsqueeze(0))

        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)

        output = F.relu(output)
        output, hidden = self.gru(output, hidden)

        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [10]:
def indexesFromList(lang, char_list):
    return [lang.word2index[word] for word in char_list]


def tensorFromList(lang, char_list):
    indexes = indexesFromList(lang, char_list)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)


def tensorsFromPair(pair):
    input_tensor = tensorFromList(input_lang, pair[0])
    target_tensor = tensorFromList(output_lang, list(pair[1]))
    return (input_tensor, target_tensor)


In [11]:
teacher_forcing_ratio = 0.5

def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH):
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

    loss = 0

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(
            input_tensor[ei], encoder_hidden)
        encoder_outputs[ei] = encoder_output[0, 0]

    decoder_input = torch.tensor([[SOS_token]], device=device)

    decoder_hidden = encoder_hidden

    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    if use_teacher_forcing:
        # Teacher forcing: Feed the target as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di]  # Teacher forcing

    else:
        # Without teacher forcing: use its own predictions as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze().detach()  # detach from history as input

            loss += criterion(decoder_output, target_tensor[di])
            if decoder_input.item() == EOS_token:
                break

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length

In [12]:
import time
import math


def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

In [13]:
def trainIters(encoder, decoder, n_iters, print_every=1000, plot_every=100, learning_rate=0.01):
    start = time.time()
    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
    training_pairs = [tensorsFromPair(random.choice(pairs))
                      for i in range(n_iters)]
    criterion = nn.NLLLoss()

    for iter in range(1, n_iters + 1):
        training_pair = training_pairs[iter - 1]
        input_tensor = training_pair[0]
        target_tensor = training_pair[1]

        loss = train(input_tensor, target_tensor, encoder,
                     decoder, encoder_optimizer, decoder_optimizer, criterion)
        print_loss_total += loss
        plot_loss_total += loss

        if iter % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print('%s (%d %d%%) %.4f' % (timeSince(start, iter / n_iters),
                                         iter, iter / n_iters * 100, print_loss_avg))

        if iter % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            plot_loss_total = 0

    showPlot(plot_losses)

In [14]:
import matplotlib.pyplot as plt
plt.switch_backend('agg')
import matplotlib.ticker as ticker
import numpy as np


def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    # this locator puts ticks at regular intervals
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)

In [19]:
def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH):
    with torch.no_grad():
        input_tensor = tensorFromList(input_lang, sentence)
        input_length = input_tensor.size()[0]
        encoder_hidden = encoder.initHidden()

        encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei],
                                                     encoder_hidden)
            encoder_outputs[ei] += encoder_output[0, 0]

        decoder_input = torch.tensor([[SOS_token]], device=device)  # SOS

        decoder_hidden = encoder_hidden

        decoded_words = []
        decoder_attentions = torch.zeros(max_length, max_length)

        for di in range(max_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            decoder_attentions[di] = decoder_attention.data
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            else:
                decoded_words.append(output_lang.index2word[topi.item()])

            decoder_input = topi.squeeze().detach()

        return decoded_words, decoder_attentions[:di + 1]

def evaluateRandomly(encoder, decoder, n=10):
    for i in range(n):
        pair = random.choice(pairs)
        print('>', pair[0])
        print('=', pair[1])
        output_words, attentions = evaluate(encoder, decoder, pair[0])
        output_sentence = ' '.join(output_words)
        print('<', output_sentence)
        print('')

In [16]:
hidden_size = 256
encoder1 = EncoderRNN(input_lang.n_words, hidden_size).to(device)
attn_decoder1 = AttnDecoderRNN(hidden_size, output_lang.n_words, dropout_p=0.1).to(device)

trainIters(encoder1, attn_decoder1, 30000, print_every=1000)

0m 13s (- 6m 26s) (1000 3%) 2.9813
0m 25s (- 5m 52s) (2000 6%) 3.2510
0m 37s (- 5m 40s) (3000 10%) 3.1910
0m 49s (- 5m 23s) (4000 13%) 3.1228
1m 1s (- 5m 9s) (5000 16%) 3.0718
1m 13s (- 4m 54s) (6000 20%) 3.0227
1m 25s (- 4m 41s) (7000 23%) 2.9434
1m 37s (- 4m 28s) (8000 26%) 2.9148
1m 49s (- 4m 16s) (9000 30%) 2.8696
2m 2s (- 4m 4s) (10000 33%) 2.7986
2m 14s (- 3m 52s) (11000 36%) 2.6889
2m 27s (- 3m 40s) (12000 40%) 2.6434
2m 39s (- 3m 28s) (13000 43%) 2.5758
2m 51s (- 3m 16s) (14000 46%) 2.4474
3m 4s (- 3m 4s) (15000 50%) 2.3789
3m 20s (- 2m 55s) (16000 53%) 2.2746
3m 34s (- 2m 44s) (17000 56%) 2.2192
3m 47s (- 2m 31s) (18000 60%) 2.1034
4m 0s (- 2m 19s) (19000 63%) 1.9971
4m 12s (- 2m 6s) (20000 66%) 1.8478
4m 25s (- 1m 53s) (21000 70%) 1.7082
4m 38s (- 1m 41s) (22000 73%) 1.6896
4m 50s (- 1m 28s) (23000 76%) 1.5565
5m 3s (- 1m 15s) (24000 80%) 1.4591
5m 15s (- 1m 3s) (25000 83%) 1.3903
5m 28s (- 0m 50s) (26000 86%) 1.2677
5m 40s (- 0m 37s) (27000 90%) 1.2353
5m 53s (- 0m 25s) (280

In [24]:
evaluate(encoder1, attn_decoder1, ['[0-9]', '[a-z]', '[A-Z]'])

(['1', 'b', 'B', '<EOS>'],
 tensor([[8.8615e-12, 3.1665e-11, 1.0000e+00, 1.2885e-09, 3.0413e-06, 9.5006e-16,
          2.7599e-15, 1.5137e-17, 3.5965e-11, 1.5679e-12, 8.3069e-12, 1.1674e-11],
         [2.5416e-16, 8.2426e-16, 2.0843e-14, 3.4032e-15, 1.0000e+00, 2.1677e-13,
          1.5051e-21, 9.7665e-27, 7.8146e-14, 5.2968e-17, 5.1381e-16, 1.1139e-15],
         [3.8029e-11, 4.4879e-11, 1.0111e-09, 3.7400e-10, 1.0000e+00, 2.8441e-10,
          5.2547e-13, 9.6653e-17, 6.0517e-10, 1.9397e-11, 3.5769e-11, 5.9174e-11],
         [2.6312e-06, 1.6109e-06, 3.1796e-07, 4.7060e-06, 3.6137e-04, 9.9952e-01,
          3.5089e-09, 3.3599e-08, 6.1721e-07, 1.0055e-04, 2.2303e-06, 3.0490e-06]]))