# Packages

In [None]:
!pip install hazm
!pip install transformers
!pip install rouge
! pip install rouge-metric

In [None]:
# Import required packages

import numpy as np
import pandas as pd
from collections import Counter 
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import f1_score
from sklearn.utils import shuffle
import tensorflow as tf
from transformers import AutoModelForMaskedLM, AutoTokenizer, TFAutoModelForMaskedLM

import hazm

import plotly.express as px
import plotly.graph_objects as go

from tqdm.notebook import tqdm
from itertools import chain
import gc
from sklearn.preprocessing import LabelEncoder
import os
import re
import json
import copy
import collections
import collections
from rouge import Rouge 

from nltk.translate.bleu_score import SmoothingFunction, corpus_bleu, sentence_bleu
cc = SmoothingFunction()

from transformers import BertConfig, BertTokenizer, TFAutoModel 
from transformers import TFBertModel, TFBertForSequenceClassification, AutoTokenizer
from transformers import glue_convert_examples_to_features
import math 
import tensorflow as tf
from termcolor import colored
import ast
from rouge_metric import PyRouge


from sklearn.model_selection import StratifiedKFold
from statistics import mean
import glob
import os
import glob
import string
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 50)


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# augmented dataset for the semantic affinity classification
concat_path = None
concatenated = pd.read_csv(concat_path)
print('length of the augmented dataset for automatic eval: ', colored( f"{len(concatenated):,}", 'blue'))

# Create the classifier

In [None]:
# use the roberta pretrained model for classifying poetries

MODEL_NAME_OR_PATH = '.../Roberta_0.4_beit'
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME_OR_PATH, use_fast=True)

max_len = max([len(tokenizer.encode(txt)) for txt in concatenated['poetry'].to_list()]) +5
max_len

In [None]:
def transformer_encoder_(texts):

    dict = tokenizer.batch_encode_plus(texts,
        max_length = max_len,
        add_special_tokens = True,
        padding='max_length', 
        truncation=True)

    inputs = {
      'input_word_ids': tf.ragged.constant(dict.input_ids).to_tensor(),
      'input_mask': tf.ragged.constant(dict.attention_mask).to_tensor()
      }

    return inputs

In [None]:
def build_custome_model():
    
    bert_encoder = TFAutoModel.from_pretrained(MODEL_NAME_OR_PATH+'/TensorFlow')
    
    input_word_ids = tf.keras.Input(shape=(max_len,), dtype=tf.int32, name="input_word_ids")
    input_mask = tf.keras.Input(shape=(max_len,), dtype=tf.int32, name="input_mask")


    embedding = bert_encoder([input_word_ids, input_mask])[0]

    clf_output = embedding[:,0,:]

    net = tf.keras.layers.Dense(32, activation='tanh')(clf_output)
    net = tf.keras.layers.Dropout(0.6)(net)


    # output = tf.keras.layers.Dense(5, activation='softmax')(net)

    output = tf.keras.layers.Dense(4 , activation='softmax')(net)

    model = tf.keras.Model(inputs=[input_word_ids, input_mask], outputs=output)

    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-5)
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    metric = tf.keras.metrics.SparseCategoricalAccuracy('accuracy')
    
    model.compile(optimizer=optimizer, loss=loss, metrics=[metric])

    
    return model

In [None]:
model = build_custome_model()
model.summary()

In [None]:
k = 4
kfold = StratifiedKFold(n_splits = k, shuffle = True)

df_train = concatenated.copy()

labels = df_train.loc[:, 'topic']

poetry = df_train.loc[:, 'poetry']

# preparing the label
le = LabelEncoder()
le.fit(labels)
labels_ = le.transform(labels)

In [None]:
for i, (train_idx, val_idx) in enumerate(kfold.split(poetry.tolist(), labels_), 1): 
    print(colored(f'fold {i}', 'blue'))
    epoch_evaluation = {}

    

    
    train_input = transformer_encoder_(
        poetry[train_idx].tolist()
        )
    
    #print(np.array(train_input))
    validation_input = transformer_encoder_(
        poetry[val_idx].tolist()
        )
    
    history = model.fit(x = train_input, y = labels_[train_idx],
                        validation_data= (validation_input, labels_[val_idx]),
                        epochs = 1,
                        verbose = 1,
                        batch_size = 16, 
                        #callbacks=[cp_callback]
                        )
    gc.collect()


# Evaluate

# BLEU

In [None]:
molana = pd.read_pickle('.../Data/molana.pickle')
molana = pd.DataFrame([i[3:] for i in molana], columns=['poetry', 'text'])

