<a href="https://colab.research.google.com/github/krishjalwal/ProjectsIntern/blob/main/BDAdrugProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Classification with Pyspark


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('treecode').getOrCreate()


In [3]:

data = spark.read.csv('drug.csv',inferSchema=True,header=True)

In [4]:

data.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bp: string (nullable = true)
 |-- cholesterol: string (nullable = true)
 |-- Na_to_K: double (nullable = true)
 |-- drug: string (nullable = true)



In [5]:
data.head()

Row(age=23, sex='F', bp='HIGH', cholesterol='HIGH', Na_to_K=25.355, drug='drugY')

In [6]:

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer

In [7]:

data.show()


+---+---+------+-----------+-------+-----+
|age|sex|    bp|cholesterol|Na_to_K| drug|
+---+---+------+-----------+-------+-----+
| 23|  F|  HIGH|       HIGH| 25.355|drugY|
| 47|  M|   LOW|       HIGH| 13.093|drugC|
| 47|  M|   LOW|       HIGH| 10.114|drugC|
| 28|  F|NORMAL|       HIGH|  7.798|drugX|
| 61|  F|   LOW|       HIGH| 18.043|drugY|
| 22|  F|NORMAL|       HIGH|  8.607|drugX|
| 49|  F|NORMAL|       HIGH| 16.275|drugY|
| 41|  M|   LOW|       HIGH| 11.037|drugC|
| 60|  M|NORMAL|       HIGH| 15.171|drugY|
| 43|  M|   LOW|     NORMAL| 19.368|drugY|
| 47|  F|   LOW|       HIGH| 11.767|drugC|
| 34|  F|  HIGH|     NORMAL| 19.199|drugY|
| 43|  M|   LOW|       HIGH| 15.376|drugY|
| 74|  F|   LOW|       HIGH| 20.942|drugY|
| 50|  F|NORMAL|       HIGH| 12.703|drugX|
| 16|  F|  HIGH|     NORMAL| 15.516|drugY|
| 69|  M|   LOW|     NORMAL| 11.455|drugX|
| 43|  M|  HIGH|       HIGH| 13.972|drugA|
| 23|  M|   LOW|       HIGH|  7.298|drugC|
| 32|  F|  HIGH|     NORMAL| 25.974|drugY|
+---+---+--

In [8]:
data.columns


['age', 'sex', 'bp', 'cholesterol', 'Na_to_K', 'drug']

In [9]:

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(data) for column in list(set(data.columns)-set(['drug','Na_to_K','age'])) ]


In [10]:
pipeline = Pipeline(stages=indexers)

In [11]:
df_r = pipeline.fit(data).transform(data)


In [12]:
df_r.show()

+---+---+------+-----------+-------+-----+---------+-----------------+--------+
|age|sex|    bp|cholesterol|Na_to_K| drug|sex_index|cholesterol_index|bp_index|
+---+---+------+-----------+-------+-----+---------+-----------------+--------+
| 23|  F|  HIGH|       HIGH| 25.355|drugY|      1.0|              0.0|     0.0|
| 47|  M|   LOW|       HIGH| 13.093|drugC|      0.0|              0.0|     1.0|
| 47|  M|   LOW|       HIGH| 10.114|drugC|      0.0|              0.0|     1.0|
| 28|  F|NORMAL|       HIGH|  7.798|drugX|      1.0|              0.0|     2.0|
| 61|  F|   LOW|       HIGH| 18.043|drugY|      1.0|              0.0|     1.0|
| 22|  F|NORMAL|       HIGH|  8.607|drugX|      1.0|              0.0|     2.0|
| 49|  F|NORMAL|       HIGH| 16.275|drugY|      1.0|              0.0|     2.0|
| 41|  M|   LOW|       HIGH| 11.037|drugC|      0.0|              0.0|     1.0|
| 60|  M|NORMAL|       HIGH| 15.171|drugY|      0.0|              0.0|     2.0|
| 43|  M|   LOW|     NORMAL| 19.368|drug

In [13]:

