In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Input, Embedding, Dropout, Dense, LayerNormalization, MultiHeadAttention, GlobalAveragePooling1D, Bidirectional, LSTM, GRU

import pandas as pd
import numpy as np

from tqdm import tqdm

from sklearn.model_selection import train_test_split

In [None]:
EMBED_DIM = 300 #50, 100, 200, or 300 (see glove-6b data set)
MIN_WORD_OCCURENCE = None #use all words

NUM_HEADS = 8  # Number of attention heads
FF_DIM = 32  # Hidden layer size in feed forward network inside transformer
LAYER_UNITS = 64
DENSE_DROPOUT = 0.2

LR_TRANS = 0.0001
LR_LSTM = 0.0005
LR_GRU = 0.001

BATCH_SIZE = 64
MAX_EPOCHS = 200

CALLBACK = [keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=10,
                                                    restore_best_weights=True,
                                                    verbose=1)]

In [None]:
# detect and init the TPU
tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect()

# instantiate a distribution strategy
tpu_strategy = tf.distribute.experimental.TPUStrategy(tpu)

## Preprocessing

In [None]:
import string
import regex as re
import numpy as np
import pandas as pd

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from tokenizers import BertWordPieceTokenizer


def preprocess(text):
    # Remove integers
    text = re.sub(r'\d+', '', text)

    # remove newlines as \r and \n
    text = re.sub(r'\r', '', text)

    # Convert to lowercase
    text = text.lower()

    # Remove punctuation marks
    translator = str.maketrans('', '', string.punctuation)
    text = text.translate(translator)

    return text


def encode_text_and_labels(df, max_num_words, pre_or_post='post', subword=False):
    # create a tokenizer
    if subword:
        t = BertWordPieceTokenizer(
            clean_text=True,
            handle_chinese_chars=False,
            strip_accents=False,
            lowercase=True
        )

        t.train_from_iterator(df['text'])
        vocab_size = t.get_vocab_size()
        # integer encode the documents
        encoded_list = t.encode_batch(df['text'])
        encoded_docs = [x.ids for x in encoded_list]
        # pad documents to be as long as the longest sequence in the dataset
        max_length = max([len(x) for x in encoded_docs])
    else:
        t = Tokenizer(num_words=max_num_words, oov_token='<unk>')
        t.fit_on_texts(df['text'])
        vocab_size = len(t.word_index) + 1
        # integer encode the documents
        encoded_docs = t.texts_to_sequences(df['text'])
        # pad documents to be as long as the longest sequence in the dataset
        max_length = df['text'].apply(lambda x: len(x.split(' '))).max()

    padded_docs = pad_sequences(encoded_docs, maxlen=max_length, padding=pre_or_post)

    # integer encode
    label_encoder = LabelEncoder()
    integer_encoded = label_encoder.fit_transform(df['artist'])
    # binary encode
    onehot_encoded = to_categorical(integer_encoded)
    return padded_docs, onehot_encoded, vocab_size, max_length, t


def load_and_preprocess_data(path, max_num_words=None, pre_or_post='post', subword=False):
    """
    Load the data and preprocess it
    :param path: path to the data
    :return: preprocessed data in the form of a pandas dataframe. The first item returned is the data,
    the second is the labels, the third is the vocabulary size, and the fourth is the maximum length of a sequence
    """
    df = pd.read_csv(path)

    df = df.groupby('artist').filter(lambda x: len(x) > 100)

    df['text'] = df['text'].apply(preprocess)

    # Identify the rows that contain duplicated text in the 'song' column
    no_covers = ~df['song'].duplicated()

    # Filter the DataFrame to include only the rows with unique text
    df = df[no_covers]

    # prepare text data for a recurrent network
    return encode_text_and_labels(df, max_num_words, pre_or_post, subword)

In [None]:
path = "/kaggle/input/spotify-million-song-dataset/spotify_millsongdata.csv"
padded_docs, artists_onehot_encoded, vocab_size, max_length, token = load_and_preprocess_data(path)

In [None]:
embedding_vector = {}
f = open(f'/kaggle/input/glove-6b/glove.6B.{EMBED_DIM}d.txt') 
for line in tqdm(f):
    vector = line.split(' ')
    word = vector[0]
    coef = np.asarray(vector[1:],dtype = 'float32')
    embedding_vector[word]=coef
