#  word-based model

In [None]:
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing
import numpy as np
import os
from tensorflow import keras
from sklearn.model_selection import train_test_split
import tensorflow.keras.utils as ku
from nltk.translate.bleu_score import sentence_bleu
from matplotlib.pyplot import plot as plt

# read the data

In [None]:
os.chdir('/content/drive/My Drive/ANN/hw5/shahname')
path_to_file = 'shahname'
listOfFiles = os.listdir()
poems = []
for f in listOfFiles:
  poems.append(open(f).read())
os.chdir('/content/drive/My Drive/ANN/hw5')
dictionary = open('allShahnameWords.txt').readlines()
dictionary.append("گهٔ\n")
dictionary.append("رهٔ\n")
dictionary.append("فرهٔ\n")
dictionary.append("**\n")  #مصرع
dictionary.append("&&&\n")    #بیت

In [None]:
test = poems[561:]
poems_text = ""
for i in range(561):
  poems_text += poems[i]

In [None]:
# sentences = []
# for poem in poems:
#   for verse in poem:
#     s = []
#     words = str.split(verse)
#     for word in words: 
#       s.append(dictionary.index(word+'\n'))
#     sentences.append(s)
# padded_sentences = tf.keras.preprocessing.sequence.pad_sequences(sentences)

# vocab = sorted(set(poems))
# sentences = np.zeros([len(poems)//100, 100])
# for i in range(len(poems)//100):
#   for j in range(100):
#     sentences[i,j] = vocab.index(poems[100*i + j])

