In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [3]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
from math import *
from IPython.display import clear_output
import random

In [4]:
try :
  with open('/kaggle/working/input.txt', 'r', encoding='utf-8') as f:
      text = f.read()
except:
  !wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
  with open('input.txt', 'r', encoding='utf-8') as f:
      text = f.read()
print(len(text))

# here are all the unique characters that occur in this text
chars = sorted(list(set(text)))
vocab_size = len(chars)
# create a mapping from characters to integers
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s] # encoder: take a string, output a list of integers
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of integers, output a string

# Train and test splits
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.1*len(data)) # first 90% will be train, rest val
train_data = data[n:]
val_data = data[:n]

"""
data = torch.tensor(encode(text), dtype=torch.long)
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]
"""

# data loading
def get_batch(split, block_size, batch_size):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y
     

--2024-06-16 16:39:14--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.110.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: 'input.txt'


2024-06-16 16:39:14 (17.9 MB/s) - 'input.txt' saved [1115394/1115394]

1115394


In [5]:
class CausalMemory(nn.Module):
    def __init__(self, inp_size, out_size, time_pred=3):
        super().__init__()
        self.time_pred = time_pred
        self.out_size = out_size
        
        self.w = torch.randn(inp_size, out_size)
        self.causal = torch.zeros(1, inp_size, out_size)
        
        self.wp = torch.randn(out_size, time_pred)
        self.memory = []
        self.causal_pred = torch.zeros(1, out_size, time_pred)
        self.memoryt1 = torch.zeros(1, time_pred+1)

    def loss_fn(self, a, b):
        loss = a - b
        return loss
    
    def forward(self, sdr, learning=True, lr=3e-2):
        
        ########## FORWARD
        # sdr : (1, inp_size)
        out = torch.special.erf(sdr @ self.w) # (1, out_size)

        if learning:
            causality = sdr.T @ out # (inp_size, out_size)
            self.causal = torch.cat((self.causal, causality.unsqueeze(0)), axis=0)    
            causal = torch.mean(self.causal, 0)
            
            loss = self.loss_fn(causal, self.w)
            
            self.w = self.w + (loss * lr)
        
        self.memory.append(out.T) # (out_size, 1)
        
        ########## PREDICTION
        
        memory = torch.cat(self.memory, axis=1)[:, -self.time_pred:] # (out_size, time_pred)
        
        if self.memoryt1.shape[1] == self.time_pred:
            pred = torch.sum(memory*self.wp, dim=1) # (out_size)
            pred = torch.special.erf(pred).unsqueeze(1) # (out_size, 1)

            if learning:
                causality = out.T * self.memoryt1 # (out_size, time_pred)
                self.causal_pred = torch.cat((self.causal_pred, causality.unsqueeze(0)), axis=0)    
                causal = torch.mean(self.causal_pred, 0) # (out_size, time_pred)
                
                loss = self.loss_fn(causal, self.wp)# (out_size, time_pred)
                self.wp = self.wp + (loss * lr) # (out_size, time_pred)
        else:
            pred = None
        
        self.memoryt1 = memory
        
        return out, pred

In [6]:
class SupervisedCausalMemory(nn.Module):
    def __init__(self, inp_size, out_size, time_pred):
        super().__init__()
        self.time_pred = time_pred
        self.out_size = out_size
        
        self.w = torch.randn(inp_size, out_size)
        self.causal = torch.zeros(1, inp_size, out_size)
        
        self.wp = torch.randn(out_size, time_pred)
        self.memory = []
        self.causal_pred = torch.zeros(1, out_size, time_pred)
        self.pred_countdown = 0
        self.memoryt1 = torch.zeros(1, time_pred+1)
        

    def loss_fn(self, a, b):
        loss = a - b
        return loss
    
    def forward(self, sdr, target=None, learning=True, lr=3e-2, pred_countdown=20):
        # sdr : (1, inp_size)
        # target : (1, out_size)
        
        ########## FORWARD
        
        out = torch.special.erf(sdr @ self.w) # (1, out_size)

        if learning:
            causality = sdr.T @ target # (inp_size, out_size)
            self.causal = torch.cat((self.causal, causality.unsqueeze(0)), axis=0)    
            causal = torch.mean(self.causal, 0)
            
            loss = self.loss_fn(causal, self.w)
            
            self.w = self.w + (loss * lr)
        
        self.memory.append(target.T if target is not None else out.T) # (out_size, 1)
        
        
        ########## PREDICTION
        
        memory = torch.cat(self.memory, axis=1)[:, -self.time_pred:] # (out_size, time_pred)
        
        if memory.shape[1] == self.time_pred:
            pred = torch.sum(memory*self.wp, dim=1) # (out_size)
            pred = torch.special.erf(pred).unsqueeze(1) # (out_size, 1)
            
            if learning and self.pred_countdown > pred_countdown:
                causality = target.T * self.memoryt1 # (out_size, time_pred)
                
                self.causal_pred = torch.cat((self.causal_pred, causality.unsqueeze(0)), axis=0)    
                causal = torch.mean(self.causal_pred, 0) # (out_size, time_pred)
                
                loss = self.loss_fn(causal, self.wp) # (out_size, time_pred)
    
                self.wp = self.wp + (loss * lr) # (out_size, time_pred)
                
            self.memoryt1 = memory
        else:
            pred = None
        
        self.pred_countdown += 1
        return out, pred.T if pred is not None else pred

In [7]:
data = [0, 1, 2, 3, 4, 5]
l = SupervisedCausalMemory(6, 6, time_pred=4)
for i in range(500):
    x = torch.full((1, 6), -1.0)
    x[:, data[int(i%6)] ] = 1.0
    out = l(x, x, lr=0.1)

In [8]:
x = torch.tensor([[-1, -1, 1, -1, -1, -1]]).float()

out, pred = l(x, target=None, learning=False)


In [9]:
print(F.relu(out),"\n", F.relu(pred))

tensor([[0., 0., 0., 0., 0., 0.]]) 
 tensor([[0., 0., 0., 0., 0., 0.]])


In [10]:
class CausalBlock(nn.Module):
    def __init__(self, dim, out_size, time_pred, lr=0.05):
        super().__init__()  
        self.lr = lr
        
        self.layers = nn.ModuleList([CausalMemory(dim[i], dim[i+1], time_pred=time_pred) for i in range(len(dim)-1)])
        self.head = SupervisedCausalMemory(dim[-1], out_size, time_pred)
        
    def forward(self, x, target=None, learning=True):

        for l in self.layers:
            x, pred = l(x, learning=learning, lr=self.lr)

        out, pred = self.head(x, target=target, learning=learning, lr=self.lr, pred_countdown=30)
        return out, pred

In [15]:
def train():
    len_pred = 11
    #[vocab_size, int(1.2*vocab_size), int(1.4*vocab_size), int(1.6*vocab_size)]
    model = CausalBlock([vocab_size], vocab_size, len_pred)
    
    for e in range(75):
        idx = 0
        clear_output(wait=True)
        print(decode(train_data[0:len_pred].tolist()))
        for i in range(len_pred):
            letter = train_data[idx]
            #print(itos[letter.item()], end="")
            x = torch.zeros(1, vocab_size, dtype=torch.float)
            x[0, letter] = 1.0

            out, pred = model(x, x)
            try :
                pass
                max = torch.argmax(pred)
                print(itos[max.item()], end="")
            except:
                pass
            idx += 1


In [16]:
train()

et, Marcius
t, Marciuse