f.close()
# print('Number of words found ',len(embedding_vector))

embedding_matrix = np.zeros((vocab_size, EMBED_DIM))
for word,i in tqdm(token.word_index.items()):
    embedding_vectors = embedding_vector.get(word)
    if embedding_vectors is not None:
        embedding_matrix[i] = embedding_vector[word]

In [None]:
# Here use entire training set instead of validation set
X_train, X_test, y_train, y_test = train_test_split(
    padded_docs, artists_onehot_encoded, 
    stratify=artists_onehot_encoded, 
    test_size=0.2, random_state=42)

# get validation set, which is 8% of entire data set
X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, stratify=y_train,
    test_size=0.1, random_state=42) 

In [None]:
# from https://keras.io/examples/nlp/text_classification_with_transformer/
# original source https://web.stanford.edu/~jurafsky/slp3/ed3book.pdf
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.att = MultiHeadAttention(num_heads=NUM_HEADS, key_dim=EMBED_DIM)
        self.ffn = keras.Sequential(
            [Dense(FF_DIM, activation="relu"), Dense(EMBED_DIM)]
        )
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(DENSE_DROPOUT)
        self.dropout2 = Dropout(DENSE_DROPOUT)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

In [None]:
def transformer_model():
    with tpu_strategy.scope():
        inputs = Input(shape=(max_length,))
        embedding_layer = Embedding(vocab_size, EMBED_DIM, input_length=max_length, 
                                 weights = [embedding_matrix], trainable = False)  

        x = embedding_layer(inputs)
        transformer_block = TransformerBlock()

        x = transformer_block(x)
        x = GlobalAveragePooling1D()(x)
        x = Dropout(DENSE_DROPOUT)(x)

        outputs = Dense(artists_onehot_encoded.shape[1], activation="softmax")(x)

        transformer = keras.Model(inputs=inputs, outputs=outputs)

        optimizer = keras.optimizers.Adam(learning_rate=LR_TRANS)
        transformer.compile(
            optimizer=optimizer, loss="categorical_crossentropy", 
            metrics=["accuracy"], steps_per_execution=32,
        )

        return transformer

In [None]:
def lstm_model():
    # instantiating the model in the strategy scope creates the model on the TPU
    with tpu_strategy.scope():
    
        # set the input, embedding matrix uses the glove datasets
        inputs = Input(shape=(max_length,))
        embedding_layer = Embedding(vocab_size, EMBED_DIM, input_length=max_length, 
                            weights = [embedding_matrix], trainable = False)    
        x = embedding_layer(inputs)

        # three LSTM layers with dropout
        x = Bidirectional(LSTM(LAYER_UNITS, return_sequences=True))(x)
        x = Dropout(DENSE_DROPOUT)(x)    
        x = Bidirectional(LSTM(LAYER_UNITS, return_sequences=True))(x)
        x = Dropout(DENSE_DROPOUT)(x)    
        x = Bidirectional(LSTM(LAYER_UNITS))(x)
        x = Dropout(DENSE_DROPOUT)(x)

        outputs = Dense(artists_onehot_encoded.shape[1], activation="softmax")(x)

        lstm = keras.Model(inputs=inputs, outputs=outputs)

        optimizer = keras.optimizers.Adam(learning_rate=LR_LSTM)
        lstm.compile(
            optimizer=optimizer, 
            loss="categorical_crossentropy", 
            metrics=["accuracy"],
            steps_per_execution=32,
        )
        return lstm

In [None]:
def gru_model():
    # instantiating the model in the strategy scope creates the model on the TPU
    with tpu_strategy.scope():
        # set the input, embedding matrix uses the glove datasets
        inputs = Input(shape=(max_length,))
        embedding_layer = Embedding(vocab_size, EMBED_DIM, input_length=max_length, 
                            weights = [embedding_matrix], trainable = False)    
        x = embedding_layer(inputs)

        # three GRU layers with dropout
        x = Bidirectional(GRU(LAYER_UNITS, return_sequences=True))(x)
        x = Dropout(DENSE_DROPOUT)(x)    
        x = Bidirectional(GRU(LAYER_UNITS, return_sequences=True))(x)
        x = Dropout(DENSE_DROPOUT)(x)    
        x = Bidirectional(GRU(LAYER_UNITS))(x)
        x = Dropout(DENSE_DROPOUT)(x)

        outputs = Dense(artists_onehot_encoded.shape[1], activation="softmax")(x)

        gru = keras.Model(inputs=inputs, outputs=outputs)

        optimizer = keras.optimizers.Adam(learning_rate=LR_GRU)

        gru.compile(
            optimizer=optimizer, 
            loss="categorical_crossentropy", 
            metrics=["accuracy"],
            steps_per_execution=32,
        )

        return gru

