<a href="https://colab.research.google.com/github/poojithakothapalli/BDA/blob/main/BDA_160122771007.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1.Build a classification model with spark with a dataset of your choice in python for big data analysis

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
spark = SparkSession.builder.appName("UserConversionClassification").getOrCreate()
data = [
    (25, 5, 3.5, 0),
    (32, 8, 5.2, 1),
    (40, 4, 2.5, 0),
    (23, 12, 6.8, 1),
    (36, 9, 5.9, 1),
    (29, 2, 1.2, 0),
    (50, 11, 6.5, 1),
    (45, 3, 2.1, 0),
    (31, 6, 4.0, 0),
    (28, 10, 6.1, 1)
]
columns = ["age", "pages_visited", "time_spent", "label"]
df = spark.createDataFrame(data, schema=columns)
assembler = VectorAssembler(
    inputCols=["age", "pages_visited", "time_spent"],
    outputCol="features"
)
df_prepared = assembler.transform(df).select("features", "label")
train_data, test_data = df_prepared.randomSplit([0.7, 0.3], seed=42)
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
model = dt.fit(train_data)
predictions = model.transform(test_data)
predictions.select("features", "label", "prediction").show()
evaluator = BinaryClassificationEvaluator()
accuracy = evaluator.evaluate(predictions)
print(f"Test AUC: {accuracy:.2f}")
spark.stop()


+---------------+-----+----------+
|       features|label|prediction|
+---------------+-----+----------+
| [32.0,8.0,5.2]|    1|       1.0|
|[28.0,10.0,6.1]|    1|       1.0|
+---------------+-----+----------+

Test AUC: 1.00


2.Build a clustering model with spark with a data set of your choice

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
spark = SparkSession.builder.appName("CustomerSegmentation").getOrCreate()
data = [
    (25, 30, 40),
    (34, 55, 60),
    (22, 20, 30),
    (45, 70, 80),
    (23, 25, 20),
    (40, 60, 70),
    (29, 35, 40),
    (50, 90, 90),
    (21, 18, 15),
    (31, 50, 55),
    (42, 85, 88),
    (33, 48, 53),
    (46, 95, 85),
    (28, 33, 38),
    (26, 31, 36),
]
columns = ["age", "annual_income", "spending_score"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

# Assemble features
assembler = VectorAssembler(
    inputCols=["age", "annual_income", "spending_score"],
    outputCol="features"
)
df_features = assembler.transform(df).select("features")

# KMeans clustering
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df_features)

# Predict clusters
predictions = model.transform(df_features)
predictions.show(truncate=False)

# Print cluster centers
print("Cluster Centers:")
for center in model.clusterCenters():
    print(center)

# Stop Spark session
spark.stop()


+----------------+----------+
|features        |prediction|
+----------------+----------+
|[25.0,30.0,40.0]|0         |
|[34.0,55.0,60.0]|2         |
|[22.0,20.0,30.0]|0         |
|[45.0,70.0,80.0]|1         |
|[23.0,25.0,20.0]|0         |
|[40.0,60.0,70.0]|2         |
|[29.0,35.0,40.0]|0         |
|[50.0,90.0,90.0]|1         |
|[21.0,18.0,15.0]|0         |
|[31.0,50.0,55.0]|2         |
|[42.0,85.0,88.0]|1         |
|[33.0,48.0,53.0]|2         |
|[46.0,95.0,85.0]|1         |
|[28.0,33.0,38.0]|0         |
|[26.0,31.0,36.0]|0         |
+----------------+----------+

Cluster Centers:
[24.85714286 27.42857143 31.28571429]
[45.75 85.   85.75]
[34.5  53.25 59.5 ]


3.Build a recommondation engine with spark with a dataset of your choice

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Start Spark session
spark = SparkSession.builder.appName("ALSRecommender").getOrCreate()

# Simulated user-item-rating data
data = [
    (1, 101, 5.0),
    (1, 102, 3.0),
    (1, 103, 2.5),
    (2, 101, 2.0),
    (2, 102, 2.5),
    (2, 103, 5.0),
    (2, 104, 4.0),
    (3, 101, 5.0),
    (3, 104, 4.0),
    (3, 105, 1.0),
    (4, 103, 4.0),
    (4, 104, 4.5),
    (4, 105, 5.0),
]

# Define schema
columns = ["user_id", "item_id", "rating"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

# Split into train/test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Build ALS model
als = ALS(
    userCol="user_id",
    itemCol="item_id",
    ratingCol="rating",
    coldStartStrategy="drop",  # Drop unknowns in prediction
    nonnegative=True,
    implicitPrefs=False,
    rank=10,
    maxIter=10,
    regParam=0.1
)

# Train the model
model = als.fit(train_data)

# Predict ratings for test data
predictions = model.transform(test_data)
predictions.show()

# Evaluate using RMSE
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Test RMSE: {rmse:.2f}")

# Generate top 3 item recommendations for each user
user_recs = model.recommendForAllUsers(3)
user_recs.show(truncate=False)

# Stop Spark session
spark.stop()


+-------+-------+------+----------+
|user_id|item_id|rating|prediction|
+-------+-------+------+----------+
|      1|    103|   2.5| 2.8751957|
|      4|    104|   4.5| 1.3660398|
|      4|    105|   5.0|0.34150994|
|      2|    104|   4.0| 1.6198617|
+-------+-------+------+----------+

Test RMSE: 3.05
+-------+------------------------------------------------------+
|user_id|recommendations                                       |
+-------+------------------------------------------------------+
|1      |[{101, 4.902092}, {104, 4.01145}, {102, 2.9638023}]   |
|2      |[{103, 4.8664002}, {102, 2.4335005}, {101, 2.030826}] |
|3      |[{101, 4.848009}, {104, 3.972286}, {102, 2.7848659}]  |
|4      |[{103, 3.9582627}, {102, 2.0035877}, {101, 1.7105932}]|
+-------+------------------------------------------------------+

