<a href="https://colab.research.google.com/github/youdanzh/CIS700-projects/blob/main/transformers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import random
import torch as tr
import matplotlib.pyplot as pt
from tqdm.notebook import trange
import numpy as np

In [None]:
import os 

examples = []
error =".ipynb_checkpoints"
for file in os.listdir("data/"):
 filename = os.fsdecode(file)
 print(filename)
 header = True
 path = "data/" + filename
 if(filename!=error):
   with open(path, "r") as f:
    counter = 0
    clause = ""
    for line in f:
      counter += 1
      if line[0] == "T" and counter == 3:
        clause = line[2:]
      if line[0] in "+-":
        header = False
        label = 0 if line[0] == "-" else 1
      elif not header:
        clause_steps = line[2:]+ " & " + clause
        #print(clause_steps)
        examples.append((clause_steps,label))
      
for (example,label) in enumerate(examples[:10]):
   print(example,label)

In [None]:
"""
examples = []
header = True

with open("data/00001", "r") as f:
  counter = 0
  clause = ""
  for line in f:
    counter += 1
    if line[0] == "T" and counter == 3:
      clause = line[2:]
    if line[0] in "+-":
      header = False
      label = 0 if line[0] == "-" else 1
    elif not header:
      clause_steps = line[2:] + " & " + clause
      #print(clause_steps)
      examples.append((clause_steps.strip(),label))
      
#for e,(example,label) in enumerate(examples[:10]):
 # print(label, example)
for i in range(10):
  print(examples[i])
len(examples)
"""

In [None]:
validation = []
header = True

with open("00011", "r") as f:
  counter = 0
  clause = ""
  for line in f:
    counter += 1
    if line[0] == "T" and counter == 3:
      clause = line[2:]
    if line[0] in "+-":
      header = False
      label = 0 if line[0] == "-" else 1
    elif not header:
      clause_steps = line[2:] + " & " + clause
      #print(clause_steps)
      validation.append((clause_steps.strip(),label))
      
      


In [None]:
chrs = "".join([chr(c) for c in range(ord("a"), ord("z"))] + [chr(c) for c in range(ord("A"), ord("Z"))] + ["_"])
print(chrs)

def parse(line):
  tokens = []
  token = ""
  for c in line:
    if c in chrs: token += c
    else:
      tokens.append(token.lstrip())
      token = c
  tokens.append(token.lstrip())
  return tokens

print(parse(examples[0][0]))

In [None]:
all_tokens = set()
for example, label in examples:
  tokens = parse(example) 
  all_tokens |= set(tokens)

for example_2, label_2 in validation:
  tokens = parse(example_2) 
  all_tokens |= set(tokens)


all_tokens = list(all_tokens)
lookup = {token: t for (t, token) in enumerate(all_tokens)}

print(lookup)
for token in all_tokens: print(token)
print(len(all_tokens))


In [None]:
max_len = max([len(parse(example)) for example,label in examples])
print("max_len:", max_len)
embeddings = tr.eye(len(all_tokens))

In [None]:
"""
Transformer utilities
"""
def embed(seq, max_len, embeddings, offset=0):
    embedded = tr.zeros(max_len, embeddings.shape[1])
    cap = min(len(seq), max_len-offset)
    if cap > 0:
        embedded[offset:offset+cap] = tr.stack(tuple(embeddings[lookup[token]] for token in seq[:cap]))
    return embedded

def Attention(Q, K, V, masked=False):
    dk = Q.shape[1]
    logits = Q @ K.t() / dk**.5
    if masked:
        idx = tr.arange(Q.shape[0])
        logits[idx.unsqueeze(1) < idx] = -tr.inf
    return tr.softmax(logits, dim=1) @ V

class MultiHeadAttention(tr.nn.Module):
    def __init__(self, num_heads, d_model, masked=False, projections=""):
        super(MultiHeadAttention, self).__init__()
        dh = d_model // num_heads
        self.masked = masked
        self.num_heads = num_heads
        self.WQ, self.WK, self.WV = tuple(
            tr.nn.ModuleList([tr.nn.Linear(d_model, dh, bias=False) for i in range(num_heads)])
            if p in projections else [lambda x: x[:,:dh]]*num_heads
            for p in "QKV")
        self.WO = tr.nn.Linear(dh * num_heads, d_model, bias=False) if "O" in projections else lambda x: x
        self.ln = tr.nn.LayerNorm(d_model)
        self.projections = projections

    def forward(self, Q, K, V):
        heads = [
            Attention(self.WQ[i](Q), self.WK[i](K), self.WV[i](V), self.masked)
            for i in range(self.num_heads)]
        out = self.WO(tr.cat(heads, dim=1))
        out += Q # skip connection
        out = self.ln(out) # layer normalization
        return out

# requires d_model - d_embedding >= max_len
def one_hot_positional_encoder(max_len):
    def encode_position(inputs):
        I = tr.eye(max_len)
        return tr.cat((I, inputs), dim=1)
    return encode_position