poems1 = poems_text.replace("\t"," ** ").replace("\n"," &&& ")
words = str.split(poems1)
sentences = np.zeros([len(words)//15,15])
for i in range(len(words)//15):
  for j in range(15):
    sentences[i,j] = dictionary.index(words[15*i + j]+'\n')

# create dataset

In [None]:
X_train = sentences[:,:-1]
y_train = sentences[:,1:]
for i in range(1,len(test)):
  test[i] = str.split(test[i])

# build the model

In [None]:
class EarlyStoppingCallback(keras.callbacks.Callback):
  def __init__(self,patience=0):
    super(EarlyStoppingCallback,self).__init__()
    self.patience = patience
    self.val_acc = []
    self.trn_acc = []
    self.cur_epoch = 0

  def on_train_begin(self,logs=None):
    self.best = -1 * np.Inf
    self.wait  = 0
    self.stopped_epoch = 0
  def on_epoch_end(self,epoch,logs =None):
    current_acc = logs.get("val_accuracy")
    self.val_acc.append(current_acc)
    self.trn_acc.append(logs.get("accuracy"))
    self.cur_epoch = epoch
    if np.greater(current_acc,self.best):
      self.best = current_acc
      self.wait = 0
      print("******************")
      print(current_acc)
      self.best_weights = self.model.get_weights()
    else: 
      self.wait +=1 
      if self.wait > self.patience :
        print("in stop scope")
        self.stopped_epoch = epoch - self.patience
        self.model.stop_training = True
        self.model.set_weights(self.best_weights)
  def on_train_end(self,logs = None):
    if self.stopped_epoch > 0 :
      print("epoch: %d: early stopping" , self.cur_epoch)
      print(self.best)
    plt.plot(range(self.cur_epoch+1),self.val_acc,label="val_acc",color = 'orange')
    plt.plot(range(self.cur_epoch+1),self.trn_acc,label="train_acc", color='blue')
    plt.legend()
    plt.show()

In [None]:
def loss(labels,logits):
  return tf.keras.losses.sparse_categorical_crossentropy(labels,logits,from_logits=True)
er_cal = EarlyStoppingCallback(patience=10)
vocab_size = len(dictionary)
# vocab_size = len(vocab)
embedding_dim = 250
rnn_units = 700
learning_rate = 0.001
model = keras.models.Sequential()
model.add(tf.keras.layers.Embedding(vocab_size, embedding_dim))
model.add(tf.keras.layers.LSTM(rnn_units,return_sequences = True))
model.add(tf.keras.layers.Dense(vocab_size))
model.compile(optimizer=keras.optimizers.Adam(learning_rate=learning_rate),loss=loss,metrics=['accuracy']) 
model.fit(X_train,y_train,validation_split=0.22,batch_size=256, epochs=2, callbacks=[er_cal])

# generating phase

In [None]:
score = 0
for j,t in enumerate(test): 
  generated_poem = t[0]
  words_id = [dictionary.index(t[0]+'\n')]
  for i in range(len(t)):
    words_id.append(model.predict([words_id])[0,-1].argmax())
    generated_poem  +=  " " + dictionary[int(words_id[-1])].strip('\n')
  score += sentence_bleu([t], str.split(generated_poem))
  generated_poem1 = generated_poem.replace("**","\t").replace("&&&","\n")
  print(generated_poem1)
  print("--------------------")
print("BLEU score : " , score / len(X_test))

# character-based  model

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM,Dense,Embedding
from tensorflow.keras.losses import sparse_categorical_crossentropy
import os
from nltk.translate.bleu_score import sentence_bleu
from tensorflow.keras.models import load_model
from tensorflow import keras
from sklearn.metrics import accuracy_score

# create dataset

In [None]:
os.chdir('/content/drive/My Drive/ANN/shahname')
path_to_file = 'shahname'
listOfFiles = os.listdir()
poems = []
for f in listOfFiles:
  poems.append(open(f,'r').read())

test = poems[561:]
poems_text = ""
for i in range(561):
  poems_text += poems[i]

vocab = sorted(set(poems_text))


char_to_ind = {char:i for i, char in enumerate(vocab)}
ind_to_char = np.array(vocab)
encoded_text = np.array([char_to_ind[c] for c in poems_text])


seq_len = 200

char_dataset = tf.data.Dataset.from_tensor_slices(encoded_text)
sequences = char_dataset.batch(seq_len+1, drop_remainder=True)

def create_seq_targets(seq):
    input_txt = seq[:-1]
    target_txt = seq[1:]
    return input_txt, target_txt
    
dataset = sequences.map(create_seq_targets)

# build model

In [None]:
# Batch size
batch_size = 128

dataset = dataset.batch(batch_size, drop_remainder=True)

def sparse_cat_loss(y_true,y_pred):
  return sparse_categorical_crossentropy(y_true, y_pred, from_logits=True)

def create_model(vocab_size, embed_dim, rnn_neurons, batch_size):
    model = Sequential()
    model.add(Embedding(vocab_size, embed_dim,batch_input_shape=[batch_size, None]))
    model.add(LSTM(rnn_neurons,return_sequences=True,stateful=True,recurrent_initializer='glorot_uniform'))
    # Final Dense Layer to Predict
    model.add(Dense(vocab_size))
    model.compile(optimizer='adam', loss=sparse_cat_loss,metrics=['accuracy']) 
    return model
  
  
  
# Length of the vocabulary in chars
vocab_size = len(vocab)
# The embedding dimension
embed_dim = 50
# Number of RNN units
rnn_neurons = 100

#Create the model
model = create_model(
  vocab_size = vocab_size,
  embed_dim=embed_dim,
  rnn_neurons=rnn_neurons,
  batch_size=batch_size)


#Train the model
epochs = 30
model.fit(dataset,epochs=epochs)

# save model

In [None]:
os.chdir('/content/drive/My Drive/ANN')
model.save('shahname_gen2.h5') 


model = create_model(vocab_size, embed_dim, rnn_neurons, batch_size=1)
model.load_weights('shahname_gen2.h5')
model.build(tf.TensorShape([1, None]))

# generate text

In [None]:
def generate_text(model, start_seed,gen_size=100,temp=0.5):

  num_generate = gen_size
  input_eval = [char_to_ind[s] for s in start_seed]
  input_eval = tf.expand_dims(input_eval, 0)
  text_generated = []
  predicts = []
  temperature = temp
  model.reset_states()
  for i in range(num_generate):
      predictions = model(input_eval)
      predictions = tf.squeeze(predictions, 0)
      predictions = predictions / temperature
      predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()
      input_eval = tf.expand_dims([predicted_id], 0)
      text_generated.append(ind_to_char[predicted_id])
      predicts.append(predicted_id)
  return (start_seed + ''.join(text_generated)), predicts

test_t = []
for t in test:
  test_t.append(t[0:200])

score = 0
val_acc = 0
for i in range(len(test_t)):
  generated_poem , predicted_labels = generate_text(model,test_t[i][0:15],gen_size=len(test_t[i])-15)
  true_labels = [char_to_ind[s] for s in test_t[i][15:]]
  val_acc += accuracy_score(true_labels,predicted_labels)
  # print(val_acc)
  score += sentence_bleu([str.split(test_t[i])], str.split(generated_poem))
  print(generated_poem)
  print("-------------------------")
print("BLEU score : " , score / len(test_t))
print("validation accuracy : ", val_acc/len(test_t))