<a href="https://colab.research.google.com/github/tejasvini2805/160122771017_BDA_ASSIGNMENT_2/blob/main/160122771017_BigDataAnalytics_Assignment_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Spark 3.4.1 with Hadoop 3 from an official Apache mirror
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Extract Spark
!tar -xzf spark-3.4.1-bin-hadoop3.tgz

# Install findspark
!pip install -q findspark

# Set environment variables and initialize Spark
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

findspark.init()


1 Build a Classification Model with Spark with a dataset of your choice

In [17]:
# Step 1: Install Spark and set up
!apt-get install openjdk-11-jdk -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ClassificationModel").getOrCreate()

# Step 2: Load dataset with headers
import pandas as pd

# Correct adult dataset URL (without 404)
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"

# Column names from UCI
columns = ["age", "workclass", "fnlwgt", "education", "education-num",
           "marital-status", "occupation", "relationship", "race", "sex",
           "capital-gain", "capital-loss", "hours-per-week", "native-country", "income"]

df_pd = pd.read_csv(url, names=columns, na_values=" ?", skipinitialspace=True)
df_pd.to_csv("adult.csv", index=False)

# Step 3: Load into Spark
df = spark.read.csv("adult.csv", header=True, inferSchema=True)

# Step 4: Drop nulls and clean columns
df = df.dropna()
for col_name in df.columns:
    df = df.withColumnRenamed(col_name, col_name.strip())

# Step 5: Preprocessing - Index categorical features
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

categorical_cols = ["workclass", "education", "marital-status", "occupation", "relationship", "race", "sex", "native-country"]
indexers = [StringIndexer(inputCol=col, outputCol=col + "_Index") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col + "_Index", outputCol=col + "_Vec") for col in categorical_cols]

# Label indexer
label_indexer = StringIndexer(inputCol="income", outputCol="label")

# Assembling all features
features = ["age", "fnlwgt", "education-num", "capital-gain", "capital-loss", "hours-per-week"] + [col + "_Vec" for col in categorical_cols]
assembler = VectorAssembler(inputCols=features, outputCol="features")

# Step 6: Pipeline
pipeline = Pipeline(stages=indexers + encoders + [label_indexer, assembler])

# Step 7: Prepare final dataset
model = pipeline.fit(df)
df_prepared = model.transform(df)

# Step 8: Train classifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

train_data, test_data = df_prepared.randomSplit([0.8, 0.2], seed=42)
classifier = LogisticRegression(featuresCol="features", labelCol="label")
clf_model = classifier.fit(train_data)
predictions = clf_model.transform(test_data)

# Step 9: Evaluate
evaluator = BinaryClassificationEvaluator()
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy (AUC): {accuracy:.4f}")


Test Accuracy (AUC): 0.9064


In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluators for Accuracy, Precision, Recall, and F1 score
evaluator_accuracy = MulticlassClassificationEvaluator(metricName="accuracy", labelCol="label")
evaluator_precision = MulticlassClassificationEvaluator(metricName="weightedPrecision", labelCol="label")
evaluator_recall = MulticlassClassificationEvaluator(metricName="weightedRecall", labelCol="label")
evaluator_f1 = MulticlassClassificationEvaluator(metricName="f1", labelCol="label")

# Evaluate the model
accuracy = evaluator_accuracy.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)

# Print the evaluation metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision (Weighted): {precision:.4f}")
print(f"Recall (Weighted): {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")


Accuracy: 0.8506
Precision (Weighted): 0.8443
Recall (Weighted): 0.8506
F1 Score: 0.8453


2 Build a Clustering Model with Spark with a dataset of your choice

In [22]:


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ClusteringModel").getOrCreate()

# Step 2: Load dataset
import pandas as pd

# URL for Iris dataset (CSV)
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
columns = ["sepal_length", "sepal_width", "petal_length", "petal_width", "class"]

# Load the dataset into Pandas and then save it as a CSV
df = pd.read_csv(url, names=columns)
df.to_csv("iris.csv", index=False)

# Step 3: Load into Spark
spark_df = spark.read.csv("iris.csv", header=True, inferSchema=True)

# Step 4: Data Preprocessing - Select relevant features
# We will exclude the 'class' column for clustering
df_features = spark_df.select("sepal_length", "sepal_width", "petal_length", "petal_width")

# Step 5: Assemble features into a single vector column
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], outputCol="features")
df_assembled = assembler.transform(df_features)

# Step 6: Apply KMeans clustering
from pyspark.ml.clustering import KMeans

# Define the KMeans model with 3 clusters (since Iris dataset has 3 species)
kmeans = KMeans(k=3, seed=1, featuresCol="features", predictionCol="prediction")

# Train the model
model = kmeans.fit(df_assembled)

# Make predictions
predictions = model.transform(df_assembled)

