In [1]:
# unzipping the file contents into dataset folder
!wget https://personal.utdallas.edu/~pxn210006/cnn_dailymail/train.zip
!unzip /content/train.zip -d /content/dataset

--2022-04-28 00:21:07--  https://personal.utdallas.edu/~pxn210006/cnn_dailymail/train.zip
Resolving personal.utdallas.edu (personal.utdallas.edu)... 129.110.46.112
Connecting to personal.utdallas.edu (personal.utdallas.edu)|129.110.46.112|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 479197291 (457M) [application/zip]
Saving to: ‘train.zip’


2022-04-28 00:23:14 (3.74 MB/s) - ‘train.zip’ saved [479197291/479197291]

Archive:  /content/train.zip
  inflating: /content/dataset/train.csv  


In [2]:
# installing spark in colab and creating spark session

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

import findspark
findspark.init()

findspark.find()

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

sc = spark.sparkContext

In [3]:
# loading csv dataset file in spark dataframe
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
dataset_schema = StructType([ \
    StructField("text",StringType(),True), \
    StructField("summary",StringType(),True), \
  ])
train_df = spark.read.csv("/content/dataset/train.csv", schema = dataset_schema)
train_df.printSchema()

root
 |-- text: string (nullable = true)
 |-- summary: string (nullable = true)



In [4]:
# showing dataset
train_df = train_df.na.drop()
train_df.show()

+--------------------+--------------------+
|                text|             summary|
+--------------------+--------------------+
|                  id|             article|
|0001d1afc246a7964...|By . Associated P...|
|Church members in...| Grand Forks and ...|
|0002095e55fcbd3a2...|"(CNN) -- Ralph M...|
|          Ralph Mata| an internal affa...|
|He also arranged ...| a complaint alle...|
|00027e965c8264c35...|A drunk driver wh...|
|Was using phone w...|     Isle of Wight .|
|Crashed head-on i...| who died in hosp...|
|0002c17436637c4fe...|(CNN) -- With a b...|
|Targeting Russia'...|          she says .|
|0003ad6ef0c37534f...|Fleetwood are the...|
|        Peterborough|        Bristol City|
|0004306354494f090...|He's been accused...|
|0005d61497d21ff37...|By . Daily Mail R...|
|0006021f772fad0aa...|"By . Daily Mail ...|
|Passenger Chris D...|                  46|
|00083697263e215e5...|There are a numbe...|
|000940f2bb357ac04...|"Canberra, Austra...|
|Even if the fligh...| informati

In [5]:
# filtering long and shorter texts
from  pyspark.sql.functions import *

max_art_len = 520
min_art_len = 20
max_sum_len = 30

train_df = train_df.filter(length(col("Summary")) <= max_sum_len)
train_df = train_df.filter(length(col("Text")) >= min_art_len)
train_df = train_df.filter(length(col("Text")) <= max_art_len)
train_df.show()

+--------------------+--------------------+
|                text|             summary|
+--------------------+--------------------+
|He also arranged ...| a complaint alle...|
|Was using phone w...|     Isle of Wight .|
|Crashed head-on i...| who died in hosp...|
|Targeting Russia'...|          she says .|
|Passenger Chris D...|                  46|
|Even if the fligh...| information is r...|
|stepping up Speci...| including Britis...|
|Pennsylvania had ...| with Ohio a clos...|
|China kept the vi...|      Xinhua said ."|
|Anti-Morsy politi...| according to sta...|
|Maryellen followe...|        California .|
|Authorities say M...|                  20|
|""Rebel tours"" w...|          unofficial|
|Journalists were ...|                beat|
|The rift between ...|          it says ."|
|Terrorists with b...| panel's chairman...|
|Biological attack...|       report says .|
|Number of nations...|       panel says ."|
|He stabbed fiancé...|                 arm|
|He and pilot then...|  personal

In [6]:
# cleaning text
import string
def func_text_cleaner(text):
    text = text.lower()
    printable = set(string.printable)
    text = "".join(list(filter(lambda x: x in printable, text)))
    return text

columns = ['article', 'summary']
cleaned_dataset = train_df.rdd.map(lambda X:func_text_cleaner(X[0])+func_text_cleaner(X[1]))
cleaned_dataset = train_df.rdd.toDF(columns)
cleaned_dataset.show()

+--------------------+--------------------+
|             article|             summary|
+--------------------+--------------------+
|He also arranged ...| a complaint alle...|
|Was using phone w...|     Isle of Wight .|
|Crashed head-on i...| who died in hosp...|
|Targeting Russia'...|          she says .|
|Passenger Chris D...|                  46|
|Even if the fligh...| information is r...|
|stepping up Speci...| including Britis...|
|Pennsylvania had ...| with Ohio a clos...|
|China kept the vi...|      Xinhua said ."|
|Anti-Morsy politi...| according to sta...|
|Maryellen followe...|        California .|
|Authorities say M...|                  20|
|""Rebel tours"" w...|          unofficial|
|Journalists were ...|                beat|
|The rift between ...|          it says ."|
|Terrorists with b...| panel's chairman...|
|Biological attack...|       report says .|
|Number of nations...|       panel says ."|
|He stabbed fiancé...|                 arm|
|He and pilot then...|  personal