In [None]:
lstm = lstm_model()

lstm.summary()

In [None]:
lstm_history = lstm.fit(
    X_train,
    y_train,
    validation_data=(X_val, y_val),
    epochs = MAX_EPOCHS,
    batch_size = BATCH_SIZE,
    callbacks = CALLBACK,
    use_multiprocessing = True
)

In [None]:
gru = gru_model()
gru.summary()

In [None]:
gru_history = gru.fit(
    X_train,
    y_train,
    validation_data=(X_val, y_val),
    epochs = MAX_EPOCHS,
    batch_size = BATCH_SIZE,
    callbacks = CALLBACK,
    use_multiprocessing = True
)

In [None]:
transformer = transformer_model()
transformer.summary()

In [None]:
transformer.fit(
    X_train,
    y_train,
    validation_data=(X_val, y_val),
    epochs=MAX_EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=CALLBACK,
    use_multiprocessing=True,
)

In [18]:
# Evaluate the model on the test data using `evaluate`
print("Evaluate on test data")
lstm_results = lstm.evaluate(X_test, y_test)
print("test loss, test acc:", lstm_results)

Evaluate on test data
test loss, test acc: [4.91952657699585, 0.07445956766605377]


In [19]:
# Evaluate the model on the test data using `evaluate`
print("Evaluate on test data")
gru_results = gru.evaluate(X_test, y_test)
print("test loss, test acc:", gru_results)

Evaluate on test data
test loss, test acc: [4.8601226806640625, 0.08038430660963058]


In [20]:
# Evaluate the model on the test data using `evaluate`
print("Evaluate on test data")
transformer_results = transformer.evaluate(X_test, y_test)
print("test loss, test acc:", transformer_results)

Evaluate on test data
test loss, test acc: [4.71074914932251, 0.12425940483808517]


In [21]:
lstm_pred = np.argmax(lstm.predict(X_test), axis=1)
gru_pred = np.argmax(gru.predict(X_test), axis=1)
transformer_pred = np.argmax(transformer.predict(X_test), axis=1)

In [22]:
y_test = np.argmax(y_test, axis=1)

In [23]:
y_test

array([168, 211, 112, ..., 147, 180,  18])

In [None]:
# load pre-trained transformer results
ptt_pred = pd.read_csv("/kaggle/input/pre-trained-transformer/submission.csv")['prediction']

In [90]:
def mcnemar(prediction1, prediction2, y_test):# McNemar test to compare the results of the two models
    CC = 0
    CF = 0
    FC = 0
    FF = 0

    for i in range(y_test.shape[0]):
#         print(prediction1[i], prediction2[i], y_test[i])
        if ((prediction1[i] == prediction2[i]) and (prediction1[i] == y_test[i])):
            CC += 1
        elif ((prediction1[i] != prediction2[i]) and (prediction1[i] == y_test[i])):
            CF += 1
        elif ((prediction1[i] != prediction2[i]) and (prediction2[i] == y_test[i])):
            FC += 1
        else:
            FF +=1
            
    print(f"CC: {CC}, CF: {CF}, FC: {FC}, FF: {FF}")

    mcNemar = ((CF - FC)*(CF - FC))/(CF + FC)
    print(mcNemar)

In [93]:
mcnemar(lstm_pred, gru_pred, y_test) #corresponding p-value: 0,01837

187 254 310 5494
5.560283687943262


In [94]:
mcnemar(lstm_pred, transformer_pred, y_test) #corresponding p-value: 3.125e-23

181 260 541 5263
98.57802746566792


In [95]:
mcnemar(transformer_pred, gru_pred, y_test) #corresponding p-value: 6.925e-16

221 501 276 5247
65.15444015444015
