# Experiment IV : Pre-trained models: ChatGPT

**Author:** Felipe Cortes Jaramillo

**Description:** Experiments to fine-tune and use a pre-trained ChatGPT model

**References:** https://huggingface.co/gpt2

In [1]:
# Let's load the data
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install transformers sentencepiece sacremoses sacrebleu rouge-score jiwer bert-score pyter3



In [3]:
# --- 1. We import the libraries we need ---
import numpy as np
import tensorflow as tf
import pandas as pd
import time
import re
import ast
import os

from transformers import GPT2Tokenizer, TFGPT2LMHeadModel
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, EarlyStopping

from sacrebleu import corpus_bleu
from rouge_score import rouge_scorer
from jiwer import wer
from bert_score import score as bert_score
import pyter

def predict_and_compare(index, testX, testY, model, tokenizer, max_output_length=5):
    """ Predicts translation for a given index in the test set and compares with the ground truth. """
    input_seq = testX[index:index+1]

    # Determine the total max_length (input length + desired output length)
    total_max_length = len(input_seq[0]) + max_output_length
    prediction = model.generate(input_seq, max_length=total_max_length, no_repeat_ngram_size=2)

    # Decode the prediction and input
    input_text = tokenizer.decode(input_seq[0], skip_special_tokens=True)
    predicted_text = tokenizer.decode(prediction[0], skip_special_tokens=True)

    # For ground truth
    ground_truth_text = tokenizer.decode(testY[index], skip_special_tokens=True)

    # Return results
    print(f'Prediction index for element finished: {index}')
    return input_text, predicted_text, ground_truth_text

def clean_predictions(predictions):
    """ Cleans predictions for comparison purposes. """
    inputs, preds, truths = zip(*predictions)

    # Join the tokenized words into sentences
    preds = [" ".join(pred) if isinstance(pred, list) else pred for pred in preds]
    truths = [" ".join(truth) if isinstance(truth, list) else truth for truth in truths]

    # Clean preds
    cleaned_preds = []
    for current_pred in preds:
      tokens = current_pred.strip('[]').split(',')

      # Cleaning each token by removing special characters and extra quotes
      cleaned_tokens = [re.sub(r'[^\w\s]', '', token.strip()) for token in tokens]
      split_tokens = [word for token in cleaned_tokens for word in token.split()]
      cleaned_preds.append(split_tokens)

    return zip(inputs, cleaned_preds, truths)

class TimedCSVLogger(CSVLogger):
    def __init__(self, filename, separator=',', append=False):
        super().__init__(filename, separator, append)
        self.start_time = time.time()

    def on_epoch_begin(self, epoch, logs=None):
        self.epoch_start_time = time.time()

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        end_time = time.time()
        logs['epoch_duration'] = end_time - self.epoch_start_time
        logs['total_time'] = end_time - self.start_time
        super().on_epoch_end(epoch, logs)

