In [1]:
import numpy as np
from IPython.display import display, HTML
!pip install matplotlib
import matplotlib.pyplot as plt
# import sklearn 
from sklearn.manifold import TSNE
import pandas as pd
import re
import ast
import json
import pickle
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import logging
import torch.optim as optim
from torch.utils.data import DataLoader
import codecs


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


  from .autonotebook import tqdm as notebook_tqdm


## Method 0: Plain Tokenization on source code<br>
Similar to just treating source code as natural language 

In [2]:
import tokenize as tk
import io
import re

def tokenize_python_file(file_path):
    with open('py150_files/'+file_path, 'rb') as file:
    # with open('blah.py', 'rb') as file:
        tokens = tk.tokenize(file.readline)
        token_list = []
        ignore_comment = False
        for token in tokens:
            if ignore_comment:
                if token[0] == tk.NEWLINE:
                    ignore_comment = False
                continue
            if token[0] == tk.COMMENT:
                continue
            elif token[0] == tk.STRING and '"""' in token[1]:
                # just ignore the entire multiline comment, which comes in one token from the tokenizer 
                continue
            elif token[0] == tk.NEWLINE:
                continue
            elif token[0] == tk.OP:
                if token[1] in ['(', ')', '[', ']', '{', '}']:
                    continue
            elif token[0] == tk.NL:
                continue
            elif token[1] == 'utf-8' or token[1] == '' or token[1].isspace():
                continue
            token_list.append(token[1])
            if token[1] == '#':
                ignore_comment = True
    return token_list

def tokenize_files(file_paths, n):
    tokenized_files = []
    with open(file_paths, 'r') as f:
        for _ in range(n):
            file_path = f.readline().strip()
            tokens = tokenize_python_file(file_path)
            tokenized_files.append(tokens)
    return tokenized_files


In [3]:
def generate_dictionary(text, max_vocab_size=30000):
    word_to_index = {}
    index_to_word = {}
    corpus = []
    count = 0
    vocab_size = 0
    
    # Count occurrences of each token
    token_counts = {}
    for program in text:
        for word in program:
            # print(word)
            word = word.lower()
            corpus.append(word)
            token_counts[word] = token_counts.get(word, 0) + 1
    
    # Sort token counts and select the most frequent tokens
    sorted_tokens = sorted(token_counts.items(), key=lambda x: x[1], reverse=True)
    most_frequent_tokens = sorted_tokens[:max_vocab_size]
    frequent_tokens_set = set(token for token, count in most_frequent_tokens)
    
    # Create word-to-index and index-to-word mappings
    word_to_index['unk'] = 0  # Assign index 0 to the 'unk' token
    index_to_word[0] = 'unk'
    count = 1
    for token, _ in most_frequent_tokens:
        word_to_index[token] = count
        index_to_word[count] = token
        count += 1
    
    # Replace less frequent tokens with 'unk'
    corpus = ['unk' if token not in frequent_tokens_set else token for token in corpus]
    print(corpus)
    vocab_size = len(word_to_index)
    length_of_corpus = len(corpus)
    
    return word_to_index, index_to_word, corpus, vocab_size, length_of_corpus


In [4]:
def get_one_hot_vectors(target_word,context_words,vocab_size,word_to_index):
    trgt_word_vector = np.zeros(vocab_size)
    index_of_word_dictionary = word_to_index.get(target_word) 
    trgt_word_vector[index_of_word_dictionary] = 1
    ctxt_word_vector = np.zeros(vocab_size)
    for word in context_words:
        index_of_word_dictionary = word_to_index.get(word) 
        ctxt_word_vector[index_of_word_dictionary] = 1
    return trgt_word_vector,ctxt_word_vector

In [5]:
def generate_training_data(corpus,window_size,vocab_size,word_to_index,length_of_corpus,sample=None):
	
    training_data =  []
    training_sample_words =  []
    for i,word in enumerate(corpus):
        index_target_word = i
        target_word = word
        context_words = []
        
        #when target word is the first word
        if i == 0:  

            # trgt_word_index:(0), ctxt_word_index:(1,2)
            context_words = [corpus[x] for x in range(i + 1 , window_size + 1)] 


        #when target word is the last word
        elif i == len(corpus)-1:

            # trgt_word_index:(9), ctxt_word_index:(8,7), length_of_corpus = 10
            context_words = [corpus[x] for x in range(length_of_corpus - 2 ,length_of_corpus -2 - window_size  , -1 )]

        #When target word is the middle word
        else:

            #Before the middle target word
            before_target_word_index = index_target_word - 1
            for x in range(before_target_word_index, before_target_word_index - window_size , -1):
                if x >=0:
                    context_words.extend([corpus[x]])

            #After the middle target word
            after_target_word_index = index_target_word + 1
            for x in range(after_target_word_index, after_target_word_index + window_size):
                if x < len(corpus):
                    context_words.extend([corpus[x]])


        trgt_word_vector,ctxt_word_vector = get_one_hot_vectors(target_word,context_words,vocab_size,word_to_index)
        training_data.append([trgt_word_vector,ctxt_word_vector])   
        
        if sample is not None:
            training_sample_words.append([target_word,context_words])   
        
    return training_data,training_sample_words

