In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode,regexp_replace,col,lower, monotonically_increasing_id,lit, when

In [2]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

# Preprocess 

## Shakespare

In [3]:
# Read file
shakespeare = spark.read.text("shakespeare.txt")

# Remove whitespace rows
shakespeare = shakespeare.filter("value != ''")

# Added new column "words" which splits the content of each row into arrays of strings
shakespeare = shakespeare.withColumn("words", split(shakespeare["value"], " "))

# Convert those words to each rows
shakespeare = shakespeare.select("value", explode(shakespeare["words"]).alias("word"))
shakespeare = shakespeare.select("word").filter("word != ''")

# # Remove punctuation
shakespeare = shakespeare.withColumn("no_punc", regexp_replace(col("word"), r'[^\w\s]', ''))
shakespeare = shakespeare.select("no_punc")

# # Generelize to all lower case
shakespeare = shakespeare.select("no_punc", lower(shakespeare["no_punc"]))
shakespeare = shakespeare.select("lower(no_punc)")
shakespeare = shakespeare.withColumnRenamed("lower(no_punc)","value")

shakespeare.show()

+-----------+
|      value|
+-----------+
|       1609|
|        the|
|    sonnets|
|         by|
|    william|
|shakespeare|
|          1|
|       from|
|    fairest|
|  creatures|
|         we|
|     desire|
|   increase|
|       that|
|    thereby|
|    beautys|
|       rose|
|      might|
|      never|
|        die|
+-----------+
only showing top 20 rows



## Verbs

In [4]:
verbs = spark.read.text("all_verbs.txt")

In [5]:
verbs.show()

+---------+
|    value|
+---------+
|    abash|
|  abashed|
|  abashed|
|  abashes|
| abashing|
|    abate|
|   abated|
|   abated|
|   abates|
|  abating|
|    abide|
|    abode|
|    abode|
|   abides|
|  abiding|
|   absorb|
| absorbed|
| absorbed|
|  absorbs|
|absorbing|
+---------+
only showing top 20 rows



# Manipulate Shakespeare

In [6]:
# Filter based on verbs
filtered_shakes = shakespeare.join(verbs,"value","inner")
filtered_shakes = filtered_shakes.withColumn("count", lit(1))

In [7]:
# Group by verbs to aggregate verb occurences & rename column
filtered_shakes = filtered_shakes.groupBy("value").sum("count")
filtered_shakes = filtered_shakes.withColumnRenamed("sum(count)","count")

In [8]:
filtered_shakes.filter("value == 'float'").show()

+-----+-----+
|value|count|
+-----+-----+
|float|    2|
+-----+-----+



## Verb dictionary

In [9]:
# Give index on each dictionary & remove any duplicates
verb_dict = spark.read.text("verb_dict.txt")
verb_dict = verb_dict.withColumn("row_id", monotonically_increasing_id())
verb_dict = verb_dict.withColumn("split_line", split(verb_dict["value"], ","))
verb_dict = verb_dict.select("row_id", "split_line")
verb_dict = verb_dict.selectExpr("row_id", "explode(split_line) as form")
verb_dict = verb_dict.dropDuplicates(['row_id', 'form'])
verb_dict.filter(verb_dict.form  == "be").show(truncate=False)

+------+----+
|row_id|form|
+------+----+
|50    |be  |
|49    |be  |
|47    |be  |
|48    |be  |
+------+----+



In [10]:
# Combine all "be" combinations to one index
condition = (col("row_id") == 49)
updated_df = verb_dict.withColumn("row_id", when(condition, 50).otherwise(col("row_id")))
condition = (col("row_id") == 47)
updated_df = updated_df.withColumn("row_id", when(condition, 50).otherwise(col("row_id")))
condition = (col("row_id") == 48)
updated_df = updated_df.withColumn("row_id", when(condition, 50).otherwise(col("row_id")))

In [11]:
#Check
updated_df.filter(updated_df.row_id  == 49).show(truncate=False)
updated_df.filter(updated_df.row_id  == 50).show(truncate=False)

+------+----+
|row_id|form|
+------+----+
+------+----+

+------+-----+
|row_id|form |
+------+-----+
|50    |be   |
|50    |be   |
|50    |were |
|50    |being|
|50    |was  |
|50    |be   |
|50    |being|
|50    |been |
|50    |been |
|50    |was  |
|50    |been |
|50    |be   |
|50    |am   |
|50    |been |
|50    |are  |
|50    |is   |
|50    |being|
|50    |being|
+------+-----+



In [12]:
# Delete duplicates & rename
updated_df = updated_df.dropDuplicates(['row_id', 'form'])
updated_df = updated_df.withColumnRenamed("form","value")
updated_df.filter(updated_df.row_id  == 50).show(truncate=False)

+------+-----+
|row_id|value|
+------+-----+
|50    |be   |
|50    |am   |
|50    |were |
|50    |being|
|50    |is   |
|50    |was  |
|50    |been |
|50    |are  |
+------+-----+



