# Lab 6 - Recurrent Neural Networks

## Problem 1:


There is nothing in principle different from the character level RNN above and a word level RNN. Make a RNN to generate sentences from words instead of characters (you may clean out all punctuation to make this easier). It may help to only use the words that appear more than one, more than twice or more than three times, dropping any phrases that include unique words. 

Construct a Shakespearian text generator using a word encoding rather than a letter by letter encoding. Compare the results of using LSTM vs GRU nodes. 

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import re
from collections import Counter
from torch.utils.data import DataLoader, TensorDataset

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# print(f"Using device: {device}")

# Load the data
with open("Shakespeare.txt", "r", encoding="utf-8") as f:
    text = f.read().lower()

# Clean the text
text = re.sub(r"[^\w\s]", "", text)

words = text.split()

min_word_freq = 1000  # Keep words occured >=1000 times
word_counts = Counter(words)
vocab = {word for word, count in word_counts.items() if count >= min_word_freq}

word_to_index = {word: i+1 for i, word in enumerate(vocab)}
word_to_index["<UNK>"] = 0  # Replace less-frequent words as <UNK>
index_to_word = {i: word for word, i in word_to_index.items()}

data = [word_to_index.get(word, 0) for word in words] 

# Set sequence length to 5
seq_length = 5  

# Construct training dataset
X, Y = [], []
for i in range(len(data) - seq_length):
    X.append(data[i:i+seq_length])
    Y.append(data[i+seq_length])

X = torch.tensor(X, dtype=torch.long)
Y = torch.tensor(Y, dtype=torch.long)

# print(f"Size of Dataset: {X.shape[0]}")
# print(f"Size of Vocabulary Set: {len(word_to_index)}")

# Create a DataLoader to load data in bulk
batch_size = 64
dataset = TensorDataset(X, Y)
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