In [7]:
# tokenizing text into individual words
from pyspark.ml.feature import Tokenizer

article_tokenizer = Tokenizer(inputCol="article", outputCol="article_words") 
summary_tokenizer = Tokenizer(inputCol="summary", outputCol="summary_words")

tokenized_dataset = article_tokenizer.transform(cleaned_dataset)
tokenized_dataset = summary_tokenizer.transform(tokenized_dataset)

tokenized_dataset.show()

+--------------------+--------------------+--------------------+--------------------+
|             article|             summary|       article_words|       summary_words|
+--------------------+--------------------+--------------------+--------------------+
|He also arranged ...| a complaint alle...|[he, also, arrang...|[, a, complaint, ...|
|Was using phone w...|     Isle of Wight .|[was, using, phon...|[, isle, of, wigh...|
|Crashed head-on i...| who died in hosp...|[crashed, head-on...|[, who, died, in,...|
|Targeting Russia'...|          she says .|[targeting, russi...|    [, she, says, .]|
|Passenger Chris D...|                  46|[passenger, chris...|              [, 46]|
|Even if the fligh...| information is r...|[even, if, the, f...|[, information, i...|
|stepping up Speci...| including Britis...|[stepping, up, sp...|[, including, bri...|
|Pennsylvania had ...| with Ohio a clos...|[pennsylvania, ha...|[, with, ohio, a,...|
|China kept the vi...|      Xinhua said ."|[china, kep

In [8]:
# creating a vocabulary to index mapping which contains all words from summary and articles
tokenized_dataset = tokenized_dataset.select("article_words", "summary_words")
tokenized_dataset = tokenized_dataset.dropna()

tokenized_data = tokenized_dataset.collect()
vocabulary_index_mapping = {}

for row in tokenized_data:
  for word in row["article_words"]:
      if word not in vocabulary_index_mapping:
          vocabulary_index_mapping[word]=len(vocabulary_index_mapping)
          
  for word in row["summary_words"]:
      if word not in vocabulary_index_mapping:
          vocabulary_index_mapping[word]=len(vocabulary_index_mapping)

print(vocabulary_index_mapping['is'])

63


In [9]:
# dowloading pre trained GloVe word embedding file
!wget https://personal.utdallas.edu/~pxn210006/cnn_dailymail/glove.zip
!unzip glove.zip -d /content/glove_word_embeddings

--2022-04-28 00:26:23--  https://personal.utdallas.edu/~pxn210006/cnn_dailymail/glove.zip
Resolving personal.utdallas.edu (personal.utdallas.edu)... 129.110.46.112
Connecting to personal.utdallas.edu (personal.utdallas.edu)|129.110.46.112|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 137847651 (131M) [application/zip]
Saving to: ‘glove.zip’


2022-04-28 00:26:58 (3.78 MB/s) - ‘glove.zip’ saved [137847651/137847651]

Archive:  glove.zip
  inflating: /content/glove_word_embeddings/glove.6B.100d.txt  


In [10]:
# creating mapping of word and its embedding vector
import numpy as np

def load_word_embeddings(glove_file):
  vocabulary_embedding_mapping = {}

  with open(glove_file) as g_file:     
    for line in g_file:
        row = line.strip().split(' ')
        word = row[0].lower()
        if word not in vocabulary_embedding_mapping:
            vocabulary_embedding_mapping[word] = np.asarray(row[1:], np.float32)
  
  return vocabulary_embedding_mapping;

vocabulary_embedding_mapping = load_word_embeddings('/content/glove_word_embeddings/glove.6B.100d.txt')

In [11]:
# building vocabulary and its embeddings
vocabulary_words = []
embeddings_word = []
special_tags = ['<UNK>','<PAD>','<EOS>']

for word in vocabulary_index_mapping:
    if word in vocabulary_embedding_mapping:
        vocabulary_words.append(word)
        embeddings_word.append(vocabulary_embedding_mapping[word])
        
for special_tag in special_tags:
    vocabulary_words.append(special_tag)
    embeddings_word.append(np.random.rand(len(embeddings_word[0]),))
    
vocabulary_index_mapping = {word:idx for idx,word in enumerate(vocabulary_words)}
embeddings_word = np.asarray(embeddings_word, np.float32)

print("Size of vacabulary: {}".format(len(vocabulary_index_mapping)))

Size of vacabulary: 45395


In [12]:
# build word vector for articles and summaries
articles_vector = []
summaries_vector = []

unk_vector = vocabulary_index_mapping['<UNK>']
for row in tokenized_data:
    # Replace out of vocab words with index for '<UNK>' tag
    article_vector = []
    summary_vector = []

    for word in row["article_words"]:
      article_vector.append(vocabulary_index_mapping.get(word, unk_vector))
    for word in row["summary_words"]:
      summary_vector.append(vocabulary_index_mapping.get(word, unk_vector))
    
    articles_vector.append(article_vector)
    summaries_vector.append(summary_vector)

print(article_vector)

[55, 2624, 114, 3663, 220, 662, 253, 289, 376]


In [13]:
# randomly shuffle the data
import random
random.seed(101)

article_indexes = [index for index in range(len(articles_vector))]
random.shuffle(article_indexes)

articles_vector = [articles_vector[index] for index in article_indexes]
summaries_vector = [summaries_vector[index] for index in article_indexes]

In [14]:
# Use first 10000 data for testing, the next 10000 data for validation, and rest for training
sum_vec = len(summaries_vector)
art_vec = len(summaries_vector)
test_data_summaries = summaries_vector[0:sum_vec//10]
test_data_articles = articles_vector[0:art_vec // 10]

validation_data_summaries = summaries_vector[sum_vec//10:sum_vec//5]
validation_data_articles = articles_vector[art_vec//10:art_vec//5]

training_data_summaries = summaries_vector[sum_vec//5:]
training_data_articles = articles_vector[art_vec//5:]

In [15]:
# creates batch of articles and summaries
def create_batch(articles, summaries, dataset_batch_size=32):
    
    article_lengths = [len(article) for article in articles]
    sorted_indexes = np.flip(np.argsort(article_lengths),axis=0)
    articles = [articles[index] for index in sorted_indexes]
    summaries = [summaries[index] for index in sorted_indexes]
    
    article_batches = []
    summary_batches = []
    article_batches_article_length = []
    summary_batches_summary_length = []
    
    i = 0
    while i < (len(articles)-dataset_batch_size):
        
        max_article_length = len(articles[i])
        
        article_batch = []
        summary_batch = []
        article_batch_article_length = []
        summary_batch_summary_length = []
        
        for j in range(dataset_batch_size):
            
            padded_article = articles[i+j]
            padded_summary = summaries[i+j]
            
            article_batch_article_length.append(len(articles[i+j]))
            summary_batch_summary_length.append(len(summaries[i+j])+1)
     
            while len(padded_article) < max_article_length:
                padded_article.append(vocabulary_index_mapping['<PAD>'])

            padded_summary.append(vocabulary_index_mapping['<EOS>']) #End of Sentence Marker
            while len(padded_summary) < max_sum_len+1:
                padded_summary.append(vocabulary_index_mapping['<PAD>'])
            
        
            article_batch.append(padded_article)
            summary_batch.append(padded_summary)
        
        article_batches.append(article_batch)
        summary_batches.append(summary_batch)
        article_batches_article_length.append(article_batch_article_length)
        summary_batches_summary_length.append(summary_batch_summary_length)
        
        i += dataset_batch_size
        
    return article_batches, summary_batches, article_batches_article_length, summary_batches_summary_length

In [16]:
# getting batches for train, test and validation
train_article_batches, train_summary_batches, train_article_batches_article_length, train_summary_batches_summary_length \
= create_batch(training_data_articles, training_data_summaries)

validation_article_batches, validation_summary_batches, validation_article_batches_article_length, validation_summary_batches_summary_length \
= create_batch(validation_data_articles, validation_data_summaries)

test_article_batches, test_summary_batches, test_article_batches_article_length, test_summary_batches_summary_length \
= create_batch(test_data_articles, test_data_summaries)

In [17]:
# loading all batches and mapping dictionary to json file
import json

final_dictionary = {}

final_dictionary["vocabulary_index_mapping"] = vocabulary_index_mapping
final_dictionary["embeddings_word"] = embeddings_word.tolist()
final_dictionary["train_article_batches"] = train_article_batches
final_dictionary["test_article_batches"] = test_article_batches
final_dictionary["validation_article_batches"] = validation_article_batches
final_dictionary["train_summary_batches"] = train_summary_batches
final_dictionary["test_summary_batches"] = test_summary_batches
final_dictionary["validation_summary_batches"] = validation_summary_batches
final_dictionary["train_article_batches_article_length"] = train_article_batches_article_length
final_dictionary["validation_article_batches_article_length"] = validation_article_batches_article_length
final_dictionary["test_article_batches_article_length"] = test_article_batches_article_length
final_dictionary["train_summary_batches_summary_length"] = train_summary_batches_summary_length
final_dictionary["validation_summary_batches_summary_length"] = validation_summary_batches_summary_length
final_dictionary["test_summary_batches_summary_length"] = test_summary_batches_summary_length

with open('CNN_Dailymail_Data.json', 'w') as output_file:
    json.dump(final_dictionary, output_file)