<a href="https://colab.research.google.com/github/pravalika-n25/BDA/blob/main/BDA_160122771011.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ***BDA ASSIGNMENT-2 ~ 160122771011***

# **Initializing spark session and Loading Wine quality dataset into colab.**

In [11]:
#initializing spark session
from pyspark.sql import SparkSession

try:
    spark = SparkSession.builder \
        .appName("ClassificationModel") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    print("Spark session initialized successfully.")
except Exception as e:
    print(f"Error initializing Spark session: {e}")
    exit(1)

Spark session initialized successfully.


In [12]:
from google.colab import files
uploaded = files.upload()

file_path = '/content/WineQT.csv'

#reading the dataset using Spark
df = spark.read.csv(file_path, header=True, inferSchema=True)

#Show the first few rows of the dataset
df.show(5)

Saving WineQT.csv to WineQT (2).csv
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+---+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality| Id|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+---+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|  0|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|  1|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|  2|
|         11.2|            0.28|       0

#**1) Build a Classification Model with Spark with a dataset of your choice.**

In [13]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

#Prepare features and label columns (excluding 'quality' and 'Id')
feature_columns = [col for col in df.columns if col not in ['quality', 'Id']]

#Initialize VectorAssembler
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

#Initialize Logistic Regression model
lr = LogisticRegression(labelCol='quality', featuresCol='features')

#Creating a pipeline with assembler and logistic regression stages
pipeline = Pipeline(stages=[assembler, lr])

#Splitting data into training and testing sets (80% training, 20% testing)
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)

#Fit the pipeline model using the training data
model = pipeline.fit(train_data)

#Making predictions on the test data
predictions = model.transform(test_data)

#Evaluate the model's performance using accuracy
evaluator = BinaryClassificationEvaluator(labelCol='quality', rawPredictionCol='prediction')
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy:.2f}")

Accuracy: 1.00


#**2) Build a Clustering Model with Spark with a dataset of your choice.**

In [14]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import ClusteringEvaluator

#Prepare features and label columns (excluding 'quality' and 'Id')
feature_columns = [col for col in df.columns if col not in ['quality', 'Id']]

#Initialize VectorAssembler to create 'features' column
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

#Initialize KMeans model (k=3, you can change k depending on the dataset)
kmeans = KMeans(k=3, seed=1234, featuresCol='features', predictionCol='prediction')

#Creating a pipeline with assembler and KMeans model
pipeline = Pipeline(stages=[assembler, kmeans])

#Splitting data into training and testing sets (80% training, 20% testing)
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)

#Fit the pipeline model using the training data
model = pipeline.fit(train_data)

#Make predictions on the test data
predictions = model.transform(test_data)

#Evaluate the clustering performance using ClusteringEvaluator
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='features')

silhouette_score = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette_score:.2f}")

Silhouette Score: 0.77


#**3) Build a Recommendation Engine with Spark with a dataset of your choice.**

In [15]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, BucketedRandomProjectionLSH
from pyspark.sql.functions import col

#Initialising Spark session
spark = SparkSession.builder \
    .appName("WineContentBasedRec") \
    .getOrCreate()

#Loading and inspecting the data
df = spark.read.csv('/content/WineQT (1).csv', header=True, inferSchema=True)
df.printSchema()

#Assembling the numeric features into a vector
feature_cols = [
    'fixed acidity','volatile acidity','citric acid','residual sugar',
    'chlorides','free sulfur dioxide','total sulfur dioxide',
    'density','pH','sulphates','alcohol'
]
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
dataset = assembler.transform(df).select(col('Id').alias('itemId'), 'features')

#Fit an LSH model for approximate nearest-neighbor search
lsh = BucketedRandomProjectionLSH(
    inputCol='features',
    outputCol='hashes',
    bucketLength=1.0,
    numHashTables=5
)
lsh_model = lsh.fit(dataset)

#Pick a “query” wine (e.g. itemId = 1) and get its feature vector
query_vec = dataset.filter(col('itemId') == 1).select('features').head()[0]

#Find the 5 most similar wines (including itself)
neighbors = lsh_model.approxNearestNeighbors(dataset, query_vec, 6)

#Show recommendations (drop the query wine itself)
neighbors.filter(col('itemId') != 1).orderBy('distCol').select('itemId','distCol').show(5)

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)
 |-- Id: integer (nullable = true)

+------+------------------+
|itemId|           distCol|
+------+------------------+
|   752|1.5438594827574175|
|  1173|2.5254823558282884|
|  1174|2.5254823558282884|
|  1357|2.5255890604767823|
|   796| 3.338695019854314|
+------+------------------+

