## 📦 1. Importing Required Libraries

We begin by importing essential libraries for data processing and building the LSTM model:

In [1]:
import pandas as pd
import numpy as np
from collections import Counter
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dropout, Dense
from tensorflow.keras.models import load_model
from tensorflow.keras.optimizers import Adam
from collections import Counter


## 📄 Step 2: Load Dataset in Chunks

To handle large datasets efficiently, we load the FreeCodeCamp chat data in chunks of 100,000 rows:

In [15]:
data_path = "../data/freecodecamp_casual_chatroom.csv"
texts = []
known_bots = ['camperbot']

chunks = pd.read_csv(data_path, chunksize=100_000)

for i, chunk in enumerate(chunks):
    chunk = chunk[chunk["text"].notna()]
    chunk = chunk[~chunk['fromUser.username'].str.lower().isin(known_bots)]   
    texts += chunk["text"].astype(str).tolist()

    if i == 5:
        break

print("Total messages loaded after filtering bots:", len(texts))

  for i, chunk in enumerate(chunks):
  for i, chunk in enumerate(chunks):
  for i, chunk in enumerate(chunks):
  for i, chunk in enumerate(chunks):
  for i, chunk in enumerate(chunks):


Total messages loaded after filtering bots: 561880


  for i, chunk in enumerate(chunks):


- We use `chunksize=100_000` to read the dataset incrementally.
- Only rows with non-null `text` values are retained.
- The loop stops after reading 6 chunks (approximately 600,000 rows).

## 🧹 Step 3: Text Preprocessing and Character Filtering

We create a clean corpus by lowercasing all text, filtering infrequent characters, and mapping characters to integer indices:

In [26]:
corpus = " ".join(texts).lower()

char_freq = Counter(corpus)
min_freq = 100
valid_chars = sorted([c for c, f in char_freq.items() if f >= min_freq])

char_to_idx = {c: i for i, c in enumerate(valid_chars)}
idx_to_char = {i: c for c, i in char_to_idx.items()}

clean_corpus = ''.join(c for c in corpus if c in valid_chars)

## 🔢 Step 4: Sequence Generation

We split the cleaned corpus into overlapping sequences of fixed length and prepare input-output pairs for the model:

In [27]:
maxlen = 100
step = 3

sequences = []
next_chars = []

for i in range(0, len(clean_corpus) - maxlen, step):
    sequences.append(clean_corpus[i:i+maxlen])
    next_chars.append(clean_corpus[i + maxlen])

X = [[char_to_idx[c] for c in seq] for seq in sequences]
y = [char_to_idx[c] for c in next_chars]

## 🧠 Step 5: Model Architecture and Training

We build a character-level LSTM model using Keras and train it on the prepared input-output sequences:

In [None]:
model = Sequential([
    Embedding(input_dim=len(char_to_idx), output_dim=64, input_length=maxlen),
    LSTM(128),
    Dropout(0.2),
    Dense(len(char_to_idx), activation='softmax')
])

model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=Adam(learning_rate=0.001),
    metrics=['accuracy']
)
model.fit(X, y, batch_size=64, epochs=5, validation_split=0.1)


In [None]:
#model = load_model("/Users/work/Desktop/DataScience/Projects/textgen-lstm-gpt2/models/lstm_model.keras")


### 🔍 Model Details:
- `Embedding`: Converts character indices to dense vectors of size 64.
- `LSTM`: 128 units to capture sequential dependencies.
- `Dropout`: Prevents overfitting by randomly dropping 20% of connections during training.
- `Dense`: Final layer with softmax activation to predict the next character.
- `loss`: `sparse_categorical_crossentropy` is used for integer targets.
- `optimizer`: Adam optimizer with learning rate 0.001.
- `validation_split=0.1`: 10% of the data is used for validation.

Training runs for 5 epochs with a batch size of 64.

## ✍️ Step 6: Text Generation with the Trained Model

We define functions to generate text character-by-character using the trained LSTM model. The generation process is autoregressive: each predicted character is appended to the input for the next prediction.

In [29]:
def sample(preds, temperature=1.0):
    preds = np.log(preds + 1e-8) / temperature
    preds = np.exp(preds) / np.sum(np.exp(preds))
    return np.random.choice(len(preds), p=preds)