reference = [[i.split() for i in molana.loc[:, 'poetry'].values.tolist()]]

In [None]:
normalizer = hazm.Normalizer()

# all_data_not_cleaned = pd.concat([sadi, molana, hafez, added], axis = 0).reset_index(drop=True)

# all punctuations except . and /
punct = re.sub(r'[\/]', '', string.punctuation) +'،؟»«…'

def cleaning(text):
    """First set of cleaning and removing extra text"""

    text = normalizer.normalize(text)
    # remove punc
    text = text.translate(str.maketrans('', '', punct))
    return text

molana.loc[:, 'poetry'] =  molana.loc[:, 'poetry'].apply(lambda x: cleaning(x))

In [None]:
def find_my_bleu(text, w):

    candidates_ = [text.split()]
    #print(candidates_)
    return corpus_bleu(reference, candidates_, weights=w, 
                                        smoothing_function=cc.method4)

def get_final_bleu(output_df):

    print('Started calculating the bleu scores...')
    output_df.loc[:, 'bleu_1'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x:[find_my_bleu(t, (1, 0, 0, 0)) for t in x])
    output_df.loc[:, 'bleu_2'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x:[find_my_bleu(t, (0, 1, 0, 0)) for t in x])
    output_df.loc[:, 'bleu_3'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x:[find_my_bleu(t, (0, 0, 1, 0)) for t in x])


    print('Now the average score...')
    output_df.loc[:, 'bleu_3_mean'] = output_df.loc[:, 'bleu_3'].apply(lambda x:np.mean(x))
    output_df.loc[:, 'bleu_2_mean'] = output_df.loc[:, 'bleu_2'].apply(lambda x:np.mean(x))
    output_df.loc[:, 'bleu_1_mean'] = output_df.loc[:, 'bleu_1'].apply(lambda x:np.mean(x))

    print('mean bleu_3 score: ', np.mean(output_df.loc[:, 'bleu_3_mean']))
    print('mean bleu_2 score: ', np.mean(output_df.loc[:, 'bleu_2_mean']))
    print('mean bleu_1 score: ', np.mean(output_df.loc[:, 'bleu_1_mean']))

In [None]:
#col_name = 'poetry_generated_Seq2Seq_with_Att'
#col_name = 'poetry_generated_MHA'
# col_name = 'poetry_generated_Seq2Seq_GRU

def get_bleu_phase_1(df, col_name = 'poetry_generated_MHA'):

    bleu_1_list = []
    bleu_2_list = []
    bleu_3_list = []

    for i in range(len(df)):

        refrence_ = base_df.loc[i, 'poetry_ground_truth'].split()
        

        # bleu unigrams
        #print(df.loc[i, col_name])
        candidate = df.loc[i, col_name]
        bleu_1_list.append(sentence_bleu(refrence_, 
                                        candidate,
                                        weights=(1, 0, 0, 0), 
                                        smoothing_function=cc.method4))
        
        
        bleu_2_list.append(sentence_bleu(refrence_, 
                                        candidate,
                                        weights=(0, 1, 0, 0), 
                                        smoothing_function=cc.method4))
        
        bleu_3_list.append(sentence_bleu(refrence_, 
                                        candidate,
                                        weights=(0, 0, 1, 0), 
                                        smoothing_function=cc.method4))
        
    print(colored('BLEU-1 : ', 'blue'), np.mean(bleu_1_list))
    print(colored('BLEU-2 : ', 'blue'), np.mean(bleu_2_list))
    print(colored('BLEU-3 : ', 'blue'), np.mean(bleu_3_list))

    hyps, refs = map(list, zip(*[[df.loc[i, col_name],
                                 df.loc[i, 'poetry_ground_truth']] for i in range(len(df))]))
    rouge = Rouge()

    scores = rouge.get_scores(hyps, refs, avg=True)

    print(colored('\nROUGE : \n', 'blue'), scores)

# Rouge

In [None]:
reference_rouge = [i.split() for i in molana.loc[:, 'poetry'].values.tolist()]
rouge = PyRouge(rouge_n=(1, 2), rouge_l=True, rouge_w=False, rouge_s=False, rouge_su=False)

def find_my_rouge(text):
    hypotheses = [[text.split()]]
    #print(hypotheses)
    score = rouge.evaluate_tokenized(hypotheses, [[reference_rouge]])
    #print(score)
    return score
    


