In [1]:
import os
import random
import torch
import torch.nn as nn
from torch.nn import functional as F

# extract text and create dataset

In [2]:
def is_comment(line):
    # Define a function to check if a line is a comment
    line = line.strip()
    if line.startswith('#') or line.startswith("'''") or line.startswith('"""'):
        return True
    return False

def extract_non_comments(source_directory, target_directory):
    # Process all .py files in the specified directory and subdirectories
    for root, dirs, files in os.walk(source_directory):
        for file in files:
            if file.endswith('.py'):
                file_path = os.path.join(root, file)
                target_file_path = os.path.join(target_directory, file.replace('.py', '.txt'))
                with open(file_path, 'r') as source_file, open(target_file_path, 'w') as target_file:
                    non_comments = []
                    comment_block = False
                    
                    for line in source_file:
                        # Check for the start or end of a comment block
                        if "''" in line or '"""' in line:
                            comment_block = not comment_block
                            continue
                        # If it's not a comment or part of a comment block, save it
                        if not is_comment(line) and not comment_block:
                            non_comments.append(line)
                        # Write non-comment lines to a target .txt file
                    target_file.writelines(non_comments)

# # Define the path to the local repository (change this to the actual path of your local repo)
# # source_directory = '/path/to/your/local/pytorch/repo'
# source_directory = '.'
# # target_directory = '/path/to/your/output/directory'
# target_directory = './dataset/'


# # Create the target directory if it doesn't exist
# os.makedirs(target_directory, exist_ok=True)

# # Call the function to start extracting non-comment lines
# extract_non_comments(source_directory, target_directory)


In [3]:

def combine_files(directory, output_file, sample=False, num_files_to_sample=100, seed=111, start_token="<START>", end_token="<END>"):
    """
    Combine content from a specified number of text files in a directory into one file, 
    with start and end tokens between contents from each file.

    :param directory: Path to the directory containing text files.
    :param output_file: Name of the output file to create.
    :param num_files_to_sample: Number of files to sample and combine.
    :param start_token: The start token to be added before each file's content.
    :param end_token: The end token to be added after each file's content.
    """
    
    
    # List all text files in the directory
    all_files = [f for f in os.listdir(directory) if f.endswith('.txt')]
    files = all_files

    if sample:
        # Sample the specified number of files
        random.seed(seed)
        files = random.sample(all_files, min(num_files_to_sample, len(all_files)))

    # Start combining the sampled files
    with open(output_file, 'w') as outfile:
        for filename in files:
            file_path = os.path.join(directory, filename)
            with open(file_path, 'r') as infile:
                outfile.write(start_token + '\n')
                content = infile.read()
                content_with_tabs = content.replace('    ', '\t')
                outfile.write(content_with_tabs + '\n')
                outfile.write(end_token + '\n\n')

    print(f"Combined file created as '{output_file}' with contents from {len(files)} files.")

# Example usage
combine_files('dataset/', 'sample_scripts.txt')


Combined file created as 'sample_scripts.txt' with contents from 1153 files.


In [4]:
# read it in to inspect it
with open('sample_scripts.txt', 'r', encoding='utf-8') as f:
    text = f.read()

In [5]:
print("length of dataset in characters: ", len(text))

length of dataset in characters:  8518735


In [6]:
print(text[:1000])

<START>

from __future__ import annotations

import dataclasses
from typing import Optional

from torch.onnx._internal.diagnostics.infra.sarif import (
	_artifact_location,
	_property_bag,
)


@dataclasses.dataclass
class VersionControlDetails(object):

<END>

<START>
from typing import Union

import torch


class _InsertPoint:
	def __init__(
		self,
		insert_point_graph: torch._C.Graph,
		insert_point: Union[torch._C.Node, torch._C.Block],
	):
		self.insert_point = insert_point
		self.g = insert_point_graph
		self.guard = None

	def __enter__(self):
		self.prev_insert_point = self.g.insertPoint()
		self.g.setInsertPoint(self.insert_point)

	def __exit__(self, *args):
		self.g.setInsertPoint(self.prev_insert_point)


def insert_point_guard(self, insert_point: Union[torch._C.Node, torch._C.Block]):
	return _InsertPoint(self, insert_point)

<END>

