In [48]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,desc,sum,avg,array_contains, when
from pyspark.sql.types import StructType, StructField, StringType

In [49]:
sc = SparkSession.builder \
    .master("spark://ravi:7077") \
    .config("spark.sql.debug.maxToStringFields", 1000) \
    .getOrCreate()

In [50]:
books = sc.read.csv('hdfs://ravi:9000/Books.csv',header=True)
users = sc.read.csv('hdfs://ravi:9000/Users.csv',header=True)
ratings = sc.read.csv('hdfs://ravi:9000/Ratings.csv',header=True)

In [51]:
books.show()

+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|         Image-URL-S|         Image-URL-M|         Image-URL-L|
+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|http://images.ama...|http://images.ama...|http://images.ama...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|http://images.ama...|http://images.ama...|http://images.ama...|
|0374157065|Flu: The Story of...|    Gina Bari

In [52]:
users.show()

+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|18.0|
|      3|moscow, yukon ter...|NULL|
|      4|porto, v.n.gaia, ...|17.0|
|      5|farnborough, hant...|NULL|
|      6|santa monica, cal...|61.0|
|      7| washington, dc, usa|NULL|
|      8|timmins, ontario,...|NULL|
|      9|germantown, tenne...|NULL|
|     10|albacete, wiscons...|26.0|
|     11|melbourne, victor...|14.0|
|     12|fort bragg, calif...|NULL|
|     13|barcelona, barcel...|26.0|
|     14|mediapolis, iowa,...|NULL|
|     15|calgary, alberta,...|NULL|
|     16|albuquerque, new ...|NULL|
|     17|chesapeake, virgi...|NULL|
|     18|rio de janeiro, r...|25.0|
|     19|           weston, ,|14.0|
|     20|langhorne, pennsy...|19.0|
+-------+--------------------+----+
only showing top 20 rows



In [53]:
ratings.show()

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276725|034545104X|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
| 276733|2080674722|          0|
| 276736|3257224281|          8|
| 276737|0600570967|          6|
| 276744|038550120X|          7|
| 276745| 342310538|         10|
| 276746|0425115801|          0|
| 276746|0449006522|          0|
| 276746|0553561618|          0|
| 276746|055356451X|          0|
| 276746|0786013990|          0|
| 276746|0786014512|          0|
| 276747|0060517794|          9|
| 276747|0451192001|          0|
| 276747|0609801279|          0|
| 276747|0671537458|          9|
+-------+----------+-----------+
only showing top 20 rows



In [54]:
books_rows = books.count()
books_columns = len(books.columns)
print("Books DataFrame shape: ({}, {})".format(books_rows, books_columns))

Books DataFrame shape: (271360, 8)


In [55]:
ratings_rows = ratings.count()
ratings_columns = len(ratings.columns)
print("Ratings DataFrame shape: ({}, {})".format(ratings_rows, ratings_columns))

Ratings DataFrame shape: (1149780, 3)


In [56]:
users_rows = users.count()
users_columns = len(users.columns)
print("Users DataFrame shape: ({}, {})".format(users_rows, users_columns))

Users DataFrame shape: (278859, 3)


In [57]:

# Calculate the count of null values for each column in the books DataFrame
null_counts = books.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in books.columns])

# Print the count of null values for each column
null_counts.show()

+----+----------+-----------+-------------------+---------+-----------+-----------+-----------+
|ISBN|Book-Title|Book-Author|Year-Of-Publication|Publisher|Image-URL-S|Image-URL-M|Image-URL-L|
+----+----------+-----------+-------------------+---------+-----------+-----------+-----------+
|   0|         0|          1|                  0|        2|          0|          0|          3|
+----+----------+-----------+-------------------+---------+-----------+-----------+-----------+



In [58]:
# Calculate the count of null values for each column in the books DataFrame
null_count = users.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in users.columns])

# Print the count of null values for each column
null_count.show()

+-------+--------+------+
|User-ID|Location|   Age|
+-------+--------+------+
|      1|       0|110518|
+-------+--------+------+



In [59]:
# Calculate the count of null values for each column in the books DataFrame
null_con = ratings.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in ratings.columns])

# Print the count of null values for each column
null_con.show()

+-------+----+-----------+
|User-ID|ISBN|Book-Rating|
+-------+----+-----------+
|      0|   0|          0|
+-------+----+-----------+



In [60]:
from pyspark.sql.functions import count, lit

# Count the number of duplicate rows in the books DataFrame
duplicate_counts = (
    books.select("*")
    .groupBy(*books.columns)
    .agg(count(lit(1)).alias("count"))
    .filter("count > 1")
    .agg(sum("count").alias("total_duplicates"))
    .collect()[0]["total_duplicates"]
)

print(f"Number of duplicate rows: {duplicate_counts}")

Number of duplicate rows: None


