In [2]:
# Loading the dataset
import pandas as pd

In [3]:
# Loading the dataset
df = pd.read_csv("books_data.csv")

Hadoop MapReduce Job

In [4]:
# Simulating the MapReduce: Word Count
def map_reduce_word_count(data):
    # Mapper
    mapped = []
    for desc in data['description'].dropna():
        words = desc.split()
        mapped.extend([(word.lower(), 1) for word in words])
    # Reducer
    reduced = {}
    for word, count in mapped:
        reduced[word] = reduced.get(word, 0) + count
    return reduced

In [5]:
# Running the MapReduce job
word_counts = map_reduce_word_count(df)
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)

In [6]:
# Displaying top 10 word frequencies
print("Top 10 Word Frequencies in Description:")
print(sorted_word_counts[:10])

Top 10 Word Frequencies in Description:
[('the', 898888), ('and', 639147), ('of', 586098), ('to', 361155), ('a', 353734), ('in', 283222), ('is', 150728), ('for', 140415), ('with', 120235), ('that', 109321)]


Apache Spark Job






In [7]:
# Installing the PySpark
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lower, explode, split



In [8]:
# Initializing the SparkSession
spark = SparkSession.builder.appName("BooksDataProcessing").getOrCreate()

In [9]:
# Loading the dataset into the Spark DataFrame
df = spark.read.csv("books_data.csv", header=True, inferSchema=True)
# Displaying the schema and first few rows
df.printSchema()
df.show(5)

root
 |-- Title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- image: string (nullable = true)
 |-- previewLink: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publishedDate: string (nullable = true)
 |-- infoLink: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- ratingsCount: string (nullable = true)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+------------+
|               Title|         description|             authors|               image|         previewLink|           publisher| publishedDate|            infoLink|          categories|ratingsCount|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+---

In [10]:
# Task 1: Word Count (EDA on 'description')
# Tokenizing the descriptions into words
df_words = df.select(explode(split(lower(col("description")), "\\s+")).alias("word"))

In [11]:
# Counting the word frequencies
word_counts = df_words.groupBy("word").count().orderBy(col("count").desc())
# Showing the top 10 most common words
word_counts.show(10)

+----+------+
|word| count|
+----+------+
| the|770031|
| and|545083|
|  of|503350|
|  to|307905|
|   a|303321|
|  in|241999|
|  is|128667|
| for|120241|
|with|103418|
|that| 91267|
+----+------+
only showing top 10 rows



In [12]:
# Task 2: Group by Categories and Count
category_counts = df.groupBy("categories").count().orderBy(col("count").desc())
category_counts.show()

+--------------------+-----+
|          categories|count|
+--------------------+-----+
|                NULL|40524|
|         ['Fiction']|20020|
|        ['Religion']| 7923|
|         ['History']| 7882|
|['Juvenile Fiction']| 6088|
|['Biography & Aut...| 5207|
|['Business & Econ...| 4663|
|       ['Computers']| 3659|
|  ['Social Science']| 3212|
|['Juvenile Nonfic...| 3190|
|         ['Science']| 2346|
|       ['Education']| 2297|
|         ['Cooking']| 2158|
|['Sports & Recrea...| 1975|
|         ['Medical']| 1828|
|           ['Music']| 1823|
|['Family & Relati...| 1814|
|             ['Art']| 1796|
|['Literary Critic...| 1791|
|['Language Arts &...| 1751|
+--------------------+-----+
only showing top 20 rows



Apache Spark MLlib for Machine Learning

In [15]:
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [16]:
# Assuring the 'ratingsCount' column is cast to a numeric type
df = df.withColumn("ratingsCount", col("ratingsCount").cast(IntegerType()))
# Dropping the rows where 'ratingsCount' is null
df_cleaned = df.na.drop(subset=["ratingsCount"])
# Converting to the feature vector
vector_assembler = VectorAssembler(inputCols=["ratingsCount"], outputCol="features")
df_ml = vector_assembler.transform(df_cleaned)

In [17]:
# Performing the K-Means Clustering
kmeans = KMeans().setK(3).setSeed(1)  # 3 clusters
model = kmeans.fit(df_ml)
# Evaluating the clustering
predictions = model.transform(df_ml)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")

Silhouette Score: 0.9977541695374487


In [18]:
# Showing the cluster assignments
predictions.select("categories", "ratingsCount", "prediction").show(10)

+--------------------+------------+----------+
|          categories|ratingsCount|prediction|
+--------------------+------------+----------+
|        ['Religion']|           5|         0|
|                NULL|           3|         0|
|['Biography & Aut...|           1|         0|
|         ['History']|           1|         0|
|                NULL|           1|         0|
|['Juvenile Fiction']|           2|         0|
|['Sports & Recrea...|           1|         0|
|    ['Architecture']|           1|         0|
|        ['Religion']|           4|         0|
|         ['Fiction']|           1|         0|
+--------------------+------------+----------+
only showing top 10 rows

