In [1]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# create spark contexts

sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

In [2]:
spark = SparkSession \
    .builder \
    .master("local[*]")\
    .appName("blogtext_preprocessing") \
    .getOrCreate()

In [3]:
blogtext_df = spark.read.csv('/spring2021/project1/blogtext.csv',header=True, inferSchema='true')


In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import nltk
import preproc as pp

# preproc is a python file that has functions defined to do the "text" column preprocessing.

# Register all the functions in Preproc with Spark Context

remove_stops_udf = udf(pp.remove_stops, StringType())
remove_features_udf = udf(pp.remove_features, StringType())
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
lemmatize_udf = udf(pp.lemmatize, StringType())

In [5]:
# get the raw columns
raw_cols = blogtext_df.columns

rm_stops_df = blogtext_df.select(raw_cols)\
                   .withColumn("stop_text", remove_stops_udf(blogtext_df["text"]))

In [6]:
from functools import reduce
from pyspark.sql import DataFrame

rm_stops_df = reduce(DataFrame.drop, ['text'], rm_stops_df)
raw_cols = rm_stops_df.columns

In [7]:
rm_features_df = rm_stops_df.select(raw_cols)\
                            .withColumn("feat_text", \
                            remove_features_udf(rm_stops_df["stop_text"]))

In [8]:
rm_features_df = reduce(DataFrame.drop, ['stop_text'], rm_features_df)

raw_cols = rm_features_df.columns

In [9]:
tagged_df = rm_features_df.select(raw_cols) \
                          .withColumn("tagged_text", \
                           tag_and_remove_udf(rm_features_df.feat_text))

In [10]:
tagged_df = reduce(DataFrame.drop, ['feat_text'], tagged_df)

raw_cols = tagged_df.columns

In [11]:
lemm_df = tagged_df.select(raw_cols) \
                   .withColumn("text", lemmatize_udf(tagged_df["tagged_text"]))

In [12]:
lemm_df = reduce(DataFrame.drop, ['tagged_text'], lemm_df)

raw_cols = lemm_df.columns

In [13]:
#lemm_df.show(5)

In [14]:
blog_df = lemm_df.select("text")
blog_df.show(5)

+--------------------+
|                text|
+--------------------+
|info find page pd...|
|team member drewe...|
|het kader van ker...|
|           test test|
|thanks yahoo tool...|
+--------------------+
only showing top 5 rows



In [15]:
blog_rdd = blog_df.rdd.map(list)
blog_rdd.take(2)

[['info find page pdf file wait untill team leader process learns html'],
 ['team member drewes van der laag urllink mail ruiyu xie urllink mail bryan aaldering urllink mail']]

In [16]:
import re

blog_rdd = blog_rdd.map(lambda x: re.sub('\[|\]', '', str(x)))
blog_rdd.take(2)

["'info find page pdf file wait untill team leader process learns html'",
 "'team member drewes van der laag urllink mail ruiyu xie urllink mail bryan aaldering urllink mail'"]

In [17]:
blog_rdd = blog_rdd.flatMap(lambda satir: satir.split(" "))
blog_rdd.take(2)


["'info", 'find']

In [18]:
blog_rdd_count = blog_rdd.map(lambda word: (word, 1))
blog_rdd_count.take(2)

[("'info", 1), ('find', 1)]

In [19]:
blog_rdd_count_RBK = blog_rdd_count.reduceByKey(lambda x, y: (x + y))
blog_rdd_count_RBK.take(5)

[('process', 14146),
 ('scott', 5485),
 ('format', 3283),
 ('step', 22933),
 ('risk', 6768)]

In [20]:
blog_rdd1 = blog_rdd_count_RBK.map(lambda x: (x[1],x[0])).sortByKey(ascending=False)
blog_rdd1.take(5)

[(762164, 'get'),
 (499790, 'go'),
 (428069, 'know'),
 (424555, 'time'),
 (419093, 'think')]

In [21]:
blog_df1 = spark.createDataFrame(blog_rdd1)
blog_df1.show(5)


+------+-----+
|    _1|   _2|
+------+-----+
|762164|  get|
|499790|   go|
|428069| know|
|424555| time|
|419093|think|
+------+-----+
only showing top 5 rows



In [22]:
blog_df2 = blog_df1.select("_2")
blog_df2 = blog_df2.withColumnRenamed("_2", "blog_words")
blog_df2.show(5)


+----------+
|blog_words|
+----------+
|       get|
|        go|
|      know|
|      time|
|     think|
+----------+
only showing top 5 rows



