In [56]:
# Set the PySpark environment variables
import os
os.environ['SPARK_HOME'] = "/home/rajesh/CSL7100/PySpark/spark-3.4.2-bin-hadoop3"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [57]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc

In [58]:
spark = SparkSession.builder \
    .appName("DataFrame-Gutenberg-Network") \
    .master("local[6]") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "24") \
    .getOrCreate()

### Using RDDs

In [59]:
from pyspark.sql.functions import regexp_extract

books_df = (
    spark.sparkContext
         .wholeTextFiles("/home/rajesh/CSL7100/Assignment1/data/D184MB/*.txt") #reads the whole text files from specified path
         .toDF(["file_path", "text"]) #convert RDD to data frame
         .withColumn(
             "file_name",  # add a new column calle file_name and extract it using regular expression
             regexp_extract("file_path", r"([^/]+$)", 1)
         )
         .select("file_name", "text") #keep file_name and text column
)



In [60]:
from pyspark.sql.functions import regexp_extract, col

#create a new data frame with columns
books_df = (
    books_df
    .withColumn("title",         #create
        regexp_extract("text", r"(?i)Title:\s*(.*)", 1)
    )
    .withColumn("release_date",
        regexp_extract("text", r"(?i)Release Date:\s*(.*)", 1)
    )
    .withColumn("language",
        regexp_extract("text", r"(?i)Language:\s*(.*)", 1)
    )
    .withColumn("encoding",
        regexp_extract("text", r"(?i)Character set encoding:\s*(.*)", 1)
    )
    .select("file_name", "title", "release_date", "language", "encoding")
)

books_df.show(truncate=False)

[Stage 5:>                                                          (0 + 1) / 1]

