In [1]:
!pip install -q pyspark


1.Build a Classification Model with Spark with a dataset of your choice

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from sklearn.datasets import load_breast_cancer
import pandas as pd

# Spark session
spark = SparkSession.builder.appName("Classification").getOrCreate()

# Load and convert dataset
data = load_breast_cancer()
df = pd.DataFrame(data.data, columns=data.feature_names)
df['label'] = data.target
spark_df = spark.createDataFrame(df)

# Features
vec_assembler = VectorAssembler(inputCols=data.feature_names.tolist(), outputCol="features")
assembled = vec_assembler.transform(spark_df)

# Model
lr = LogisticRegression(featuresCol='features', labelCol='label')
model = lr.fit(assembled)

# Prediction
predictions = model.transform(assembled)
predictions.select("label", "prediction", "probability").show(5)


+-----+----------+-----------+
|label|prediction|probability|
+-----+----------+-----------+
|    0|       0.0|  [1.0,0.0]|
|    0|       0.0|  [1.0,0.0]|
|    0|       0.0|  [1.0,0.0]|
|    0|       0.0|  [1.0,0.0]|
|    0|       0.0|  [1.0,0.0]|
+-----+----------+-----------+
only showing top 5 rows



2.Build a Clustering Model with Spark with a dataset of your choice

In [3]:
from pyspark.ml.clustering import KMeans
from sklearn.datasets import load_iris

# Load and convert dataset
iris = load_iris()
iris_df = pd.DataFrame(iris.data, columns=iris.feature_names)
spark_iris_df = spark.createDataFrame(iris_df)

# Feature vector
assembler = VectorAssembler(inputCols=iris.feature_names, outputCol="features")
iris_vec = assembler.transform(spark_iris_df)

# KMeans
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(iris_vec)
centers = model.clusterCenters()

# Predictions
preds = model.transform(iris_vec)
preds.select("features", "prediction").show(5)


+-----------------+----------+
|         features|prediction|
+-----------------+----------+
|[5.1,3.5,1.4,0.2]|         1|
|[4.9,3.0,1.4,0.2]|         1|
|[4.7,3.2,1.3,0.2]|         1|
|[4.6,3.1,1.5,0.2]|         1|
|[5.0,3.6,1.4,0.2]|         1|
+-----------------+----------+
only showing top 5 rows



3.Build a Recommendation Engine with Spark with a dataset of your
choice

In [4]:
!wget https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip -o ml-latest-small.zip

from pyspark.ml.recommendation import ALS

# Load data
ratings_df = pd.read_csv("ml-latest-small/ratings.csv")
ratings_df = ratings_df.drop("timestamp", axis=1)
spark_ratings_df = spark.createDataFrame(ratings_df)

# ALS Model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(spark_ratings_df)

# Recommendations
user_recs = model.recommendForAllUsers(3)
user_recs.show(5, truncate=False)


--2025-04-24 10:04:31--  https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 978202 (955K) [application/zip]
Saving to: ‘ml-latest-small.zip’


2025-04-24 10:04:34 (953 KB/s) - ‘ml-latest-small.zip’ saved [978202/978202]

Archive:  ml-latest-small.zip
   creating: ml-latest-small/
  inflating: ml-latest-small/links.csv  
  inflating: ml-latest-small/tags.csv  
  inflating: ml-latest-small/ratings.csv  
  inflating: ml-latest-small/README.txt  
  inflating: ml-latest-small/movies.csv  
+------+------------------------------------------------------------+
|userId|recommendations                                             |
+------+------------------------------------------------------------+
|1     |[{132333, 5.74244}, {5490, 5.74244}, {5915, 5.7160797}]     |
