In [None]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    input_file_name, regexp_extract, length, col, 
    avg, lower, regexp_replace, udf, split, element_at, desc
)

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("GutenbergFinal") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.adaptive.enabled", "false") \
    .config("spark.driver.host", "localhost") \
    .getOrCreate()

print("Spark is alive again!")

Spark is alive again!


In [None]:

dataset_path = "C:/Users/SUDHEESH/Downloads/D184MB/D184MB/*.txt"

raw_df = spark.read.text(dataset_path, wholetext=True)
books_df = raw_df.withColumn("file_name", input_file_name()) \
                 .withColumnRenamed("value", "text")

# Extract Metadata
extracted_df = books_df.withColumn("title", regexp_extract("text", r"Title:\s+(.*)", 1)) \
    .withColumn("release_date", regexp_extract("text", r"Release Date:\s+([A-Za-z]+,?\s+\d{4})", 1)) \
    .withColumn("year_str", regexp_extract("release_date", r"(\d{4})", 1)) \
    .withColumn("language", regexp_extract("text", r"Language:\s+(\w+)", 1)) \
    .withColumn("author", regexp_extract("text", r"Author:\s+(.*)", 1))


extracted_df = extracted_df.selectExpr("*", "try_cast(year_str as int) as year")

clean_metadata_df = extracted_df.filter((col("year").isNotNull()) & (col("year_str") != ""))


print("--- Question 10: Final Results ---")

print("\n[A] Number of books released each year:")
books_per_year = clean_metadata_df.groupBy("year").count().orderBy("year")
books_per_year.show(n=100)

# Find the most common language
print("\n[B] Language Analysis:")
lang_df = clean_metadata_df.groupBy("language").count().orderBy("count", ascending=False)
most_common = lang_df.first()
print(f"The most common language is: {most_common['language']} (Count: {most_common['count']})")

# Determine the average length of book titles
print("\n[C] Title Length Analysis:")
avg_val = clean_metadata_df.select(avg(length("title"))).first()[0]
print(f"The average length of book titles is: {avg_val:.2f} characters")

--- Question 10: Final Results ---

[A] Number of books released each year:
+----+-----+
|year|count|
+----+-----+
|1975|    1|
|1978|    1|
|1979|    1|
|1991|    6|
|1992|   13|
|1993|   12|
|1994|   16|
|1995|   59|
|1996|   53|
+----+-----+


[B] Language Analysis:
The most common language is: English (Count: 162)

[C] Title Length Analysis:
The average length of book titles is: 23.75 characters


In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, Normalizer
from pyspark.sql.functions import desc, round

clean_text_df = extracted_df.withColumn("text_clean", lower(col("text"))) \
                            .withColumn("text_clean", regexp_replace("text_clean", r"[^a-z\s]", ""))

# Vectorization Pipeline
tokenizer = Tokenizer(inputCol="text_clean", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="normFeatures") 


tfidf_df = idf.fit(hashingTF.transform(remover.transform(tokenizer.transform(clean_text_df)))).transform(
    hashingTF.transform(remover.transform(tokenizer.transform(clean_text_df)))
)
normalized_df = normalizer.transform(tfidf_df).select("file_name", "normFeatures").cache()

# Similarity for '10.txt'
target_file = "10.txt"
target_row = normalized_df.filter(col("file_name").contains(target_file)).select("normFeatures").first()

if target_row:
    target_vec = target_row[0]
    sim_udf = udf(lambda v: float(v.dot(target_vec)))
    
results = normalized_df.withColumn("similarity", sim_udf(col("normFeatures"))) \
    .filter(~col("file_name").contains(target_file)) \
    .withColumn("book_name", element_at(split(col("file_name"), "/"), -1)) \
    .select("book_name", round(col("similarity"), 4).alias("similarity")) \
    .orderBy(desc("similarity"))
    
print(f"\n--- Question 11: Top 5 Similar Books to {target_file} ---")
results.show(5)


--- Question 11: Top 5 Similar Books to 10.txt ---
+---------+----------+
|book_name|similarity|
+---------+----------+
|   30.txt|       1.0|
|   58.txt|    0.4765|
|   26.txt|      0.45|
|  169.txt|    0.4257|
|  357.txt|    0.3613|
+---------+----------+
only showing top 5 rows


In [None]:

author_data = clean_metadata_df.select("author", "year", "language").distinct().filter("author != ''")

influence_edges = author_data.alias("a").join(
    author_data.alias("b"),
    (col("b.year") > col("a.year")) & 
    (col("b.year") <= col("a.year") + 1) & 
    (col("a.author") != col("b.author")) &
    (col("a.language") == col("b.language"))
).select(col("a.author").alias("influencer"), col("b.author").alias("influenced")).distinct()

print("\n--- Question 12: Refined Influence Network ---")
print("Top 5 Influencers (Out-Degree):")
influence_edges.groupBy("influencer").count().orderBy(desc("count")).show(5)

print("Top 5 Influenced (In-Degree):")
influence_edges.groupBy("influenced").count().orderBy(desc("count")).show(5)


--- Question 12: Refined Influence Network ---
Top 5 Influencers (Out-Degree):
+--------------------+-----+
|          influencer|count|
+--------------------+-----+
|    G. K. Chesterton|   86|
|        Thomas Hardy|   58|
|         John Milton|   57|
|Edgar Rice Burroughs|   53|
|Electronic Fronti...|   50|
+--------------------+-----+
only showing top 5 rows
Top 5 Influenced (In-Degree):
+--------------------+-----+
|          influenced|count|
+--------------------+-----+
|        Thomas Hardy|   63|
|Mary Roberts Rine...|   61|
|         Henry James|   61|
|       Joseph Conrad|   61|
|  Arthur Conan Doyle|   61|
+--------------------+-----+
only showing top 5 rows
