# Homemade chat GPT (from IMDb_dataset)

In [1]:
# Load the modules

import json
import numpy as np
from nltk import word_tokenize, download
import tensorflow as tf

from keras.models import Sequential
from keras.layers import Embedding, LSTM, Dense, Dropout
from keras.callbacks import EarlyStopping, ModelCheckpoint

download('punkt')


2024-04-15 21:19:47.691987: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-15 21:19:47.695412: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-15 21:19:47.741482: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[nltk_data] Downloading package punkt to /home/valentin/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [2]:
# Define utility functions


def get_token_counts(data: list) -> dict:
    '''
    Create vocabulary from a bunch of (tokenized) texts. 

    Args:
        - data (list): list of tokenized texts
    
    Returns:
        - token count (dict)
    '''
    dict_token = {}    
    
    for utterance in data:
        for token in utterance:
            dict_token[token] = 1 if token not in dict_token else dict_token[token] + 1

    return dict(sorted(dict_token.items(), key=lambda x: x[1], reverse = True))


def encode_sequence(tokens: list[str], vocab: dict) -> list[int]:
    '''
    Encode a list of tokens into a list of integers using a vocabulary.
    '''
    
    return [vocab[token] if token in vocab else 0 for token in tokens]

## Preparing data

In [3]:
# Define a function to clean the tokens

def clean_tokens(buf: list[str]) -> list[str]:
    '''
    Clean the input list of tokens by removing punctuation marks, special characters,
    periods at the end of words, and splitting tokens at '/'.

    Args:
        - buf (list): list of tokens
    
    Returns:
        - list of cleaned tokens
    '''
    # Define a set of characters to ignore
    ignore = ['br', '']

    # One letter tokens accepted
    one_letter_tokens = ['a', 'i', '.', ',', '!', '?', ';', ':', '(', ')']
    
    # Filter out tokens containing only punctuation marks, special characters,
    # and remove periods at the end of words
    cleaned_tokens = []
    for token in buf:
        if token in ignore:
            continue
        elif len(token) == 1:
            if token in one_letter_tokens:
                cleaned_tokens.append(token)
        elif '/' in token:
            # Split token at '/'
            separated_words = token.split('/')
            buf.extend([word for word in separated_words if word != ''])
        elif '.' in token:
            # Split token at '.'
            separated_words = token.split('.')
            buf.extend([word for word in separated_words if word != ''])
        elif '\'' in token:
            # Split token at "\'"
            separated_words = token.split('\'')
            buf.extend([word for word in separated_words if word != ''])
        else:
            if token.endswith('.') or token.endswith('-') or token.endswith("_") or token.endswith("`"):
                token = token[:-1]  # Remove the period at the end
            if token.startswith(".") or token.startswith('-') or token.startswith("_") or token.startswith("`"):
                token = token[1:]
            if token != '':
                cleaned_tokens.append(token)
    
    # Convert tokens to lowercase
    cleaned_tokens = [token.lower() for token in cleaned_tokens]

    return cleaned_tokens


In [4]:
# Load the dataset
with open('./IMDb_dataset.json', 'rt') as f:
    imdb_data = json.load(f)

# Create a list with all sentences, separated by positive and negative
print("Starting tokenization for positive texts")
pos_texts = [clean_tokens(word_tokenize(x[1])) for x in imdb_data if x[0] == 'pos']


print("Starting tokenization for negative texts")
neg_texts = [clean_tokens(word_tokenize(x[1])) for x in imdb_data if x[0] == 'neg']


all_texts = pos_texts + neg_texts

Starting tokenization for positive texts
Starting tokenization for negative texts


In [5]:
# Create the vocabulary (we do the same for positive and negative texts to avoid having different vocabularies for each class)

# We count the occurrences of each token in the texts
dict_occurences = get_token_counts(all_texts)


MINOCC = 50 # PARAM : The number of occurence of a word to be considered as in the vocabulary

int_to_word = []
vocab = {}

int_id = 1
vocab['<unk>'] = 0
int_to_word.append('<unk>')
for token in dict_occurences:
    if dict_occurences[token] >= MINOCC:
        vocab[token] = int_id
        int_to_word.append(token)
        int_id += 1

print("Vocabulary size: ", len(vocab))

Vocabulary size:  7149


In [6]:
encoded_pos_texts = [encode_sequence(x, vocab) for x in pos_texts]
encoded_neg_texts = [encode_sequence(x, vocab) for x in neg_texts]
encoded_all_texts = encoded_pos_texts + encoded_neg_texts

print("All texts have been tokenized and encoded")

All texts have been tokenized and encoded


In [7]:

input_length = 5     # Define length of history


