# Recurrent neural networks for NLP

In [1]:
import pandas as pd
import numpy as np

## Dataset retrieval
The file cards_nlp.csv must be available in the given path. Run the MTG NLP - 0 if not present.

Get all cards and also lets load the Gemsin embedding for MTG.

In [2]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

if(IN_COLAB):
    import gdown
    output_file = './mtgjson_dataset/cards_nlp.csv'
    file_url = 'https://drive.google.com/file/d/1j2e1Va8Tt6bccRUdXEahsJMZD2J1o7ds/view?usp=drive_link'
    gdown.download(url = file_url, output = output_file, fuzzy=True)

data = pd.read_csv('./mtgjson_dataset/cards_nlp.csv')
print('There are',data.shape[0],'unique cards/documents')
data.head(5)

There are 28941 unique cards/documents


Unnamed: 0.1,Unnamed: 0,artist,colorIdentity,colors,convertedManaCost,edhrecRank,edhrecSaltiness,flavorText,keywords,legalities,...,power,rarity,rulings,setCode,subtypes,supertypes,text,toughness,type,types
0,0,Lius Lasahido,U,U,7.0,9173.0,0.44,He answers questions as readily as he asks the...,"['Flying', 'Hexproof']","{'commander': 'Legal', 'duel': 'Legal', 'legac...",...,5,mythic,"[{'date': '2016-01-22', 'text': 'A spell that ...",POGW,Sphinx,,"This spell can't be countered.\nFlying, hexpro...",5,Creature — Sphinx,Creature
1,1,Jesper Myrfors,R,R,3.0,3003.0,0.3,"To become king of the Goblins, one must assass...",['None'],"{'commander': 'Legal', 'duel': 'Legal', 'legac...",...,2,rare,"[{'date': '2005-08-01', 'text': 'Goblin King n...",3ED,Goblin,,Other Goblins get +1/+1 and have mountainwalk.,2,Creature — Goblin,Creature
2,2,Drew Baker,G,G,1.0,10754.0,0.0,,['Morbid'],"{'commander': 'Legal', 'duel': 'Legal', 'legac...",...,0,common,"[{'date': '2011-09-22', 'text': 'You can choos...",ISD,,,"Search your library for a basic land card, rev...",0,Sorcery,Sorcery
3,3,Jason Chan,RG,RG,5.0,21203.0,0.0,"""May the earth rise up to meet you.""",['Cycling'],"{'commander': 'Legal', 'duel': 'Legal', 'legac...",...,3,common,"[{'date': '2008-10-01', 'text': 'Cycling is an...",ARB,Minotaur,,"When this_card enters the battlefield, it deal...",4,Creature — Minotaur,Creature
4,4,Scott M. Fischer,W,W,3.0,1921.0,1.72,The law is meant to ensure that people kill ea...,['None'],"{'brawl': 'Legal', 'commander': 'Legal', 'duel...",...,0,uncommon,"[{'date': '2019-07-12', 'text': 'If you cast a...",M20,,,Each player can't cast more than one spell eac...,0,Enchantment,Enchantment


In [3]:
from gensim.models import Word2Vec

if(IN_COLAB):
    import gdown
    output_file = './mtgjson_dataset/mtg_skipgram.pkl'
    file_url = 'https://drive.google.com/file/d/1k5Vv-ikNcf8GMB_DoSLnZjEar1K2E3Cn/view?usp=drive_link'
    gdown.download(url = file_url, output = output_file, fuzzy=True)

mtg_skipgram = Word2Vec.load('./mtgjson_dataset/mtg_skipgram.pkl')

## Preprocessing
Lets start by seeking the maximum length for the input sequence. The vanilla dataset longest is an special token that's not legal. After dropping the types _Dungeon_ and _Card_, the longest playable test from a card (as of june 2024) is 113 words from Master of the Hunt (the official text from that card explains the _Band_ keyword which is famously complicated.)

In [4]:
print("Longest sequence", data.text.str.split("\\s+").str.len().max())
print("Index of longest sequence", data.text.str.split("\\s+").str.len().idxmax())

Longest sequence 264
Index of longest sequence 27694


In [5]:
## Cards, Dungeon types are dropped as they are not playable.
max_sequence_length = data.loc[(data.type != 'Card') & (data.type != 'Dungeon')].text.str.split("\\s+").str.len().max()

print("Longest sequence of a playable card is",max_sequence_length)
data.loc[(data.type != 'Card') & (data.type != 'Dungeon')].text.str.split("\\s+").str.len().idxmax()
data.loc[27014,'name']

Longest sequence of a playable card is 113


'Master of the Hunt'

