In [1]:
import os
import json
import argparse
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dropout, TimeDistributed, Dense, Activation, Embedding, GRU

In [2]:
DATA_DIR = './data/'
MODEL_DIR = './model/'
filename = 'input.txt'

In [None]:
BATCH_SIZE = 16
SEQ_LENGTH = 64

In [None]:
# In this function we are creating batches with sequence length of 64 chars. 
# X consists of indices of characters for each of the sequence
# Y is a tensor which is a on hot encoded value corresponding to the index of next character
def read_batches(T, vocab_size):
    length = T.shape[0]; #129,665
    batch_chars = int(length / BATCH_SIZE); # 8,104

    for start in range(0, batch_chars - SEQ_LENGTH, SEQ_LENGTH): # (0, 8040, 64)
        X = np.zeros((BATCH_SIZE, SEQ_LENGTH)) # 16X64
        Y = np.zeros((BATCH_SIZE, SEQ_LENGTH, vocab_size)) # 16X64X86
        for batch_idx in range(0, BATCH_SIZE): # (0,16)
            for i in range(0, SEQ_LENGTH): #(0,64)
                X[batch_idx, i] = T[batch_chars * batch_idx + start + i] # 
                Y[batch_idx, i, T[batch_chars * batch_idx + start + i + 1]] = 1
        yield X, Y

![image-3.png](attachment:image-3.png)

![image.png](attachment:image.png)

# Multi stack LSTM Model

In [None]:
def save_weights(epoch, model):
    if not os.path.exists(MODEL_DIR):
        os.makedirs(MODEL_DIR)
    model.save_weights(os.path.join(MODEL_DIR, 'weights.{}.h5'.format(epoch)))

def load_weights(epoch, model):
    model.load_weights(os.path.join(MODEL_DIR, 'weights.{}.h5'.format(epoch)))

###########################################################################################################################
## The hidden state has multiple LSTM units(256). Same input goes to all the LSTM units, each of the LSTM will learn different 
## aspects of the input characters. Output of each LSTM goes to next timestamp

## return_sequences=True --- By default this parameter is false for LSTM. If we make it True, for every LSTM unit
##                           if we want it to generate an output we have to set this parameter to True. 
##                           Usually for many-many artchitectures this parameter is true

## TimeDistributed Dense layer --- After every timestep if we want to create a dense layer then we use this parameter. 
##                                 It is similar to an MLP with 86 neurons i.e. vocab_size

## stateful=True --- Default value is False. BUt if True then we are giving output of 'Batch 1_First row' as input to 'Batch 2_First row'
##                   Essentialluy RNN will learn data from 0 to 8103 with continuity

## LSTM Documentation: https://keras.io/api/layers/recurrent_layers/lstm/
##                     https://keras.io/models/sequential/
###########################################################################################################################

def build_model(batch_size, seq_len, vocab_size):
    model = Sequential()
    model.add(Embedding(vocab_size, 512, batch_input_shape=(batch_size, seq_len)))
    for i in range(3):
        model.add(LSTM(256, return_sequences=True, stateful=True))
        model.add(Dropout(0.2))

    model.add(TimeDistributed(Dense(vocab_size))) 
    model.add(Activation('softmax'))
    return model

#if __name__ == '__main__':
    #model = build_model(16, 64, 50)
    #model.summary()

In [None]:
#### text has the data pressent in input.txt file, # of epochs is 100, after every 10 epochs we are saving the model
def train(text, epochs=100, save_freq=10):

    # Sorting the characters in the text file and assigning index numbers to each character. 
    # char_to_idx will be a dictionary of unique characters as key and index number is the value pair of dictionary
    # To summarize we are comverting each character to a numerical index
    
    char_to_idx = { ch: i for (i, ch) in enumerate(sorted(list(set(text)))) }
    print("Number of unique characters: " + str(len(char_to_idx))) #86

    ## Saving the char_to_idx to a json file
    with open(os.path.join(DATA_DIR, 'char_to_idx.json'), 'w') as f:
        json.dump(char_to_idx, f)

    #3 Here we are creating index to character mapping, i.e. given an index we want to get the character for that index
    idx_to_char = { i: ch for (ch, i) in char_to_idx.items() }
    vocab_size = len(char_to_idx)

    ######################################
    ######### Model architecture #########
    ######################################
    
    model = build_model(BATCH_SIZE, SEQ_LENGTH, vocab_size)
    model.summary()
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    
    ###################################
    ###### Train data generation ######
    ###################################
    
    #convert complete text into numerical indices
    T = np.asarray([char_to_idx[c] for c in text], dtype=np.int64)

    print("Length of text:" + str(T.size)) #129,665

    steps_per_epoch = (len(text) / BATCH_SIZE - 1) / SEQ_LENGTH  
    
    epc,losses, accs = [], [], []
    ### This for loop will run for 100 epochs
    for epoch in range(epochs):
        print('\nEpoch {}/{}'.format(epoch + 1, epochs))
        
        

        # For each epoch it will generate a batch of X , Y values. For each batch we will train the model. 
        for i, (X, Y) in enumerate(read_batches(T, vocab_size)):
            
            #print(X);
            
            ## Details about train_on_batch here: https://keras.io/models/sequential/
            loss, acc = model.train_on_batch(X, Y)
            print('Batch {}: loss = {}, acc = {}'.format(i + 1, loss, acc))
            epc.append(epoch + 1)
            losses.append(loss)
            accs.append(acc)
        
      
        # Saving the model after every 10 epochs
        if (epoch + 1) % save_freq == 0:
            save_weights(epoch + 1, model)
            print('Saved checkpoint to', 'weights.{}.h5'.format(epoch + 1))
    
    df1 = pd.DataFrame(epc)
    df2 = pd.DataFrame(losses)
    df3 = pd.DataFrame(accs)
    frames = [df1,df2, df3]

    result = pd.concat(frames, axis =1)
    result.columns=['Epoch','Losses','Accuracy']

    grouped_multiple = result.groupby(['Epoch'], as_index=False).agg({'Losses': 'mean'
                                              ,'Accuracy':'mean'})
    
    return grouped_multiple


