## Drug Classification

### Setting

In [8]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

In [9]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
import requests
from io import BytesIO
import pandas as pd
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType



## Ingesting Data

### A simpler data ingesting method should have worked on an appropriately configured Spark server.

In [None]:
# Initialize a Spark session
spark = SparkSession.builder.appName("DrugClassification").getOrCreate()
# the data set is available at the url below.
URL = "https://openml1.win.tue.nl/dataset43382/dataset_43382.pq"
df_spark = spark.read.parquet(URL)
df_spark.printSchema()
df_spark.show()

### A workaround ingesting data

In [29]:

# the data set is available at the url below.
URL = "http://openml1.win.tue.nl/dataset43382/dataset_43382.pq"

# Download the data
response = requests.get(URL)
data = response.content

# Load data into a DataFrame
with BytesIO(data) as byte_io:
    df_pandas = pd.read_parquet(byte_io)
df_pandas.head()

Unnamed: 0,Age,Sex,BP,Cholesterol,Na_to_K,Drug
0,23,F,HIGH,HIGH,25.355,DrugY
1,47,M,LOW,HIGH,13.093,drugC
2,47,M,LOW,HIGH,10.114,drugC
3,28,F,NORMAL,HIGH,7.798,drugX
4,61,F,LOW,HIGH,18.043,DrugY


In [30]:
data = df_pandas.values.tolist()

In [31]:
# Initialize a Spark session
spark = SparkSession.builder.appName("DrugClassification").getOrCreate()

# Define the schema for the PySpark DataFrame
schema = StructType([
    StructField("Age", IntegerType(), True),
    StructField("Sex", StringType(), True),
    StructField("BP", StringType(), True),
    StructField("Cholesterol", StringType(), True),
    StructField("Na_to_K", DoubleType(), True),
    StructField("Drug", StringType(), True)
])

# Create a PySpark DataFrame using the defined schema
df_spark = spark.createDataFrame(data, schema=schema)

# Show the DataFrame schema
df_spark.printSchema()

# Display the first few rows of the DataFrame
df_spark.show()