assembler = VectorAssembler(
  inputCols=['age',
             'sex_index',
             'bp_index',
             'cholesterol_index',
             'Na_to_K'],
              outputCol="features")

In [14]:
output = assembler.transform(df_r)


In [15]:
output.show()

+---+---+------+-----------+-------+-----+---------+-----------------+--------+--------------------+
|age|sex|    bp|cholesterol|Na_to_K| drug|sex_index|cholesterol_index|bp_index|            features|
+---+---+------+-----------+-------+-----+---------+-----------------+--------+--------------------+
| 23|  F|  HIGH|       HIGH| 25.355|drugY|      1.0|              0.0|     0.0|[23.0,1.0,0.0,0.0...|
| 47|  M|   LOW|       HIGH| 13.093|drugC|      0.0|              0.0|     1.0|[47.0,0.0,1.0,0.0...|
| 47|  M|   LOW|       HIGH| 10.114|drugC|      0.0|              0.0|     1.0|[47.0,0.0,1.0,0.0...|
| 28|  F|NORMAL|       HIGH|  7.798|drugX|      1.0|              0.0|     2.0|[28.0,1.0,2.0,0.0...|
| 61|  F|   LOW|       HIGH| 18.043|drugY|      1.0|              0.0|     1.0|[61.0,1.0,1.0,0.0...|
| 22|  F|NORMAL|       HIGH|  8.607|drugX|      1.0|              0.0|     2.0|[22.0,1.0,2.0,0.0...|
| 49|  F|NORMAL|       HIGH| 16.275|drugY|      1.0|              0.0|     2.0|[49.0,1.0,2.

In [16]:
from pyspark.ml.feature import StringIndexer


In [17]:

indexer = StringIndexer(inputCol="drug", outputCol="drugIndex")
output_fixed = indexer.fit(output).transform(output)

In [18]:
final_data = output_fixed.select("features",'drugIndex')
final_data

DataFrame[features: vector, drugIndex: double]

In [19]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [20]:
train_data.show()

+--------------------+---------+
|            features|drugIndex|
+--------------------+---------+
|(5,[0,4],[23.0,8....|      2.0|
|(5,[0,4],[29.0,12...|      2.0|
|(5,[0,4],[34.0,18...|      0.0|
|(5,[0,4],[39.0,9....|      2.0|
|(5,[0,4],[40.0,27...|      0.0|
|(5,[0,4],[47.0,10...|      2.0|
|(5,[0,4],[50.0,7....|      2.0|
|(5,[0,4],[51.0,18...|      0.0|
|(5,[0,4],[59.0,13...|      3.0|
|(5,[0,4],[60.0,13...|      3.0|
|(5,[0,4],[70.0,9....|      3.0|
|(5,[0,4],[70.0,13...|      3.0|
|[16.0,0.0,0.0,1.0...|      0.0|
|[16.0,1.0,0.0,1.0...|      0.0|
|[17.0,0.0,2.0,1.0...|      1.0|
|[18.0,1.0,0.0,1.0...|      0.0|
|[19.0,1.0,0.0,0.0...|      2.0|
|[19.0,1.0,0.0,1.0...|      0.0|
|[20.0,0.0,0.0,1.0...|      0.0|
|[20.0,1.0,0.0,0.0...|      2.0|
+--------------------+---------+
only showing top 20 rows



In [21]:
from pyspark.ml.classification import DecisionTreeClassifier


In [22]:
dtc = DecisionTreeClassifier(labelCol='drugIndex',featuresCol='features')


In [23]:
# Train the model
dtc_model = dtc.fit(train_data)

In [24]:
dtc_predictions = dtc_model.transform(test_data)
predicted = dtc_predictions.select("features","drugIndex", "prediction","rawPrediction")
predicted.show()

+--------------------+---------+----------+--------------------+
|            features|drugIndex|prediction|       rawPrediction|
+--------------------+---------+----------+--------------------+
|(5,[0,4],[31.0,30...|      0.0|       0.0|[63.0,0.0,0.0,0.0...|
|(5,[0,4],[43.0,13...|      2.0|       2.0|[0.0,0.0,19.0,0.0...|
|(5,[0,4],[58.0,18...|      0.0|       0.0|[63.0,0.0,0.0,0.0...|
|(5,[0,4],[66.0,16...|      0.0|       0.0|[63.0,0.0,0.0,0.0...|
|(5,[0,4],[68.0,11...|      3.0|       3.0|[0.0,0.0,0.0,9.0,...|
|(5,[0,4],[74.0,9....|      3.0|       3.0|[0.0,0.0,0.0,9.0,...|
|[15.0,0.0,0.0,1.0...|      0.0|       0.0|[63.0,0.0,0.0,0.0...|
|[15.0,0.0,2.0,0.0...|      1.0|       1.0|[0.0,26.0,0.0,0.0...|
|[15.0,1.0,0.0,1.0...|      0.0|       0.0|[63.0,0.0,0.0,0.0...|
|[16.0,0.0,1.0,0.0...|      4.0|       4.0|[0.0,0.0,0.0,0.0,...|
|[18.0,1.0,0.0,0.0...|      0.0|       0.0|[63.0,0.0,0.0,0.0...|
|[18.0,1.0,2.0,1.0...|      1.0|       1.0|[0.0,26.0,0.0,0.0...|
|[20.0,1.0,2.0,1.0...|   

In [25]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [26]:
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="drugIndex", predictionCol="prediction", metricName="accuracy")

In [27]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)


In [29]:
dtc_acc

1.0

In [32]:
def knn(train_data, test_data, k=3):
    # Convert to Pandas for simplicity
    train_data_pd = train_data.select("features", "drugIndex").toPandas()
    test_data_pd = test_data.select("features", "drugIndex").toPandas()

    # Convert features to NumPy arrays
    X_train = np.array([x.toArray() for x in train_data_pd['features']])
    X_test = np.array([x.toArray() for x in test_data_pd['features']])

    # Compute the distances between each test point and all the train points
    distances = pairwise_distances(X_test, X_train)

    predictions = []
    for i in range(distances.shape[0]):
        # Get indices of the k nearest neighbors
        k_neighbors = np.argsort(distances[i])[:k]

        # Get majority class from the k neighbors
        k_labels = train_data_pd.iloc[k_neighbors]['drugIndex'].values

        # Convert k_labels to integers and get majority class
        majority_class = np.bincount(k_labels.astype(int)).argmax()
        predictions.append(majority_class)

    # Compare predictions to the actual values
    true_labels = test_data_pd['drugIndex'].values
    accuracy = np.sum(np.array(predictions) == true_labels) / len(true_labels)

    return accuracy

# Run KNN classifier
knn_acc = knn(train_data, test_data, k=3)
print(f"KNN Accuracy: {knn_acc}")


KNN Accuracy: 0.7321428571428571


In [31]:
from pyspark.ml.classification import NaiveBayes

# Naive Bayes Classifier
nb = NaiveBayes(featuresCol="features", labelCol="drugIndex", modelType="multinomial")

# Train the Naive Bayes model
nb_model = nb.fit(train_data)

# Make predictions
nb_predictions = nb_model.transform(test_data)
nb_predicted = nb_predictions.select("features", "drugIndex", "prediction", "rawPrediction")

# Evaluate Naive Bayes model
nb_acc = acc_evaluator.evaluate(nb_predictions)
print(f"Naive Bayes Accuracy: {nb_acc}")


Naive Bayes Accuracy: 0.625


In [33]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import pairwise_distances
import numpy as np

# Initialize Spark session
spark = SparkSession.builder.appName('LandslidingPrediction').getOrCreate()

# Load data
data = spark.read.csv('drug.csv', inferSchema=True, header=True)

# Print schema of the dataset
data.printSchema()

# Show first few rows of the dataset
data.show()

# Index categorical columns using StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(data)
            for column in list(set(data.columns)-set(['drug','Na_to_K','age']))]

# Apply the StringIndexer transformations
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(data).transform(data)

# Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=['age', 'sex_index', 'bp_index', 'cholesterol_index', 'Na_to_K'],
    outputCol="features"
)
output = assembler.transform(df_r)

# Index the target variable 'drug'
indexer = StringIndexer(inputCol="drug", outputCol="drugIndex")
output_fixed = indexer.fit(output).transform(output)

# Select features and target variable
final_data = output_fixed.select("features", 'drugIndex')

# Split the data into train and test datasets (70% train, 30% test)
train_data, test_data = final_data.randomSplit([0.7, 0.3])

# Train and Evaluate Decision Tree Classifier
dtc = DecisionTreeClassifier(labelCol='drugIndex', featuresCol='features')
dtc_model = dtc.fit(train_data)
dtc_predictions = dtc_model.transform(test_data)

# Evaluate Decision Tree classifier
acc_evaluator = MulticlassClassificationEvaluator(labelCol="drugIndex", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
print(f"Decision Tree Accuracy: {dtc_acc}")

# Implement KNN Classifier
def knn(train_data, test_data, k=3):
    # Convert to Pandas for simplicity
    train_data_pd = train_data.select("features", "drugIndex").toPandas()
    test_data_pd = test_data.select("features", "drugIndex").toPandas()

    # Convert features to NumPy arrays
    X_train = np.array([x.toArray() for x in train_data_pd['features']])
    X_test = np.array([x.toArray() for x in test_data_pd['features']])

    # Compute the distances between each test point and all the train points
    distances = pairwise_distances(X_test, X_train)

    predictions = []
    for i in range(distances.shape[0]):
        # Get indices of the k nearest neighbors
        k_neighbors = np.argsort(distances[i])[:k]

        # Get majority class from the k neighbors
        k_labels = train_data_pd.iloc[k_neighbors]['drugIndex'].values

        # Convert k_labels to integers and get majority class
        majority_class = np.bincount(k_labels.astype(int)).argmax()
        predictions.append(majority_class)

    # Compare predictions to the actual values
    true_labels = test_data_pd['drugIndex'].values
    accuracy = np.sum(np.array(predictions) == true_labels) / len(true_labels)

    return accuracy

# Run KNN classifier
knn_acc = knn(train_data, test_data, k=3)
print(f"KNN Accuracy: {knn_acc}")

# Train and Evaluate Naive Bayes Classifier
nb = NaiveBayes(featuresCol="features", labelCol="drugIndex", modelType="multinomial")
nb_model = nb.fit(train_data)
nb_predictions = nb_model.transform(test_data)

# Evaluate Naive Bayes classifier
nb_acc = acc_evaluator.evaluate(nb_predictions)
print(f"Naive Bayes Accuracy: {nb_acc}")


root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bp: string (nullable = true)
 |-- cholesterol: string (nullable = true)
 |-- Na_to_K: double (nullable = true)
 |-- drug: string (nullable = true)

+---+---+------+-----------+-------+-----+
|age|sex|    bp|cholesterol|Na_to_K| drug|
+---+---+------+-----------+-------+-----+
| 23|  F|  HIGH|       HIGH| 25.355|drugY|
| 47|  M|   LOW|       HIGH| 13.093|drugC|
| 47|  M|   LOW|       HIGH| 10.114|drugC|
| 28|  F|NORMAL|       HIGH|  7.798|drugX|
| 61|  F|   LOW|       HIGH| 18.043|drugY|
| 22|  F|NORMAL|       HIGH|  8.607|drugX|
| 49|  F|NORMAL|       HIGH| 16.275|drugY|
| 41|  M|   LOW|       HIGH| 11.037|drugC|
| 60|  M|NORMAL|       HIGH| 15.171|drugY|
| 43|  M|   LOW|     NORMAL| 19.368|drugY|
| 47|  F|   LOW|       HIGH| 11.767|drugC|
| 34|  F|  HIGH|     NORMAL| 19.199|drugY|
| 43|  M|   LOW|       HIGH| 15.376|drugY|
| 74|  F|   LOW|       HIGH| 20.942|drugY|
| 50|  F|NORMAL|       HIGH| 12.703|drug