In [8]:
class WordRNN(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers, rnn_type="LSTM"):
        super(WordRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn_type = rnn_type
        if rnn_type == "LSTM":
            self.rnn = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        else:
            self.rnn = nn.GRU(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden=None):
        x = self.embedding(x)
        output, hidden = self.rnn(x, hidden)
        output = self.fc(output[:, -1, :]) 
        return output, hidden

# Model Parameter
vocab_size = len(word_to_index)
embed_size = 128
hidden_size = 128  
num_layers = 1 

# Initialize LSTM and GRU
lstm_model = WordRNN(vocab_size, embed_size, hidden_size, num_layers, "LSTM").to(device)
gru_model = WordRNN(vocab_size, embed_size, hidden_size, num_layers, "GRU").to(device)


In [11]:
def train(model, train_loader, num_epochs=10, lr=0.001):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for batch_X, batch_Y in train_loader:
            batch_X, batch_Y = batch_X.to(device), batch_Y.to(device)

            optimizer.zero_grad()
            output, _ = model(batch_X) 
            loss = criterion(output, batch_Y)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

print("Training LSTM Model...")
train(lstm_model, train_loader)

print("\nTraining GRU Model...")
train(gru_model, train_loader)


Training LSTM Model...
Epoch 1/10, Loss: 2.5407
Epoch 2/10, Loss: 2.5344
Epoch 3/10, Loss: 2.5289
Epoch 4/10, Loss: 2.5237
Epoch 5/10, Loss: 2.5197
Epoch 6/10, Loss: 2.5155
Epoch 7/10, Loss: 2.5119
Epoch 8/10, Loss: 2.5085
Epoch 9/10, Loss: 2.5054
Epoch 10/10, Loss: 2.5025

Training GRU Model...
Epoch 1/10, Loss: 2.5616
Epoch 2/10, Loss: 2.5578
Epoch 3/10, Loss: 2.5538
Epoch 4/10, Loss: 2.5503
Epoch 5/10, Loss: 2.5476
Epoch 6/10, Loss: 2.5452
Epoch 7/10, Loss: 2.5432
Epoch 8/10, Loss: 2.5410
Epoch 9/10, Loss: 2.5392
Epoch 10/10, Loss: 2.5377


## Problem 2: (Geron, 15.10)

<img width = 800 src = http://www.bach-chorales.com/Images/ChoraleImages/Image_BWV_0133_6.jpg>

Download the [Bach chorales dataset](https://homl.info/bach) and unzip it. It is composed of 382 chorales composed by Johann Sebastian Bach. Each chorale is 100 to 640 time steps long, and each time step contains 4 integers, where each integer corresponds to a note’s index on a piano (except for the value 0, which means that no note is played).

Train a model — recurrent, convolutional, or both — that can predict the next time step (four notes), given a sequence of time steps from a chorale. Then use this model to generate Bach-like music, one note at a time: you can do this by giving the model the start of a chorale and asking it to predict the next time step, then appending these time steps to the input sequence and asking the model for the next note, and so on. Also make sure to check out [Google’s Coconet model](https://magenta.tensorflow.org/coconet), which was used for a nice Google doodle about Bach.

#### Further information

At each time step, each chorales has 4 notes. Looking at the CSV for one of  the files we see that the columns are __note0__, __note1__, __note2__ and __note3__, with each row corresponding to a timestep. Each of the numbers corresponds to a piano key. 

Python can construct audio and Jupyter can play it in a Jupyter widget:

In [27]:
import pandas as pd

# Load piano note frequency data
piano_path = "https://raw.githubusercontent.com/tipthederiver/Math-7243-2020/master/Labs/Lab%206/Piano%20Notes.csv"
notes = pd.read_csv(piano_path, encoding='unicode_escape')
print(notes.head())

        Note  Frequency (Hz)  Wavelength (cm)  Key Position
0         C0           16.35          2109.89             1
1   C#0/Db0            17.32          1991.47             2
2         D0           18.35          1879.69             3
3   D#0/Eb0            19.45          1774.20             4
4         E0           20.60          1674.62             5


The audio is then constructed by a simple sum of sine functions. I've uploaded in index between the note position and the keys taken from Wikipeidia (https://en.wikipedia.org/wiki/Piano_key_frequencies).

We can then get the frequency from the key position by

In [30]:
notes[notes["Key Position"]==1]

Unnamed: 0,Note,Frequency (Hz),Wavelength (cm),Key Position
0,C0,16.35,2109.89,1


The function below will take a sequence of notes and turn it into audio.

In [31]:
def gen_audio(song, notes, framerate=22050, L=.25):
    framerate = 22050  ## Standard Framerate
    N = len(song) # Number of Notes
    L = .25        # Note Length in Seconds
    W = int(framerate*L) # Window Size
    t = np.linspace(0,L,W)
    data = np.zeros(W*N)

    for i in range(N): 
        F = notes["Frequency (Hz)"].iloc[song[i]+1]
        data[W*i:W*(i+1)] = np.sin(2*np.pi*F*t)
        
    return data

 I've included two note snippets below. These are __note0__ and __note1__ from chorale 305:

In [32]:
song_0 = [65,65,65,65,72,72,70,70,69,69,67,67,65,65,65,65,72,72,72,72,74,74,74,74,74,74,74]
song_1 = [60,60,60,60,60,60,60,60,60,60,60,60,62,62,64,64,65,65,65,65,65,65,65,65,65,65,65]

You can generate the tunes individually:

In [33]:
data_0 = gen_audio(song_0, notes)
Audio(data,rate=22050)

or to hear them together, simply add the sine-wave representation:

In [34]:
data_0 = gen_audio(song_0, notes)
data_1 = gen_audio(song_1, notes)
Audio(data_0 + data_1,rate=22050)

#### Rough Outline: 

This entire project can be completed without ever listening to the audio files and instead just treating the sequences as sequences. Your pattern should roughly be the text generator above, although now at each time step we have 4 notes, not a single letter. A rough outline is as follows:

* Load a single chorale using Panda's `read_csv` function.
* Construct the training data as we did for the text generator: the `X_train` will be sequences of $K$ timesteps, each time step containing the 4 notes. It is your choice if you leave them as integers of one-hot encode the notes. 
* The labels `y_train` will be sequences of length $K$ timesteps shifted by 1.
* Construct a simple RNN model with 64 LSTM nodes. Your input and output shape will be $K\times 4$ if you do not one-hot encode and $K\times 4\times 108$ if you do one-hot encode.
* After your get your network running on one chorale, expand your dataset by adding sequences from other chorales. 

In [35]:
import numpy as np

def create_training_data(song, timesteps=10):
    X, y = [], []
    for i in range(len(song) - timesteps):
        X.append(song[i:i + timesteps]) 
        y.append(song[i + timesteps])   
    return np.array(X), np.array(y)

# Construct training data
timesteps = 10  
X_0, y_0 = create_training_data(song_0, timesteps)
X_1, y_1 = create_training_data(song_1, timesteps)

# Merge two songs data
X_train = np.concatenate([X_0, X_1], axis=0)
y_train = np.concatenate([y_0, y_1], axis=0)

In [36]:
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))

In [37]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

# Construct the model
model = Sequential()
model.add(LSTM(64, input_shape=(X_train.shape[1], X_train.shape[2]), return_sequences=False))
model.add(Dropout(0.2))  
model.add(Dense(1, activation='linear'))  

# Interprete the model
model.compile(optimizer='adam', loss='mse')

model.summary()

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_2 (LSTM)               (None, 64)                16896     
                                                                 
 dropout_2 (Dropout)         (None, 64)                0         
                                                                 
 dense_2 (Dense)             (None, 1)                 65        
                                                                 
Total params: 16961 (66.25 KB)
Trainable params: 16961 (66.25 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [38]:
# Train the model
model.fit(X_train, y_train, epochs=50, batch_size=32)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.src.callbacks.History at 0x7fe1fc423970>

In [39]:
def generate_song(model, start_sequence, timesteps=10, num_notes=50):
    song = start_sequence
    for _ in range(num_notes):
        X_input = np.array(song[-timesteps:]).reshape((1, timesteps, 1))
        next_note = model.predict(X_input)
        song.append(int(next_note))  # Join the predicted notes into the sequence
    return song

# Generate new sequence
generated_song = generate_song(model, song_0[:timesteps], timesteps=10, num_notes=50)

print(generated_song)

[65, 65, 65, 65, 72, 72, 70, 70, 69, 69, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9]