In [13]:
# Count verb occurences
count = shakespeare.join(updated_df,"value","inner")
count.filter(count.value  == "die").show(truncate=False)
result = count.groupBy("row_id").count()
result.show()

+-----+------+
|value|row_id|
+-----+------+
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
|die  |242   |
+-----+------+
only showing top 20 rows

+------+-----+
|row_id|count|
+------+-----+
|   474|   15|
|    26|   26|
|   964|    5|
|    29|   21|
|   418|   13|
|   541|   10|
|    65|  226|
|   558|   29|
|   293|   58|
|   938|   15|
|   270|  278|
|   730|   12|
|   222|   73|
|   278|  198|
|   720|   29|
|   243|    8|
|   926|   62|
|    19|   40|
|   965|   84|
|    54|    6|
+------+-----+
only showing top 20 rows



In [14]:
# Make lookup table
lookup = spark.read.text("verb_dict.txt")
lookup = lookup.withColumn("row_id", monotonically_increasing_id())
split_col = split(col("value"), ",")
lookup = lookup.withColumn("first_value", split_col.getItem(0))
lookup = lookup.select(["row_id","first_value"])
lookup = lookup.withColumnRenamed("first_value","value")

In [15]:
lookup.show()

+------+---------+
|row_id|    value|
+------+---------+
|     0|    abash|
|     1|    abate|
|     2|    abide|
|     3|   absorb|
|     4|   accept|
|     5|accompany|
|     6|     ache|
|     7|  achieve|
|     8|  acquire|
|     9|      act|
|    10|      add|
|    11|  address|
|    12|   adjust|
|    13|   admire|
|    14|    admit|
|    15|   advise|
|    16|   afford|
|    17|    agree|
|    18|   alight|
|    19|    allow|
+------+---------+
only showing top 20 rows



In [16]:
#replace index with lookup
final = lookup.join(result,"row_id","inner")
final = final.select(["value","count"])

In [17]:
final.show()

+---------+-----+
|    value|count|
+---------+-----+
|   insult|   15|
|    apply|   26|
|      vie|    5|
|    argue|   21|
|  imitate|   13|
|   misuse|   10|
|  beseech|  226|
|      nod|   29|
|     envy|   58|
|translate|   15|
|    drink|  278|
| sanctify|   12|
|     deal|   73|
|      eat|  198|
|sacrifice|   29|
|   differ|    8|
|   thrive|   62|
|    allow|   40|
|     view|   84|
| beautify|    6|
+---------+-----+
only showing top 20 rows



In [25]:
final.filter(final.value  == "float").show(truncate=False)

+-----+-----+
|value|count|
+-----+-----+
|float|5    |
+-----+-----+



In [19]:
final.rdd.collect()

[Row(value='insult', count=15),
 Row(value='apply', count=26),
 Row(value='vie', count=5),
 Row(value='argue', count=21),
 Row(value='imitate', count=13),
 Row(value='misuse', count=10),
 Row(value='beseech', count=226),
 Row(value='nod', count=29),
 Row(value='envy', count=58),
 Row(value='translate', count=15),
 Row(value='drink', count=278),
 Row(value='sanctify', count=12),
 Row(value='deal', count=73),
 Row(value='eat', count=198),
 Row(value='sacrifice', count=29),
 Row(value='differ', count=8),
 Row(value='thrive', count=62),
 Row(value='allow', count=40),
 Row(value='view', count=84),
 Row(value='beautify', count=6),
 Row(value='free', count=189),
 Row(value='endure', count=81),
 Row(value='split', count=23),
 Row(value='earn', count=11),
 Row(value='illustrate', count=3),
 Row(value='rain', count=62),
 Row(value='learn', count=132),
 Row(value='contend', count=21),
 Row(value='charge', count=225),
 Row(value='chase', count=37),
 Row(value='heave', count=15),
 Row(value='exclud

In [20]:
final.show()

+---------+-----+
|    value|count|
+---------+-----+
|   insult|   15|
|    apply|   26|
|      vie|    5|
|    argue|   21|
|  imitate|   13|
|   misuse|   10|
|  beseech|  226|
|      nod|   29|
|     envy|   58|
|translate|   15|
|    drink|  278|
| sanctify|   12|
|     deal|   73|
|      eat|  198|
|sacrifice|   29|
|   differ|    8|
|   thrive|   62|
|    allow|   40|
|     view|   84|
| beautify|    6|
+---------+-----+
only showing top 20 rows



In [23]:
final.sort('count', ascending=[False]).show()

+-----+-----+
|value|count|
+-----+-----+
|   be|26727|
| have| 7848|
|   do| 6416|
| come| 3610|
| make| 2892|
|   go| 2575|
| love| 2501|
|  let| 2384|
|  say| 2356|
| know| 2251|
|enter| 2118|
|  see| 2062|
| give| 1952|
| like| 1757|
| take| 1594|
|speak| 1541|
|think| 1504|
| tell| 1356|
| hear| 1336|
|  eye| 1109|
+-----+-----+
only showing top 20 rows



In [24]:
type(final)

pyspark.sql.dataframe.DataFrame