In [61]:
# Count the number of duplicate rows in the books DataFrame
duplicate_count= (
    users.select("*")
    .groupBy(*users.columns)
    .agg(count(lit(1)).alias("count"))
    .filter("count > 1")
    .agg(sum("count").alias("total_duplicates"))
    .collect()[0]["total_duplicates"]
)

print(f"Number of duplicate rows: {duplicate_count}")

Number of duplicate rows: None


In [62]:
# Count the number of duplicate rows in the books DataFrame
duplicate_con= (
    ratings.select("*")
    .groupBy(*ratings.columns)
    .agg(count(lit(1)).alias("count"))
    .filter("count > 1")
    .agg(sum("count").alias("total_duplicates"))
    .collect()[0]["total_duplicates"]
)

print(f"Number of duplicate rows: {duplicate_con}")



Number of duplicate rows: None


                                                                                

In [63]:
####Popularity Based Recommendation System

In [64]:
ratings_with_name = ratings.join(books, ratings.ISBN == books.ISBN, "inner").drop(books.ISBN)

In [65]:
num_rating_df = ratings_with_name.groupBy('Book-Title').agg(count(col('Book-Rating')).alias('num_ratings'))

# Show the resulting DataFrame
num_rating_df.show()



+--------------------+-----------+
|          Book-Title|num_ratings|
+--------------------+-----------+
|Survival Guide to...|          1|
|              Apt. 3|          1|
|     In the Clearing|          2|
|Too Many Men : A ...|          4|
|Circles of Confus...|          1|
|Emily Post's Wedd...|          2|
|Two of a Kind #15...|          1|
|Speculations: The...|          1|
|The broom of the ...|          2|
|  Fran Ellen's House|          2|
|Queen of Spades a...|          1|
|Cooking in a Beds...|          2|
|Witnesses to War:...|          1|
|The Dream of the ...|          2|
|The Wizard's Holi...|          3|
|Madness and Sexua...|          1|
|    Rapture's Legacy|          2|
|Fallen Hero/the S...|          1|
|Red Earth and Pou...|          1|
|Last Train to Mem...|          5|
+--------------------+-----------+
only showing top 20 rows



                                                                                

In [66]:
avg_rating_df = ratings_with_name.groupBy('Book-Title').agg(avg('Book-Rating').alias('avg_ratings'))

# Show the resulting DataFrame
avg_rating_df.show()



+--------------------+------------------+
|          Book-Title|       avg_ratings|
+--------------------+------------------+
|Survival Guide to...|               0.0|
|              Apt. 3|               0.0|
|     In the Clearing|               5.0|
|Too Many Men : A ...|               0.0|
|Circles of Confus...|               0.0|
|Emily Post's Wedd...|               4.5|
|Two of a Kind #15...|               0.0|
|Speculations: The...|               0.0|
|The broom of the ...|               4.0|
|  Fran Ellen's House|               0.0|
|Queen of Spades a...|              10.0|
|Cooking in a Beds...|               6.5|
|Witnesses to War:...|               0.0|
|The Dream of the ...|               1.5|
|The Wizard's Holi...|3.3333333333333335|
|Madness and Sexua...|               5.0|
|    Rapture's Legacy|               0.0|
|Fallen Hero/the S...|               0.0|
|Red Earth and Pou...|               0.0|
|Last Train to Mem...|               3.6|
+--------------------+------------

                                                                                

In [67]:
# Perform join operation
popular_df = num_rating_df.join(avg_rating_df, 'Book-Title', 'inner')

# Show the resulting DataFrame
popular_df.show()