### Vectorizer
The TextVectorization layer from Keras can handle the tokenization and also can be used in the model pipeline after an Input layer (thus accepting tensors and the model being able to accept string inputs). It can be initialized with a fixed vocabulary. In this case, it will use the previous Gensim vocabulary.

In [6]:
from keras.layers import TextVectorization
import tensorflow.strings as tf_strings

# Get a custom standarizer that does roughly the same as the default 'strip punctuation and lower' but with a slightly different regex.
def standarizer(input):
    output = tf_strings.regex_replace(input, r'[!"#$%&()\*,\._:;<=>?@\[\\\]^`~\t\n]', ' ')
    output = tf_strings.lower(output)
    return output

vectorizer = TextVectorization(output_sequence_length=int(max_sequence_length),
                               standardize=standarizer,
                               # Initialize the Vectorizer state from the Gensim embedding vocabulary
                               vocabulary=mtg_skipgram.wv.index_to_key)

def sequence_to_text(sequence,vectorizer):
    string = ''
    try:
        iter(sequence)
    except:
        return vectorizer.get_vocabulary()[sequence]
    else:
        for index in sequence:
            w = vectorizer.get_vocabulary()[index]
            string+= w
            if(w!=''): string+=" "
        return string.strip()

vectorizer.get_vocabulary()[:5]

['', '[UNK]', 'this', 'card', 'a']

Note how the layer already set up two special indexes: the 0 for empty (used in padding sequences) and 1 for _unkeyed_, for when words are not found in the vocab. This, of course, expands the vocabulary by 2.

In [7]:
total_tokens = len(vectorizer.get_vocabulary())
embedding_dim = mtg_skipgram.wv.vector_size
print("Vocabulary size:", total_tokens)

Vocabulary size: 2195


### Embedding mapping
As the model will use a pretrained Embedding, it is required to map the newly created tokens to their actual vectors in that Embedding.  Since the tokenizer has been initialized with the same vocabulary as the Gensim embedding, the indexes will be the same but with addition of the empty and unkeyed arrays.

In [8]:
from keras.layers import Embedding

# Create two rows of embedding_dim size and then add to it the Gensim raw numpy vectors.
embedding_matrix = np.zeros((2,embedding_dim))
embedding_matrix = np.append(embedding_matrix,mtg_skipgram.wv.vectors,axis=0)

# Keras embedding layer initialized to embedding matrix.
embedding_layer = Embedding(
    total_tokens,
    embedding_dim,
    trainable=False,
)
embedding_layer.build((1,))
embedding_layer.set_weights([embedding_matrix])
print("Embedding input dimension is",embedding_layer.input_dim)
print("Embedding output dimension is",embedding_layer.output_dim)

Embedding input dimension is 2195
Embedding output dimension is 300


### Train/test split
Setting the learning problema as many-to-one; that is, the network will try to predict the next word given a sequence.

In [9]:
from sklearn.model_selection import train_test_split
X_train, X_test, _, _ = train_test_split(data.text,
                                         data.text,
                                         test_size=0.2,
                                         random_state=42)

def prepare_training_target_sequence(x):
    # Obtain an array of the last word index for target labels,
    # and set that same word as 0 for the input set.

    # Get that last non-zero element by cumsum and then the index of the
    # first max (since the sum won't get higher after the last non-zero).
    # Finally get the elements by ways of indexing.
    y = x[np.arange(x.shape[0]),(x!=0).cumsum(1).argmax(1)]
    x[np.arange(x.shape[0]),(x!=0).cumsum(1).argmax(1)] = 0
    return x,y

x,y = prepare_training_target_sequence(vectorizer(X_train).numpy())
x

array([[ 43,  28,   2, ...,   0,   0,   0],
       [515,   6, 646, ...,   0,   0,   0],
       [ 45,   6,  66, ...,   0,   0,   0],
       ...,
       [ 32,  25,   8, ...,   0,   0,   0],
       [  1,  55,  81, ...,   0,   0,   0],
       [ 45,   6,  66, ...,   0,   0,   0]], dtype=int64)

In [10]:
print(sequence_to_text(x[250],vectorizer))
print(sequence_to_text(y[250],vectorizer))

when this card dies each player loses 3
life


## Model

In [11]:
from tensorflow import keras
from tensorflow import data
from keras import layers
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Dense, LSTM, Embedding, Dropout
from keras.losses import SparseCategoricalCrossentropy

### Long-short term memory units (LSTM)

