In [68]:
from youtube_transcript_api import YouTubeTranscriptApi
import requests
from tqdm import tqdm
import sentencepiece as spm
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [2]:
CHANNEL_ID = 'UCo_IB5145EVNcf8hw1Kku7w'
API_KEY = "AIzaSyBr6ZttXZH1ENh9aHnmJ8tA4fbl1nGhoH4"

In [3]:
url = f"https://www.googleapis.com/youtube/v3/channels?part=contentDetails&id={CHANNEL_ID}&key={API_KEY}"
response = requests.get(url).json()

response

{'kind': 'youtube#channelListResponse',
 'etag': 'FedyxNMXA1d8IbkP_k4E0_TxNaQ',
 'pageInfo': {'totalResults': 1, 'resultsPerPage': 5},
 'items': [{'kind': 'youtube#channel',
   'etag': 'vGYCzE5ifW3VZ-T9g_5gxfsWQ6E',
   'id': 'UCo_IB5145EVNcf8hw1Kku7w',
   'contentDetails': {'relatedPlaylists': {'likes': '',
     'uploads': 'UUo_IB5145EVNcf8hw1Kku7w'}}}]}

In [4]:
uploads_playlist_id = response['items'][0]['contentDetails']['relatedPlaylists']['uploads']

# Get the video IDs from the uploads playlist
video_url = f"https://www.googleapis.com/youtube/v3/playlistItems?part=snippet&playlistId={uploads_playlist_id}&maxResults=50&key={API_KEY}"
video_response = requests.get(video_url).json()
video_response