# Step 7: Evaluate the model - Evaluate clustering using the training cost (similar to WSSSE)
wssse = model.summary.trainingCost
print(f"Within Set Sum of Squared Errors (WSSSE): {wssse}")


# Step 8: Show the clusters
predictions.select("sepal_length", "sepal_width", "petal_length", "petal_width", "prediction").show(10)

# Step 9: Show the cluster centers
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)


Within Set Sum of Squared Errors (WSSSE): 78.94084142614598
+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|prediction|
+------------+-----------+------------+-----------+----------+
|         5.1|        3.5|         1.4|        0.2|         1|
|         4.9|        3.0|         1.4|        0.2|         1|
|         4.7|        3.2|         1.3|        0.2|         1|
|         4.6|        3.1|         1.5|        0.2|         1|
|         5.0|        3.6|         1.4|        0.2|         1|
|         5.4|        3.9|         1.7|        0.4|         1|
|         4.6|        3.4|         1.4|        0.3|         1|
|         5.0|        3.4|         1.5|        0.2|         1|
|         4.4|        2.9|         1.4|        0.2|         1|
|         4.9|        3.1|         1.5|        0.1|         1|
+------------+-----------+------------+-----------+----------+
only showing top 10 rows

Cluster Centers: 
[5.9016129  2.

3 Build a Recommendation Engine with Spark with a dataset of your
choice

In [67]:
pip install pyspark




In [70]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
import urllib.request
import zipfile

# Step 1: Initialize Spark Session
spark = SparkSession.builder.master("local").appName("MovieRecommendation").getOrCreate()

# Step 2: Download the MovieLens 100k dataset from a URL (it is hosted online)
url = "http://files.grouplens.org/datasets/movielens/ml-100k.zip"
filename = "/tmp/ml-100k.zip"
urllib.request.urlretrieve(url, filename)

# Step 3: Extract the zip file
with zipfile.ZipFile(filename, 'r') as zip_ref:
    zip_ref.extractall("/tmp")

# Step 4: Load the ratings data into a DataFrame
ratings_file_path = "/tmp/ml-100k/u.data"
ratings_df = spark.read.option("delimiter", "\t").csv(ratings_file_path, header=False, inferSchema=True)

# Step 5: Rename columns for better readability
ratings_df = ratings_df.withColumnRenamed("_c0", "user_id") \
                       .withColumnRenamed("_c1", "item_id") \
                       .withColumnRenamed("_c2", "rating") \
                       .withColumnRenamed("_c3", "timestamp")

# Step 6: Show the first few rows of the dataset
ratings_df.show(5)

# Step 7: Set up the ALS model for collaborative filtering
# Adjust ALS parameters, including regularization and rank (hyperparameters)
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating",
          nonnegative=True, implicitPrefs=False, rank=50, maxIter=10, regParam=0.1, coldStartStrategy="drop")

# Step 8: Split the data into training and test sets
(training_data, test_data) = ratings_df.randomSplit([0.8, 0.2], seed=1234)

# Step 9: Fit the ALS model to the training data
als_model = als.fit(training_data)

# Step 10: Make predictions on the test data
predictions = als_model.transform(test_data)

# Step 11: Ensure no null values in the predictions
predictions_filtered = predictions.filter(predictions["prediction"].isNotNull())

# Step 12: Show some of the predictions
predictions_filtered.show(5)

# Step 13: Calculate RMSE to evaluate the model
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
rmse = evaluator.evaluate(predictions_filtered)
print(f"Root-Mean-Square Error (RMSE) on test data = {rmse}")

# Step 14: Generate top 10 movie recommendations for each user
user_recommendations = als_model.recommendForAllUsers(10)
user_recommendations.show(5)

# Step 15: Generate top 10 user recommendations for each movie
movie_recommendations = als_model.recommendForAllItems(10)
movie_recommendations.show(5)


+-------+-------+------+---------+
|user_id|item_id|rating|timestamp|
+-------+-------+------+---------+
|    196|    242|     3|881250949|
|    186|    302|     3|891717742|
|     22|    377|     1|878887116|
|    244|     51|     2|880606923|
|    166|    346|     1|886397596|
+-------+-------+------+---------+
only showing top 5 rows

+-------+-------+------+---------+----------+
|user_id|item_id|rating|timestamp|prediction|
+-------+-------+------+---------+----------+
|    148|     70|     5|877021271| 3.1155865|
|    148|     71|     5|877019251| 3.6562562|
|    148|     89|     5|877398587| 3.9636848|
|    148|    114|     5|877016735| 4.4826035|
|    148|    177|     2|877020715|  3.580373|
+-------+-------+------+---------+----------+
only showing top 5 rows

Root-Mean-Square Error (RMSE) on test data = 0.9119366885453809
+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{169, 4.9104013}...|
|      3|[{320, 4.5112166}...|
| 