In [None]:
def get_short_rouge(list_dicts):

    """ get the mean of all generated poetries for each record"""
    l_r = 0
    l_p = 0
    l_f = 0

    one_r = 0
    one_p  = 0
    one_f  = 0

    two_r  = 0
    two_p  = 0
    two_f  = 0
    
    for d in list_dicts:
        
        
        one_r += d['rouge-1']['r']
        one_p += d['rouge-1']['p']
        one_f += d['rouge-1']['f']


        two_r += d['rouge-2']['r']
        two_p += d['rouge-2']['p']
        two_f += d['rouge-2']['f']
        
        l_r += d['rouge-l']['r']
        l_p += d['rouge-l']['p']
        l_f += d['rouge-l']['f']

    length = len(list_dicts)

    return {'rouge-1': {'r': one_r/length , 'p': one_p/length , 'f': one_f/length},
            'rouge-2': {'r': two_r/length, 'p': two_p/length, 'f': two_f/length},
            'rouge-l': {'r': l_r/length, 'p': l_p/length , 'f': l_f/length}
            }

def get_overal_rouge_mean(output_df):
    print('Started getting the overall rouge of each record...')
    output_df.loc[:, 'rouge_mean'] = output_df.loc[:, 'rouge'].apply(lambda x: get_short_rouge(x))
    print('Started getting the overall rouge of all record...')
    l_r = 0
    l_p = 0
    l_f = 0

    one_r = 0
    one_p  = 0
    one_f  = 0

    two_r  = 0
    two_p  = 0
    two_f  = 0

    for i in range(len(output_df)):
        d = output_df.loc[i, 'rouge_mean']
        
        one_r += d['rouge-1']['r']
        one_p += d['rouge-1']['p']
        one_f += d['rouge-1']['f']


        two_r += d['rouge-2']['r']
        two_p += d['rouge-2']['p']
        two_f += d['rouge-2']['f']
        
        l_r += d['rouge-l']['r']
        l_p += d['rouge-l']['p']
        l_f += d['rouge-l']['f']

    length = len(output_df)
    print('overall rouge scores: ')
    print({'rouge-1': {'r': one_r/length , 'p': one_p/length , 'f': one_f/length},
                'rouge-2': {'r': two_r/length, 'p': two_p/length, 'f': two_f/length},
                'rouge-l': {'r': l_r/length, 'p': l_p/length , 'f': l_f/length}
                })
    return output_df

In [None]:
#get_overal_rouge_mean(output_df)

# Semantic Similarity

In [None]:
def predict(sent):

    pred = model.predict(transformer_encoder_([sent]))
    pred_label = le.inverse_transform([np.argmax(pred)])[0]
    
    return pred_label


def clean_phase_1(txt):

    txt = re.sub('<sep>', ' / ', txt)
    txt = re.sub('(<start>)|(<end>)', '', txt)

    return txt


In [None]:
def import_results_final_approach(model_name='BERT', on='beit'):


    """
    import the results from the heuristic and final section
    """
    

    if on == 'beit':
        

        elif  model_name=='BERT V3':
            folder_path = r'.../Bert V3/Final_BERT_V3_gh_M_poetry_format_20_numOfKeywords_4_mesra_first_elements_num_4_mesra_second_elements_num_3_beam_depth_100_first.csv'

        elif  model_name=='Roberta':
            folder_path = '.../Roberta/Final_Roberta_gh_M_poetry_format_20_numOfKeywords_4_mesra_first_elements_num_4_mesra_second_elements_num_3_beam_depth_100_first.csv'
        
        elif  model_name=='Distilbert':
            folder_path = '.../DistilBERT/Final_Distilbert_gh_M_poetry_format_20_numOfKeywords_4_mesra_first_elements_num_4_mesra_second_elements_num_3_beam_depth_100_first.csv'

        elif  model_name=='Albert':
            folder_path = '.../Albert/Final_Albert_gh_M_poetry_format_20_numOfKeywords_4_mesra_first_elements_num_4_mesra_second_elements_num_3_beam_depth_100_first.csv'
        
        #if model_name=='BERT V2':
         #   folder_path = r'/content/drive/MyDrive/Paper 4 - Poetry Generation/Pretrained Models/Pretrained on beit/BERT_V2_0.4_beit/'
        


    else:


        if model_name=='BERT V3':
            folder_path = '.../Bert V3/Final_BERT_V3_gh_last_Verse_M_poetry_format_20_numOfKeywords_4_mesra_first_elements_num_4_mesra_second_elements_num_3_beam_depth_100_first.csv'
        
        elif  model_name=='Albert':
            folder_path = '.../Albert/Final_Albert_gh_Verse_M_poetry_format_20_numOfKeywords_4_mesra_first_elements_num_4_mesra_second_elements_num_3_beam_depth_100_first.csv'
        
        elif  model_name=='Roberta':
            folder_path = '.../Roberta/Final_Roberta_gh_Verse_M_poetry_format_20_numOfKeywords_4_mesra_first_elements_num_4_mesra_second_elements_num_3_beam_depth_100_first.csv'
        
        elif  model_name=='Distilbert':
            folder_path = '.../DistilBERTFinal_DistilBERT_gh_Verse_M_poetry_format_20_numOfKeywords_4_mesra_first_elements_num_4_mesra_second_elements_num_3_beam_depth_100_first.csv'


    return folder_path