+---------+---------------------------------------------------+----------------------------+--------+---------------------+
|file_name|title                                              |release_date                |language|encoding             |
+---------+---------------------------------------------------+----------------------------+--------+---------------------+
|10.txt   |The King James Bible                               |March 2, 2011 [EBook #10]   |English |ASCII                |
|101.txt  |Hacker Crackdown                                   |January, 1994               |English |ASCII                |
|102.txt  |The Tragedy of Pudd'nhead Wilson                   |January, 1994               |English |ASCII                |
|103.txt  |Around the World in 80 Days                        |May 15, 2008 [EBook #103]   |English |ASCII                |
|104.txt  |Franklin Delano Roosevelt's First Inaugural Address|May 14, 2008 [EBook #104]   |English |ASCII                |
|105.txt

26/02/13 16:12:36 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 5 (TID 5): Attempting to kill Python Worker
                                                                                

In [48]:
books_df.printSchema()


root
 |-- file_name: string (nullable = true)
 |-- text: string (nullable = true)



In [49]:
row = books_df.first()

print("File name:", row.file_name)
print("Text preview:\n", row.text[:100])   # first 1000 chars only

[Stage 1:>                                                          (0 + 1) / 1]

File name: 10.txt
Text preview:
 The Project Gutenberg EBook of The King James Bible

This eBook is for the use of anyone anywhere 


26/02/13 16:01:36 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1 (TID 1): Attempting to kill Python Worker
                                                                                

In [50]:
#import builtin function for cleaning and formating the text
from pyspark.sql.functions import (
    regexp_replace,
    lower,
    col,
    split,
)
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import split, expr

In [51]:
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import split, expr

books_clean = (
    books_df
    .withColumn(
        "clean_text",
        regexp_replace(                  #remove the header
            col("text"),
            r"(?is)^.*?\*\*\*\s*START OF.*?\*\*\*",
            ""
        )
    )
    .withColumn(
        "clean_text",
        regexp_replace(                 #remove the footer
            col("clean_text"),
            r"(?is)\*\*\*\s*END OF.*?\*\*\*.*$",
            ""
        )
    )
    .withColumn("clean_text", lower(col("clean_text")))         #convert the text to lower case
    .withColumn(
        "clean_text",
        regexp_replace(col("clean_text"), r"[^a-z\s]", " ")     #remove the punctuations
    )
    .withColumn("words", split(col("clean_text"), r"\s+"))      #tokenize into words array
)

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="tokens")
books_clean = remover.transform(books_clean)

books_clean = books_clean.select("file_name", "tokens")  # keep file_name and tokenized words


In [9]:
print("book count = ", books_clean.count())
books_clean.show(5, truncate=100)

                                                                                

book count =  425


26/02/13 14:27:00 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 5 (TID 5): Attempting to kill Python Worker
                                                                                

+---------+----------------------------------------------------------------------------------------------------+
|file_name|                                                                                              tokens|
+---------+----------------------------------------------------------------------------------------------------+
|   10.txt|[, old, testament, king, james, version, bible, first, book, moses, called, genesis, beginning, g...|
|  101.txt|[, hacker, crackdown, law, disorder, electronic, frontier, bruce, sterling, contents, preface, el...|
|  102.txt|[, produced, anonymous, volunteer, tragedy, pudd, nhead, wilson, mark, twain, whisper, reader, ch...|
|  103.txt|[, around, world, eighty, days, contents, chapter, phileas, fogg, passepartout, accept, one, mast...|
|  104.txt|[, inaugural, address, franklin, delano, roosevelt, given, washington, d, c, march, th, president...|
+---------+-------------------------------------------------------------------------------------

In [10]:
books_df.unpersist()

DataFrame[file_name: string, text: string]

In [12]:
books_df.unpersist(blocking=True) if books_df.is_cached else None
del books_df


In [13]:
spark.catalog.clearCache()


In [14]:
books_clean = books_clean.repartition(6, "file_name").cache()
books_clean.count()   # materialize cache


                                                                                

425

2. TF- IDF calculation

In [15]:
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql.functions import udf, col, lit, explode
from pyspark.sql.types import DoubleType
import numpy as np
from pyspark.sql.functions import explode, countDistinct, log, lit, col, count, size


### Calculate the Term Frequency (TF) of each word in each book

In [16]:
from pyspark.sql.functions import explode, col, count

tf_df = (
    books_clean
    .select("file_name", explode(col("tokens")).alias("word"))
    .groupBy("file_name", "word")
    .agg(count("*").alias("term_count"))
)


In [20]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("file_name").orderBy(desc("term_count"))

top10_per_book = (
    tf_df
    .withColumn("rank", row_number().over(window_spec))  # assign rank per book
    .filter("rank <= 10")  # keep top 10
    .drop("rank")  # remove helper column
)

top10_per_book.show(truncate=False)


+---------+---------+----------+
|file_name|word     |term_count|
+---------+---------+----------+
|102.txt  |en       |344       |
|102.txt  |de       |298       |
|102.txt  |tom      |294       |
|102.txt  |said     |252       |
|102.txt  |wilson   |223       |
|102.txt  |one      |202       |
|102.txt  |dat      |156       |
|102.txt  |got      |136       |
|102.txt  |man      |117       |
|102.txt  |old      |103       |
|112.txt  |one      |92        |
|112.txt  |gretchen |84        |
|112.txt  |jurgen   |76        |
|112.txt  |professor|65        |
|112.txt  |back     |54        |
|112.txt  |said     |48        |
|112.txt  |like     |46        |
|112.txt  |time     |46        |
|112.txt  |hand     |45        |
|112.txt  |viola    |42        |
+---------+---------+----------+
only showing top 20 rows



                                                                                

### Calculate the Inverse Document Frequency (IDF) for each word across all books.

In [21]:
from pyspark.sql.functions import countDistinct

N = tf_df.select("file_name").distinct().count()

print("Total files counts N ", N)

Total files counts N  425


In [22]:
#Number of files in which a word appeared

df_word = (
    tf_df            #term frquency df
    .groupBy("word")  #group by word
    .agg(countDistinct("file_name").alias("doc_freq"))  #count unique documents
)

df_word.show(truncate=False)

[Stage 23:>                                                         (0 + 6) / 6]

+------------+--------+
|word        |doc_freq|
+------------+--------+
|spirit      |339     |
|doubt       |342     |
|matters     |299     |
|every       |395     |
|rewritten   |15      |
|eye         |347     |
|ago         |352     |
|still       |387     |
|del         |44      |
|build       |245     |
|old         |385     |
|honeysuckles|4       |
|palings     |21      |
|stood       |358     |
|boxes       |169     |
|grew        |324     |
|made        |396     |
|symbol      |98      |
|chief       |302     |
|lofty       |168     |
+------------+--------+
only showing top 20 rows



                                                                                

In [24]:
#compute IDF 
from pyspark.sql.functions import log

idf_df = df_word.withColumn(
    "idf",
    log(lit(N) / (col("doc_freq") + lit(1)))    # compute log(N/df)
)


In [28]:
idf_df.show(truncate=False)

[Stage 28:>                                                         (0 + 6) / 6]

+------------+--------+-------------------+
|word        |doc_freq|idf                |
+------------+--------+-------------------+
|spirit      |339     |0.22314355131420976|
|doubt       |342     |0.21435872175847698|
|matters     |299     |0.3483066942682158 |
|every       |395     |0.07067495766993637|
|rewritten   |15      |3.2795004466846356 |
|eye         |347     |0.19988668914994248|
|ago         |352     |0.1856211119911201 |
|still       |387     |0.09108382930114349|
|del         |44      |2.245426679154097  |
|build       |245     |0.5467576329920539 |
|old         |385     |0.0962517994595859 |
|honeysuckles|4       |4.442651256490317  |
|palings     |21      |2.961046715566101  |
|stood       |358     |0.16876678043613805|
|boxes       |169     |0.9162907318741551 |
|grew        |324     |0.26826398659467937|
|made        |396     |0.0681528882372264 |
|symbol      |98      |1.456969318789827  |
|chief       |302     |0.3383563634150477 |
|lofty       |168     |0.9221904

                                                                                

### Compute the TF-IDF score for each word in each book (TF * IDF)

In [30]:
from pyspark.sql.functions import col

tf_idf_df = (
    tf_df     # term frequency data
    .join(idf_df.select("word", "idf"),  # join with IDF
          on="word", how="inner")  # join on word
    .withColumn("tf_idf", # create TF-IDF column
                col("term_count") * col("idf"))   # multiply TF and IDF
    .select("file_name", "word", "tf_idf")  # select final columns
)


In [31]:
tf_idf_df.show(truncate=False)



+---------+-----------+------------------+
|file_name|word       |tf_idf            |
+---------+-----------+------------------+
|38.txt   |aaargh     |10.717883976728944|
|200.txt  |aad        |5.358941988364472 |
|80.txt   |aag        |5.358941988364472 |
|200.txt  |aak        |5.358941988364472 |
|124.txt  |abadias    |5.358941988364472 |
|14.txt   |abaiang    |4.260329699696362 |
|25.txt   |abaiang    |4.260329699696362 |
|48.txt   |abaiang    |4.260329699696362 |
|180.txt  |abaiang    |4.260329699696362 |
|87.txt   |abaiang    |4.260329699696362 |
|200.txt  |abailard   |4.953476880256307 |
|267.txt  |abailard   |4.953476880256307 |
|200.txt  |abaissement|5.358941988364472 |
|24.txt   |abandonedly|5.358941988364472 |
|200.txt  |abaris     |5.358941988364472 |
|228.txt  |abas       |24.767384401281536|
|227.txt  |abas       |9.906953760512614 |
|102.txt  |abashed    |1.7346010553881064|
|172.txt  |abashed    |1.7346010553881064|
|224.txt  |abashed    |3.4692021107762128|
+---------+

                                                                                

In [33]:
spark.catalog.clearCache()
del tf_df 
del idf_df


In [34]:
tf_idf_df = tf_idf_df.repartition(6, "file_name").cache() 
tf_idf_df.count() 


                                                                                

2409637

### Book Similaritiy

In [41]:
spark.catalog.clearCache()

In [42]:
del tfidf_vector_df 

In [35]:
from pyspark.sql.functions import sum as spark_sum, sqrt, col

norm_df = (
    tf_idf_df
    .withColumn("square", col("tf_idf") * col("tf_idf"))  # square value
    .groupBy("file_name")   # group per book
    .agg(spark_sum("square").alias("sum_sq"))  # sum of squares
    .withColumn("norm", sqrt(col("sum_sq")))   #compute norm
    .select("file_name", "norm")  # keep needed cols
)


In [36]:
dot_df = (
    tf_idf_df.alias("a")
    .join(tf_idf_df.alias("b"), on="word")  # match common words
    .filter(col("a.file_name") < col("b.file_name"))  # avoid duplicates
    .withColumn("product", col("a.tf_idf") * col("b.tf_idf"))# multiply values
    .groupBy("a.file_name", "b.file_name")  # group book pairs
    .agg(spark_sum("product").alias("dot_product"))  # compute dot
)


In [37]:
similarity_df = (
    dot_df
    .join(norm_df.alias("n1"), dot_df["a.file_name"] == col("n1.file_name"))
    .join(norm_df.alias("n2"), dot_df["b.file_name"] == col("n2.file_name"))
    .withColumn(
        "cosine_similarity",
        col("dot_product") / (col("n1.norm") * col("n2.norm"))  # cosine formula
    )
    .select(
        col("a.file_name").alias("book1"),
        col("b.file_name").alias("book2"),
        "cosine_similarity"
    )
)


In [38]:
from pyspark.sql.functions import desc

target_book = "10.txt"

top5 = (
    similarity_df
    .filter((col("book1") == target_book) | 
            (col("book2") == target_book)) # select related pairs
    .withColumn(
        "similar_book",
        expr(f"CASE WHEN book1 = '{target_book}' THEN book2 ELSE book1 END")
    )
    .orderBy(desc("cosine_similarity"))   # sort by similarity
    .select("similar_book", "cosine_similarity")    # final columns
    .limit(5)  # top 5 only
)

top5.show(truncate=False)


                                                                                

+------------+-------------------+
|similar_book|cosine_similarity  |
+------------+-------------------+
|30.txt      |0.9999885553852694 |
|58.txt      |0.449803428012394  |
|26.txt      |0.41985814420968676|
|357.txt     |0.32755879652499764|
|109.txt     |0.28829094553856716|
+------------+-------------------+



In [43]:
spark.stop()