def generate_text(model, seed, length=300, temperature=1.0):
    result = seed
    input_seq = seed[-maxlen:]

    for _ in range(length):
        input_indices = [char_to_idx.get(c, 0) for c in input_seq]
        input_array = np.zeros((1, maxlen), dtype=np.int32)
        input_array[0, -len(input_indices):] = input_indices

        preds = model.predict(input_array, verbose=0)[0]
        next_idx = sample(preds, temperature)
        next_char = idx_to_char[next_idx]

        result += next_char
        input_seq = result[-maxlen:]

    return result

## 🚀 Step 7: Generate Sample Text

We now generate a sample output using the trained LSTM model and a custom seed string:

In [30]:
seed = "what are you working on"
print(generate_text(model, seed, length=300, temperature=0.8))

what are you working on. comments to get else
 @okonancosek  @dtver @kimkwike a lot and script     @terakilisich ad with a more people la easies mention is to when the fcc of. howe reator applicy the books with the codepen it's good going to could see supposed to conseend weeks and me as you like strong the internet to le


In [None]:
import os
import time
from datetime import datetime

# Create directory for saving models
os.makedirs('models', exist_ok=True)

def create_sequences_and_targets(clean_corpus, char_to_idx, maxlen, step):
    """Create sequences and targets for a given maxlen and step."""
    sequences = []
    next_chars = []
    
    for i in range(0, len(clean_corpus) - maxlen, step):
        sequences.append(clean_corpus[i:i+maxlen])
        next_chars.append(clean_corpus[i + maxlen])
    
    X = [[char_to_idx[c] for c in seq] for seq in sequences]
    y = [char_to_idx[c] for c in next_chars]
    
    return X, y

def create_and_train_model(X, y, char_to_idx, maxlen, epochs=5, model_name="model"):
    """Create, train and save an LSTM model."""
    print(f"\n{'='*50}")
    print(f"Training {model_name}")
    print(f"Sequences: {len(X)}, Max length: {maxlen}, Epochs: {epochs}")
    print(f"{'='*50}")
    
    model = Sequential([
        Embedding(input_dim=len(char_to_idx), output_dim=64, input_length=maxlen),
        LSTM(128),
        Dropout(0.2),
        Dense(len(char_to_idx), activation='softmax')
    ])
    
    model.compile(
        loss='sparse_categorical_crossentropy',
        optimizer=Adam(learning_rate=0.001),
        metrics=['accuracy']
    )
    
    start_time = time.time()
    history = model.fit(X, y, batch_size=64, epochs=epochs, validation_split=0.1, verbose=1)
    training_time = time.time() - start_time
    
    # Save the model
    model_path = f'models/{model_name}.keras'
    model.save(model_path)
    print(f"Model saved to: {model_path}")
    print(f"Training time: {training_time:.2f} seconds")
    
    return model, history, training_time


In [None]:
# Experiment 1: Different maxlen values
maxlen_values = [50, 100, 150]
step = 3
epochs = 5

maxlen_models = {}
maxlen_histories = {}
maxlen_times = {}

for maxlen_val in maxlen_values:
    print(f"\n🔬 Testing maxlen = {maxlen_val}")
    
    # Create sequences for this maxlen
    X_exp, y_exp = create_sequences_and_targets(clean_corpus, char_to_idx, maxlen_val, step)
    
    # Train model
    model_name = f"lstm_maxlen_{maxlen_val}"
    model, history, training_time = create_and_train_model(
        X_exp, y_exp, char_to_idx, maxlen_val, epochs, model_name
    )
    
    # Store results
    maxlen_models[maxlen_val] = model
    maxlen_histories[maxlen_val] = history
    maxlen_times[maxlen_val] = training_time

print("\n✅ Maxlen experiments completed!")


In [None]:
# Experiment 2: Different epoch counts
epoch_values = [2, 10, 20]
maxlen_fixed = 100  # Use standard maxlen for epoch experiments

epoch_models = {}
epoch_histories = {}
epoch_times = {}