In [None]:
class Net(tr.nn.Module):
  def __init__(self, num_blocks, num_heads):
    super(Net, self).__init__()
    d_model = max_len+embeddings.shape[1]
    self.encoder = one_hot_positional_encoder(max_len)
    self.blocks = tr.nn.ModuleList([
      MultiHeadAttention(num_heads, d_model, projections="QKVO")
      for _ in range(num_blocks)
    ])
    self.readout = tr.nn.Linear(d_model, 2)
  def forward(self, example):
    x = embed(parse(example), max_len, embeddings)
    x = self.encoder(x)
    for mha in self.blocks:
      x = mha(x, x, x)
    y = self.readout(x).mean(dim=0).unsqueeze(0)
    return y

net = Net(3, 4)
y = net(examples[0][0])
print(y)

In [None]:
net = Net(2, 4)
xc = tr.nn.CrossEntropyLoss()
if tr.cuda.is_available():
    net= net.cuda()

opt = tr.optim.Adam(net.parameters(), lr=0.0001)

num_iters = 200
verb_step = 20
train_loss = []
valid_accu =[]
valid_loss =[]
for i in trange(num_iters):

    example, label = random.choice(examples)
    if tr.cuda.is_available():
          example , label = example.cuda(non_blocking=True), label.cuda(non_blocking=True)
    logits = net(example)
    loss = xc(logits, tr.tensor([label]))
    train_loss.append(loss.item())

    opt.zero_grad()
    loss.backward()
    opt.step()
    correct = []
    vloss = []
    
    
    if i % verb_step == 0 or i == num_iters-1:
       
        with tr.no_grad():
            for example_2, label_2 in validation:
               if tr.cuda.is_available():
                    example_2, label_2 = example_2.cuda(), label_2.cuda()
               logits = net(example_2)
               v_loss = xc(logits,tr.tensor([label_2]))  
               pred = logits.argmax()
               correct.append(np.absolute(label_2-pred))
               vloss.append(v_loss.item())
        valid_accu.append(1-np.mean(correct))
        valid_loss.append(np.mean(vloss))
        print(f'loss:{loss.item()} \t\tval_loss: {np.mean(vloss)}\t\t val_acc: {1-np.mean(correct)}  ')
        
              

#pt.plot(prediction)
pt.plot(train_loss)
pt.xlabel("Iteration")
pt.ylabel("Loss")



In [None]:
pt.plot(valid_accu)
pt.xlabel("Num measurement")
pt.ylabel("accuracy")

In [None]:
class Net(tr.nn.Module):
  def __init__(self, num_blocks, num_heads):
    super(Net, self).__init__()
    d_model = max_len+embeddings.shape[1]
    self.conv = tr.nn.Conv1d(max_len,max_len, kernel_size=1)    
    self.encoder = one_hot_positional_encoder(max_len)
    self.blocks = tr.nn.ModuleList([
      MultiHeadAttention(num_heads, d_model, projections="QKVO")
      for _ in range(num_blocks)
    ])
    self.readout =tr.nn.Linear(d_model, 2)
    
  def forward(self, example):
    x = embed(parse(example), max_len, embeddings)
    x = self.encoder(x)
    x = self.conv(x)
    for mha in self.blocks:
      x = mha(x, x, x)
    x=self.conv(x)
    y = self.readout(x).mean(dim=0).unsqueeze(0)
    return y

#Peter's Implementation

In [None]:
net = Net(4, 8)
xc = tr.nn.CrossEntropyLoss()
if tr.cuda.is_available():
    net= net.cuda()

opt = tr.optim.Adam(net.parameters(), lr=0.001)

num_iters = 200
verb_step = 20
train_loss = []
valid_accu =[]
valid_loss =[]
for i in trange(num_iters):

    example, label = random.choice(examples)
    if tr.cuda.is_available():
          example , label = example.cuda(), label.cuda()
    logits = net(example)
    loss = xc(logits, tr.tensor([label]))
    train_loss.append(loss.item())

    opt.zero_grad()
    loss.backward()
    opt.step()
    correct = []
    vloss = []
    
    
    if i % verb_step == 0 or i == num_iters-1:
       
        with tr.no_grad():
            for example_2, label_2 in validation:
               if tr.cuda.is_available():
                    example_2, label_2 = example_2.cuda(), label_2.cuda()
               logits = net(example_2)
               v_loss = xc(logits,tr.tensor([label_2]))  
               pred = logits.argmax()
               correct.append(np.absolute(label_2-pred))
               vloss.append(v_loss.item())
        valid_accu.append(1-np.mean(correct))
        valid_loss.append(np.mean(vloss))
        print(f'loss:{loss.item()} \t\tval_loss: {np.mean(vloss)}\t\t val_acc: {1-np.mean(correct)}  ')
        
              

#pt.plot(prediction)
pt.plot(train_loss)
pt.xlabel("Iteration")
pt.ylabel("Loss")


In [None]:
pt.plot(valid_accu)
pt.xlabel("Num measurement")
pt.ylabel("accuracy")