+--------------------+-----------+------------------+
|          Book-Title|num_ratings|       avg_ratings|
+--------------------+-----------+------------------+
| A Light in the S...|          4|              2.25|
| Ask Lily (Young ...|          1|               8.0|
| Flight of Fancy:...|          2|               4.0|
| Murder of a Slee...|         11|3.1818181818181817|
| Q-Space (Star Tr...|         17|1.9411764705882353|
| Summonings: Book...|          2|               7.0|
| This Place Has N...|          9|               2.0|
|                !Yo!|         11| 5.181818181818182|
|"A Wing and a Pra...|          1|               7.0|
|"Active Drama Pla...|          2|               3.0|
|"Best Places Nort...|          1|               0.0|
|"Best \Thinking M...|          3|2.3333333333333335|
|"Black Sheep One ...|          2|               0.0|
|"Cannery Row (Ste...|          2|               3.5|
|"Cinderella and O...|          5|               1.8|
|"Deslices histÃ³r...|      

                                                                                

In [68]:
# # Filter and sort popular_df
# popular_df = popular_df.filter(col('num_ratings') >= 250) \
#     .sort(col('avg_ratings').desc()) \
#     .limit(50)

In [None]:
# popular_df = popular_df.join(books, on='Book-Title', how='left') \
#                        .dropDuplicates(['Book-Title']) \
#                        .select('Book-Title', 'Book-Author','Image-URL-L','num_ratings', 'avg_ratings')

In [None]:
from pyspark.sql.functions import col, desc

# Filter and sort popular_df
popular_df = popular_df.filter(col('num_ratings') >= 250) \
                        .sort(col('avg_ratings').desc()) \
                        .limit(50)

popular_df = popular_df.join(books, on='Book-Title', how='left') \
                        .dropDuplicates(['Book-Title']) \
                        .select('Book-Title', 'books.Book-Author', 'Image-URL-L', 'num_ratings', 'avg_ratings')

# Print the DataFrame sorted in descending order of avg_ratings
popular_df.sort(desc('avg_ratings')).show()

In [85]:
popular_df.show()



+--------------------+------------------+--------------------+-----------+------------------+
|          Book-Title|       Book-Author|         Image-URL-L|num_ratings|       avg_ratings|
+--------------------+------------------+--------------------+-----------+------------------+
|Harry Potter and ...|     J. K. Rowling|http://images.ama...|        428| 5.852803738317757|
|Harry Potter and ...|     J. K. Rowling|http://images.ama...|        387|5.8242894056847545|
|Harry Potter and ...|     J. K. Rowling|http://images.ama...|        278| 5.737410071942446|
|Harry Potter and ...|     J. K. Rowling|http://images.ama...|        347| 5.501440922190202|
|Harry Potter and ...|     J. K. Rowling|http://images.ama...|        556| 5.183453237410072|
|The Hobbit : The ...|    J.R.R. TOLKIEN|http://images.ama...|        281|  5.00711743772242|
|The Fellowship of...|    J.R.R. TOLKIEN|http://images.ama...|        368| 4.948369565217392|
|Harry Potter and ...|     J. K. Rowling|http://images.ama..

                                                                                

In [71]:
##Collaborative Filtering Based Recommender System

In [72]:
# Create a DataFrame with the count of 'Book-Rating' grouped by 'User-ID'
count_df = ratings_with_name.groupBy('User-ID').count().withColumnRenamed('count', 'Book-Rating')

# Filter the DataFrame to keep only rows where 'Book-Rating' is greater than 200
x = count_df.filter(count_df['Book-Rating'] > 200)

# Get the 'User-ID' values from the filtered DataFrame
y = x.select('User-ID').rdd.flatMap(lambda x: x).collect()

                                                                                

In [73]:
filtered_rating = ratings_with_name.filter(col('User-ID').isin(y))

In [74]:
from pyspark.sql.functions import col, count

# Group the filtered_rating DataFrame by 'Book-Title' and count the 'Book-Rating'
book_rating_counts = filtered_rating.groupBy('Book-Title').agg(count('Book-Rating').alias('book_rating_count'))

# Filter the book_rating_counts DataFrame to keep only rows where 'book_rating_count' is greater than or equal to 50
r = book_rating_counts.filter(col('book_rating_count') >= 50)

# Get the 'Book-Title' values from the filtered DataFrame
famous_books = r.select('Book-Title').rdd.flatMap(lambda x: x).collect()

                                                                                

In [75]:
final_ratings = filtered_rating.filter(col('Book-Title').isin(famous_books))

In [76]:
# Pivot the DataFrame
pt = final_ratings.groupby('Book-Title').pivot('User-ID').agg({'Book-Rating': 'first'})


                                                                                

In [77]:
# Replace null values with 0
pt = pt.fillna(0)

In [78]:
# Convert PySpark DataFrame to RDD
pt_rdd = pt.rdd.zipWithIndex()

# Filter the RDD to get the row at any index 
row= pt_rdd.filter(lambda x: x[1] == 10).map(lambda x: x[0]).collect()

# If there's a row at index, extract the Book-Title
if row:
    book_title_ = row[0]['Book-Title']
    print(book_title_)
else:
    print("No row found at index")


24/05/06 00:38:45 WARN DAGScheduler: Broadcasting large task binary with size 1341.5 KiB
[Stage 295:>                                                        (0 + 1) / 1]

A Fine Balance


                                                                                

In [79]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import col

In [80]:
# Rename the columns with integer names to avoid errors
new_column_names = ["col_" + str(i) for i in range(len(pt.columns))]
pt_renamed = pt.toDF(*new_column_names)


In [81]:
from pyspark.sql.functions import col, udf, isnan, when
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler

# Convert columns to double type
pt_renamed = pt.select([col(c).cast("double").alias(c) for c in pt.columns])

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=pt_renamed.columns[1:], outputCol="features")
vector_df = assembler.transform(pt_renamed)

# Calculate norm of each feature vector
norm_udf = udf(lambda v: float(v.norm(2)), DoubleType())

# Apply the UDF to calculate the norm of the features column
vector_df = vector_df.withColumn('features_norm', norm_udf(col('features')))

# Replace NaN values in 'features_norm' column with 0
vector_df = vector_df.withColumn('features_norm', when(isnan(vector_df['features_norm']), 0).otherwise(vector_df['features_norm']))

# Check if 'features_norm' column exists and its data type is compatible
if 'features_norm' in vector_df.columns and vector_df.schema['features_norm'].dataType == DoubleType():
    # Check for zero norm vectors
    zero_norm_count = vector_df.filter(vector_df.features_norm == 0.0).count()
    print("Number of zero norm vectors:", zero_norm_count)

    # Define a user-defined function (UDF) to calculate cosine similarity
    cosine_similarity_udf = udf(lambda v1, v2: float(v1.dot(v2) / (v1.norm(2) * v2.norm(2))), DoubleType())

    # Cross join to calculate cosine similarity between all pairs of vectors
    similarity_score = vector_df.alias("df1").crossJoin(
        vector_df.alias("df2")
    ).select(
        col("df1.Book-Title").alias("Book-Title1"),
        col("df2.Book-Title").alias("Book-Title2"),
        cosine_similarity_udf("df1.features", "df2.features").alias("cosine_similarity")
    )

    # Show the results
    similarity_score.show()

    # Now, let's proceed with the recommendations code

    def recommend(book_name, similarity_score):
        # Find similar items based on similarity score
        similar_items = similarity_score.filter((col("Book-Title1") == book_name) | (col("Book-Title2") == book_name)) \
                                        .filter(col("Book-Title1") != col("Book-Title2")) \
                                        .orderBy(col("cosine_similarity").desc()) \
                                        .limit(4) \
                                        .collect()

        data = []
        for row in similar_items:
            similar_book = row["Book-Title1"] if row["Book-Title2"] == book_name else row["Book-Title2"]
            temp_df = pt.filter(pt['Book-Title'] == similar_book)
            item = [
                similar_book,
                temp_df.select("Book-Author").distinct().first()[0],
                temp_df.select("Image-URL-M").distinct().first()[0]
            ]
            data.append(item)

        return data

    # Call recommend function
    recommendations = recommend('1984', similarity_score)
else:
    print("'features_norm' column doesn't exist or has incompatible data type.")



24/05/06 00:38:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/05/06 00:39:04 WARN DAGScheduler: Broadcasting large task binary with size 1906.0 KiB
24/05/06 00:39:05 WARN TaskSetManager: Lost task 0.0 in stage 304.0 (TID 705) (10.0.8.186 executor 0): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$Lambda$2231/0x00007a2740aab598`: (struct<100459:double,100644:double,100846:double,100906:double,101209:double,101851:double,101876:double,102275:double,102647:double,102702:double,102967:double,104399:double,104429:double,10447:double,104636:double,104665:double,105374:double,105517:double,105979:double,106225:double,106816:double,107021:double,107301:double,107453:double,107784:double,107951:double,108005:double,10819:double,108285:double,108352:double,109461:double,109574:double,109955:double,110029

Py4JJavaError: An error occurred while calling o4913.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 304.0 failed 4 times, most recent failure: Lost task 0.3 in stage 304.0 (TID 708) (10.0.8.186 executor 0): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$2231/0x00007a2740aab598`: (struct<100459:double,100644:double,100846:double,100906:double,101209:double,101851:double,101876:double,102275:double,102647:double,102702:double,102967:double,104399:double,104429:double,10447:double,104636:double,104665:double,105374:double,105517:double,105979:double,106225:double,106816:double,107021:double,107301:double,107453:double,107784:double,107951:double,108005:double,10819:double,108285:double,108352:double,109461:double,109574:double,109955:double,110029:double,110483:double,110912:double,110934:double,110973:double,111174:double,111947:double,112001:double,112026:double,112881:double,113270:double,113519:double,113817:double,113904:double,113983:double,114178:double,114368:double,114414:double,114444:double,114868:double,114988:double,115002:double,115003:double,115120:double,115490:double,11601:double,116599:double,11676:double,117251:double,117539:double,119575:double,11993:double,120093:double,120548:double,122429:double,122793:double,122881:double,123790:double,123883:double,123981:double,124078:double,124079:double,124363:double,124487:double,124942:double,125039:double,12538:double,125519:double,125692:double,125774:double,126492:double,126604:double,126736:double,127200:double,127233:double,127359:double,127429:double,127914:double,12824:double,128696:double,128835:double,129008:double,129074:double,129358:double,129465:double,129716:double,12982:double,129851:double,130474:double,130554:double,130571:double,13082:double,131027:double,131046:double,131402:double,131837:double,131855:double,132083:double,13273:double,133571:double,133689:double,133747:double,133868:double,135045:double,135149:double,135265:double,135458:double,13552:double,135831:double,136010:double,136139:double,136205:double,136252:double,136382:double,137589:double,137688:double,138097:double,138441:double,13850:double,138578:double,138844:double,139467:double,139742:double,140000:double,140036:double,140358:double,141493:double,141710:double,142093:double,142524:double,143175:double,143253:double,143415:double,143792:double,14422:double,144531:double,144555:double,14521:double,145449:double,145451:double,145619:double,145641:double,146113:double,146230:double,146348:double,147141:double,147451:double,147847:double,147965:double,148199:double,148258:double,148744:double,149069:double,149907:double,149908:double,149934:double,150124:double,150498:double,150968:double,150979:double,151790:double,151806:double,152186:double,152249:double,153563:double,153662:double,153718:double,15408:double,15418:double,154992:double,155014:double,155147:double,155219:double,155495:double,156150:double,156269:double,156467:double,157273:double,157811:double,158295:double,159033:double,159376:double,159858:double,160032:double,160819:double,16106:double,161752:double,162052:double,162639:double,162738:double,163804:double,163973:double,164027:double,164323:double,164465:double,164533:double,164675:double,164828:double,165232:double,165308:double,165319:double,166123:double,16634:double,166596:double,167349:double,167471:double,167800:double,16795:double,168047:double,168064:double,168144:double,168245:double,16916:double,169233:double,16966:double,169663:double,169699:double,170229:double,170513:double,170518:double,170575:double,170634:double,170742:double,170947:double,171118:double,172030:double,172512:double,172742:double,173291:double,173415:double,173632:double,173684:double,173835:double,174216:double,174304:double,174791:double,175003:double,175886:double,177072:double,177090:double,177374:double,177432:double,177458:double,177690:double,178181:double,178199:double,178667:double,178950:double,17950:double,179733:double,179744:double,179772:double,179978:double,180348:double,180586:double,180651:double,18067:double,180917:double,180957:double,181176:double,181687:double,182085:double,182086:double,182987:double,182993:double,183196:double,183995:double,18401:double,184299:double,185233:double,185384:double,186570:double,187145:double,187256:double,187517:double,188010:double,189334:double,189516:double,189835:double,189973:double,190459:double,190708:double,190807:double,19085:double,190925:double,191187:double,192093:double,193499:double,193560:double,194600:double,194669:double,195694:double,196047:double,196077:double,196160:double,196457:double,196502:double,19664:double,197364:double,197659:double,197775:double,198621:double,198699:double,198711:double,199416:double,199772:double,200226:double,200674:double,201017:double,20115:double,201290:double,201447:double,201526:double,201674:double,201768:double,201783:double,20201:double,203240:double,203799:double,203968:double,204167:double,204522:double,204591:double,204864:double,205473:double,205735:double,205980:double,206074:double,206534:double,206567:double,206979:double,207246:double,207349:double,207499:double,207782:double,208141:double,208147:double,208406:double,208410:double,208568:double,208671:double,208829:double,209373:double,209516:double,209756:double,209875:double,210035:double,21014:double,210485:double,210792:double,210959:double,211426:double,211430:double,211847:double,211919:double,21252:double,212645:double,212898:double,212923:double,212965:double,213150:double,213350:double,214272:double,214786:double,21576:double,216012:double,216444:double,216466:double,21659:double,216683:double,217106:double,217121:double,217375:double,217740:double,218552:double,218608:double,219546:double,220278:double,221445:double,222050:double,222204:double,222296:double,222941:double,223087:double,223154:double,224138:double,224349:double,224430:double,224435:double,224646:double,224764:double,225087:double,225199:double,225232:double,225763:double,225810:double,225986:double,225989:double,22625:double,226545:double,226879:double,226965:double,227447:double,227520:double,2276:double,227705:double,228681:double,228764:double,228998:double,229011:double,229313:double,229329:double,229551:double,229741:double,230249:double,230496:double,230522:double,230708:double,231210:double,231237:double,231827:double,231857:double,232131:double,23288:double,232945:double,233911:double,233917:double,234359:double,234623:double,234721:double,234828:double,235105:double,235282:double,235392:double,235842:double,235935:double,236058:double,236172:double,236283:double,236340:double,236757:double,236948:double,236959:double,23768:double,238120:double,238526:double,238541:double,238545:double,238557:double,238699:double,23872:double,238781:double,238961:double,23902:double,239584:double,239594:double,240144:double,240567:double,240568:double,241198:double,241548:double,241666:double,24194:double,241980:double,242006:double,242083:double,242106:double,242299:double,242409:double,242646:double,242824:double,243077:double,244349:double,244688:double,244736:double,245371:double,245410:double,245645:double,245827:double,245963:double,246156:double,246311:double,246513:double,246655:double,246671:double,247429:double,247447:double,247752:double,248718:double,249111:double,24921:double,249628:double,249862:double,249894:double,250184:double,250405:double,250764:double,250962:double,251394:double,251422:double,251613:double,251843:double,251844:double,252071:double,252222:double,252695:double,252820:double,252848:double,253556:double,253821:double,254:double,25409:double,254206:double,254465:double,254899:double,254971:double,255092:double,255218:double,255489:double,25601:double,256167:double,256402:double,256407:double,257028:double,257204:double,258152:double,258185:double,258534:double,258938:double,259260:double,259380:double,259629:double,25981:double,260897:double,261105:double,261829:double,262399:double,262998:double,263163:double,263460:double,263877:double,264031:double,264082:double,264317:double,264321:double,264637:double,265115:double,26516:double,265313:double,26535:double,26544:double,265595:double,26583:double,265889:double,26593:double,266056:double,266226:double,266753:double,266865:double,267635:double,268030:double,268032:double,268110:double,268330:double,268932:double,269566:double,269719:double,269890:double,270713:double,270820:double,271195:double,271284:double,271448:double,271705:double,273979:double,274004:double,274061:double,274301:double,274308:double,275970:double,27617:double,2766:double,277427:double,277639:double,278418:double,28204:double,28360:double,28523:double,28591:double,29259:double,2977:double,29855:double,30276:double,30487:double,30511:double,30533:double,30711:double,30735:double,30972:double,31315:double,31391:double,31556:double,31826:double,31846:double,32195:double,32440:double,32721:double,32773:double,3363:double,33974:double,35050:double,35836:double,35857:double,35859:double,36606:double,36609:double,36836:double,36907:double,37567:double,37712:double,37950:double,38023:double,38273:double,38781:double,39281:double,39467:double,39616:double,39646:double,39773:double,4017:double,40889:double,40943:double,41084:double,41841:double,42914:double,43246:double,43806:double,43842:double,4385:double,44595:double,44728:double,46398:double,47316:double,48494:double,49109:double,49460:double,49889:double,50225:double,51094:double,51350:double,51386:double,51450:double,51883:double,52199:double,52584:double,52614:double,52853:double,52917:double,53174:double,53628:double,53729:double,54218:double,55187:double,55490:double,55492:double,55548:double,55734:double,55892:double,56271:double,56399:double,56447:double,56856:double,56959:double,59172:double,59971:double,60244:double,60277:double,60337:double,60707:double,61619:double,62272:double,6251:double,62891:double,62895:double,6323:double,63394:double,63714:double,63938:double,64436:double,65258:double,6543:double,6563:double,6575:double,66680:double,67840:double,68555:double,69042:double,69078:double,69232:double,69355:double,69405:double,69697:double,69808:double,69971:double,70052:double,70415:double,70594:double,7158:double,71712:double,72352:double,7286:double,72992:double,73394:double,7346:double,73651:double,75591:double,75860:double,76151:double,76223:double,76352:double,76499:double,76626:double,76818:double,76942:double,77809:double,77940:double,78553:double,78783:double,78834:double,78973:double,7915:double,79186:double,79441:double,80538:double,8067:double,80683:double,81045:double,81492:double,81560:double,81977:double,82407:double,8245:double,82831:double,82893:double,82926:double,83637:double,84024:double,85426:double,85526:double,85656:double,85701:double,85757:double,85993:double,86202:double,86243:double,86641:double,8681:double,87141:double,87143:double,87555:double,87746:double,87974:double,88283:double,88677:double,88693:double,88733:double,88937:double,8936:double,89602:double,91203:double,91342:double,91832:double,91931:double,92547:double,92652:double,92810:double,92979:double,93047:double,93085:double,93363:double,93629:double,94242:double,94347:double,94853:double,94923:double,95193:double,95316:double,95359:double,95903:double,95932:double,95991:double,96054:double,96448:double,97874:double,98391:double,98741:double,98758:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ScalaUDF_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:260)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:151)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:30)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.ContextAwareIterator.next(ContextAwareIterator.scala:41)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 22 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$2231/0x00007a2740aab598`: (struct<100459:double,100644:double,100846:double,100906:double,101209:double,101851:double,101876:double,102275:double,102647:double,102702:double,102967:double,104399:double,104429:double,10447:double,104636:double,104665:double,105374:double,105517:double,105979:double,106225:double,106816:double,107021:double,107301:double,107453:double,107784:double,107951:double,108005:double,10819:double,108285:double,108352:double,109461:double,109574:double,109955:double,110029:double,110483:double,110912:double,110934:double,110973:double,111174:double,111947:double,112001:double,112026:double,112881:double,113270:double,113519:double,113817:double,113904:double,113983:double,114178:double,114368:double,114414:double,114444:double,114868:double,114988:double,115002:double,115003:double,115120:double,115490:double,11601:double,116599:double,11676:double,117251:double,117539:double,119575:double,11993:double,120093:double,120548:double,122429:double,122793:double,122881:double,123790:double,123883:double,123981:double,124078:double,124079:double,124363:double,124487:double,124942:double,125039:double,12538:double,125519:double,125692:double,125774:double,126492:double,126604:double,126736:double,127200:double,127233:double,127359:double,127429:double,127914:double,12824:double,128696:double,128835:double,129008:double,129074:double,129358:double,129465:double,129716:double,12982:double,129851:double,130474:double,130554:double,130571:double,13082:double,131027:double,131046:double,131402:double,131837:double,131855:double,132083:double,13273:double,133571:double,133689:double,133747:double,133868:double,135045:double,135149:double,135265:double,135458:double,13552:double,135831:double,136010:double,136139:double,136205:double,136252:double,136382:double,137589:double,137688:double,138097:double,138441:double,13850:double,138578:double,138844:double,139467:double,139742:double,140000:double,140036:double,140358:double,141493:double,141710:double,142093:double,142524:double,143175:double,143253:double,143415:double,143792:double,14422:double,144531:double,144555:double,14521:double,145449:double,145451:double,145619:double,145641:double,146113:double,146230:double,146348:double,147141:double,147451:double,147847:double,147965:double,148199:double,148258:double,148744:double,149069:double,149907:double,149908:double,149934:double,150124:double,150498:double,150968:double,150979:double,151790:double,151806:double,152186:double,152249:double,153563:double,153662:double,153718:double,15408:double,15418:double,154992:double,155014:double,155147:double,155219:double,155495:double,156150:double,156269:double,156467:double,157273:double,157811:double,158295:double,159033:double,159376:double,159858:double,160032:double,160819:double,16106:double,161752:double,162052:double,162639:double,162738:double,163804:double,163973:double,164027:double,164323:double,164465:double,164533:double,164675:double,164828:double,165232:double,165308:double,165319:double,166123:double,16634:double,166596:double,167349:double,167471:double,167800:double,16795:double,168047:double,168064:double,168144:double,168245:double,16916:double,169233:double,16966:double,169663:double,169699:double,170229:double,170513:double,170518:double,170575:double,170634:double,170742:double,170947:double,171118:double,172030:double,172512:double,172742:double,173291:double,173415:double,173632:double,173684:double,173835:double,174216:double,174304:double,174791:double,175003:double,175886:double,177072:double,177090:double,177374:double,177432:double,177458:double,177690:double,178181:double,178199:double,178667:double,178950:double,17950:double,179733:double,179744:double,179772:double,179978:double,180348:double,180586:double,180651:double,18067:double,180917:double,180957:double,181176:double,181687:double,182085:double,182086:double,182987:double,182993:double,183196:double,183995:double,18401:double,184299:double,185233:double,185384:double,186570:double,187145:double,187256:double,187517:double,188010:double,189334:double,189516:double,189835:double,189973:double,190459:double,190708:double,190807:double,19085:double,190925:double,191187:double,192093:double,193499:double,193560:double,194600:double,194669:double,195694:double,196047:double,196077:double,196160:double,196457:double,196502:double,19664:double,197364:double,197659:double,197775:double,198621:double,198699:double,198711:double,199416:double,199772:double,200226:double,200674:double,201017:double,20115:double,201290:double,201447:double,201526:double,201674:double,201768:double,201783:double,20201:double,203240:double,203799:double,203968:double,204167:double,204522:double,204591:double,204864:double,205473:double,205735:double,205980:double,206074:double,206534:double,206567:double,206979:double,207246:double,207349:double,207499:double,207782:double,208141:double,208147:double,208406:double,208410:double,208568:double,208671:double,208829:double,209373:double,209516:double,209756:double,209875:double,210035:double,21014:double,210485:double,210792:double,210959:double,211426:double,211430:double,211847:double,211919:double,21252:double,212645:double,212898:double,212923:double,212965:double,213150:double,213350:double,214272:double,214786:double,21576:double,216012:double,216444:double,216466:double,21659:double,216683:double,217106:double,217121:double,217375:double,217740:double,218552:double,218608:double,219546:double,220278:double,221445:double,222050:double,222204:double,222296:double,222941:double,223087:double,223154:double,224138:double,224349:double,224430:double,224435:double,224646:double,224764:double,225087:double,225199:double,225232:double,225763:double,225810:double,225986:double,225989:double,22625:double,226545:double,226879:double,226965:double,227447:double,227520:double,2276:double,227705:double,228681:double,228764:double,228998:double,229011:double,229313:double,229329:double,229551:double,229741:double,230249:double,230496:double,230522:double,230708:double,231210:double,231237:double,231827:double,231857:double,232131:double,23288:double,232945:double,233911:double,233917:double,234359:double,234623:double,234721:double,234828:double,235105:double,235282:double,235392:double,235842:double,235935:double,236058:double,236172:double,236283:double,236340:double,236757:double,236948:double,236959:double,23768:double,238120:double,238526:double,238541:double,238545:double,238557:double,238699:double,23872:double,238781:double,238961:double,23902:double,239584:double,239594:double,240144:double,240567:double,240568:double,241198:double,241548:double,241666:double,24194:double,241980:double,242006:double,242083:double,242106:double,242299:double,242409:double,242646:double,242824:double,243077:double,244349:double,244688:double,244736:double,245371:double,245410:double,245645:double,245827:double,245963:double,246156:double,246311:double,246513:double,246655:double,246671:double,247429:double,247447:double,247752:double,248718:double,249111:double,24921:double,249628:double,249862:double,249894:double,250184:double,250405:double,250764:double,250962:double,251394:double,251422:double,251613:double,251843:double,251844:double,252071:double,252222:double,252695:double,252820:double,252848:double,253556:double,253821:double,254:double,25409:double,254206:double,254465:double,254899:double,254971:double,255092:double,255218:double,255489:double,25601:double,256167:double,256402:double,256407:double,257028:double,257204:double,258152:double,258185:double,258534:double,258938:double,259260:double,259380:double,259629:double,25981:double,260897:double,261105:double,261829:double,262399:double,262998:double,263163:double,263460:double,263877:double,264031:double,264082:double,264317:double,264321:double,264637:double,265115:double,26516:double,265313:double,26535:double,26544:double,265595:double,26583:double,265889:double,26593:double,266056:double,266226:double,266753:double,266865:double,267635:double,268030:double,268032:double,268110:double,268330:double,268932:double,269566:double,269719:double,269890:double,270713:double,270820:double,271195:double,271284:double,271448:double,271705:double,273979:double,274004:double,274061:double,274301:double,274308:double,275970:double,27617:double,2766:double,277427:double,277639:double,278418:double,28204:double,28360:double,28523:double,28591:double,29259:double,2977:double,29855:double,30276:double,30487:double,30511:double,30533:double,30711:double,30735:double,30972:double,31315:double,31391:double,31556:double,31826:double,31846:double,32195:double,32440:double,32721:double,32773:double,3363:double,33974:double,35050:double,35836:double,35857:double,35859:double,36606:double,36609:double,36836:double,36907:double,37567:double,37712:double,37950:double,38023:double,38273:double,38781:double,39281:double,39467:double,39616:double,39646:double,39773:double,4017:double,40889:double,40943:double,41084:double,41841:double,42914:double,43246:double,43806:double,43842:double,4385:double,44595:double,44728:double,46398:double,47316:double,48494:double,49109:double,49460:double,49889:double,50225:double,51094:double,51350:double,51386:double,51450:double,51883:double,52199:double,52584:double,52614:double,52853:double,52917:double,53174:double,53628:double,53729:double,54218:double,55187:double,55490:double,55492:double,55548:double,55734:double,55892:double,56271:double,56399:double,56447:double,56856:double,56959:double,59172:double,59971:double,60244:double,60277:double,60337:double,60707:double,61619:double,62272:double,6251:double,62891:double,62895:double,6323:double,63394:double,63714:double,63938:double,64436:double,65258:double,6543:double,6563:double,6575:double,66680:double,67840:double,68555:double,69042:double,69078:double,69232:double,69355:double,69405:double,69697:double,69808:double,69971:double,70052:double,70415:double,70594:double,7158:double,71712:double,72352:double,7286:double,72992:double,73394:double,7346:double,73651:double,75591:double,75860:double,76151:double,76223:double,76352:double,76499:double,76626:double,76818:double,76942:double,77809:double,77940:double,78553:double,78783:double,78834:double,78973:double,7915:double,79186:double,79441:double,80538:double,8067:double,80683:double,81045:double,81492:double,81560:double,81977:double,82407:double,8245:double,82831:double,82893:double,82926:double,83637:double,84024:double,85426:double,85526:double,85656:double,85701:double,85757:double,85993:double,86202:double,86243:double,86641:double,8681:double,87141:double,87143:double,87555:double,87746:double,87974:double,88283:double,88677:double,88693:double,88733:double,88937:double,8936:double,89602:double,91203:double,91342:double,91832:double,91931:double,92547:double,92652:double,92810:double,92979:double,93047:double,93085:double,93363:double,93629:double,94242:double,94347:double,94853:double,94923:double,95193:double,95316:double,95359:double,95903:double,95932:double,95991:double,96054:double,96448:double,97874:double,98391:double,98741:double,98758:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ScalaUDF_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$5(AggregationIterator.scala:260)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:151)
	at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:30)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.ContextAwareIterator.next(ContextAwareIterator.scala:41)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 22 more


In [None]:
# Convert PySpark DataFrame to Pandas DataFrame
pandas_df = popular_df.toPandas()

# Save Pandas DataFrame as Pickle file
pandas_df.to_pickle("popular.pkl")

# Print the first few rows of the Pandas DataFrame
print(pandas_df.head())

In [None]:
# Drop duplicates based on the 'Book-Title' column
books_without_duplicates = books.dropDuplicates(['Book-Title'])

# Show the resulting DataFrame
books_without_duplicates.show()

In [None]:
# Save pt DataFrame
pt.write.parquet("/home/hadoop/pt.parquet")

# Save books DataFrame
books.write.parquet("/home/hadoop/books.parquet")

# Save similarity_score DataFrame
similarity_score.write.parquet("/home/hadoop/similarity_scores.parquet")