In [23]:
pride_rdd = sc.textFile("/spring2021/project1/ddss/PrideClean.txt")


In [24]:
def clean_string(x):
    lower_string = x.lower()
    punctuation = '!"#$%&\'“”()*+,./:'';’<=>?@[\\]^_`{|}~-—1234567890'
    for i in punctuation:
        lower_string = lower_string.replace(i, '')
    return lower_string


In [25]:
pride_rdd = pride_rdd.map(clean_string)
pride_rdd = pride_rdd.flatMap(lambda satir: satir.split(" "))
pride_rdd = pride_rdd.filter(lambda x: x != '')
pride_rdd_count = pride_rdd.map(lambda word: (word, 1))
pride_rdd_count_RBK = pride_rdd_count.reduceByKey(lambda x, y: (x + y))
pride_rdd_count_RBK = pride_rdd_count_RBK.map(lambda x:(x[1], x[0]))
print(pride_rdd_count_RBK.take(10))


[(61, 'chapter'), (858, 'is'), (12, 'single'), (1862, 'in'), (9, 'possession'), (3599, 'of'), (182, 'good'), (308, 'must'), (134, 'however'), (57, 'known')]


In [26]:
from nltk.corpus import stopwords

stopwords = stopwords.words('english')
newStopWords = ['mrs','mr','could','would','though','said','one','like']
stopwords.extend(newStopWords)

#stopwords

In [27]:
pride_rdd_count_RBK = pride_rdd_count_RBK.filter(lambda x: x[0] not in stopwords).sortByKey()
pride_rdd_count_RBK.take(10)

[(1, 'rightful'),
 (1, 'grownup'),
 (1, 'newcomers'),
 (1, 'overscrupulous'),
 (1, 'vexing'),
 (1, 'solace'),
 (1, 'disclosed'),
 (1, 'hat'),
 (1, 'coughing'),
 (1, 'stress')]

In [28]:
pride_df = spark.createDataFrame(pride_rdd_count_RBK)
pride_df.show(5)


+---+--------------+
| _1|            _2|
+---+--------------+
|  1|      rightful|
|  1|       grownup|
|  1|     newcomers|
|  1|overscrupulous|
|  1|        vexing|
+---+--------------+
only showing top 5 rows



In [29]:
pride_df1 = pride_df.select("_2")
pride_df1 = pride_df1.withColumnRenamed("_2", "pride_words")
pride_df1.show(5)


+--------------+
|   pride_words|
+--------------+
|      rightful|
|       grownup|
|     newcomers|
|overscrupulous|
|        vexing|
+--------------+
only showing top 5 rows



In [30]:
pride_blog = pride_df1.crossJoin(blog_df2)
pride_blog.show(5)


+-----------+----------+
|pride_words|blog_words|
+-----------+----------+
|   rightful|       get|
|   rightful|        go|
|   rightful|      know|
|   rightful|      time|
|   rightful|     think|
+-----------+----------+
only showing top 5 rows



In [31]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
pride_blog = pride_blog.withColumn("levenshtein", F.levenshtein(F.col("pride_words"), F.col("blog_words")))
pride_blog.show()


+-----------+----------+-----------+
|pride_words|blog_words|levenshtein|
+-----------+----------+-----------+
|   rightful|       get|          6|
|   rightful|        go|          7|
|   rightful|      know|          8|
|   rightful|      time|          7|
|   rightful|     think|          7|
|   rightful|       say|          8|
|   rightful|      nbsp|          8|
|   rightful|      make|          8|
|   rightful|       day|          8|
|   rightful|     thing|          7|
|   rightful|       see|          8|
|   rightful|      want|          7|
|   rightful|    people|          8|
|   rightful|   urllink|          8|
|   rightful|      come|          8|
|   rightful|      good|          7|
|   rightful|      take|          7|
|   rightful|      work|          8|
|   rightful|      look|          8|
|   rightful|      love|          8|
+-----------+----------+-----------+
only showing top 20 rows



In [32]:
pride_blog.filter("levenshtein < 2").show(20)


+-----------+----------+-----------+
|pride_words|blog_words|levenshtein|
+-----------+----------+-----------+
|  newcomers|  newcomer|          1|
|     solace|    solace|          0|
|  disclosed|  disclose|          1|
|        hat|       eat|          1|
|        hat|      hate|          1|
|        hat|       hit|          1|
|        hat|       hot|          1|
|        hat|       cat|          1|
|        hat|       fat|          1|
|        hat|       hat|          0|
|        hat|       wat|          1|
|        hat|      chat|          1|
|        hat|      heat|          1|
|        hat|       sat|          1|
|        hat|       dat|          1|
|        hat|       tat|          1|
|        hat|       pat|          1|
|        hat|       rat|          1|
|        hat|       bat|          1|
|        hat|       hav|          1|
+-----------+----------+-----------+
only showing top 20 rows



