In [None]:
import findspark
findspark.init()

In [None]:
import os
os.environ["HADOOP_CONF_DIR"]

In [None]:
import pyspark
pyspark.__file__

In [None]:
from pyspark.sql import functions as F
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

In [None]:
from pyspark import *
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp").master("yarn").config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.executor.cores", 4) \


spark = configure_spark_with_delta_pip(builder, extra_packages=["com.johnsnowlabs.nlp:spark-nlp_2.12:4.4.0"]).getOrCreate()


sc = spark.sparkContext

logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.OFF)
logger.LogManager.getLogger("akka").setLevel(logger.Level.OFF)

sc.setLogLevel("OFF")
sc 

In [None]:
spark.catalog.clearCache()
spark.conf.set("spark.sql.broadcastTimeout", 100000)

In [None]:
books_data = "hdfs:/project/books_demo3/part*"
books_ratings = "hdfs:/project/ratings_demo3/part*"

In [None]:
schema_books = "Title STRING, Description STRING, Authors STRING, Image STRING, PreviewLink STRING, Publisher STRING, PublishedDate DATE, InfoLink STRING, Categories STRING, RatingsCount INT"
schema_ratings = "ID STRING, Title STRING, Price INT, User_id STRING, User_name STRING, Helpfulness STRING, Score FLOAT, Timestamp INT, Summary STRING, Review STRING"

In [None]:
df_books = spark.read \
    .format("csv") \
    .schema(schema_books) \
    .option("header", "false") \
    .option("delimiter", "|") \
    .load(books_data)
df_ratings = spark.read \
    .format("csv") \
    .schema(schema_ratings) \
    .option("header", "false") \
    .option("delimiter", "|") \
    .load(books_ratings)

In [None]:
# errasing not needed columns
df_books = df_books.drop("PublishedDate","Description","Authors", "Image", "PreviewLink", "Publisher", "PublishedDate","InfoLink","RatingsCount")
df_ratings = df_ratings.drop("ID", "Price","User_id","User_name","helpfulness", "Timestamp","Summary")

In [None]:
df_books = df_books.repartition(150)
df_ratings = df_ratings.repartition(150)

In [None]:
# cleaning null values
df_books = df_books.na.drop(subset=["Categories", "Title"])
df_ratings = df_ratings.na.drop(subset=["Title", "Score", "Review"])
first_row = df_ratings.limit(1)
df_ratings = df_ratings.subtract(first_row)

SENTIMENT ANALYSIS

In [None]:
document = DocumentAssembler() \
    .setInputCol("Review") \
    .setOutputCol("document")

token = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normal")

vivekn = ViveknSentimentModel.pretrained() \
    .setInputCols(["document", "normal"]) \
    .setOutputCol("result_sentiment")

finisher = Finisher() \
    .setInputCols(["result_sentiment"]) \
    .setOutputCols(["Sentiment"]) \
    .setCleanAnnotations(True)

pipeline = Pipeline().setStages([document, token, normalizer, vivekn, finisher])

pipelineModel = pipeline.fit(df_ratings)
df_ratings = pipelineModel.transform(df_ratings)

In [None]:
df_books.write \
    .mode("overwrite").format("delta").save("/tmp/books")
df_ratings \
    .write.mode("overwrite").format("delta").save("/tmp/ratings1")

In [None]:
# replacing positive/negative with value
df_ratings = df_ratings.withColumn('Sentiment', F.when(df_ratings.Sentiment[0] == "positive", 1).otherwise(0)).cache()

In [None]:
df_ratings = df_ratings.repartition("Title")

Count between 50 and 100

In [None]:
# First, calculate the count for each title
df_count = df_ratings.groupBy("Title").agg(F.count("*").alias("Count"))
df_ratings.unpersist()

# Filter the DataFrame based on the count
df_count_filtered = df_count.filter(df_count["Count"] > 50).cache()

In [None]:
#using brodacast as optimization
df_ratings_filtered = df_ratings.join(F.broadcast(df_count_filtered), "Title")

# Now, calculate the average scores and sentiments

df_ratings = df_ratings_filtered.groupBy("Title", "Count").agg(
    F.mean("Score").alias("Scores"),
    F.mean("Sentiment").alias("Sentiments")
)

df_count_filtered.unpersist()


In [None]:
df_ratings \
    .write.mode("overwrite").format("delta").save("/tmp/ratings_count")

In [None]:
# upper limit for amount of reviews
df_ratings_updated = df_ratings.withColumn('Count', F.when(F.col("Count") > 100, 100).otherwise(F.col("Count")))

In [None]:
df_ratings_updated \
    .write.mode("overwrite").format("delta").save("/tmp/ratings_count_updated")

Merging of delta tables

In [None]:
# Read data from the Delta tables
df_ratings_temp = spark.read.format("delta").load("/tmp/ratings_count")
df_ratings_updated_temp = spark.read.format("delta").load("/tmp/ratings_count_updated")

# Read the data back as Delta tables
delta_table_ratings = DeltaTable.forPath(spark, "/tmp/ratings_count")

# Perform the MERGE INTO operation
delta_table_ratings.alias("ratings1") \
    .merge(df_ratings_updated_temp.alias("ratings1_updated"),
           "ratings1.Title = ratings1_updated.Title AND ratings1.Scores = ratings1_updated.Scores") \
    .whenMatchedUpdate(set={"Count": "ratings1_updated.Count"}) \
    .whenNotMatchedInsertAll() \
    .execute()

# Save the merged data back to the original Delta table
delta_table_ratings.toDF().write.format("delta").mode("overwrite").save("/tmp/ratings_count")

In [None]:
df_ratings = spark.read.format("delta").load("/tmp/ratings_count")

In [None]:
# calculating weighted values to get the final score of each book

final_ratings = df_ratings.withColumn(
    "Final_score", 0.04 * F.col("Count") + 0.3 *2* F.col("Scores") + 0.3 *10* F.col("Sentiments")
)
final_ratings = final_ratings.na.drop(subset=["Final_score"])

In [None]:
final_ratings = final_ratings.select("Title", "Final_score").cache()
#final_ratings.cache()

In [None]:
df_books = df_books.withColumn("Categories", F.translate("Categories", "[]'", ""))
df_books = df_books.withColumn("Categories", F.split(F.col("Categories"), ", "))

second_df = df_books.select("Title", F.col("Categories").getItem(0).alias("Category"))

# joining tables
final_ratings = final_ratings.join(F.broadcast(second_df), on="Title")
# deleting duplicates
final_ratings = final_ratings.dropDuplicates(['Title'])

In [None]:
merged_df = final_ratings.withColumn(
    "Title_final_score", 
    F.concat(F.col("Title"), F.lit(" - "), F.format_number(F.col("Final_score"), 2))
)
merged_df.cache()
final_ratings.unpersist()

In [None]:
from pyspark.sql.window import Window

# making ranking
window = Window.partitionBy('Category').orderBy(F.desc('Final_score'))

# using extra counter column
df = merged_df.withColumn('counter', F.row_number().over(window))
merged_df.unpersist()

In [None]:
# pivoting
pivot_df = df.groupBy('counter').pivot('Category').agg(F.first('Title_final_score'))
pivot_df = pivot_df.drop('counter')

In [None]:
pivot_df.show(truncate=False)