In [None]:
def choose_one(output_df, base_df):
    """
    choose one label for all the generated poetry 
    for each plain text
    """

    for i in range(len(output_df)):

        labels_d = output_df.loc[i, 'prediction_for_output']
        generated_p = sorted(labels_d, reverse=True)
        gt = base_df.loc[i, 'prediction_for_gt']


        if gt in generated_p: final_label = gt

        else: final_label = generated_p[0]

        output_df.loc[i, 'chosen_prediction_for_output'] = final_label
    return output_df

def automatic_eval(output_df, base_df):
    """ 
    predict the predicted poetry labels
    """

    output_df.loc[:, 'prediction_for_output'] = output_df.loc[:, 'final_predicted_verses'].apply (lambda x: [predict(sent) for sent in x])
    output_df.loc[:, 'prediction_for_output'] = output_df.loc[:, 'prediction_for_output'].apply(lambda x: dict(Counter(x)))

    
    output_df = choose_one(output_df, base_df)

    gherabat = len(output_df[output_df['chosen_prediction_for_output']==base_df['prediction_for_gt']])/len(output_df) 
    print('semantic similarity is observed in', colored(f"{gherabat:.2f}", 'blue'), 'percent of cases.')

    return output_df

In [None]:
# get one file ground truths as the base format for comparison
base = import_results_final_approach(model_name='Roberta', on='beit')
base_df = pd.read_csv(base)
base_df.drop(columns=['heuristics', 'final_predicted_verses'], inplace=True)

In [None]:
# clean the multihead results
base_df.loc[:, 'poetry_generated_MHA'] = base_df.loc[:, 'poetry_generated_MHA'].apply(lambda x: clean_phase_1(x))
# predict the ground truth 
base_df.loc[:, 'prediction_for_gt'] = base_df.loc[:, 'poetry_ground_truth'].apply (lambda x: predict(x))

# Models

## Roberta

### beit

In [None]:
folder_path = import_results_final_approach(model_name='Roberta', on='beit')
output_df = pd.read_csv(folder_path)
output_df.loc[:, 'final_predicted_verses'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x : ast.literal_eval(x))

output_df = automatic_eval(output_df, base_df)

In [None]:
# get_final_bleu(output_df)

In [None]:
output_df.loc[:, 'rouge'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x:[find_my_rouge(t) for t in x])
output_df = get_overal_rouge_mean(output_df)

In [None]:
output_df.head(1)

### verse

In [None]:

folder_path = import_results_final_approach(model_name='Roberta', on='verse')
output_df = pd.read_csv(folder_path)
output_df.loc[:, 'final_predicted_verses'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x : ast.literal_eval(x))

output_df = automatic_eval(output_df, base_df)

In [None]:
get_final_bleu(output_df)

In [None]:
output_df.loc[:, 'rouge'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x:[find_my_rouge(t) for t in x])
output_df = get_overal_rouge_mean(output_df)

In [None]:
output_df.head(1)

## Distilbert

### beit

In [None]:

folder_path = import_results_final_approach(model_name='Distilbert', on='beit')
output_df = pd.read_csv(folder_path)
output_df.loc[:, 'final_predicted_verses'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x : ast.literal_eval(x))

output_df = automatic_eval(output_df, base_df)

In [None]:
get_final_bleu(output_df)

In [None]:
output_df.loc[:, 'rouge'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x:[find_my_rouge(t) for t in x])
output_df = get_overal_rouge_mean(output_df)

In [None]:
output_df.head(1)

### verse

In [None]:


folder_path = import_results_final_approach(model_name='Distilbert', on='verse')
output_df = pd.read_csv(folder_path)
output_df.loc[:, 'final_predicted_verses'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x : ast.literal_eval(x))

output_df = automatic_eval(output_df, base_df)

In [None]:
get_final_bleu(output_df)

In [None]:
output_df.loc[:, 'rouge'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x:[find_my_rouge(t) for t in x])
output_df = get_overal_rouge_mean(output_df)

In [None]:
output_df.head(1)

## Albert

### beit

In [None]:

folder_path = import_results_final_approach(model_name='Albert', on='beit')
output_df = pd.read_csv(folder_path)
output_df.loc[:, 'final_predicted_verses'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x : ast.literal_eval(x))