In [None]:
if __name__ == '__main__':
    epochs = 100
    freq = 10
    
    ### Calling the train function to read the data from input.txt file 
    Train_epoch_loss_acc = train(open(os.path.join(DATA_DIR, filename)).read(), epochs, freq)

In [None]:
Train_epoch_loss_acc

In [None]:
plt.plot(Train_epoch_loss_acc['Losses'])
plt.title('LSTM Model train')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['train_loss'], loc='upper right')

In [None]:
plt.plot(Train_epoch_loss_acc['Accuracy'])
plt.title('LSTM Model train')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['train_accuracy'], loc='upper right')

plt.show()

### Code for building a LSTM model to generate sample sequence of characters

In [None]:
def build_model_seq_gen(unique_chars):
    model = Sequential()
    model.add(Embedding(unique_chars, 512, batch_input_shape=(1, 1)))
    for i in range(3):
        model.add(LSTM(256, return_sequences=(i != 2), stateful=True))
        model.add(Dropout(0.2))

    model.add(Dense(unique_chars))
    model.add(Activation('softmax'))
    return model

In [None]:
def sample_seq_generator(epoch_num, character_index, seq_length):
    with open(os.path.join(DATA_DIR, 'char_to_idx.json')) as f:
        char_to_index = json.load(f)
    index_to_char = {i:ch for ch, i in char_to_index.items()}
    vocab_size = len(index_to_char)

    ##########################################################
    ######### Sequence generator model architecture  #########
    ##########################################################
    
    model = build_model_seq_gen(vocab_size)
    model.load_weights(MODEL_DIR + 'weights.{}.h5'.format(epoch_num))
     
    sequence_index = [character_index]
    
    for _ in range(seq_length):
        batch = np.zeros((1, 1))
        batch[0, 0] = sequence_index[-1]
        
        predicted_probs = model.predict(batch).reshape(-1)
        #print(predicted_probs)
        sample = np.random.choice(range(vocab_size), size = 1, p = predicted_probs)
        
        sequence_index.append(sample[0])
    
    seq = ''.join(index_to_char[c] for c in sequence_index)
    return seq

In [None]:
epoch_number = int(input("Enter epoch number between 1 to 100 only multiples of 10: "))
character_index = int(input("Enter any number between 0 to 86 to sequence generation: "))
Sequence_length = int(input("Number of characters to generate: "))

Generated_music_sequence = sample_seq_generator(epoch_number, character_index, Sequence_length)

print("\n")

print(Generated_music_sequence)

### -------------------------------------------------------------------------------------------------------------------------------------------------------

# Additional work

## Multi stack GRU Model architecture

In [None]:
def save_weights(epoch, model):
    if not os.path.exists(MODEL_DIR):
        os.makedirs(MODEL_DIR)
    model.save_weights(os.path.join(MODEL_DIR, 'gru_weights.{}.h5'.format(epoch)))

def load_weights(epoch, model):
    model.load_weights(os.path.join(MODEL_DIR, 'gru_weights.{}.h5'.format(epoch)))

def build_model_gru(batch_size, seq_len, vocab_size):
    model = Sequential()
    model.add(Embedding(vocab_size, 512, batch_input_shape=(batch_size, seq_len)))
    for i in range(3):
        model.add(GRU(256, return_sequences=True, stateful=True))
        model.add(Dropout(0.2))

    model.add(TimeDistributed(Dense(vocab_size))) 
    model.add(Activation('softmax'))
    return model

## GRU model training function