In [33]:
moby_rdd = sc.textFile("/spring2021/project1/ddss/MobyCleanChaptersOnly.txt")

In [34]:
moby_rdd = moby_rdd.map(clean_string)
moby_rdd = moby_rdd.flatMap(lambda satir: satir.split(" "))
moby_rdd = moby_rdd.filter(lambda x: x != '')
moby_rdd_count = moby_rdd.map(lambda word: (word, 1))
moby_rdd_count_RBK = moby_rdd_count.reduceByKey(lambda x, y: (x + y))
moby_rdd_count_RBK = moby_rdd_count_RBK.map(lambda x:(x[1], x[0]))
print(moby_rdd_count_RBK.take(10))

[(172, 'chapter'), (54, 'call'), (93, 'years'), (77, 'mind'), (313, 'long'), (563, 'no'), (10, 'money'), (4050, 'in'), (7, 'purse'), (49, 'particular')]


In [35]:
moby_rdd_count_RBK = moby_rdd_count_RBK.filter(lambda x: x[0] not in stopwords).sortByKey()
moby_rdd_count_RBK.take(10)

[(1, 'regulating'),
 (1, 'circulation'),
 (1, 'offthen'),
 (1, 'philosophical'),
 (1, 'manhattoes'),
 (1, 'coenties'),
 (1, 'seeposted'),
 (1, 'spiles'),
 (1, 'lath'),
 (1, 'plastertied')]

In [36]:
moby_df = spark.createDataFrame(moby_rdd_count_RBK)
moby_df.show(5)

+---+-------------+
| _1|           _2|
+---+-------------+
|  1|   regulating|
|  1|  circulation|
|  1|      offthen|
|  1|philosophical|
|  1|   manhattoes|
+---+-------------+
only showing top 5 rows



In [37]:
moby_df1 = moby_df.select("_2")
moby_df1 = moby_df1.withColumnRenamed("_2", "moby_words")
moby_df1.show(5)

+-------------+
|   moby_words|
+-------------+
|   regulating|
|  circulation|
|      offthen|
|philosophical|
|   manhattoes|
+-------------+
only showing top 5 rows



In [38]:
moby_blog = moby_df1.crossJoin(blog_df2)
moby_blog.show(5)

+----------+----------+
|moby_words|blog_words|
+----------+----------+
|regulating|       get|
|regulating|        go|
|regulating|      know|
|regulating|      time|
|regulating|     think|
+----------+----------+
only showing top 5 rows



In [39]:
moby_blog = moby_blog.withColumn("levenshtein", F.levenshtein(F.col("moby_words"), F.col("blog_words")))
moby_blog.show()

+----------+----------+-----------+
|moby_words|blog_words|levenshtein|
+----------+----------+-----------+
|regulating|       get|          8|
|regulating|        go|          9|
|regulating|      know|         10|
|regulating|      time|          8|
|regulating|     think|          8|
|regulating|       say|          9|
|regulating|      nbsp|         10|
|regulating|      make|          9|
|regulating|       day|          9|
|regulating|     thing|          7|
|regulating|       see|          9|
|regulating|      want|          8|
|regulating|    people|          8|
|regulating|   urllink|          7|
|regulating|      come|         10|
|regulating|      good|          9|
|regulating|      take|          9|
|regulating|      work|         10|
|regulating|      look|          9|
|regulating|      love|          9|
+----------+----------+-----------+
only showing top 20 rows



In [40]:
moby_blog.filter("levenshtein < 2").show()

+-------------+-------------+-----------+
|   moby_words|   blog_words|levenshtein|
+-------------+-------------+-----------+
|  circulation|  circulation|          0|
|philosophical|philosophical|          0|
|         lath|         late|          1|
|         lath|         math|          1|
|         lath|         path|          1|
|         lath|         bath|          1|
|         lath|          lah|          1|
|         lath|         lash|          1|
|         lath|         hath|          1|
|         lath|         lata|          1|
|         lath|        latch|          1|
|         lath|         oath|          1|
|     counters|      counter|          1|
|        desks|         desk|          1|
|        shady|        shade|          1|
|        shady|        shady|          0|
|        shady|        shaky|          1|
|      attract|      attract|          0|
|         dale|         date|          1|
|         dale|         sale|          1|
+-------------+-------------+-----