output_df = automatic_eval(output_df, base_df)

In [None]:
get_final_bleu(output_df)

In [None]:
output_df.loc[:, 'rouge'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x:[find_my_rouge(t) for t in x])
output_df = get_overal_rouge_mean(output_df)

In [None]:
output_df.head(1)

### verse

In [None]:

folder_path = import_results_final_approach(model_name='Albert', on='verse')
output_df = pd.read_csv(folder_path)
output_df.loc[:, 'final_predicted_verses'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x : ast.literal_eval(x))

output_df = automatic_eval(output_df, base_df)

In [None]:
get_final_bleu(output_df)

In [None]:
output_df.loc[:, 'rouge'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x:[find_my_rouge(t) for t in x])
output_df = get_overal_rouge_mean(output_df)

In [None]:
output_df.head(1)

## Bert

### beit

In [None]:

folder_path = import_results_final_approach(model_name='BERT V3', on='beit')
output_df = pd.read_csv(folder_path)
output_df.loc[:, 'final_predicted_verses'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x : ast.literal_eval(x))

output_df = automatic_eval(output_df, base_df)

In [None]:
get_final_bleu(output_df)

In [None]:
output_df.loc[:, 'rouge'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x:[find_my_rouge(t) for t in x])
output_df = get_overal_rouge_mean(output_df)

In [None]:
output_df.head(1)

### verse

In [None]:

folder_path = import_results_final_approach(model_name='BERT V3', on='verse')
output_df = pd.read_csv(folder_path)
output_df.loc[:, 'final_predicted_verses'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x : ast.literal_eval(x))

output_df = automatic_eval(output_df, base_df)

In [None]:
get_final_bleu(output_df)

In [None]:
output_df.loc[:, 'rouge'] = output_df.loc[:, 'final_predicted_verses'].apply(lambda x:[find_my_rouge(t) for t in x])
output_df = get_overal_rouge_mean(output_df)

In [None]:
output_df.head(1)

# Eval for phase |Models

## MultiheadAttention

In [None]:
col_name = 'poetry_generated_MHA'

base_df.loc[:, col_name+'_pred'] = base_df.loc[:, col_name].apply (lambda x: predict(x))

In [None]:
gherabat = len(base_df[base_df[col_name+'_pred']==base_df['prediction_for_gt']])/len(base_df) 
print('semantic similarity is observed in', colored(f"{gherabat:.2f}", 'blue'), 'percent of cases.')

In [None]:
get_bleu_phase_1(base_df, col_name = col_name)

In [None]:
base_df

# Seq2Seq with attention

In [None]:
all(base_df.loc[: , 'poetry_ground_truth'] == output_df.loc[:99, 'poetry_ground_truth'])

In [None]:
base_df

In [None]:
col_name = 'poetry_generated_Seq2Seq_with_Att'


output_df = pd.read_csv(None)
assert all(base_df.loc[: , 'poetry_ground_truth'] == output_df.loc[:99, 'poetry_ground_truth'])
output_df = output_df.loc[:99, :]


output_df.loc[:, col_name] = output_df.loc[:, col_name].apply(lambda x: clean_phase_1(x))
output_df.loc[:, col_name+'_pred'] = output_df.loc[:, col_name].apply (lambda x: predict(x))

In [None]:
output_df

In [None]:
gherabat = len(output_df[output_df[col_name+'_pred']==base_df['prediction_for_gt']])/len(base_df) 
print('semantic similarity is observed in', colored(f"{gherabat:.2f}", 'blue'), 'percent of cases.')

In [None]:
get_bleu_phase_1(output_df, col_name = col_name)

# Normal RNN

In [None]:
output_df = pd.read_csv(None)

output_df.loc[:99, 'poetry_ground_truth']

In [None]:
col_name = 'poetry_generated_Seq2Seq_GRU'


assert all(base_df.loc[: , 'poetry_ground_truth'] == output_df.loc[:99, 'poetry_ground_truth'])
output_df = output_df.loc[:99, :]


output_df.loc[:, col_name] = output_df.loc[:, col_name].apply(lambda x: clean_phase_1(x))
output_df.loc[:, col_name+'_pred'] = output_df.loc[:, col_name].apply (lambda x: predict(x))

In [None]:
gherabat = len(output_df[output_df[col_name+'_pred']==base_df['prediction_for_gt']])/len(base_df) 
print('semantic similarity is observed in', colored(f"{gherabat:.2f}", 'blue'), 'percent of cases.')

In [None]:
get_bleu(output_df, col_name = col_name)