In [6]:
text = [['from', 'bootstrap', 'import', 'Bootstrap', 'from', 'fund', 'import']]
word_to_index,index_to_word,corpus,vocab_size,length_of_corpus = generate_dictionary(text)

['from', 'bootstrap', 'import', 'bootstrap', 'from', 'fund', 'import']


In [7]:
window_size = 4
print(corpus)
training_data,training_sample_words = generate_training_data(corpus,4,vocab_size,word_to_index,length_of_corpus,'yes')

['from', 'bootstrap', 'import', 'bootstrap', 'from', 'fund', 'import']


In [8]:
for i in range(len(training_data)):
    print('*' * 50)
    
    print('Target word:%s . Target vector: %s ' %(training_sample_words[i][0],training_data[i][0]))
    print('Context word:%s . Context  vector: %s ' %(training_sample_words[i][1],training_data[i][1]))

**************************************************
Target word:from . Target vector: [0. 1. 0. 0. 0.] 
Context word:['bootstrap', 'import', 'bootstrap', 'from'] . Context  vector: [0. 1. 1. 1. 0.] 
**************************************************
Target word:bootstrap . Target vector: [0. 0. 1. 0. 0.] 
Context word:['from', 'import', 'bootstrap', 'from', 'fund'] . Context  vector: [0. 1. 1. 1. 1.] 
**************************************************
Target word:import . Target vector: [0. 0. 0. 1. 0.] 
Context word:['bootstrap', 'from', 'bootstrap', 'from', 'fund', 'import'] . Context  vector: [0. 1. 1. 1. 1.] 
**************************************************
Target word:bootstrap . Target vector: [0. 0. 1. 0. 0.] 
Context word:['import', 'bootstrap', 'from', 'from', 'fund', 'import'] . Context  vector: [0. 1. 1. 1. 1.] 
**************************************************
Target word:from . Target vector: [0. 1. 0. 0. 0.] 
Context word:['bootstrap', 'import', 'bootstrap', 'from', 'fu

In [9]:
# Input vector, returns nearest word(s)
def cosine_similarity(word,weight,word_to_index,vocab_size,index_to_word):
    
    #Get the index of the word from the dictionary
    index = word_to_index[word]
    
    #Get the correspondin weights for the word
    word_vector_1 = weight[index]
    
    
    word_similarity = {}

    for i in range(vocab_size):
        
        word_vector_2 = weight[i]
        
        theta_sum = np.dot(word_vector_1, word_vector_2)
        theta_den = np.linalg.norm(word_vector_1) * np.linalg.norm(word_vector_2)
        theta = theta_sum / theta_den
        
        word = index_to_word[i]
        word_similarity[word] = theta
    
    return word_similarity #words_sorted

In [10]:
def print_similar_words(top_n_words,weight,msg,words_subset):
    
    columns=[]
    
    for i in range(0,len(words_subset)):
        columns.append('similar:' +str(i+1) )
        
    df = pd.DataFrame(columns=columns,index=words_subset)
    df.head()
    
    row = 0
    for word in words_subset:
        
        #Get the similarity matrix for the word: word
        similarity_matrix = cosine_similarity(word,weight,word_to_index,vocab_size,index_to_word)
        col = 0
        
        #Sort the top_n_words
        words_sorted = dict(sorted(similarity_matrix.items(), key=lambda x: x[1], reverse=True)[1:top_n_words+1])
        
        #Create a dataframe to display the similarity matrix
        for similar_word,similarity_value in words_sorted.items():
            df.iloc[row][col] = (similar_word,round(similarity_value,2))
            col += 1
        row += 1
    styles = [dict(selector='caption', 
    props=[('text-align', 'center'),('font-size', '20px'),('color', 'red')])] 
    df = df.style.set_properties(**
                       {'color': 'green','border-color': 'blue','font-size':'14px'}
                      ).set_table_styles(styles).set_caption(msg)
    return df

In [11]:
def print_similar_words(top_n_words,weight,msg,words_subset):
    
    columns=[]
    
    for i in range(0,len(words_subset)):
        columns.append('similar:' +str(i+1) )
        
    df = pd.DataFrame(columns=columns,index=words_subset)
    df.head()
    
    row = 0
    for word in words_subset:
        
        #Get the similarity matrix for the word: word
        similarity_matrix = cosine_similarity(word,weight,word_to_index,vocab_size,index_to_word)
        col = 0
        
        #Sort the top_n_words
        words_sorted = dict(sorted(similarity_matrix.items(), key=lambda x: x[1], reverse=True)[1:top_n_words+1])
        
        #Create a dataframe to display the similarity matrix
        for similar_word,similarity_value in words_sorted.items():
            df.iloc[row][col] = (similar_word,round(similarity_value,2))
            col += 1
        row += 1
    styles = [dict(selector='caption', 
    props=[('text-align', 'center'),('font-size', '20px'),('color', 'red')])] 
    df = df.style.set_properties(**
                       {'color': 'green','border-color': 'blue','font-size':'14px'}
                      ).set_table_styles(styles).set_caption(msg)
    return df

In [12]:
class SkipGram(nn.Module):
    def __init__(self, n_vocab, n_embed):
        super().__init__()
        self.embed = nn.Embedding(n_vocab, n_embed)
        self.output = nn.Linear(n_embed, n_vocab)
        self.log_softmax = nn.LogSoftmax(dim=1)
    
    def forward(self, x):
        x = self.embed(x)
        scores = self.output(x)
        log_ps = self.log_softmax(scores)
        
        return log_ps

In [13]:
# Convert inputs and targets to PyTorch tensors
inputs = torch.tensor([data[0] for data in training_data], dtype=torch.long)
targets = torch.tensor([data[1] for data in training_data], dtype=torch.long)


# Create a DataLoader for mini-batch training
dataset = torch.utils.data.TensorDataset(inputs, targets)
loader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=True)

  inputs = torch.tensor([data[0] for data in training_data], dtype=torch.long)
  inputs = torch.tensor([data[0] for data in training_data], dtype=torch.long)
  targets = torch.tensor([data[1] for data in training_data], dtype=torch.long)