In [41]:
gatsby_rdd = sc.textFile("/spring2021/project1/ddss/GatsbyClean.txt")

In [42]:
gatsby_rdd = gatsby_rdd.map(clean_string)
gatsby_rdd = gatsby_rdd.flatMap(lambda satir: satir.split(" "))
gatsby_rdd = gatsby_rdd.filter(lambda x: x != '')
gatsby_rdd_count = gatsby_rdd.map(lambda word: (word, 1))
gatsby_rdd_count_RBK = gatsby_rdd_count.reduceByKey(lambda x, y: (x + y))
gatsby_rdd_count_RBK = gatsby_rdd_count_RBK.map(lambda x:(x[1], x[0]))
print(gatsby_rdd_count_RBK.take(10))

[(37, 'once'), (56, 'again'), (3, 'wear'), (7, 'gold'), (5, 'hat'), (10, 'move'), (392, 'her'), (2, 'bounce'), (17, 'high'), (7, 'till')]


In [43]:
gatsby_rdd_count_RBK = gatsby_rdd_count_RBK.filter(lambda x: x[0] not in stopwords).sortByKey()
gatsby_rdd_count_RBK.take(10)

[(1, 'highbouncing'),
 (1, 'parke'),
 (1, 'dinvilliers'),
 (1, 'advantages'),
 (1, 'inclined'),
 (1, 'natures'),
 (1, 'victim'),
 (1, 'veteran'),
 (1, 'bores'),
 (1, 'abnormal')]

In [44]:
gatsby_df = spark.createDataFrame(gatsby_rdd_count_RBK)
gatsby_df.show(5)

+---+------------+
| _1|          _2|
+---+------------+
|  1|highbouncing|
|  1|       parke|
|  1| dinvilliers|
|  1|  advantages|
|  1|    inclined|
+---+------------+
only showing top 5 rows



In [45]:
gatsby_df1 = gatsby_df.select("_2")
gatsby_df1 = gatsby_df1.withColumnRenamed("_2", "gatsby_words")
gatsby_df1.show(5)

+------------+
|gatsby_words|
+------------+
|highbouncing|
|       parke|
| dinvilliers|
|  advantages|
|    inclined|
+------------+
only showing top 5 rows



In [46]:
gatsby_blog = gatsby_df1.crossJoin(blog_df2)
gatsby_blog.show(5)

+------------+----------+
|gatsby_words|blog_words|
+------------+----------+
|highbouncing|       get|
|highbouncing|        go|
|highbouncing|      know|
|highbouncing|      time|
|highbouncing|     think|
+------------+----------+
only showing top 5 rows



In [47]:
gatsby_blog = gatsby_blog.withColumn("levenshtein", F.levenshtein(F.col("gatsby_words"), F.col("blog_words")))
gatsby_blog.show()

+------------+----------+-----------+
|gatsby_words|blog_words|levenshtein|
+------------+----------+-----------+
|highbouncing|       get|         11|
|highbouncing|        go|         10|
|highbouncing|      know|         11|
|highbouncing|      time|         11|
|highbouncing|     think|          9|
|highbouncing|       say|         12|
|highbouncing|      nbsp|         11|
|highbouncing|      make|         12|
|highbouncing|       day|         12|
|highbouncing|     thing|          8|
|highbouncing|       see|         12|
|highbouncing|      want|         11|
|highbouncing|    people|         11|
|highbouncing|   urllink|         10|
|highbouncing|      come|         11|
|highbouncing|      good|         10|
|highbouncing|      take|         12|
|highbouncing|      work|         11|
|highbouncing|      look|         11|
|highbouncing|      love|         11|
+------------+----------+-----------+
only showing top 20 rows



In [48]:
gatsby_blog.filter("levenshtein < 2").show()

+------------+-----------+-----------+
|gatsby_words| blog_words|levenshtein|
+------------+-----------+-----------+
|       parke|       park|          1|
|       parke|     parker|          1|
|       parke|     parked|          1|
|  advantages|  advantage|          1|
|    inclined|    incline|          1|
|    inclined|   inclined|          0|
|     natures|     nature|          1|
|      victim|     victim|          0|
|     veteran|    veteran|          0|
|       bores|       bore|          1|
|       bores|      bored|          1|
|       bores|      bore'|          1|
|    abnormal|   abnormal|          0|
|     appears|     appear|          1|
|  politician| politician|          0|
| confidences| confidence|          1|
| fundamental|fundamental|          0|
|   tolerance|  tolerance|          0|
|    glimpses|    glimpse|          1|
|      exempt|     exempt|          0|
+------------+-----------+-----------+
only showing top 20 rows