In [12]:
model = Sequential()
model.add(keras.Input(shape=(max_sequence_length,)))
model.add(embedding_layer)
model.add(LSTM(64, return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(64))
model.add(Dense(32, activation='relu'))
model.add(Dense(total_tokens, activation='softmax'))
# Using SparseCategorical as the softmax functions returns the index of the most probable word instead of
# OneHot encoding of probabilities.
model.compile(loss=SparseCategoricalCrossentropy(), optimizer='adam', metrics='accuracy')

model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       (None, 113, 300)          658500    
                                                                 
 lstm (LSTM)                 (None, 113, 64)           93440     
                                                                 
 dropout (Dropout)           (None, 113, 64)           0         
                                                                 
 lstm_1 (LSTM)               (None, 64)                33024     
                                                                 
 dense (Dense)               (None, 32)                2080      
                                                                 
 dense_1 (Dense)             (None, 2195)              72435     
                                                                 
Total params: 859,479
Trainable params: 200,979
Non-trai

In [13]:
import tensorflow
print("Num GPUs Available: ", len(tensorflow.config.list_physical_devices('GPU')))

Num GPUs Available:  1


### Perplexity metric
Any deep (supervised) learning model can be generalized as an estimation of the conditional probability distribution of a given experiment: $$ p_{model} = \hat{P}(y|x_1,x_2...x_n)$$ which is obtained by ways of minimizing the Kullback–Leibler (KL) divergence $$ D_{KL}(p_{data}||p_{model}) = \mathbb{E}_{x \sim p_{data}} \left[\log \frac{p_{data}}{p_{model}}\right] $$
which measures, in short, how different two distributions are. It is equivalent to minimizing the cross-entropy or maximizing the likelihood function. The density $p_{model}$ will vary according to the model, the most common ones being Gaussian distributions (regression), Bernoulli distributions (logistic regressions) and Multinoulli distributions (multinomial).

Unlike Naive Bayes, which assumed independancy between samples allowing $ p_{model} = \hat{P}(y|x_1,x_2...x_n) = \prod_{n} P(y|x_i)$, recurrent neural networks assume the order of the samples matter and are not independent. A RNN models the probability distribution of a certain term appearing inmediately after the samples $$ p_{model} = \hat{P}(x_{i+1}|x_1,x_2...x_i)$$

Then, the probability $P(\vec w)$ of a certain sentence made of samples $ \vec w = [w_1,w_2,w_3,...,w_n] $ becomes: $$ P(\vec w) = P(w_1)P(w_2|w_1)P(w_3|w_1,w_2)...P(w_n|w_1,w_2,...w_{n-1})$$
As stated, each term is the output of an RNN (with softmax output) when fed the corresponding sequence. **A good metric for such a model is how likely it is to produce sentences in the validation set**. The perplexity is such a metric, given by the reciprocal of the normalized probability. Taking logs for easier computation, the perplexity expression is $$PPL(\vec w) = \exp\left[-\frac{1}{t}\sum_{i}^t \log P(w_i|w_{<i})\right]$$

In [14]:
x_test,y_test = prepare_training_target_sequence(vectorizer(X_test).numpy())

In [15]:
from tensorflow.keras.utils import pad_sequences

class PplCallback(keras.callbacks.Callback):

    def __init__(self, val_data, max_sequence_length):
        self.val_data = val_data
        self.target = []
        self.padded = []
        count = 0
        self.info = []
        for seq in val_data:
            seq = np.trim_zeros(seq)
            len_seq = len(seq)
            subseq = [seq[:i] for i in range(len_seq)]
            self.target.extend([seq[i] for i in range(len_seq)])
            if len(subseq)!=0:
                self.padded.append(pad_sequences(subseq, maxlen=max_sequence_length, padding='pre'))
                self.info.append((count,count+len_seq))
                count += len_seq
        self.padded = np.vstack(self.padded)

    def on_epoch_end(self, epoch, logs=None):
        scores = []
        predictions = self.model.predict(self.padded,verbose=0)
        for start,end in self.info:
          probs = [predictions[idx_seq,idx_vocab] for idx_seq, idx_vocab in zip(range(start,end),self.target[start:end])]
          scores.append(np.exp(-np.sum(np.log(probs))/(end-start)))
        print(f'\n mean perplexity: {np.mean(scores)} \n')


### Training
The perplexity metric above proved too expensive, so I am training with a simple accuracy score.

In [26]:
hist = model.fit(x, y, epochs=100, batch_size=32)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

### Save model

In [27]:
model.save_weights('./mtgjson_dataset/models/mtg_lstm.weights.h5')

In [28]:
model.load_weights('./mtgjson_dataset/models/mtg_lstm.weights.h5')

### Simple sequence generation
With this method, simply concatenate a single output to the input sequence and get another prediction, until satisfied. This method is easy, but deterministic and tends to loop if a certain set of words have considerable higher probability that others.

To avoid that, Beam Search con be used (see below)

In [29]:
def generate_sequence(input,size,model,vectorizer):
    
    seq = input
    for i in range(size):
        preds = model.predict(tensorflow.reshape(vectorizer(seq),(1,-1)), verbose=False)
        seq += " " +  np.array(vectorizer.get_vocabulary())[np.argmax(preds,axis=1)][0]
    return seq

"Target land gets" is a simple enough start to try things. Results is not bad.

In [31]:
generate_sequence("target land gets", 7 , model,vectorizer)

'target land gets +3/+3 trample lifelink haste trample indestructible haste'

Here is an instance of the above mentioned. Keywords like _trample_, _flying_, _lifelink_ are entirely too common and this kind of generations tend to get stuck.

In [41]:
generate_sequence("whenever a creature", 7 , model,vectorizer)

'whenever a creature token trample reach trample trample trample lifelink'

In [57]:
generate_sequence("if an effect", 2 , model,vectorizer)

'if an effect haste trample'

## Beam search

In [23]:
from scipy.special import softmax

def select_candidates(pred,num_beams,vocab_size,history_probs,history_tokens,temp=1,mode='det'):

  pred_large = []

  for idx,pp in enumerate(pred):
    pred_large.extend(np.log(pp+1E-10)+history_probs[idx])

  pred_large = np.array(pred_large)

 
  if mode == 'det':
    idx_select = np.argsort(pred_large)[::-1][:num_beams]
  elif mode == 'sto':
    idx_select = np.random.choice(np.arange(pred_large.shape[0]), num_beams, p=softmax(pred_large/temp))
  else:
    raise ValueError(f'Wrong selection mode. {mode} was given. det and sto are supported.')
  new_history_tokens = np.concatenate((np.array(history_tokens)[idx_select//vocab_size],
                        np.array([idx_select%vocab_size]).T),
                      axis=1)

  return pred_large[idx_select.astype(int)], new_history_tokens.astype(int)


def beam_search(model,num_beams,num_words,input, encode, temp = 1, mode = 'det'):
    encoded = encode(input)
    y_hat = np.squeeze(model.predict(encoded,verbose=False))
    vocab_size = y_hat.shape[0]
    history_probs = [0]*num_beams
    history_tokens = [encoded[0]]*num_beams
    history_probs, history_tokens = select_candidates([y_hat],
                                        num_beams,
                                        vocab_size,
                                        history_probs,
                                        history_tokens,
                                        temp=temp,
                                        mode=mode)

    for i in range(num_words-1):
      preds = []
      for hist in history_tokens:
        input_update = np.array([hist[i+1:]]).copy()
        y_hat = np.squeeze(model.predict(input_update,verbose=False))

        preds.append(y_hat)

      history_probs, history_tokens = select_candidates(preds,
                                                        num_beams,
                                                        vocab_size,
                                                        history_probs,
                                                        history_tokens,
                                                        temp=temp,
                                                        mode=mode)

    return history_tokens

### Results
Non-deterministic beam seach will output a different result each time. Increasing the temperature argument avoids the loop problem but the resulting sequence will be increasingly non-sensical.

In [52]:
def encode(input):
    return tensorflow.reshape(vectorizer(input),(1,-1))
                       
out = beam_search(model,num_beams=5,num_words=6,input="whenever a creature",mode='sto',encode=encode, temp=1)
sequence_to_text(out[0],vectorizer)

'whenever a creature token card auras flying flying flying'

In [51]:
out = beam_search(model,num_beams=5,num_words=6,input="whenever a creature",mode='sto',encode=encode, temp=10)
sequence_to_text(out[0],vectorizer)

"whenever a creature {4}{w}{u} crewed centaur hydra's {r}{r} −3"

In [53]:
out = beam_search(model,num_beams=5,num_words=5,input="whenever a creature",mode='sto',encode=encode, temp=25)
sequence_to_text(out[0],vectorizer)

'whenever a creature {3}{b} modifications nearest itself {x}'

In [54]:
out = beam_search(model,num_beams=5,num_words=5,input="whenever a creature",mode='sto',encode=encode, temp=50)
sequence_to_text(out[0],vectorizer)

'whenever a creature reduced do kaldra chroma zombie'

In [59]:
out = beam_search(model,num_beams=5,num_words=5,input="if an effect",mode='sto',encode=encode, temp=50)
sequence_to_text(out[0],vectorizer)

'if an effect won −10 saproling: bloodthirst beginning'