def create_x_y(encoded_texts, nb_sample_max=None, step=1):
    X = []
    Y = []
    point_indice = vocab['.']

    if nb_sample_max is None:
        for text in encoded_texts:
            for i in range(0, len(text) - input_length, step):
                if text[i+input_length] != 0 and text[i+input_length] != len(vocab):
                    X.append(text[i:i+input_length])
                    Y.append(text[i+input_length])
            X.append(text[-input_length:])
            Y.append(point_indice)

    else:
        for text in encoded_texts:
            if nb_sample_max <= 0:
                break
            for i in range(0, len(text) - input_length, step):
                if text[i+input_length] != 0 and text[i+input_length] != len(vocab):
                    nb_sample_max -= 1
                    X.append(text[i:i+input_length])
                    Y.append(text[i+input_length])
            X.append(text[-input_length:])
            Y.append(point_indice)
    
    return X, Y

X_good, Y_good = create_x_y(encoded_pos_texts, nb_sample_max=500000, step=4)
X_bad, Y_bad = create_x_y(encoded_neg_texts, nb_sample_max=500000, step=4)
X_total = X_good + X_bad
Y_total = Y_good + Y_bad


print('Number of sequences for training for good :', len(X_good))
print('Number of sequences for training for bad :', len(X_bad))
print('Number of sequences for training total :', len(X_total))

Number of sequences for training for good : 508559
Number of sequences for training for bad : 508713
Number of sequences for training total : 1017272


## Creation of the RNN model

In [8]:
# Define the model

vocab_size = len(vocab)
embedding_size = 100 # dimension of the input embeddings  # TODO : play with this (300)
lstm_size = 100 # dimension of the RNN state

model = Sequential()
model.add(Embedding(vocab_size, embedding_size, input_shape = (input_length,)))
model.add(LSTM(lstm_size))
model.add(Dropout(0.1))
model.add(Dense(vocab_size, activation = 'softmax'))

model.compile(loss = 'sparse_categorical_crossentropy', optimizer = 'adam')

# print(model.summary())



  super().__init__(**kwargs)


In [9]:
# Train the model


epochs = 1  # PARAM : The number of time we pass over the data
batch_size = 128
val_split = 0.2

stop = EarlyStopping(monitor = 'val_loss', min_delta = 0, patience = 5, verbose = 1, mode = 'auto')
save = ModelCheckpoint('save/saved_model.keras', monitor = 'val_loss', verbose = 0, save_best_only = True)

X_bad = np.array(X_bad)
Y_bad = np.array(Y_bad)

X_good = np.array(X_good)
Y_good = np.array(Y_good)

X_total = np.array(X_total)
Y_total = np.array(Y_total)

# Train the model
model.fit(X_bad, Y_bad, batch_size = batch_size, epochs = epochs, verbose = 1, validation_split = val_split, callbacks = [stop, save])

model.save('bad_model_1_epoch.keras')

[1m3180/3180[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m72s[0m 22ms/step - loss: 6.2477 - val_loss: 5.5685


In [10]:
# Functions to predict next token

def predict(model, h: list[int], mode = 'best', true_i = None):
    '''
    Return a predicted token given the history and the model. Said more simply, predict p[.|h]
    with the model and take the best guess or a random guess (depending on mode).
    
    Returns predicted token with the corresponding probability, optionnally returning the activation 
    prob of the true token if true_i is provided
    '''
    h_array = np.array(h).reshape(1, -1)
    
    probs = model.predict(h_array, verbose=0)[0]
    
    if mode == 'best':
        pred_token_id = np.argmax(probs)
        pred_prob = probs[pred_token_id]

    else:
        pred_token_id = np.random.choice(len(probs), p=probs)
        pred_prob = probs[pred_token_id]
    
    true_prob = probs[true_i] if true_i is not None else 0
    
    return pred_token_id, pred_prob, true_prob

## Natural language generation

In [11]:
# Function to generate text

MAX_SENTENCE_SIZE = 100  # The maximum number of token generated

def generate(prompt, model, mode):
    '''
    Generate text starting from the prompt.
    '''
    if mode == 'mix':
        mode_mix = True
        mode = 'best'
    else:
        mode_mix = False

    prompt_list = prompt.split()
    encoded_text = []
    text = [clean_tokens(word_tokenize(word)) for word in prompt_list[-5:]]

    for word in text:
        if word[0] not in vocab:
            encoded_text.append(0)
        else:
            encoded_text.append(vocab[word[0]])


    iteration = 0
    while iteration < MAX_SENTENCE_SIZE:
        prediction = predict(model, encoded_text[-5:], mode)
        encoded_text.append(prediction[0])
        if prediction[0] == vocab['.']:
            break
        iteration += 1
        if mode_mix:
            mode = 'random' if mode == 'best' else 'best'
    
    # Decode the text
    for number in encoded_text:
        print(int_to_word[number], end=' ')



In [16]:
# It's here we can finally play

model = tf.keras.models.load_model('bad_model_1_epoch.keras')

# Different modes : 'best', 'random', 'mix'
mode = 'best'

# The prompt you want to start with
prompt = "Hello"

generate(prompt, model, mode)


hello of the movie . 