23/09/19 13:30:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
root
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- BP: string (nullable = true)
 |-- Cholesterol: string (nullable = true)
 |-- Na_to_K: double (nullable = true)
 |-- Drug: string (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+---+---+------+-----------+-------+-----+
|Age|Sex|    BP|Cholesterol|Na_to_K| Drug|
+---+---+------+-----------+-------+-----+
| 23|  F|  HIGH|       HIGH| 25.355|DrugY|
| 47|  M|   LOW|       HIGH| 13.093|drugC|
| 47|  M|   LOW|       HIGH| 10.114|drugC|
| 28|  F|NORMAL|       HIGH|  7.798|drugX|
| 61|  F|   LOW|       HIGH| 18.043|DrugY|
| 22|  F|NORMAL|       HIGH|  8.607|drugX|
| 49|  F|NORMAL|       HIGH| 16.275|DrugY|
| 41|  M|   LOW|       HIGH| 11.037|drugC|
| 60|  M|NORMAL|       HIGH| 15.171|DrugY|
| 43|  M|   LOW|     NORMAL| 19.368|DrugY|
| 47|  F|   LOW|       HIGH| 11.767|drugC|
| 34|  F|  HIGH|     NORMAL| 19.199|DrugY|
| 43|  M|   LOW|       HIGH| 15.376|DrugY|
| 74|  F|   LOW|       HIGH| 20.942|DrugY|
| 50|  F|NORMAL|       HIGH| 12.703|drugX|
| 16|  F|  HIGH|     NORMAL| 15.516|DrugY|
| 69|  M|   LOW|     NORMAL| 11.455|drugX|
| 43|  M|  HIGH|       HIGH| 13.972|drugA|
| 23|  M|   LOW|       HIGH|  7.298|drugC|
| 32|  F|  HIGH|     NORMAL| 25.974|DrugY|
+---+---+--

                                                                                

## Feature Engineering

In [32]:
indexers = [StringIndexer(inputCol=col, outputCol=col+"_indexed").fit(df_spark) for col in ['Sex', 'BP', 'Cholesterol']]
pipeline = Pipeline(stages=indexers)
indexed_df = pipeline.fit(df_spark).transform(df_spark)
indexed_df.show()



+---+---+------+-----------+-------+-----+-----------+----------+-------------------+
|Age|Sex|    BP|Cholesterol|Na_to_K| Drug|Sex_indexed|BP_indexed|Cholesterol_indexed|
+---+---+------+-----------+-------+-----+-----------+----------+-------------------+
| 23|  F|  HIGH|       HIGH| 25.355|DrugY|        1.0|       0.0|                0.0|
| 47|  M|   LOW|       HIGH| 13.093|drugC|        0.0|       1.0|                0.0|
| 47|  M|   LOW|       HIGH| 10.114|drugC|        0.0|       1.0|                0.0|
| 28|  F|NORMAL|       HIGH|  7.798|drugX|        1.0|       2.0|                0.0|
| 61|  F|   LOW|       HIGH| 18.043|DrugY|        1.0|       1.0|                0.0|
| 22|  F|NORMAL|       HIGH|  8.607|drugX|        1.0|       2.0|                0.0|
| 49|  F|NORMAL|       HIGH| 16.275|DrugY|        1.0|       2.0|                0.0|
| 41|  M|   LOW|       HIGH| 11.037|drugC|        0.0|       1.0|                0.0|
| 60|  M|NORMAL|       HIGH| 15.171|DrugY|        0.0|

In [33]:

# Perform one-hot encoding on the indexed categorical columns
encoder = OneHotEncoder(inputCols=['Sex_indexed', 'BP_indexed', 'Cholesterol_indexed'],
                        outputCols=['Sex_encoded', 'BP_encoded', 'Cholesterol_encoded'])
df_encoded = encoder.fit(indexed_df).transform(indexed_df)

# Show the DataFrame schema
df_encoded.printSchema()

# Display the first few rows of the DataFrame
df_encoded.show()


root
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- BP: string (nullable = true)
 |-- Cholesterol: string (nullable = true)
 |-- Na_to_K: double (nullable = true)
 |-- Drug: string (nullable = true)
 |-- Sex_indexed: double (nullable = false)
 |-- BP_indexed: double (nullable = false)
 |-- Cholesterol_indexed: double (nullable = false)
 |-- Sex_encoded: vector (nullable = true)
 |-- BP_encoded: vector (nullable = true)
 |-- Cholesterol_encoded: vector (nullable = true)

+---+---+------+-----------+-------+-----+-----------+----------+-------------------+-------------+-------------+-------------------+
|Age|Sex|    BP|Cholesterol|Na_to_K| Drug|Sex_indexed|BP_indexed|Cholesterol_indexed|  Sex_encoded|   BP_encoded|Cholesterol_encoded|
+---+---+------+-----------+-------+-----+-----------+----------+-------------------+-------------+-------------+-------------------+
| 23|  F|  HIGH|       HIGH| 25.355|DrugY|        1.0|       0.0|                0.0|    (1,[]

Transform the string column "Drug" to a column of numerical values named "label"

In [34]:
indexer = StringIndexer(inputCol="Drug", outputCol="label")
df_encoded = indexer.fit(df_encoded).transform(df_encoded)
df_encoded.show()

+---+---+------+-----------+-------+-----+-----------+----------+-------------------+-------------+-------------+-------------------+-----+
|Age|Sex|    BP|Cholesterol|Na_to_K| Drug|Sex_indexed|BP_indexed|Cholesterol_indexed|  Sex_encoded|   BP_encoded|Cholesterol_encoded|label|
+---+---+------+-----------+-------+-----+-----------+----------+-------------------+-------------+-------------+-------------------+-----+
| 23|  F|  HIGH|       HIGH| 25.355|DrugY|        1.0|       0.0|                0.0|    (1,[],[])|(2,[0],[1.0])|      (1,[0],[1.0])|  0.0|
| 47|  M|   LOW|       HIGH| 13.093|drugC|        0.0|       1.0|                0.0|(1,[0],[1.0])|(2,[1],[1.0])|      (1,[0],[1.0])|  4.0|
| 47|  M|   LOW|       HIGH| 10.114|drugC|        0.0|       1.0|                0.0|(1,[0],[1.0])|(2,[1],[1.0])|      (1,[0],[1.0])|  4.0|
| 28|  F|NORMAL|       HIGH|  7.798|drugX|        1.0|       2.0|                0.0|    (1,[],[])|    (2,[],[])|      (1,[0],[1.0])|  1.0|
| 61|  F|   LOW|    

In [35]:

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=['Age', 'Sex_encoded', 'BP_encoded', 'Cholesterol_encoded', 'Na_to_K'],
                            outputCol='features')
df_assembled = assembler.transform(df_encoded)


## Model creation and training and evaluation

In [36]:

# Split the data into training and testing sets
(train_data, test_data) = df_assembled.randomSplit([0.7, 0.3], seed=1234)

# Train a RandomForestClassifier
rf = RandomForestClassifier(labelCol='label', featuresCol='features', numTrees=10)
model = rf.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# Stop the Spark session
spark.stop()


Accuracy: 0.9047619047619048
