In [1]:
import pandas as pd
import string
import numpy as np
import json

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import Sequential
import tensorflow.keras.utils as ku

tf.random.set_seed(42)
from numpy.random import seed
seed(42)

print("Loading datasets...")
# Load all the datasets 
df1 = pd.read_csv('USvideos.csv')
df2 = pd.read_csv('CAvideos.csv')
df3 = pd.read_csv('GBvideos.csv')

# Load the datasets containing the category names
data1 = json.load(open('US_category_id.json'))
data2 = json.load(open('CA_category_id.json'))
data3 = json.load(open('GB_category_id.json'))
print("Datasets loaded successfully!")

Loading datasets...
Datasets loaded successfully!


In [2]:
def category_extractor(data):
    i_d = [data['items'][i]['id'] for i in range(len(data['items']))]
    title = [data['items'][i]['snippet']["title"] for i in range(len(data['items']))]
    i_d = list(map(int, i_d))
    category = zip(i_d, title)
    category = dict(category)
    return category

print("Processing video categories...")
# Create a new category column by mapping the category names to their id
df1['category_title'] = df1['category_id'].map(category_extractor(data1))
df2['category_title'] = df2['category_id'].map(category_extractor(data2))
df3['category_title'] = df3['category_id'].map(category_extractor(data3))

# Join the dataframes
df = pd.concat([df1, df2, df3], ignore_index=True)

# Drop rows based on duplicate videos
df = df.drop_duplicates('video_id')

# Collect only titles of entertainment videos
entertainment = df[df['category_title'] == 'Entertainment']['title']
entertainment = entertainment.tolist()

# Remove punctuations and convert text to lowercase
def clean_text(text):
    # Keep only letters, numbers, and spaces
    text = ''.join(e.lower() for e in text if (e.isalnum() or e.isspace()))
    # Remove extra spaces
    text = ' '.join(text.split())
    return text

# Clean all entertainment titles
corpus = [clean_text(x) for x in entertainment]
# Remove very short titles (less than 3 words)
corpus = [x for x in corpus if len(x.split()) >= 3]

print(f"Processed {len(corpus)} video titles")

Processing video categories...
Processed 9509 video titles


In [3]:
print("Creating sequences...")
tokenizer = Tokenizer(num_words=10000)  # Limit vocabulary size
def get_sequence_of_tokens(corpus):
    # Get tokens
    tokenizer.fit_on_texts(corpus)
    total_words = min(len(tokenizer.word_index) + 1, 10000)
    
    # Convert to sequence of tokens
    input_sequences = []
    for line in corpus:
        token_list = tokenizer.texts_to_sequences([line])[0]
        for i in range(1, len(token_list)):
            n_gram_sequence = token_list[:i+1]
            input_sequences.append(n_gram_sequence)
    
    return input_sequences, total_words

inp_sequences, total_words = get_sequence_of_tokens(corpus)
print(f"Created {len(inp_sequences)} sequences with vocabulary size {total_words}")

Creating sequences...
Created 69159 sequences with vocabulary size 10000


In [5]:
def generate_padded_sequences(input_sequences):
    max_sequence_len = max([len(x) for x in input_sequences])
    input_sequences = np.array(pad_sequences(input_sequences, maxlen=max_sequence_len, padding='pre'))
    predictors, label = input_sequences[:,:-1], input_sequences[:, -1]
    label = ku.to_categorical(label, num_classes=total_words)
    return predictors, label, max_sequence_len

print("Generating padded sequences...")
predictors, label, max_sequence_len = generate_padded_sequences(inp_sequences)
print(f"Max sequence length: {max_sequence_len}")

Generating padded sequences...


MemoryError: Unable to allocate 5.15 GiB for an array with shape (69159, 10000) and data type float64

In [None]:
def create_model(max_sequence_len, total_words):
    input_len = max_sequence_len - 1
    model = Sequential()
    
    # Embedding layer
    model.add(Embedding(total_words, 100, input_length=input_len))
    
    # Bidirectional LSTM layers
    model.add(Bidirectional(LSTM(150, return_sequences=True)))
    model.add(Dropout(0.2))
    
    model.add(Bidirectional(LSTM(100)))
    model.add(Dropout(0.2))
    
    # Dense layers
    model.add(Dense(100, activation='relu'))
    model.add(Dropout(0.2))
    
    # Output layer
    model.add(Dense(total_words, activation='softmax'))
    
    # Compile model
    model.compile(loss='categorical_crossentropy', 
                 optimizer='adam',
                 metrics=['accuracy'])
    
    return model

print("Creating and training the model...")
model = create_model(max_sequence_len, total_words)

# Add early stopping to prevent overfitting
early_stopping = EarlyStopping(monitor='val_loss', 
                              patience=5,
                              restore_best_weights=True)

history = model.fit(predictors, 
                   label, 
                   epochs=50,
                   batch_size=128,
                   validation_split=0.1,
                   callbacks=[early_stopping],
                   verbose=1)

print("Training completed!")

In [None]:
def generate_text(seed_text, next_words, model, max_sequence_len, temperature=0.7):
    for _ in range(next_words):
        token_list = tokenizer.texts_to_sequences([seed_text])[0]
        token_list = pad_sequences([token_list], maxlen=max_sequence_len-1, padding='pre')
        
        # Get predicted probabilities
        predicted_probs = model.predict(token_list, verbose=0)[0]
        
        # Apply temperature scaling
        predicted_probs = np.log(predicted_probs) / temperature
        predicted_probs = np.exp(predicted_probs) / np.sum(np.exp(predicted_probs))
        
        # Sample from the scaled distribution
        predicted = np.random.choice(len(predicted_probs), p=predicted_probs)
        
        output_word = ""
        for word, index in tokenizer.word_index.items():
            if index == predicted:
                output_word = word
                break
                
        seed_text += " " + output_word
    
    # Capitalize first letter of each word for title format
    return ' '.join(word.capitalize() for word in seed_text.split())

# Generate titles with different temperatures
seed_words = ["spiderman", "funny", "amazing", "best", "how to"]
temperatures = [0.5, 0.7, 1.0]

print("Generated YouTube Titles:")
print("-" * 50)
for seed in seed_words:
    print(f"Seed: '{seed}'")
    for temp in temperatures:
        generated_title = generate_text(seed, 6, model, max_sequence_len, temperature=temp)
        print(f"Temperature {temp:.1f}: {generated_title}")
    print("-" * 50)