<a href="https://colab.research.google.com/github/swapnabanoth/001_BDA_Assignment/blob/main/BDA_001.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 1: Create Spark session
spark = SparkSession.builder.appName("IrisClassification").getOrCreate()

# Step 2: Load dataset
# You can replace this with your own CSV path
data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
columns = ["sepal_length", "sepal_width", "petal_length", "petal_width", "species"]

# Downloading data and saving locally (if needed)
import pandas as pd
iris_df = pd.read_csv(data_url, names=columns)
iris_df.to_csv("iris.csv", index=False)

# Load into Spark
df = spark.read.csv("iris.csv", header=True, inferSchema=True)

# Step 3: Preprocess data
# Let's convert this into a binary classification problem: 'Iris-setosa' vs others
df = df.withColumn("label", (df["species"] == "Iris-setosa").cast("integer"))

# Features into a single vector
feature_columns = ["sepal_length", "sepal_width", "petal_length", "petal_width"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = assembler.transform(df)

# Step 4: Split dataset
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Step 5: Build and train model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)

# Step 6: Make predictions
predictions = model.transform(test_data)

# Step 7: Evaluate model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy:.2f}")

# Optional: Show some predictions
predictions.select("features", "label", "prediction").show(5)

# Stop Spark session
spark.stop()


Accuracy: 1.00
+-----------------+-----+----------+
|         features|label|prediction|
+-----------------+-----+----------+
|[4.4,3.0,1.3,0.2]|    1|       1.0|
|[4.6,3.2,1.4,0.2]|    1|       1.0|
|[4.6,3.6,1.0,0.2]|    1|       1.0|
|[4.8,3.1,1.6,0.2]|    1|       1.0|
|[4.9,3.1,1.5,0.1]|    1|       1.0|
+-----------------+-----+----------+
only showing top 5 rows



In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: Spark Session
spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()

# Step 2: Load dataset
# MovieLens 100k dataset: userId, movieId, rating, timestamp
import pandas as pd
import urllib.request

# Download MovieLens 100k
ml_url = "https://files.grouplens.org/datasets/movielens/ml-latest-small.zip"
urllib.request.urlretrieve(ml_url, "ml-latest-small.zip")

import zipfile
with zipfile.ZipFile("ml-latest-small.zip", 'r') as zip_ref:
    zip_ref.extractall(".")

# Read ratings into Spark
ratings_df = spark.read.csv("ml-latest-small/ratings.csv", header=True, inferSchema=True)
ratings_df = ratings_df.select("userId", "movieId", "rating")

# Step 3: Train-test split
train_data, test_data = ratings_df.randomSplit([0.8, 0.2], seed=42)

# Step 4: ALS model
als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"  # drop NaN predictions
)
model = als.fit(train_data)

# Step 5: Predictions
predictions = model.transform(test_data)

# Step 6: Evaluate the model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"\nRoot-mean-square error (RMSE): {rmse:.4f}")

# Step 7: Recommend Top 5 Movies for Each User
user_recs = model.recommendForAllUsers(5)
print("\nTop 5 movie recommendations for sample users:")
user_recs.show(5, truncate=False)

# Step 8: Stop Spark session
spark.stop()



Root-mean-square error (RMSE): 0.8797

Top 5 movie recommendations for sample users:
+------+-------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                  |
+------+-------------------------------------------------------------------------------------------------+
|1     |[{132333, 5.896185}, {177593, 5.773418}, {5915, 5.77255}, {8542, 5.668288}, {171495, 5.6276536}] |
|2     |[{131724, 4.9002175}, {2693, 4.89408}, {136469, 4.852532}, {78836, 4.72831}, {2936, 4.726223}]   |
|3     |[{5048, 5.0585546}, {6835, 4.914627}, {5746, 4.914627}, {5181, 4.8702173}, {4518, 4.8108983}]    |
|4     |[{4642, 5.288948}, {132333, 5.1431713}, {26326, 5.045396}, {2300, 5.0337276}, {8542, 5.032188}]  |
|5     |[{1188, 4.917164}, {177593, 4.8808775}, {1212, 4.8354225}, {26326, 4.8340344}, {7096, 4.8247476}]|
+------+--------------------------------------------------

In [None]:
# ============================
# Step 1: Setup Spark
# ============================
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

# Start Spark session
spark = SparkSession.builder.appName("MovieRecommendationALS").getOrCreate()

# ============================
# Step 2: Load Dataset
# ============================
ratings_path = "/content/ml-latest-small/ratings.csv"  # <- Change path if needed

df = spark.read.csv(ratings_path, header=True, inferSchema=True)
df = df.select("userId", "movieId", "rating")
df.show(5)

# ============================
# Step 3: Train ALS Model
# ============================
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    maxIter=10,
    regParam=0.1,
    rank=10,
    nonnegative=True,
    coldStartStrategy="drop"
)

model = als.fit(df)

# ============================
# Step 4: Make Predictions
# ============================
predictions = model.transform(df)

# ============================
# Step 5: Evaluate the Model
# ============================
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"\n Root Mean Squared Error (RMSE): {rmse:.2f}")

# ============================
# Step 6: Generate Recommendations
# ============================
userRecs = model.recommendForAllUsers(5)
itemRecs = model.recommendForAllItems(5)

print("\n Top-5 movie recommendations for users:")
userRecs.select("userId", "recommendations").show(5, truncate=False)

# ============================
# Step 7: Stop Spark
# ============================
spark.stop()