<START>
import contextlib
import logging
import math
from typing import Any, Callable, cast, Dict, Generator, Iterator, no_typ


In [52]:
# here are all the unique characters that occur in this text
chars = sorted(list(set(text)))
vocab_size = len(chars)
''.join(chars)

'\t\n !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~–≤⊑⊔⊳─│└├✓'

In [53]:
print(vocab_size)

107


# encoding and decoding for chars

In [8]:
# create a mapping from characters to integers
ch_to_idx = { ch:i for i,ch in enumerate(chars) }
idx_to_ch = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [ch_to_idx[ch] for ch in s] # encoder: take a string, output a list of mapping idx
decode = lambda l: ''.join([idx_to_ch[idx] for idx in l]) # decoder: take a list of index, output a string

print(encode("import torch"))
print(decode(encode("import torch")))

[75, 79, 82, 81, 84, 86, 2, 86, 81, 84, 69, 74]
import torch


In [9]:
# encode the entire text dataset and store it into a torch.Tensor
data = torch.tensor(encode(text), dtype=torch.long)
print(data.shape, data.dtype)
print(data[:100])

torch.Size([8518735]) torch.int64
tensor([30, 53, 54, 35, 52, 54, 32,  1,  1, 72, 84, 81, 79,  2, 65, 65, 72, 87,
        86, 87, 84, 71, 65, 65,  2, 75, 79, 82, 81, 84, 86,  2, 67, 80, 80, 81,
        86, 67, 86, 75, 81, 80, 85,  1,  1, 75, 79, 82, 81, 84, 86,  2, 70, 67,
        86, 67, 69, 78, 67, 85, 85, 71, 85,  1, 72, 84, 81, 79,  2, 86, 91, 82,
        75, 80, 73,  2, 75, 79, 82, 81, 84, 86,  2, 49, 82, 86, 75, 81, 80, 67,
        78,  1,  1, 72, 84, 81, 79,  2, 86, 81])


# train dev split

In [10]:
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]

In [11]:
context_length = 8
x = train_data[:context_length]
y = train_data[1:context_length+1]
for t in range(context_length):
    context = x[:t+1]
    target = y[t]
    print(f"when input is {context} the target: {target}")

when input is tensor([30]) the target: 53
when input is tensor([30, 53]) the target: 54
when input is tensor([30, 53, 54]) the target: 35
when input is tensor([30, 53, 54, 35]) the target: 52
when input is tensor([30, 53, 54, 35, 52]) the target: 54
when input is tensor([30, 53, 54, 35, 52, 54]) the target: 32
when input is tensor([30, 53, 54, 35, 52, 54, 32]) the target: 1
when input is tensor([30, 53, 54, 35, 52, 54, 32,  1]) the target: 1


In [14]:
torch.manual_seed(111)
batch_size = 4
context_length = 8

def get_batch(split):
    # generate a small batch of data of inputs x and targets y
    data = train_data if split == 'train' else val_data
    start_idxs = torch.randint(len(data) - context_length, (batch_size,))
    context_idxs = torch.stack([data[start_idx : start_idx+context_length] for start_idx in start_idxs])
    target_idxs = torch.stack([data[start_idx+1 : start_idx+context_length+1] for start_idx in start_idxs])
    return context_idxs, target_idxs

context_idxs, target_idxs = get_batch('train')
print('inputs:')
print(context_idxs.shape)
print(context_idxs)
print('targets:')
print(target_idxs.shape)
print(target_idxs)

print('----')

for b in range(batch_size): # batch dimension
    for step in range(context_length): # time dimension
        context = context_idxs[b, :step+1]
        target = target_idxs[b,step]
        print(f"when input is {context.tolist()} the target: {target}")

inputs:
torch.Size([4, 8])
tensor([[84, 11,  1,  0,  0, 84, 71, 86],
        [67, 86, 75, 69, 79, 71, 86, 74],
        [67, 86, 71, 80, 16, 80, 67, 86],
        [67, 79, 71, 85,  2, 10, 89, 75]])
targets:
torch.Size([4, 8])
tensor([[11,  1,  0,  0, 84, 71, 86, 87],
        [86, 75, 69, 79, 71, 86, 74, 81],
        [86, 71, 80, 16, 80, 67, 86, 75],
        [79, 71, 85,  2, 10, 89, 75, 86]])