# Create sequences once for epoch experiments
X_epoch, y_epoch = create_sequences_and_targets(clean_corpus, char_to_idx, maxlen_fixed, step)

for epoch_val in epoch_values:
    print(f"\n🕐 Testing epochs = {epoch_val}")
    
    # Train model
    model_name = f"lstm_epochs_{epoch_val}"
    model, history, training_time = create_and_train_model(
        X_epoch, y_epoch, char_to_idx, maxlen_fixed, epoch_val, model_name
    )
    
    # Store results
    epoch_models[epoch_val] = model
    epoch_histories[epoch_val] = history
    epoch_times[epoch_val] = training_time

print("\n✅ Epoch experiments completed!")


In [None]:
# Enhanced text generation function that works with different maxlen values
def generate_text_adaptive(model, seed, maxlen_model, char_to_idx, idx_to_char, length=300, temperature=1.0):
    """Generate text with adaptive maxlen based on the model's training parameters."""
    result = seed
    input_seq = seed[-maxlen_model:]
    
    for _ in range(length):
        # Handle case where seed is shorter than maxlen
        input_indices = [char_to_idx.get(c, 0) for c in input_seq]
        input_array = np.zeros((1, maxlen_model), dtype=np.int32)
        
        # Pad or truncate input to match model's expected input length
        if len(input_indices) <= maxlen_model:
            input_array[0, -len(input_indices):] = input_indices
        else:
            input_array[0, :] = input_indices[-maxlen_model:]
        
        preds = model.predict(input_array, verbose=0)[0]
        next_idx = sample(preds, temperature)
        next_char = idx_to_char[next_idx]
        
        result += next_char
        input_seq = result[-maxlen_model:]
    
    return result


In [None]:
# Test seeds for generation
test_seeds = [
    "what are you working on",
    "i need help with",
    "can someone explain"
]

def analyze_text_quality(text, seed):
    """Simple heuristic analysis of generated text quality."""
    # Remove seed from analysis
    generated_part = text[len(seed):]
    
    # Basic metrics
    word_count = len(generated_part.split())
    char_count = len(generated_part)
    
    # Repetition analysis (check for repeated patterns)
    words = generated_part.split()
    unique_words = len(set(words))
    repetition_ratio = unique_words / max(word_count, 1)
    
    # Check for coherent word boundaries (spaces between letters)
    space_count = generated_part.count(' ')
    space_ratio = space_count / max(char_count, 1)
    
    return {
        'word_count': word_count,
        'char_count': char_count,
        'repetition_ratio': repetition_ratio,
        'space_ratio': space_ratio,
        'avg_word_length': char_count / max(word_count, 1)
    }

# Analyze maxlen experiments
print("🔍 MAXLEN EXPERIMENTS ANALYSIS")
print("="*60)

for seed in test_seeds:
    print(f"\n📝 Seed: '{seed}'")
    print("-" * 40)
    
    for maxlen_val in maxlen_values:
        model = maxlen_models[maxlen_val]
        generated_text = generate_text_adaptive(
            model, seed, maxlen_val, char_to_idx, idx_to_char, length=150, temperature=0.8
        )
        
        analysis = analyze_text_quality(generated_text, seed)
        
        print(f"\n🔬 MAXLEN {maxlen_val}:")
        print(f"Generated: {generated_text}")
        print(f"Quality metrics: Words={analysis['word_count']}, "
              f"Repetition={analysis['repetition_ratio']:.2f}, "
              f"Avg word len={analysis['avg_word_length']:.1f}")
        print(f"Training time: {maxlen_times[maxlen_val]:.2f}s")


In [None]:
# Analyze epoch experiments
print("\n\n🕐 EPOCH EXPERIMENTS ANALYSIS")
print("="*60)

for seed in test_seeds:
    print(f"\n📝 Seed: '{seed}'")
    print("-" * 40)
    
    for epoch_val in epoch_values:
        model = epoch_models[epoch_val]
        generated_text = generate_text_adaptive(
            model, seed, maxlen_fixed, char_to_idx, idx_to_char, length=150, temperature=0.8
        )
        
        analysis = analyze_text_quality(generated_text, seed)
        
        print(f"\n🕐 EPOCHS {epoch_val}:")
        print(f"Generated: {generated_text}")
        print(f"Quality metrics: Words={analysis['word_count']}, "
              f"Repetition={analysis['repetition_ratio']:.2f}, "
              f"Avg word len={analysis['avg_word_length']:.1f}")
        print(f"Training time: {epoch_times[epoch_val]:.2f}s")


