# **Milestone 3**

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import gc
import os
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Load dataset in chunks
file_path = '/content/Spotify Million Song Dataset_exported.csv'  # Replace with the actual file path
chunksize = 10000  # Adjust chunk size as needed

# Initialize NLTK resources
nltk.download('punkt')
nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

def clean_text(text):
    tokens = word_tokenize(text)
    tokens = [word.lower() for word in tokens if word.isalpha() and word.lower() not in stop_words]
    return tokens

# Initialize tokenizer
tokenizer = Tokenizer(num_words=10000, oov_token='<UNK>')

# Process dataset in chunks
def process_chunk(chunk):
    chunk.dropna(inplace=True)
    chunk.drop_duplicates(inplace=True)
    chunk['cleaned_lyrics'] = chunk['text'].astype(str).apply(clean_text)
    chunk['cleaned_lyrics_str'] = chunk['cleaned_lyrics'].apply(lambda x: ' '.join(x))
    chunk['text_with_artist'] = chunk['artist'] + ' ' + chunk['cleaned_lyrics_str']
    return chunk

# Fit tokenizer on the entire dataset in chunks
for chunk in pd.read_csv(file_path, chunksize=chunksize, encoding='utf-8'):
    chunk = process_chunk(chunk)
    tokenizer.fit_on_texts(chunk['text_with_artist'])
    del chunk
    gc.collect()

# Save tokenizer configuration
tokenizer_json = tokenizer.to_json()
with open('/content/drive/MyDrive/Checkpoints/tokenizer.json', 'w') as f:
    f.write(tokenizer_json)

# Determine max sequence length by processing a small sample of the dataset
sample_chunk = pd.read_csv(file_path, nrows=chunksize, encoding='utf-8')
sample_chunk = process_chunk(sample_chunk)
sample_sequences = tokenizer.texts_to_sequences(sample_chunk['text_with_artist'])
max_sequence_len = max([len(x) for x in sample_sequences])
del sample_chunk, sample_sequences
gc.collect()

# Function to generate padded sequences and targets in chunks
def data_generator(file_path, chunksize, max_sequence_len, batch_size, tokenizer):
    for chunk in pd.read_csv(file_path, chunksize=chunksize, encoding='utf-8'):
        chunk = process_chunk(chunk)
        sequences = tokenizer.texts_to_sequences(chunk['text_with_artist'])
        input_sequences = []
        for seq in sequences:
            for i in range(1, len(seq)):
                n_gram_sequence = seq[:i+1]
                input_sequences.append(n_gram_sequence)
        input_sequences = pad_sequences(input_sequences, maxlen=max_sequence_len, padding='pre')
        targets = input_sequences[:, -1]
        input_sequences = input_sequences[:, :-1]
        for start in range(0, len(input_sequences), batch_size):
            end = min(start + batch_size, len(input_sequences))
            yield input_sequences[start:end], targets[start:end]
        del chunk, sequences, input_sequences, targets
        gc.collect()

# Create a TensorFlow dataset from the generator
def create_tf_dataset(file_path, chunksize, max_sequence_len, batch_size, tokenizer):
    generator = lambda: data_generator(file_path, chunksize, max_sequence_len, batch_size, tokenizer)
    dataset = tf.data.Dataset.from_generator(generator,
                                             output_types=(tf.int32, tf.int32),
                                             output_shapes=((None, max_sequence_len-1), (None,)))
    return dataset

# Load the pre-trained model from Google Drive
model_path = '/content/drive/MyDrive/checkpoints/Yousef_trained_model.h5'
model = load_model(model_path)

# Freeze the base layers if you don't want to retrain them
for layer in model.layers[:-1]:  # Keep the last layer trainable
    layer.trainable = False

# Compile the model with a lower learning rate
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Setup callbacks for early stopping and best model saving
checkpoint_dir = '/content/drive/MyDrive/Checkpoints'
early_stopping = EarlyStopping(monitor='val_loss', patience=5)
model_checkpoint = ModelCheckpoint(os.path.join(checkpoint_dir, 'fine_tuned_model.h5'), save_best_only=True, monitor='val_loss')
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6, verbose=1)

# Create TensorFlow datasets
batch_size = 128
train_dataset = create_tf_dataset(file_path, chunksize, max_sequence_len, batch_size, tokenizer)
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)

# Estimate steps per epoch
steps_per_epoch = 500  # Set this to a reasonable number

# Train the model
history = model.fit(
    train_dataset,
    steps_per_epoch=steps_per_epoch,
    epochs=5,  # Additional epochs for fine-tuning
    callbacks=[early_stopping, model_checkpoint, reduce_lr]
)

# Save the final model after training
final_model_path = os.path.join(checkpoint_dir, 'final_trained_model.h5')
model.save(final_model_path)
print(f"Model saved to {final_model_path}")


Mounted at /content/drive


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


Epoch 1/5



Epoch 2/5



Epoch 3/5



Epoch 4/5



Epoch 5/5





  saving_api.save_model(


Model saved to /content/drive/MyDrive/Checkpoints/final_trained_model.h5


In [None]:
# Function to generate text based on a seed text
def generate_text(seed_text, next_words, model, tokenizer, max_sequence_len):
    words_added = 0
    current_text = seed_text
    while words_added < next_words:
        # Convert the current seed_text to a sequence of tokens
        token_list = tokenizer.texts_to_sequences([current_text])[0]
        # Pad the sequence
        token_list_padded = pad_sequences([token_list], maxlen=max_sequence_len-1, padding='pre')
        # Predict the next word
        predictions = model.predict(token_list_padded, verbose=0).squeeze()
        # Get the predicted word based on probability values
        predicted_index = np.argmax(predictions)
        # Get the predicted word from the tokenizer's index_word mapping
        output_word = tokenizer.index_word.get(predicted_index, '')

        # Check if output_word is valid
        if output_word and output_word.strip():
            current_text += ' ' + output_word.strip()
            words_added += 1
        else:
            continue  # Skip adding an invalid word and do not count it toward words_added

    return current_text