{'kind': 'youtube#playlistItemListResponse',
 'etag': '1tAlgrTCaMHFiOs_aPzfy-H6CdI',
 'nextPageToken': 'EAAaelBUOkNESWlFRVZGUTBReU1UZzJOMFpGUVRkRVJEVW9BVWl0MnZuRDdQbUlBMUFCV2pZaVEyaG9WbFpYT1daVFZVa3hUVlJSTVZKV1drOVpNbGswWVVoamVGTXlkREZPTTJOVFEzZHFaVzVaY1RSQ2FFUkpiRGd4Y3lJ',
 'items': [{'kind': 'youtube#playlistItem',
   'etag': 'V5up-fSynC68R2l8Q6-azVutDIM',
   'id': 'VVVvX0lCNTE0NUVWTmNmOGh3MUtrdTd3LnF1Z1QzbnRxUmt3',
   'snippet': {'publishedAt': '2024-10-05T17:05:00Z',
    'channelId': 'UCo_IB5145EVNcf8hw1Kku7w',
    'title': "Game Theory: Was FNAF's Final Mystery REALLY That Simple?",
    'description': '*SUBSCRIBE* If You AGREE With Hyperdroid\nDon\'t miss a Game Theory! ► https://www.youtube.com/@GameTheory/?sub_confirmation=1\n\nA few weeks ago fellow FNAF theorist @HyperDroid made a video claiming to have SOLVED the infamous Foxy Grid from the Security Logbook. Since then the FNAF community has come together in general agreement that Hyperdroid NAILED this theory and found the T

In [5]:
transcripts = []

# Extracting video IDs
for item in tqdm(video_response['items']):
    transcripts.append(YouTubeTranscriptApi.get_transcript(item['snippet']['resourceId']['videoId']))


100%|██████████| 50/50 [00:39<00:00,  1.26it/s]


In [6]:
text = []

for i in range(len(transcripts)):
    for j in range(len(transcripts[i])):
        text.append(transcripts[i][j]['text'])

text

['after 9 years and countless theories the',
 "crying child's name has been found",
 "that's right on screen right now is the",
 "real name of FNAF 4's crying child and",
 "by the end of this episode you're going",
 'to know exactly which one it',
 'is hello Internet welcome to Game Theory',
 "the show that's just desperately trying",
 'to keep up with the FNAF content train',
 "it feels like there's been a lot right",
 'we had all that stuff from the 10th',
 "anniversary we've had our first full",
 'entry into the Choose Your Own Adventure',
 "style book series there's even a FNAF",
 'level in the new Funko Fusion game I',
 'swear if there is law in this thing I am',
 'going to scream but while I was busy',
 'just trying to keep on top of all of',
 'that someone else decided that it would',
 'be fun to add one final slice to my',
 'already oversized mountain of lore Pizza',
 'fellow theorist and friend of the',
 'channel hyperd Droid now normally',
 'another theorist posting a theory 

In [7]:
with open('input.txt', 'w') as file:
    for line in text:
        file.write(line)

## Tokenizer

In [8]:
# Train SentencePiece model
spm.SentencePieceTrainer.train(
    input='input.txt',
    model_prefix='m',
    vocab_size=4000
)

In [9]:
# Load the trained SentencePiece model
sp = spm.SentencePieceProcessor(model_file='m.model')

with open('input.txt', 'r') as file:
    data = file.read()

text_tensor = torch.tensor(sp.encode(data)).long()

## Training data

In [10]:
n = int(0.9 * len(text_tensor))

train_data = text_tensor[:n]
test_data = text_tensor[n:]

In [11]:
block_size = 8

x = train_data[:block_size]
y = train_data[1:block_size + 1]

for t in range(block_size):
    context = x[:t+1]
    target = y[t]
    print(context, target)

tensor([229]) tensor(1218)
tensor([ 229, 1218]) tensor(213)
tensor([ 229, 1218,  213]) tensor(12)
tensor([ 229, 1218,  213,   12]) tensor(1332)
tensor([ 229, 1218,  213,   12, 1332]) tensor(792)
tensor([ 229, 1218,  213,   12, 1332,  792]) tensor(6)
tensor([ 229, 1218,  213,   12, 1332,  792,    6]) tensor(1348)
tensor([ 229, 1218,  213,   12, 1332,  792,    6, 1348]) tensor(6)


In [12]:
seed = 42

torch.manual_seed(seed)

def get_batch(type, block_size=8, batch_size=4):
    data = train_data if type == 'train' else test_data
    n = torch.randint(0, len(data)-block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in n])
    y = torch.stack([data[i+1:i+block_size + 1] for i in n])
    return x, y

In [13]:
x, y = get_batch('train')
print(x, y)

tensor([[ 326, 2066,    9,  178,  851,  570, 1225,  502],
        [1360,  533,  182,  141,    8,   17,    6,   87],
        [ 158,   49, 3298,  125, 1221, 2306,    7, 1302],
        [ 220,   40,  374,   71,  214, 1223,   63,   10]]) tensor([[2066,    9,  178,  851,  570, 1225,  502, 2265],
        [ 533,  182,  141,    8,   17,    6,   87,  129],
        [  49, 3298,  125, 1221, 2306,    7, 1302,   12],
        [  40,  374,   71,  214, 1223,   63,   10,  688]])


## Transformer

In [39]:
def scaled_dot_product_attention(Q, K, V, mask=None):
    # Calculate the dot product between Q and the transpose of K
    scores = torch.matmul(Q, K.transpose(-2, -1)) # (batch_size, num_heads, seq_len, seq_len)
    
    # Scale the scores by the square root of the dimensionality of Q
    d_k = torch.tensor(Q.size(-1), dtype=torch.float32)
    scaled_scores = scores / torch.sqrt(d_k)

    # Apply the mask if provided
    if mask is not None:
        scaled_scores = scaled_scores.masked_fill(mask == 0, -1e9)

    # Calculate the attention weights using softmax
    attention_weights = F.softmax(scaled_scores, dim=-1) # (batch_size, num_heads, seq_len, seq_len)

    # Multiply the attention weights with the value matrix V
    output = torch.matmul(attention_weights, V) # (batch_size, num_heads, seq_len, d_v)

    return output, attention_weights

In [40]:
Q = torch.tensor([[0.9399, 0.9714, 0.4979, 0.6917, 0.8139],
                  [0.8095, 0.6322, 0.8522, 0.3391, 0.4925],
                  [0.5739, 0.0086, 0.7944, 0.6744, 0.9431]])

K = torch.tensor([[0.0797, 0.2238, 0.6081, 0.8575, 0.4591],
                  [0.7038, 0.2540, 0.9272, 0.7835, 0.6932],
                  [0.0715, 0.7438, 0.4766, 0.8993, 0.8632]])

V = torch.tensor([[0.8015, 0.9885, 0.6720, 0.7487, 0.9365],
                  [0.7837, 0.7665, 0.5004, 0.8486, 0.6787],
                  [0.0739, 0.5611, 0.8035, 0.3088, 0.5421]])

mask = torch.tensor([[1, 1, 0]])

print(scaled_dot_product_attention(Q,K,V, mask))

(tensor([[0.7908, 0.8551, 0.5689, 0.8087, 0.7816],
        [0.7909, 0.8558, 0.5694, 0.8084, 0.7824],
        [0.7911, 0.8583, 0.5713, 0.8073, 0.7853]]), tensor([[0.3992, 0.6008, 0.0000],
        [0.4022, 0.5978, 0.0000],
        [0.4134, 0.5866, 0.0000]]))


In [45]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, mask=None):
        super().__init__()
        self.num_heads = num_heads
        self.mask = mask
        self.d_q = d_model // num_heads
        self.d_k = d_model // num_heads
        self.d_v = d_model // num_heads
        
        # Linear layers for Q, K, V
        self.linear_Q = nn.Linear(d_model, d_model)
        self.linear_K = nn.Linear(d_model, d_model)
        self.linear_V = nn.Linear(d_model, d_model)
        
        # Final linear layer
        self.linear_O = nn.Linear(d_model, d_model)

    def forward(self, x):
        # Linear transformations
        q = self.linear_Q(x)  # (batch_size, seq_len, d_model)
        k = self.linear_K(x)  # (batch_size, seq_len, d_model)
        v = self.linear_V(x)  # (batch_size, seq_len, d_model)

        q = q.view(q.size(0), q.size(1), self.num_heads, self.d_q).transpose(1, 2)  # (batch_size, num_heads, seq_len, d_q)
        k = k.view(k.size(0), k.size(1), self.num_heads, self.d_k).transpose(1, 2)  # (batch_size, num_heads, seq_len, d_k)
        v = v.view(v.size(0), v.size(1), self.num_heads, self.d_v).transpose(1, 2)  # (batch_size, num_heads, seq_len, d_v)

        # Apply scaled dot-product attention for all heads in parallel
        attn_out, _ = scaled_dot_product_attention(q, k, v, self.mask)  # (batch_size, num_heads, seq_len, d_v)

        # Concatenate the outputs along the last dimension
        output = attn_out.transpose(1, 2).contiguous().view(attn_out.size(0), attn_out.size(2), -1)  # (batch_size, seq_len, d_model)

        # Output Linear layer
        output = self.linear_O(output)

        return output

In [62]:
class Encoder(nn.Module):
    def __init__(self, d_model, d_ff, num_heads):
        super().__init__()
        self.multihead_attention = MultiHeadAttention(d_model, num_heads)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
    
    def forward(self, x):
        attention = self.multihead_attention(x)
        normalized = self.layer_norm1(x+attention)

        feed = self.feed_forward(normalized)
        normalized_feed = self.layer_norm2(feed+normalized)

        return normalized_feed

In [65]:
class Decoder(nn.Module):
    def __init__(self, d_model, d_ff, num_heads, mask=None):
        super().__init__()
        self.mask = mask
        self.multihead_attention1 = MultiHeadAttention(d_model, num_heads)
        self.multihead_attention2 = MultiHeadAttention(d_model, num_heads)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.layer_norm3 = nn.LayerNorm(d_model)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )

    def forward(self, x):
        attention = self.multihead_attention1(x)
        normalized = self.layer_norm1(x+attention)

        attention2 = self.multihead_attention2(normalized)
        second_normalized = self.layer_norm2(attention2+normalized)

        feed = self.feed_forward(second_normalized)
        normalized_feed = self.layer_norm3(feed+second_normalized)

        return normalized_feed

In [74]:
class PositionEncoder(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()

        self.pe = torch.zeros(max_len, 1, d_model)
        
        position = torch.arange(0,max_len,dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))

        self.pe[:, 0::2] = torch.sin(position * div_term)
        self.pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', self.pe)

    def forward(self, x):
        return x + self.pe[:x.size(0)]

In [78]:
max_len = 100
d_model = 8

pe = torch.zeros(max_len, d_model)
        
print(pe.shape)


torch.Size([100, 8])


In [82]:
position = torch.arange(0,max_len,dtype=torch.float).unsqueeze(1)
print(position.shape)

torch.Size([100, 1])


In [83]:
div_term = torch.arange(0, d_model, 2)
print(div_term.shape)

torch.Size([4])


In [None]:

pe[:, 0::2] = torch.sin(position / div_term)
pe[:, 1::2] = torch.cos(position / div_term)

In [75]:
# Test the MultiHeadAttention with random input
d_model = 8
num_heads = 8
batch_size = 2
seq_len = 5

# Create random input tensor
x = torch.rand(batch_size, seq_len, d_model)  # (batch_size, seq_len, d_model)

# Initialize the multi-head attention layer
pos_enc = PositionEncoder(d_model)

# Forward pass
output = pos_enc(x)

# Print the output
print("Output shape:", output.shape)  # Expected shape: (batch_size, seq_len, d_model)
print("Output tensor:", output)

RuntimeError: The expanded size of the tensor (8) must match the existing size (4) at non-singleton dimension 2.  Target sizes: [5000, 1, 8].  Tensor sizes: [5000, 4]

In [70]:
class Transformer(nn.Module):
    def __init__(self, d_model, d_ff, num_heads):
        super().__init__()

        self.encoder = Encoder(d_model, d_ff, num_heads)
        self.decoder = Decoder(d_model, d_ff, num_heads)
        self.position = PositionEncoder()
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )

    def forward(self, x, y):        
        x = self.position(x)
        y = self.position(y)

        attention = self.encoder(x)

        output = self.decoder(attention, y)

        output_probs = F.softmax(self.feed_forward(output))

        return output_probs