----
when input is [84] the target: 11
when input is [84, 11] the target: 1
when input is [84, 11, 1] the target: 0
when input is [84, 11, 1, 0] the target: 0
when input is [84, 11, 1, 0, 0] the target: 84
when input is [84, 11, 1, 0, 0, 84] the target: 71
when input is [84, 11, 1, 0, 0, 84, 71] the target: 86
when input is [84, 11, 1, 0, 0, 84, 71, 86] the target: 87
when input is [67] the target: 86
when input is [67, 86] the target: 75
when input is [67, 86, 75] the target: 69
when input is [67, 86, 75, 69] the target: 79
when input is [67, 86, 75, 69, 79] the target: 71
when input is [67, 86, 75,

In [48]:
torch.manual_seed(111)

class BigramLanguageModel(nn.Module):

    def __init__(self, vocab_size):
        super().__init__()
        # works as a look up table for the probability of the next char for each current char
        self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)

    def forward(self, context_idxs, target_idxs=None):

        # context_idxs, target_idxs are both (b,step) tensor of integers
        logits = self.token_embedding_table(context_idxs) # the shape of the logits: (b, step, embedding_dim)
        
        if target_idxs is None:
            loss = None
        else:
            b, step, embedding_dim = logits.shape
            logits = logits.view(b * step, embedding_dim)
            target_idxs = target_idxs.view(b * step)
            loss = F.cross_entropy(logits, target_idxs)

        return logits, loss
    
    def generate(self, context_idxs, max_new_tokens):
        for _ in range(max_new_tokens):
            # forward
            logits, loss = self(context_idxs)
            
            # focus only on the last time step
            logits = logits[:, -1, :] # (b, embedding_dim) tensor for the last step
            probs = F.softmax(logits, dim=-1) # predicted_label (b, embedding_dim)
            
            # sample from the distribution

            # torch.multinomial: Returns a tensor where each row contains num_samples indices 
            # sampled from the multinomial probability distribution located in the corresponding row of tensor input.
            pred_idxs = torch.multinomial(probs, num_samples=1) # (b, 1)
            # append sampled index to the running sequence
            context_idxs = torch.cat((context_idxs, pred_idxs), dim=1) # (b, step+1)
        return context_idxs

model = BigramLanguageModel(vocab_size)
logits, loss = model(context_idxs, target_idxs)
print(logits.shape)
print(loss)

# decode 5 batches of data, the in
[decode(model.generate(context_idxs=torch.zeros((5, 1), dtype=torch.long), max_new_tokens=100)[i].tolist()) for i in range(5)]

torch.Size([32, 107])
tensor(5.4190, grad_fn=<NllLossBackward0>)


['\tr7[lYl%k└J!└="5{NE(* .\tD&C_@I9PJ⊔i4G2─eG│`XTIBa]─?N"`.P)}5:uvNo%Q(Nife✓?*q0r|\tZu,s.✓@xv,6#=n⊳geS⊔)Q9',
 '\tV–*–s^✓I:hEA\n,6yGj$4"4!e4│[I#wQ<pN├>OH{├`XT7y}a\\MHtHjLpzM⊔<rF✓.bYA\nCe└l└JIBrCVTO7✓ I^<)xT`\t2y[]E≤.*s',
 '\t3B:L^oFKjy_ISG1m%Adi\tG\\6edNLo]2X≤jIVhRt;CvHI\n├Q(Q└BA>--ID├Q9Pb9#FXw~Qp=@1HezrGSFwV–GS\\)v─CwSInM5HN.4',
 '\tZMU├IIRLMwY⊳H\n>tn%T3.n\n#gN├cp+≤hd2X^nqg:Ux"a;/⊔R└vL\nYQ<>![nF|;4z)}\'w`/\'0UK\tAB-y✓r&=jUnbe`Xs5H\n>f^[└g',
 '\t\nuqLSd|8V\t@x–*\'xe&d Shv"w9aK`Wg6K<d!+4⊔~├0Q#x5{XPA7PY≤7U\nQ≤?Q-*raeUkM)~XV\n=Ugp≤4G,*│-uOZ\n*6N≤,C;"(O9']

In [50]:
decode([0])

'\t'