In [None]:
# Performance summary and analysis
print("\n\n📈 PERFORMANCE SUMMARY")
print("="*60)

print("\n🔍 MAXLEN EXPERIMENT SUMMARY:")
for maxlen_val in maxlen_values:
    history = maxlen_histories[maxlen_val]
    final_loss = history.history['loss'][-1]
    final_val_loss = history.history['val_loss'][-1]
    final_accuracy = history.history['accuracy'][-1]
    training_time = maxlen_times[maxlen_val]
    
    print(f"Maxlen {maxlen_val:3d}: Loss={final_loss:.4f}, Val_Loss={final_val_loss:.4f}, "
          f"Accuracy={final_accuracy:.4f}, Time={training_time:.1f}s")

print("\n🕐 EPOCH EXPERIMENT SUMMARY:")
for epoch_val in epoch_values:
    history = epoch_histories[epoch_val]
    final_loss = history.history['loss'][-1]
    final_val_loss = history.history['val_loss'][-1]
    final_accuracy = history.history['accuracy'][-1]
    training_time = epoch_times[epoch_val]
    
    # Check for potential overfitting (val_loss > loss by significant margin)
    overfitting_indicator = "⚠️ Overfitting" if final_val_loss > final_loss * 1.2 else "✅ Good fit"
    
    print(f"Epochs {epoch_val:2d}: Loss={final_loss:.4f}, Val_Loss={final_val_loss:.4f}, "
          f"Accuracy={final_accuracy:.4f}, Time={training_time:.1f}s - {overfitting_indicator}")


In [None]:
# Save experimental results to a summary file
import json

experiment_summary = {
    "experiment_date": datetime.now().isoformat(),
    "dataset_info": {
        "total_messages": len(texts),
        "corpus_length": len(clean_corpus),
        "vocabulary_size": len(char_to_idx),
        "min_char_frequency": 100
    },
    "maxlen_experiments": {},
    "epoch_experiments": {},
    "model_files": []
}

# Add maxlen experiment results
for maxlen_val in maxlen_values:
    history = maxlen_histories[maxlen_val]
    experiment_summary["maxlen_experiments"][str(maxlen_val)] = {
        "final_loss": float(history.history['loss'][-1]),
        "final_val_loss": float(history.history['val_loss'][-1]),
        "final_accuracy": float(history.history['accuracy'][-1]),
        "training_time_seconds": float(maxlen_times[maxlen_val]),
        "model_file": f"models/lstm_maxlen_{maxlen_val}.keras"
    }
    experiment_summary["model_files"].append(f"models/lstm_maxlen_{maxlen_val}.keras")

# Add epoch experiment results
for epoch_val in epoch_values:
    history = epoch_histories[epoch_val]
    experiment_summary["epoch_experiments"][str(epoch_val)] = {
        "final_loss": float(history.history['loss'][-1]),
        "final_val_loss": float(history.history['val_loss'][-1]),
        "final_accuracy": float(history.history['accuracy'][-1]),
        "training_time_seconds": float(epoch_times[epoch_val]),
        "model_file": f"models/lstm_epochs_{epoch_val}.keras"
    }
    experiment_summary["model_files"].append(f"models/lstm_epochs_{epoch_val}.keras")

# Save summary
with open('models/experiment_summary.json', 'w') as f:
    json.dump(experiment_summary, f, indent=2)

print("📊 Experiment summary saved to: models/experiment_summary.json")
print("🎯 All trained models saved in the 'models/' directory")
print("\n✅ Task 1.5 (Experimentation and Analysis) completed successfully!")

# Display saved models
import os
model_files = [f for f in os.listdir('models') if f.endswith('.keras')]
print(f"\n📁 Saved models ({len(model_files)} total):")
for model_file in sorted(model_files):
    print(f"   - {model_file}")