if __name__ == '__main__':

    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        try:
            # Set TensorFlow to use only one GPU
            tf.config.experimental.set_visible_devices(gpus[0], 'GPU')

            # Enable memory growth
            tf.config.experimental.set_memory_growth(gpus[0], True)

            print("Using GPU:", gpus[0])
        except RuntimeError as e:
            # Memory growth must be set at program startup
            print("RuntimeError:", e)
    else:
        raise SystemError("GPU device not found")

    # --- 2. We define the global variable ---

    BATCH_SIZE = 12
    EPOCHS = 1
    VALIDATION_SPLIT = 0.2
    identifier = 'chatgpt_mt'

    # --- 3. We open the data and apply tokenization, with data generator ---

    df = pd.read_csv('./drive/MyDrive/data/dl/preprocessed_data.csv')
    tokenizer = GPT2Tokenizer.from_pretrained('gpt2')

    # Set the EOS token as the padding token
    tokenizer.pad_token = tokenizer.eos_token

    # We extract the test set first
    train_df, test_df = train_test_split(df, test_size=VALIDATION_SPLIT)
    testX = tokenizer(test_df['en_tokens'].tolist(), return_tensors='tf', padding=True, truncation=True, max_length=512)['input_ids']
    testY = tokenizer(test_df['fr_tokens'].tolist(), return_tensors='tf', padding=True, truncation=True, max_length=512)['input_ids']

    src_texts = train_df['en_tokens'].tolist()
    tgt_texts = train_df['fr_tokens'].tolist()

    model_inputs = tokenizer(src_texts, return_tensors='tf', padding=True, truncation=True, max_length=512)

    # Concatenate source and target texts with a special token
    train_texts = ["{} <|endoftext|> {}".format(src, tgt) for src, tgt in zip(src_texts, tgt_texts)]

    # Tokenize
    model_inputs = tokenizer(train_texts, return_tensors='tf', padding=True, truncation=True, max_length=512)


    with tokenizer.as_target_tokenizer():
        labels = tokenizer(tgt_texts, return_tensors='tf', padding=True, truncation=True, max_length=512)

    model_inputs["labels"] = labels["input_ids"]

    def data_generator(model_inputs, batch_size):
      total_size = len(model_inputs['input_ids'])
      for i in range(0, total_size, batch_size):
          batch_input_ids = model_inputs['input_ids'][i:i + batch_size]
          batch_attention_mask = model_inputs['attention_mask'][i:i + batch_size]
          yield ({"input_ids": batch_input_ids, "attention_mask": batch_attention_mask}, batch_input_ids)


    # Split data into training and validation
    train_size = int((1 - VALIDATION_SPLIT) * len(model_inputs['input_ids']))
    train_dataset = (model_inputs[:train_size], labels[:train_size])
    validation_dataset = (model_inputs[train_size:], labels[train_size:])

    # Convert dataset and charg into model
    train_data = tf.data.Dataset.from_generator(
    lambda: data_generator(model_inputs, BATCH_SIZE),
    output_types=({'input_ids': tf.int32, 'attention_mask': tf.int32}, tf.int32),
    output_shapes=({'input_ids': tf.TensorShape([None, None]), 'attention_mask': tf.TensorShape([None, None])}, tf.TensorShape([None, None]))
    ).prefetch(tf.data.experimental.AUTOTUNE)

    validation_data = tf.data.Dataset.from_generator(
    lambda: data_generator(model_inputs, BATCH_SIZE),
    output_types=({'input_ids': tf.int32, 'attention_mask': tf.int32}, tf.int32),
    output_shapes=({'input_ids': tf.TensorShape([None, None]), 'attention_mask': tf.TensorShape([None, None])}, tf.TensorShape([None, None]))
    ).prefetch(tf.data.experimental.AUTOTUNE)

    # --- 4.1 Define Callbacks ---
    early_stopping = EarlyStopping(monitor='val_loss', patience=3, mode='min', verbose=1)
    csv_logger = TimedCSVLogger(f'./drive/MyDrive/data/dl/results/training_log/training_log_{identifier}.csv', append=True)

    # --- 4.2 We define and compile the model ---
    model = TFGPT2LMHeadModel.from_pretrained('gpt2')
    optimizer = tf.keras.optimizers.Adam(learning_rate=5e-5)
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(optimizer=optimizer, loss=loss)

    model_weights_path = f'./drive/MyDrive/data/dl/results/weights/weights_{identifier}.best'

    if os.path.exists(model_weights_path):
      model = tf.keras.models.load_model(model_weights_path)
      print("Model weights loaded successfully!")

    # --- 5. We train the model ---
    model.fit(train_data, validation_data=validation_data, epochs=EPOCHS, callbacks=[early_stopping, csv_logger])
    model.save_weights(model_weights_path)

    # ---6. Measure the performance ---
    all_predictions_raw = []
    for j in range(100):
        input_text, predicted_text_raw, ground_truth_text = predict_and_compare(j, testX, testY, model, tokenizer)
        all_predictions_raw.append((input_text, predicted_text_raw, ground_truth_text))
    all_predictions = clean_predictions(all_predictions_raw)

    with open(f'./drive/MyDrive/data/dl/results/predictions/model_predictions_{identifier}.txt', 'w', encoding='utf-8') as file:
      for input_text, predicted_text, ground_truth in all_predictions:
          # Format the input_text list
          input_text = ast.literal_eval(input_text)
          ground_truth = ast.literal_eval(ground_truth)

          formatted_input_text = "Input (English): " + " ".join(f"'{word}'" for word in input_text)
          formatted_pred_text = "Predicted (French): " + " ".join(f"'{word}'" for word in predicted_text)
          formatted_truth_text = "Ground Truth (French): " + " ".join(f"'{word}'" for word in ground_truth)

          file.write(formatted_input_text + "\n")
          file.write(formatted_pred_text + "\n")
          file.write(formatted_truth_text + "\n")
          file.write("----------\n")


Using GPU: PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
All PyTorch model weights were used when initializing TFGPT2Model.

All the weights of TFGPT2Model were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFGPT2Model for predictions without further training.




TypeError: The current model class (TFGPT2Model) is not compatible with `.generate()`, as it doesn't have a language model head. Please use one of the following classes instead: {'TFGPT2LMHeadModel'}

In [None]:
# End of notebook!