<a href="https://colab.research.google.com/github/msh2481/CodeStyler/blob/main/Feedforward.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [67]:
!rm -rf ./*
!git clone https://github.com/msh2481/CodeStyler.git && mv CodeStyler/* . && rm -rf CodeStyler
!ls

Cloning into 'CodeStyler'...
remote: Enumerating objects: 7927, done.[K
remote: Counting objects: 100% (7927/7927), done.[K
remote: Compressing objects: 100% (6527/6527), done.[K
remote: Total 7927 (delta 1401), reused 7917 (delta 1399), pack-reused 0[K
Receiving objects: 100% (7927/7927), 9.10 MiB | 15.38 MiB/s, done.
Resolving deltas: 100% (1401/1401), done.
Baseline.ipynb	filenames.txt  files  README.md


In [193]:
from random import shuffle, choices, choice
from collections import deque, defaultdict, Counter
import numpy as np
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm.notebook import tqdm

In [203]:
SUFFIX_SIZE = 10
TRAIN_SIZE = 100000
TEST_SIZE = 10000
MIN_OCCURENCES = 10
BATCH_SIZE = 512

In [204]:
def fmt(number):
    return '{:.5f}'.format(number)

In [205]:
rawTexts = []
alphabet = Counter()
for filename in open('filenames.txt'):
    if len(rawTexts) > TRAIN_SIZE + TEST_SIZE:
        break
    text = open(filename.strip()).read()
    if 'debug' in text or 'DEBUG' in text:
        continue
    alphabet.update(text)
    for pos in range(0, len(text) - SUFFIX_SIZE):
        rawTexts.append(text[pos : pos + SUFFIX_SIZE + 1])
alphabetCount = Counter()
alphabetCount['█'] = 0
for x, y in alphabet.items():
    if y >= MIN_OCCURENCES:
        alphabetCount[x] += y
    else:
        alphabetCount['█'] += y
alphabet = [x for x, y in alphabetCount.items()]
ALPHABET_SIZE = len(alphabet)
print(f'alphabet of length {len(alphabet)}: {alphabetCount}')

shuffle(rawTexts)
print(f'{len(rawTexts)} texts in total')
print(rawTexts[:3])

alphabet of length 92: Counter({' ': 24491, 'e': 6100, 't': 5087, 'a': 4894, 'n': 4065, 'r': 3685, 'i': 3666, '\n': 3554, 'l': 3458, 's': 3267, 'o': 3203, '0': 2347, 'u': 2076, 'p': 1667, 'c': 1609, 'E': 1535, '_': 1501, '.': 1454, 'S': 1390, '(': 1209, ')': 1209, 'd': 1204, 'T': 1196, ':': 1178, 'f': 1140, 'v': 1127, 'N': 1118, 'I': 1111, 'A': 1098, 'm': 1058, '1': 1050, 'g': 1040, 'C': 1013, 'b': 927, 'y': 860, '>': 840, 'R': 770, 'h': 753, '-': 748, '"': 711, '/': 688, '=': 674, '{': 661, '}': 661, '2': 622, 'O': 620, 'L': 618, 'U': 554, 'B': 549, '!': 548, ',': 545, 'P': 497, 'k': 487, 'D': 468, '<': 461, '3': 451, 'M': 449, '*': 399, 'x': 346, 'F': 341, 'w': 269, 'K': 235, 'V': 226, 'j': 215, '?': 205, '+': 204, '9': 201, 'W': 191, '4': 174, 'G': 169, '5': 155, 'H': 150, '6': 145, 'z': 114, 'q': 110, '8': 106, '7': 98, 'Q': 95, 'Y': 93, 'X': 90, 'J': 69, '`': 68, ';': 59, '@': 45, '&': 44, '$': 31, '|': 28, "'": 21, '[': 16, ']': 16, '█': 15, 'Z': 10})
114355 texts in total
['   v

In [206]:
charToIndexMap = { c : i for i, c in enumerate(alphabet) }
def charToIndex(c):
    return torch.as_tensor(charToIndexMap.get(c, ALPHABET_SIZE - 1), dtype=torch.long)

def stringToTensor(cur):
    x = torch.zeros(ALPHABET_SIZE * SUFFIX_SIZE)
    for j in range(SUFFIX_SIZE):
        x[j * ALPHABET_SIZE + charToIndex(cur[j])] = 1
    return x

class StringDataset(Dataset):
    def __init__(self, strings):
        super(StringDataset, self).__init__()
        self.strings = strings
    def __len__(self):
        return len(self.strings)
    def __getitem__(self, i):
        cur = self.strings[i]
        return stringToTensor(cur[:-1]), charToIndex(cur[-1])

trainSet = DataLoader(StringDataset(rawTexts[: TRAIN_SIZE]), batch_size=BATCH_SIZE, shuffle=True)
testSet = DataLoader(StringDataset(rawTexts[TRAIN_SIZE : TRAIN_SIZE + TEST_SIZE]), batch_size=BATCH_SIZE, shuffle=False)

In [207]:
print(len(trainSet), len(testSet))
print('---')
print(next(iter(trainSet)))
print('---')

196 20
---
[tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 1.,  ..., 0., 0., 0.],
        [0., 0., 1.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]]), tensor([ 2, 34, 71,  2, 58, 57, 37, 36, 76, 44, 24, 66, 66, 64, 44,  2,  2, 36,
        14, 29, 35,  2, 49, 43, 30, 61,  2,  2,  2, 58, 24, 55,  2,  2,  2, 28,
        56,  2, 51, 44,  3, 35, 28, 51,  2, 51, 44, 45, 48, 46, 30, 43, 36,  2,
        57,  2, 24, 62, 46, 57, 34,  2, 50, 26, 41, 66, 28, 55, 24, 36,  2, 84,
        12, 57, 22, 26, 66, 28,  8,  2, 43, 10,  1, 59, 24, 57,  2,  2,  2, 43,
        37, 26, 35, 30,  2, 60, 57,  2, 35,  2, 44, 53,  2,  2, 54, 75, 24, 58,
        51,  2,  2, 60,  4,  2, 30, 16,  1, 44, 13, 13, 29, 23, 28, 28, 11,  2,
        37, 66, 26, 55, 24,  8, 61,  2,  2,  4, 21, 39, 55, 56, 35, 26,  2, 26,
        16, 59, 37,  3, 44, 49,  2, 53,  2, 65, 26, 50, 28, 26, 64, 26, 51,  2,
        30,  

In [208]:
class Predictor(nn.Module):
    def __init__(self):
        super(Predictor, self).__init__()
        sizes = [SUFFIX_SIZE * ALPHABET_SIZE, 64, 32, 64, ALPHABET_SIZE]
        layerBuffer = []
        for i in range(len(sizes) - 1):
            layerBuffer.append(nn.Linear(sizes[i], sizes[i + 1], bias=False))
            layerBuffer.append(nn.BatchNorm1d(sizes[i + 1]))
            if i < len(sizes) - 2:
                layerBuffer.append(nn.LeakyReLU())
            else:
                layerBuffer.append(nn.LogSoftmax(dim=-1))
            # Add last layer as relu + tanh?
        self.layers = nn.Sequential(*layerBuffer)

    def forward(self, inputTensor):
        if inputTensor.shape[1:] != (SUFFIX_SIZE * ALPHABET_SIZE, ):
            print(inputTensor.shape)
            print(inputTensor)
        assert inputTensor.shape[1:] == (SUFFIX_SIZE * ALPHABET_SIZE, )
        outputTensor = self.layers(inputTensor)
        assert outputTensor.shape[1:] == (ALPHABET_SIZE, )
        return outputTensor

    def logProbabilityOfNext(self, inputTensor, number):
        prediction = self(inputTensor)
        result = prediction[charToIndex(number)]
        return result
    
    def guessNext(self, inputTensor):
        idx = self(inputTensor).argmax().item()
        return alphabet[idx]

In [209]:
lossFunction = nn.NLLLoss()

def evaluateOnBatch(predictor, batchInputs, batchAnswers):
    probabilities = predictor(batchInputs)
    assert probabilities.shape[1:] == (ALPHABET_SIZE, )
    loss = lossFunction(probabilities, batchAnswers)
    # print(f'loss = {loss}')
    outputs = probabilities.argmax(dim=-1)
    accuracy = (outputs == batchAnswers).float().mean()
    return accuracy, loss
        
def train(predictor, epochs, startEpoch):
    optimizer = torch.optim.Adam(predictor.parameters())
    for epoch in range(epochs):
        predictor.train()
        trainAccuracy = 0
        trainLogLoss = 0
        trainSize = 0
        for batchInputs, batchAnswers in tqdm(trainSet):
            optimizer.zero_grad()
            accuracy, loss = evaluateOnBatch(predictor, batchInputs, batchAnswers)
            loss.backward()
            optimizer.step()
            trainAccuracy += accuracy
            trainLogLoss += loss.item()
        trainAccuracy /= len(trainSet) 
        trainLogLoss /= len(trainSet) 

        with torch.no_grad():
            predictor.eval()
            testAccuracy = 0
            testLogLoss = 0
            testSize = 0
            for batchInputs, batchAnswers in tqdm(testSet):
                accuracy, logLoss = evaluateOnBatch(predictor, batchInputs, batchAnswers)
                testAccuracy += accuracy
                testLogLoss += loss.item()
            testAccuracy /= len(testSet)  
            testLogLoss /= len(testSet) 
            print(f'#{startEpoch + epoch}: {fmt(trainAccuracy)} {fmt(trainLogLoss)} {fmt(testAccuracy)} {fmt(testLogLoss)}')
        

In [210]:
def samplePrediction(predictor, length):
    s = choice(rawTexts)
    for i in range(length):
        suffix = s[-SUFFIX_SIZE:]
        assert len(suffix) == SUFFIX_SIZE
        w = list(*predictor(stringToTensor(suffix).view(1, -1)).exp().detach())
        c = choices(alphabet, w)[0]
        s += c
    return s

In [212]:
predictor = Predictor()

for i in range(300):
    train(predictor, 1, i)
    print(i, samplePrediction(predictor, 300))

  0%|          | 0/196 [00:00<?, ?it/s]

KeyboardInterrupt: ignored