<a href="https://colab.research.google.com/github/swethaswetha7676/023_BDA_assignment/blob/main/23_BDA_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Build *italicized text* a classification model with spark with a dataset of your choice**

Description:
1. Environment Preparation: The code starts by importing essential libraries, including pyspark for distributed data processing, pandas for data manipulation, and others for machine learning tasks and model evaluation. It then establishes a SparkSession, which acts as the primary entry point for interacting with Spark functionalities and provides a structured environment for executing Spark applications.

2. Data Acquisition and Preprocessing: The code proceeds to download a wine quality dataset from a publicly available source and loads it into a Spark DataFrame, a distributed collection of data organized into named columns. To facilitate classification, a new column named "label" is created, assigning a value of 1 to wines deemed "good" based on a quality threshold and 0 to those considered "not good." Relevant features, representing characteristics of the wine, are selected alongside the target variable ("label"). These features are then transformed into a vector representation using VectorAssembler, a crucial step for compatibility with many machine learning algorithms.

3. Model Training and Development: The dataset is divided into two subsets: a training set used to train the model and a testing set used to evaluate its performance on unseen data. Logistic Regression, a widely used algorithm for binary classification, is chosen as the model for predicting wine quality. The model is trained using the training data, allowing it to learn patterns and relationships between the features and the target variable (wine quality).

4. Prediction and Performance Assessment: The trained Logistic Regression model is applied to the testing data to predict the quality labels for each wine. The accuracy of these predictions is then evaluated using metrics such as Area Under the ROC Curve (AUC) and overall accuracy. These metrics provide insights into the model's ability to generalize to new data and its effectiveness in correctly classifying wine quality.

5. Resource Management and Termination: To ensure efficient resource utilization, the SparkSession is terminated, releasing the resources allocated during the analysis. This step is crucial for maintaining a clean and optimized execution environment.

In [None]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("WineQualityClassification").getOrCreate()

In [None]:
import os
data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
data_file = "winequality-red.csv"
if not os.path.exists(data_file):
    !pip install wget
    import wget
    wget.download(data_url, data_file)
df = spark.read.csv(data_file, sep=";", header=True, inferSchema=True)
df.show(5)
df.printSchema()

Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9655 sha256=d69d1b9f2930573a5e8e6c957a1c0a79d4626c8ab0d5bc77040dceac358383a5
  Stored in directory: /root/.cache/pip/wheels/40/b3/0f/a40dbd1c6861731779f62cc4babcb234387e11d697df70ee97
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|

In [None]:
from pyspark.sql.functions import when
# Create a binary label: 1 for good quality (>= 7), 0 for not good
df = df.withColumn("label", when(df["quality"] >= 7, 1.0).otherwise(0.0))
# Select the features and the new label column
feature_columns = [col for col in df.columns if col != "quality" and col != "label"]
data = df.select(feature_columns + ["label"])
data.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-----+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|label|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-----+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|  0.0|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|  0.0|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|  0.0|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|                60.0|  

In [None]:
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(            # Vectorize the feature columns
    inputCols=feature_columns,
    outputCol="features"
)
output = feature_assembler.transform(data)
final_df = output.select("features", "label")   # Select the features and label
final_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[7.4,0.7,0.0,1.9,...|  0.0|
|[7.8,0.88,0.0,2.6...|  0.0|
|[7.8,0.76,0.04,2....|  0.0|
|[11.2,0.28,0.56,1...|  0.0|
|[7.4,0.7,0.0,1.9,...|  0.0|
+--------------------+-----+
only showing top 5 rows



In [None]:
# Split the data into training and testing sets
train_data, test_data = final_df.randomSplit([0.8, 0.2], seed=42)
print(f"Number of training samples: {train_data.count()}")
print(f"Number of testing samples: {test_data.count()}")

Number of training samples: 1324
Number of testing samples: 275


In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)
predictions = model.transform(test_data)              # Make predictions on the test data
predictions.select("features", "label", "prediction").show(10)

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|[4.9,0.42,0.0,2.1...|  1.0|       1.0|
|[5.0,0.74,0.0,1.2...|  0.0|       0.0|
|[5.0,1.04,0.24,1....|  0.0|       0.0|
|[5.2,0.32,0.25,1....|  0.0|       0.0|
|[5.3,0.47,0.11,2....|  1.0|       1.0|
|[5.4,0.42,0.27,2....|  1.0|       0.0|
|[5.6,0.31,0.37,1....|  0.0|       0.0|
|[5.6,0.605,0.05,2...|  0.0|       0.0|
|[5.8,0.29,0.26,1....|  0.0|       1.0|
|[5.8,0.61,0.11,1....|  0.0|       0.0|
+--------------------+-----+----------+
only showing top 10 rows



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)       # Calculate the Area Under ROC Curve (AUC)
print(f"Area Under ROC Curve (AUC) on the test data: {auc:.4f}")
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions)
print(f"Accuracy of the model on the test data: {accuracy:.4f}")

Area Under ROC Curve (AUC) on the test data: 0.8623
Accuracy of the model on the test data: 0.8727


In [None]:
# Stop the SparkSession
spark.stop()