<a href="https://colab.research.google.com/github/zxc-ghous/Anime-synopsis-and-genre-generator/blob/main/synopsis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers
!pip install datasets

In [None]:
import transformers
import datasets
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import mixed_precision
import re

In [None]:
#google colab tpu support 
print("Tensorflow version " + tf.__version__)

try:
  tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
  print('Running on TPU ', tpu.cluster_spec().as_dict()['worker'])
except ValueError:
  raise BaseException('ERROR: Not connected to a TPU runtime; please see the previous cell in this notebook for instructions!')

tf.config.experimental_connect_to_cluster(tpu)
tf.tpu.experimental.initialize_tpu_system(tpu)
tpu_strategy = tf.distribute.TPUStrategy(tpu)

In [None]:
#google colab gpu support 
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [None]:
EOS_TOKEN = "<|endoftext|>"
tokenizer = transformers.GPT2Tokenizer.from_pretrained("gpt2")
tokenizer.pad_token=tokenizer.eos_token



#use_cache (bool, optional, defaults to True) — If set to True, past_key_values key value states are returned 
#and can be used to speed up decoding (see past). 
#Set to False during training, True during generation
model = transformers.TFGPT2LMHeadModel.from_pretrained("gpt2",use_cache=False,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id,
    )

Downloading:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/456k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/665 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/498M [00:00<?, ?B/s]

All model checkpoint layers were used when initializing TFGPT2LMHeadModel.

All the layers of TFGPT2LMHeadModel were initialized from the model checkpoint at gpt2.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFGPT2LMHeadModel for predictions without further training.


In [None]:
data=pd.read_csv('/content/drive/MyDrive/animes.csv')
synopsis=data['synopsis']
synopsis=synopsis[:1000]
synopsis=synopsis.dropna()
synopsis.reset_index(inplace=True,drop=True)
synopsis.shape

(983,)

In [None]:
def delete_bad_substrings(text: str) -> str:
  bad_substring=re.findall(r'source: \w+|written by mal rewrite',text)
  if bad_substring:
    text=re.sub(bad_substring[0],'',text)
  return text


def make_clean_text(texts: 'pd.Series[str,]') -> 'pd.Series[str,]':
  texts=texts.str.lower()
  texts=texts.map(lambda x: delete_bad_substrings(x))
  texts=texts.map(lambda x: re.sub(r'[\n\t\r]','',x))
  texts=texts.map(lambda x: re.sub(r'[\(\)\[\]]','',x))
  texts=texts.str.strip()
  return texts

In [None]:
synopsis=make_clean_text(synopsis)

In [None]:
def tokenize_function(text: 'list[str,]',tokenizer=tokenizer,max_length=100): 
  raw_data=[sentence+EOS_TOKEN for sentence in text]
  output=tokenizer(raw_data,padding=True,truncation=True,max_length=max_length)

  assert np.mean([len(i) for i in output['input_ids']])==max_length,\
  'something wrong with padding/truncation'

  # shift labels for next token prediction
  labels=[x[1:] for x in output['input_ids']]
  # set padding token labels to -100, which is ignored in loss computation of gpt2
  labels=[[-100 if x==tokenizer.pad_token_id else x for x in y] for y in labels]

  input_ids=[x[:-1] for x in output['input_ids']]
  attention_mask=[x[:-1] for x in output['attention_mask']]

  return np.array(input_ids),np.array(attention_mask),np.array(labels)

In [None]:
input_ids,attention_mask,labels=tokenize_function(synopsis)
val_size=int(len(input_ids)*0.2)

In [None]:
train_tensor_inputs = tf.convert_to_tensor(input_ids[val_size:])
train_tensor_mask= tf.convert_to_tensor(attention_mask[val_size:])
train_tensor_labels= tf.convert_to_tensor(labels[val_size:])

print("len of train dataset %d" % len(train_tensor_inputs.numpy()))

train =tf.data.Dataset.from_tensor_slices(
    (
        {"input_ids": train_tensor_inputs, 
        "attention_mask": train_tensor_mask},
        train_tensor_labels,
    )
).batch(64).prefetch(tf.data.AUTOTUNE)


val_tensor_inputs = tf.convert_to_tensor(input_ids[:val_size])
val_tensor_mask= tf.convert_to_tensor(attention_mask[:val_size])
val_tensor_labels= tf.convert_to_tensor(labels[:val_size])

print("len of val dataset %d" % len(val_tensor_inputs.numpy()))

val =tf.data.Dataset.from_tensor_slices(
    (
        {"input_ids": val_tensor_inputs, 
         "attention_mask": val_tensor_mask},
        val_tensor_labels,
    )
).batch(64).prefetch(tf.data.AUTOTUNE)



len of train dataset 787
len of val dataset 196


In [None]:
optimizer = tf.keras.optimizers.Adam()

# reduce_lr =tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', 
#                                                 factor=0.5,
#                                                 patience=2, 
#                                                 min_lr=0.0001,
#                                                 verbose=1)

stop_learning=tf.keras.callbacks.EarlyStopping(patience=1,
                                               monitor='val_loss',
                                               restore_best_weights=True,
                                               verbose=1)

callbacks=[stop_learning]

model.compile(optimizer=optimizer, loss=model.hf_compute_loss)

#the GPT-2 model implementation uses a custom function for computing the loss 
#So, instead of using one of the regular loss functions, we need to refer to the model's own compute_loss method.

In [None]:
EPOCHS=10
model.fit(train,validation_data=val,epochs=EPOCHS,callbacks=callbacks)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 3: early stopping


<keras.callbacks.History at 0x7f64beaa7b50>

In [None]:
save_path='/content/drive/MyDrive/synopsis_model'
model.save(save_path) 



In [None]:
def pipe_anime_to_synopsis(text: str,model: tf.keras.Model,tokenizer,max_length=150,num_return_sequences=1):
  synopsis_pipe = transformers.pipeline("text-generation",model=model,tokenizer=tokenizer,device=0)
  generated_synopsis = synopsis_pipe(text, 
                                     max_length=max_length,
                                     use_cache=True,
                                     do_sample=True,
                                     num_return_sequences=num_return_sequences)
  return [text['generated_text'] for text in generated_synopsis]

In [None]:
pipe_anime_to_synopsis('Neon Genesis Naruton',model,tokenizer)