# Distributed Computing - Project 1





In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## 1.Prepare Environment

### 1.1 Install Java, Pyspark and Spark NLP

In [2]:
import os

In [3]:
!apt-get update -qq
!apt-get install -y openjdk-8-jdk-headless -qq

In [4]:
#Install Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-8u312-b07-0ubuntu1~18.04-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)


In [5]:
# Install Pyspark
! pip install --ignore-installed pyspark==2.4.4

# Install Spark NLP
! pip install --ignore-installed spark-nlp==2.6.2

Collecting pyspark==2.4.4
  Using cached pyspark-2.4.4-py2.py3-none-any.whl
Collecting py4j==0.10.7
  Using cached py4j-0.10.7-py2.py3-none-any.whl (197 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4
Collecting spark-nlp==2.6.2
  Using cached spark_nlp-2.6.2-py2.py3-none-any.whl (128 kB)
Installing collected packages: spark-nlp
Successfully installed spark-nlp-2.6.2


### 1.2 Start  Spark Session

In [6]:
import sparknlp
spark = sparknlp.start()

from pyspark.ml import Pipeline
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

## 2.Get Classics Corpus

### 2.1 Convert txt files into Python Dataframe

In [7]:
import pandas as pd
import os
import re

In [8]:
directory ="/content/drive/MyDrive/Distributed-Computing/data/classic_literature/" #Change according to path
# directory ="data/classic_literature/" #Change according to path
text_type = 'C'

classics_df = pd.DataFrame(columns=['id', 'type', 'text'])

for filename in os.listdir(directory):
#filename = "data/classic_literature/45.txt"
    file_ext = os.path.basename(filename).rsplit('.',1)[1] #Get file extension
    if file_ext == "txt":
        with open(directory + '/' + filename, 'r') as file:
            text_id = os.path.basename(filename).rsplit('.',1)[0]
            corpus = file.read()
            corpus = re.sub(';', ' ', corpus)
            corpus = corpus.replace('Chapter', '')
            classics_df.loc[len(classics_df.index)] = [text_id, text_type, corpus]

In [9]:
classics_df

Unnamed: 0,id,type,text
0,1342,C,\n\n\n\n\nPRIDE AND PREJUDICE\n\nBy Jane Auste...
1,768,C,\n\n\n\nTranscribed from the 1910 John Murray ...
2,1260,C,\n\n\n\n\nTranscribed from the 1897 Service & ...
3,514,C,\n\n\n\nLITTLE WOMEN\n\n\nby\n\nLouisa May Alc...
4,1905,C,"\n\n\n\n\nTHE GOVERNESS \n\nOR, THE LITTLE FEM..."
5,113,C,\n\n\n\n\n\n\n\n\n\n\nIn Honor of Lisa Hart's ...
6,145,C,\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nMiddlemarch\n\n\...
7,45,C,\n\n\n\n\n ANNE OF GREEN GABL...


### 2.2 Convert Python Dataframe into Spark Dataframe

In [10]:
import re
import pyspark
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

#sc =  pyspark.SparkContext("local[*]", "Test Context")
sqlContext = SQLContext(spark)

In [11]:
df_Spark_classics = sqlContext.createDataFrame(classics_df) #Pyspark SQL dataframe

## 3.Get Fanfictions Corpus

### 3.1 Convert txt files into Python Dataframe

In [12]:
directory ="/content/drive/MyDrive/Distributed-Computing/data/fanfiction/" #Change according to path
# directory ="data/fanfiction/" #Change according to path
text_type = 'F'

fanfictions_df = pd.DataFrame(columns=['id', 'type', 'text'])

for filename in os.listdir(directory):
    file_ext = os.path.basename(filename).rsplit('.',1)[1] #Get file extension
    if file_ext == "txt":
        with open(directory + '/' + filename, 'r') as file:
            text_id = os.path.basename(filename).rsplit('.',1)[0]
            corpus = file.read()
            corpus = corpus.replace('Chapter', '')
            corpus = re.sub(';', ' ', corpus)
            fanfictions_df.loc[len(fanfictions_df.index)] = [text_id, text_type, corpus]

In [13]:
fanfictions_df

Unnamed: 0,id,type,text
0,fanfic_1536152,F,"1\nGoing back was the worst.I had hoped that,..."
1,fanfic_35367502,F,1\n“Lily there’s a boy at the door!”\nThe gin...
2,fanfic_33183868,F,1\n \n\n \n\nThere was something luminescent ...
3,fanfic_7441657,F,"1\nPrologue\nOctober 31, 1981\nThe view out t..."
4,fanfic_24025603,F,"1\nDisclaimer: I, by no means, claim to own a..."
5,fanfic_23824330,F,1\nAmy sits facing the window of her room. He...
6,fanfic_25042705,F,"1\n“But, really,” said Mrs. Bennet rather lou..."
7,fanfic_36819574,F,"1\nI was born in sunlight, and dragged into d..."
8,fanfic_8523001,F,1\nThere was such a cultural veil of secrecy ...


### 3.2 Convert Python Dataframe into Spark Dataframe

In [14]:
df_Spark_fanfictions = sqlContext.createDataFrame(fanfictions_df) #Pyspark SQL dataframe

## 4 Preprocess Texts

### 4.1 Create Preprocessing Pipeline

Create pipeline to preprocess the spark dataframe texts.

Each of these classes receive an input column and creates the output column.
At the end of the pipeline, we will have a dataframe with all of the columns that are created on the fly and their results.

The **last column** generated, in this case **token_features** is the one that has all the words after being preprocessed, removing stop words, etc.

In [15]:
#https://medium.com/spark-nlp/spark-nlp-101-document-assembler-500018f5f6b5
document = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document") \
    .setCleanupMode("shrink_full") #remove new lines and tabs, plus shrinking spaces and blank lines.

#https://nlp.johnsnowlabs.com/api/python/reference/autosummary/sparknlp.annotator.Tokenizer.html
token = Tokenizer()\
    .setInputCols(['document'])\
    .setOutputCol('token')

#https://nlp.johnsnowlabs.com/docs/en/annotators
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized") \
    .setLowercase(True) \
    .setCleanupPatterns(["""[^A-Za-z]"""]) # remove punctuations and alphanumeric chars

finisher = Finisher() \
    .setInputCols(["normalized"]) \
    .setOutputCols(["token_features"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

In [16]:
nlp_pipeline_lr = Pipeline(
        stages=[document, 
                token,
                normalizer,
                finisher])

### 4.2 Apply Pipeline to Spark Dataframes

#### 4.2.1 Classics

In [17]:
processed_classics_df = nlp_pipeline_lr.fit(df_Spark_classics).transform(df_Spark_classics)

#Show Token Features column
# processed_classics_df.select("token_features").show(truncate=200) 

#### 4.2.2 Fanfictions

In [18]:
processed_fanfictions_df = nlp_pipeline_lr.fit(df_Spark_fanfictions).transform(df_Spark_fanfictions)

#Show Token Features column
# processed_fanfictions_df.select("token_features").show(truncate=200) 

------------------------------------------------------------------------------------------------------------

At this point we have two preprocessed Spark Dataframes where each row belongs two one book. The column of interest is **"token_features"**, which contains all the **tokens** of the corpus.

1.   **processed_classics_df**: Contains the eiight classics (one per row)
2.   **processed_fanfictions_df**: Contains the eiight fanfictions (one per row)

--> From here, we can start doing additional processing like TF-IDF or other stuff to obtain the information we want.






In [19]:
from pyspark.sql.functions import concat_ws, split, col, concat, lit

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("temp").getOrCreate()
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col

Convert the token_features column from type [string] -> string for further processing

In [20]:
final_classics_df = processed_classics_df.select(['id', 'type', 'token_features'])
# final_classics_df = final_classics_df.withColumn("token_features", concat_ws(" ", col("token_features")))
# final_classics_df.show(truncate = 200)

In [21]:
final_fanfictions_df = processed_fanfictions_df.select(['id', 'type', 'token_features'])
# final_fanfictions_df = final_fanfictions_df.withColumn("token_features", concat_ws(" ", col("token_features")))
# final_fanfictions_df.show(truncate = 200)

In [22]:
final_df = final_classics_df.union(final_fanfictions_df)
# final_df.show(truncate = 500)

In [23]:
final_df.schema

StructType(List(StructField(id,StringType,true),StructField(type,StringType,true),StructField(token_features,ArrayType(StringType,true),true)))

### POS tagging

In [24]:
#Cut these??
# @udf()
# def get_tokens(lst):
#   tokens, tags = zip(*nltk.pos_tag(lst))
#   return list(tags)

# @udf()
# def get_tags(lst):
#   tokens, tags = zip(*nltk.pos_tag(lst))
#   return list(tokens)

In [25]:
@udf(returnType = StringType())
def get_type(id_string):
  doc_type = id_string.split('|')[1]
  return(doc_type)

@udf(returnType = StringType())
def get_id(id_string):
  id = id_string.split('|')[0]
  clean_id = id.split('_')[-1]
  return(clean_id)

@udf(returnType = StringType())
def get_token(pos_list):
  return(pos_list[0])

@udf(returnType = StringType())
def get_pos(pos_list):
  return(pos_list[1])

spark.udf.register("get_type" , get_type)
spark.udf.register("get_id" , get_id)
spark.udf.register("get_token" , get_token)
spark.udf.register("get_pos" , get_pos)

# Testing
# split_id_string1 = "fanfic_1536152|F"
# split_id_string2 = "1342|C"
# token_list_1 = ["hoped", "VBN"]

# print(get_type(split_id_string1))
# print(get_type(split_id_string2))
# get_id(split_id_string1)
# get_id(split_id_string2)
# get_token(token_list_1)
# get_pos(token_list_1)

<function __main__.get_pos>

Import and Setup NLTK

In [26]:
import nltk
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
# from nltk import word_tokenize

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

Restructure columns for ease of RDD mapping

In [27]:
final_df = final_df.select([concat(col('id'),lit('|'),col('type')).alias("id_type"), 'token_features'])

In [28]:
final_rdd = final_df.rdd

Map-Reduce Process: Term Frequency

In [29]:
map1 = final_rdd.flatMap(lambda x: [((x[0], word), 1) for word in nltk.pos_tag(x[1])])
map1.take(5)

[(('1342|C', ('pride', 'NN')), 1),
 (('1342|C', ('and', 'CC')), 1),
 (('1342|C', ('prejudice', 'NN')), 1),
 (('1342|C', ('by', 'IN')), 1),
 (('1342|C', ('jane', 'NN')), 1)]

In [30]:
reduce = map1.reduceByKey(lambda x, y: x + y)

In [31]:
reduce.take(1)

[(('1342|C', ('austen', 'NNS')), 1)]

In [32]:
tf=reduce.map(lambda x: (x[0][1],(x[0][0],x[1])))

In [33]:
tf.take(1)

[(('austen', 'NNS'), ('1342|C', 1))]

In [34]:
tf_final = tf.map(lambda x: (x[0][0], (x[0][1], x[1][0], x[1][1])))

In [35]:
tf_final.take(1)

[('austen', ('NNS', '1342|C', 1))]

In [36]:
map3=reduce.map(lambda x: (x[0][1],(x[0][0],x[1],1)))

In [37]:
map3.take(1)

[(('austen', 'NNS'), ('1342|C', 1, 1))]

In [38]:
map4=map3.map(lambda x:(x[0],x[1][2]))

In [39]:
map4.take(1)

[(('austen', 'NNS'), 1)]

Map-Reduce Process: Document Frequency

In [40]:
# Number of documents containing the word
reduce2=map4.reduceByKey(lambda x,y:x+y)

In [41]:
reduce2.take(1)

[(('going', 'VBG'), 17)]

Map-Reduce Process: IDF

In [42]:
import math
from pyspark.sql.functions import *
n = 17
idf=reduce2.map(lambda x: (x[0],math.log10(n/x[1])))

In [43]:
idf.take(1)

[(('going', 'VBG'), 0.0)]

In [44]:
idf_final = idf.map(lambda x: (x[0][0], (x[0][1], x[1])))

In [45]:
idf_final.take(1)

[('going', ('VBG', 0.0))]

Map-Reduce Process: TF-IDF computation

In [46]:
rdd=tf.join(idf)

In [47]:
rdd.take(1)

[(('hoped', 'VBN'), (('fanfic_1536152|F', 5), 0.054357662322592676))]

In [48]:
rdd=rdd.map(lambda x: (x[1][0][0],(x[0],x[1][0][1],x[1][1],x[1][0][1]*x[1][1])))

In [49]:
rdd=rdd.map(lambda x: (x[0],x[1][0],x[1][1],x[1][2],x[1][3]))

Map-Reduce Process: Final DataFrame

In [50]:
rdd_df = rdd.toDF(["DocumentId","Token","TF","IDF","TFIDF"])
rdd_df.createOrReplaceTempView("df") # create SQL table

In [51]:
rdd_df.show(5)

+----------------+------------+---+--------------------+--------------------+
|      DocumentId|       Token| TF|                 IDF|               TFIDF|
+----------------+------------+---+--------------------+--------------------+
|fanfic_1536152|F|[hoped, VBN]|  5|0.054357662322592676| 0.27178831161296335|
|fanfic_8523001|F|[hoped, VBN]| 10|0.054357662322592676|  0.5435766232259267|
|          1342|C|[hoped, VBN]|  7|0.054357662322592676| 0.38050363625814876|
|          1260|C|[hoped, VBN]|  1|0.054357662322592676|0.054357662322592676|
|          1905|C|[hoped, VBN]|  1|0.054357662322592676|0.054357662322592676|
+----------------+------------+---+--------------------+--------------------+
only showing top 5 rows



In [52]:
# Reformat results to have column id, type, token, part of speech, and tfidf value
results = spark.sql('select get_id(DocumentId) as id, get_type(DocumentId) as type, get_token(Token) as token, get_pos(Token) as pos, TFIDF as tfidf from df')
results.createOrReplaceTempView("results")

In [53]:
results.show(10)

+--------+----+-----+---+--------------------+
|      id|type|token|pos|               tfidf|
+--------+----+-----+---+--------------------+
| 1536152|   F|hoped|VBN| 0.27178831161296335|
| 8523001|   F|hoped|VBN|  0.5435766232259267|
|    1342|   C|hoped|VBN| 0.38050363625814876|
|    1260|   C|hoped|VBN|0.054357662322592676|
|    1905|   C|hoped|VBN|0.054357662322592676|
|33183868|   F|hoped|VBN| 0.16307298696777803|
|25042705|   F|hoped|VBN|  0.5435766232259267|
|36819574|   F|hoped|VBN|  0.2174306492903707|
|     514|   C|hoped|VBN| 0.16307298696777803|
|     113|   C|hoped|VBN| 0.16307298696777803|
+--------+----+-----+---+--------------------+
only showing top 10 rows



### Analysis of tfidf values

Generic Function

In [54]:
import plotly.express as px

def analyze_tfidf(word): 
  word_df = spark.sql(f"select id, token, type, tfidf from results where token = '{word}'")
  word_pd = word_df.toPandas()

  # Generate plotly graph
  fig = px.bar(word_pd, x="tfidf", y="id", color="type", title = f"TFIDF values for '{word}'")
  fig.update_layout(yaxis={'categoryorder':'total ascending'})
  fig.for_each_annotation( lambda a: a.update(text='Classics' if a.text.split('=')[1] == 'C' else 'Fanfiction'))     
  fig.show()

In [55]:
analyze_tfidf('wife')

In [56]:
analyze_tfidf('docile')

In [57]:
analyze_tfidf('marriage')

In [58]:
# baby, raise, nurture, angry, passive, docile, happy, sad, frustrated, beautiful, young, naive, hope, dream

Additional Exploration

Marriage

In [59]:
marriage_results = spark.sql("select id, token, pos, type, tfidf from results where token = 'marriage'")

In [60]:
marriage_results.show()

+--------+--------+---+----+-------------------+
|      id|   token|pos|type|              tfidf|
+--------+--------+---+----+-------------------+
|     768|marriage| NN|   C| 0.3025353506612982|
|      45|marriage| NN|   C| 0.1512676753306491|
|35367502|marriage| NN|   F|0.45380302599194733|
|     514|marriage| NN|   C| 0.7563383766532455|
|     145|marriage| NN|   C|  22.23634827360542|
|24025603|marriage| NN|   F| 2.1177474546290878|
|    1342|marriage| NN|   C|  9.983666571822841|
|    1260|marriage| NN|   C| 3.9329595585968766|
|    1905|marriage| NN|   C| 0.6050707013225964|
|25042705|marriage| NN|   F| 0.1512676753306491|
|36819574|marriage| NN|   F| 0.3025353506612982|
| 8523001|marriage| NN|   F|  9.681131221161543|
|     145|marriage| VB|   C| 2.2599829999758345|
|    1260|marriage| VB|   C| 0.7533276666586115|
|25042705|marriage| VB|   F| 0.7533276666586115|
|24025603|marriage|VBP|   F| 1.2304489213782739|
+--------+--------+---+----+-------------------+



In [61]:
marriage = marriage_results.toPandas()

In [62]:
marriage

Unnamed: 0,id,token,pos,type,tfidf
0,768,marriage,NN,C,0.302535
1,45,marriage,NN,C,0.151268
2,35367502,marriage,NN,F,0.453803
3,514,marriage,NN,C,0.756338
4,145,marriage,NN,C,22.236348
5,24025603,marriage,NN,F,2.117747
6,1342,marriage,NN,C,9.983667
7,1260,marriage,NN,C,3.93296
8,1905,marriage,NN,C,0.605071
9,25042705,marriage,NN,F,0.151268


In [63]:
import plotly.express as px
fig = px.bar(marriage, x="tfidf", y="id", color="type", title = "'Marriage' TFIDF values")
fig.update_layout(yaxis={'categoryorder':'total ascending'})
fig.for_each_annotation( lambda a: a.update(text='Classics' if a.text.split('=')[1] == 'C' else 'Fanfiction'))     
fig.show()

Wife

In [64]:
wife_results = spark.sql("select id, token, type, tfidf from results where token = 'wife'")

In [65]:
wife_results.show()

+--------+-----+----+--------------------+
|      id|token|type|               tfidf|
+--------+-----+----+--------------------+
|     514| wife|   C|  0.9741707327269183|
|     113| wife|   C| 0.18430257105644401|
|     145| wife|   C|  3.5017488500724365|
| 7441657| wife|   F| 0.05265787744469829|
|24025603| wife|   F|  3.2121305241265956|
|     768| wife|   C|  0.6055655906140304|
|      45| wife|   C|  0.5792366518916812|
| 1536152| wife|   F|  0.2369604485011423|
| 8523001| wife|   F|  1.8956835880091385|
|    1342| wife|   C|  1.1584733037833623|
|    1260| wife|   C|   2.027328281620884|
|    1905| wife|   C|  0.2896183259458406|
|33183868| wife|   F| 0.13164469361174572|
|23824330| wife|   F|0.026328938722349145|
|25042705| wife|   F|  0.5792366518916812|
|36819574| wife|   F|  0.2896183259458406|
+--------+-----+----+--------------------+



In [66]:
wife = wife_results.toPandas()

In [67]:
import plotly.express as px
fig = px.bar(wife, x="tfidf", y="id", color="type", title = "'Wife' TFIDF values")
fig.update_layout(yaxis={'categoryorder':'total ascending'})
fig.for_each_annotation( lambda a: a.update(text='Classics' if a.text.split('=')[1] == 'C' else 'Fanfiction'))     
fig.show()

Generic Function

In [68]:
import plotly.express as px

def analyze_tfidf(word): 
  word_df = spark.sql(f"select id, token, type, tfidf from results where token = '{word}'")
  word_pd = word_df.toPandas()

  # Generate plotly graph
  fig = px.bar(word_pd, x="tfidf", y="id", color="type", title = f"TFIDF values for '{word}'")
  fig.update_layout(yaxis={'categoryorder':'total ascending'})
  fig.for_each_annotation( lambda a: a.update(text='Classics' if a.text.split('=')[1] == 'C' else 'Fanfiction'))     
  fig.show()

In [69]:
analyze_tfidf('wife')

In [70]:
# baby, raise, nurture, angry, passive, docile, happy, sad, frustrated, beautiful, young, naive, hope, dream

In [71]:
analyze_tfidf('dream')