In [14]:
# Define the training parameters
n_vocab = len(word_to_index)
n_embed = 100  # Assuming the embedding dimension is 300
lr = 0.01  # Learning rate
epochs = 100
model = SkipGram(n_vocab, n_embed)
criterion = nn.NLLLoss()

optimizer = optim.SGD(model.parameters(), lr=lr)

# Training loop
for epoch in range(epochs):
    # Initialize epoch loss
    epoch_loss = 0
    
    # Set the model in training mode
    model.train()
    
    # Iterate over mini-batches
    for batch_inputs, batch_targets in loader:
        # Clear gradients
        optimizer.zero_grad()
        
        # Forward pass
        log_ps = model(batch_inputs)
        
        # Calculate loss
        loss = criterion(log_ps, batch_targets)
        
        # Backpropagation
        loss.backward()
        optimizer.step()
        
        # Accumulate epoch loss
        epoch_loss += loss.item()
    
    # Print epoch loss
    print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}')

Epoch 1/100, Loss: 1.7298
Epoch 2/100, Loss: 1.7167
Epoch 3/100, Loss: 1.7047
Epoch 4/100, Loss: 1.6936
Epoch 5/100, Loss: 1.6834
Epoch 6/100, Loss: 1.6740
Epoch 7/100, Loss: 1.6652
Epoch 8/100, Loss: 1.6571
Epoch 9/100, Loss: 1.6495
Epoch 10/100, Loss: 1.6425
Epoch 11/100, Loss: 1.6360
Epoch 12/100, Loss: 1.6299
Epoch 13/100, Loss: 1.6242
Epoch 14/100, Loss: 1.6190
Epoch 15/100, Loss: 1.6141
Epoch 16/100, Loss: 1.6096
Epoch 17/100, Loss: 1.6054
Epoch 18/100, Loss: 1.6016
Epoch 19/100, Loss: 1.5980
Epoch 20/100, Loss: 1.5947
Epoch 21/100, Loss: 1.5916
Epoch 22/100, Loss: 1.5888
Epoch 23/100, Loss: 1.5863
Epoch 24/100, Loss: 1.5839
Epoch 25/100, Loss: 1.5817
Epoch 26/100, Loss: 1.5798
Epoch 27/100, Loss: 1.5779
Epoch 28/100, Loss: 1.5763
Epoch 29/100, Loss: 1.5748
Epoch 30/100, Loss: 1.5734
Epoch 31/100, Loss: 1.5721
Epoch 32/100, Loss: 1.5710
Epoch 33/100, Loss: 1.5700
Epoch 34/100, Loss: 1.5690
Epoch 35/100, Loss: 1.5681
Epoch 36/100, Loss: 1.5673
Epoch 37/100, Loss: 1.5666
Epoch 38/1