In [None]:
#### text has the data pressent in input.txt file, # of epochs is 100, after every 10 epochs we are saving the model
def train_gru(text, epochs=100, save_freq=10):

    # Sorting the characters in the text file and assigning index numbers to each character. 
    # char_to_idx will be a dictionary of unique characters as key and index number is the value pair of dictionary
    # To summarize we are comverting each character to a numerical index
    
    char_to_idx = { ch: i for (i, ch) in enumerate(sorted(list(set(text)))) }
    print("Number of unique characters: " + str(len(char_to_idx))) #86

    ## Saving the char_to_idx to a json file
    with open(os.path.join(DATA_DIR, 'char_to_idx.json'), 'w') as f:
        json.dump(char_to_idx, f)

    #3 Here we are creating index to character mapping, i.e. given an index we want to get the character for that index
    idx_to_char = { i: ch for (ch, i) in char_to_idx.items() }
    vocab_size = len(char_to_idx)

    ######################################
    ######### Model architecture #########
    ######################################
    
    model = build_model_gru(BATCH_SIZE, SEQ_LENGTH, vocab_size)
    model.summary()
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    
    ###################################
    ###### Train data generation ######
    ###################################
    
    #convert complete text into numerical indices
    T = np.asarray([char_to_idx[c] for c in text], dtype=np.int64)

    print("Length of text:" + str(T.size)) #129,665

    steps_per_epoch = (len(text) / BATCH_SIZE - 1) / SEQ_LENGTH  
    
    epc,losses, accs = [], [], []
    ### This for loop will run for 100 epochs
    for epoch in range(epochs):
        print('\nEpoch {}/{}'.format(epoch + 1, epochs))
        
        

        # For each epoch it will generate a batch of X , Y values. For each batch we will train the model. 
        for i, (X, Y) in enumerate(read_batches(T, vocab_size)):
            
            #print(X);
            
            ## Details about train_on_batch here: https://keras.io/models/sequential/
            loss, acc = model.train_on_batch(X, Y)
            print('Batch {}: loss = {}, acc = {}'.format(i + 1, loss, acc))
            epc.append(epoch + 1)
            losses.append(loss)
            accs.append(acc)
        
      
        # Saving the model after every 10 epochs
        if (epoch + 1) % save_freq == 0:
            save_weights(epoch + 1, model)
            print('Saved checkpoint to', 'gru_weights.{}.h5'.format(epoch + 1))
    
    df1 = pd.DataFrame(epc)
    df2 = pd.DataFrame(losses)
    df3 = pd.DataFrame(accs)
    frames = [df1,df2, df3]

    result = pd.concat(frames, axis =1)
    result.columns=['Epoch','Losses','Accuracy']

    grouped_multiple = result.groupby(['Epoch'], as_index=False).agg({'Losses': 'mean'
                                              ,'Accuracy':'mean'})
    
    return grouped_multiple


### Code for building a GRU model to generate sample sequence of characters

In [3]:
def build_model_seq_gen_gru(unique_chars):
    model = Sequential()
    model.add(Embedding(unique_chars, 512, batch_input_shape=(1, 1)))
    for i in range(3):
        model.add(GRU(256, return_sequences=(i != 2), stateful=True))
        model.add(Dropout(0.2))

    model.add(Dense(unique_chars))
    model.add(Activation('softmax'))
    return model

In [None]:
def sample_seq_generator_gru(epoch_num, character_index, seq_length):
    with open(os.path.join(DATA_DIR, 'char_to_idx.json')) as f:
        char_to_index = json.load(f)
    index_to_char = {i:ch for ch, i in char_to_index.items()}
    vocab_size = len(index_to_char)

    ##########################################################
    ######### Sequence generator model architecture  #########
    ##########################################################
    
    model = build_model_seq_gen_gru(vocab_size)
    model.load_weights(MODEL_DIR + 'gru_weights.{}.h5'.format(epoch_num))
     
    sequence_index = [character_index]
    
    for _ in range(seq_length):
        batch = np.zeros((1, 1))
        batch[0, 0] = sequence_index[-1]
        
        predicted_probs = model.predict(batch).reshape(-1)
        #print(predicted_probs)
        sample = np.random.choice(range(vocab_size), size = 1, p = predicted_probs)
        
        sequence_index.append(sample[0])
    
    seq = ''.join(index_to_char[c] for c in sequence_index)
    return seq

In [None]:
if __name__ == '__main__':
    epochs = 100
    freq = 10
    
    ### Calling the train function to read the data from input.txt file 
    Train_epoch_loss_acc_gru = train_gru(open(os.path.join(DATA_DIR, filename)).read(), epochs, freq)

In [None]:
plt.plot(Train_epoch_loss_acc_gru['Losses'])
plt.title('GRU Model train')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['train_loss'], loc='upper right')

In [None]:
epoch_number = int(input("Enter epoch number between 1 to 100 only multiples of 10: "))
character_index = int(input("Enter any number between 0 to 86 to sequence generation: "))
Sequence_length = int(input("Number of characters to generate: "))

Generated_music_sequence_gru = sample_seq_generator_gru(epoch_number, character_index, Sequence_length)

print("\n")

print(Generated_music_sequence_gru)

In [4]:
from tensorflow.keras.utils import plot_model
plot_model(build_model_seq_gen_gru, to_file='model.png')

InvocationException: GraphViz's executables not found