<a href="https://colab.research.google.com/github/taiwotman/TaiwotmanGoogleColab/blob/main/COVID_HOSPITAL_TREATMENT_Predicting_Patient's_Length_of_Stay(LOS)_using_Kaggle_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Multi-Class Classification of COVID-19 Patients based on Length of Stay**

***Description***: **[COVID HOSPITAL TREATMENT](https://www.kaggle.com/arashnic/covid19-hospital-treatment)**

Kaggle dataset available for download: [data](https://www.kaggle.com/arashnic/covid19-hospital-treatment/download)



***Authored by:*** Taiwo O. Adetiloye | [Website](https://taiwotman.github.io)

***Date:*** March 15, 2021

---




### **1. SET UP**

**Mount content from Google Drive**


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**Install Java 8**

In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

**Install pyspark libraries**

In [4]:
!pip install -q findspark
!pip install pyspark


Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 70kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=f3ea9b33ae356adfde835bcb047d68ad85ebfbb62da20904b62329b18228174c
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


**Set JAVA_HOME and SPARK_HOME**

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.7/dist-packages/pyspark"


**Ensure spark is set up and running.**


In [6]:
import findspark 
findspark.find()

'/usr/local/lib/python3.7/dist-packages/pyspark'

In [7]:

from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local')\
.appName("Predicting LOS for High Risk Patient")\
.getOrCreate()

In [8]:
spark

**Read file from the mounted Drive in spark dataframe**

In [9]:
filepath = "/content/drive/MyDrive/Colab Notebooks/data/host_train.csv" # Change this to your data filepath

df  = spark.read.option("header", "true").csv(filepath)


### **2. Data Preparation**


In [10]:
df.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- Hospital: string (nullable = true)
 |-- Hospital_type: string (nullable = true)
 |-- Hospital_city: string (nullable = true)
 |-- Hospital_region: string (nullable = true)
 |-- Available_Extra_Rooms_in_Hospital: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Ward_Type: string (nullable = true)
 |-- Ward_Facility: string (nullable = true)
 |-- Bed_Grade: string (nullable = true)
 |-- patientid: string (nullable = true)
 |-- City_Code_Patient: string (nullable = true)
 |-- Type of Admission: string (nullable = true)
 |-- Illness_Severity: string (nullable = true)
 |-- Patient_Visitors: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Admission_Deposit: string (nullable = true)
 |-- Stay_Days: string (nullable = true)



In [46]:
print(f"Counts of rows/samples: {df.count()}")
print(f"Counts of columns/features: {len(df.columns)}")

Counts of rows/samples: 318438
Counts of columns/features: 18


In [12]:
df.show(30)

+-------+--------+-------------+-------------+---------------+---------------------------------+------------+---------+-------------+---------+---------+-----------------+-----------------+----------------+----------------+-----+-----------------+---------+
|case_id|Hospital|Hospital_type|Hospital_city|Hospital_region|Available_Extra_Rooms_in_Hospital|  Department|Ward_Type|Ward_Facility|Bed_Grade|patientid|City_Code_Patient|Type of Admission|Illness_Severity|Patient_Visitors|  Age|Admission_Deposit|Stay_Days|
+-------+--------+-------------+-------------+---------------+---------------------------------+------------+---------+-------------+---------+---------+-----------------+-----------------+----------------+----------------+-----+-----------------+---------+
|      1|       8|            2|            3|              2|                                3|radiotherapy|        R|            F|      2.0|    31397|              7.0|        Emergency|         Extreme|               2|51-

**Observations using Dataframe Schema**


\begin{array}{ccc}
Column\:Name&Critical\:Factor&Data\:Type&Transformation\:required&Transformation\\
case\_id & No &String &No&N/A \\ 
Hospital & Yes &String&Yes&String\:to\:integer\\
Hospital\_type & Yes &String&Yes& String\:to\:integer\\
Hospital\_city & Yes &String&Yes&String\:to\:integer\\
Hospital\_region & Yes &String&Yes&String\:to\:integer\\
Available\_extra\_rooms\_in\_hospital & Yes &String&Yes&String\:to\:integer\\
Department & Yes &String&Yes&String\:to\:index\\
Ward\_type & Yes &String&Yes&String\:to\:index\\
Ward\_facility & Yes &String&Yes&String\:to\:index\\
Bed\_grade & Yes &String&Yes&String\:to\:integer\\
Patientid & No &String&Yes&String\:to\:integer\\
City\_Code\_Patient & Yes &String&Yes&String\:to\:integer\\
Type\:of\:Admission & Yes &String&Yes&String\:to\:index\\
Illness\_Severity & Yes &String&Yes&String\:to\:index\\
Patient\_Visitors & Yes &String&Yes&String\:to\:Integer\\
Age & Yes &String&Yes&String\:to\:index\\
Admission\_Deposit& Yes &String&Yes&String\:to\:integer\\
Stay\_Days & Target\:variable\:or\:Label &String&Yes&String\:to\:index\\
\end{array}




**First Level Transformation: Rename selected columns**

In [13]:
## Rename column "Type of Admission" and "patientid"
df2 = df.withColumnRenamed("Type of Admission", "Type_of_Admission")\
      .withColumnRenamed("patientid", "Patient_id")

## Convert all columns to lower case for uniformity
df3 = df2.toDF(*[c.lower() for c in df2.columns])
df3.printSchema()



root
 |-- case_id: string (nullable = true)
 |-- hospital: string (nullable = true)
 |-- hospital_type: string (nullable = true)
 |-- hospital_city: string (nullable = true)
 |-- hospital_region: string (nullable = true)
 |-- available_extra_rooms_in_hospital: string (nullable = true)
 |-- department: string (nullable = true)
 |-- ward_type: string (nullable = true)
 |-- ward_facility: string (nullable = true)
 |-- bed_grade: string (nullable = true)
 |-- patient_id: string (nullable = true)
 |-- city_code_patient: string (nullable = true)
 |-- type_of_admission: string (nullable = true)
 |-- illness_severity: string (nullable = true)
 |-- patient_visitors: string (nullable = true)
 |-- age: string (nullable = true)
 |-- admission_deposit: string (nullable = true)
 |-- stay_days: string (nullable = true)



**Second Level Transformation: Change selected columns from String to Integer**

In [14]:
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F

string_to_integer_list = ['hospital', 'hospital_type', 'hospital_city','hospital_region','available_extra_rooms_in_hospital',
                          'bed_grade','city_code_patient','patient_visitors','admission_deposit']
df4 = df3
for col in string_to_integer_list:
  df4 = df4.withColumn(col, F.col(col).astype(IntegerType()))

df4.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- hospital: integer (nullable = true)
 |-- hospital_type: integer (nullable = true)
 |-- hospital_city: integer (nullable = true)
 |-- hospital_region: integer (nullable = true)
 |-- available_extra_rooms_in_hospital: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- ward_type: string (nullable = true)
 |-- ward_facility: string (nullable = true)
 |-- bed_grade: integer (nullable = true)
 |-- patient_id: string (nullable = true)
 |-- city_code_patient: integer (nullable = true)
 |-- type_of_admission: string (nullable = true)
 |-- illness_severity: string (nullable = true)
 |-- patient_visitors: integer (nullable = true)
 |-- age: string (nullable = true)
 |-- admission_deposit: integer (nullable = true)
 |-- stay_days: string (nullable = true)



**Third Level Transformation: Change selected columns from String to Index**



In [21]:
from pyspark.ml.feature import StringIndexer
import pyspark.sql.functions as F

string_to_integer_list = ['department','ward_facility', 'ward_type','illness_severity','type_of_admission']

def custom_stringindex(df, col):
  output = col + "_index"
  indexer = StringIndexer(inputCol=col, outputCol=output)
  indexed = indexer.fit(df).transform(df)
  
  return indexed


df5 = df4
for col in string_to_integer_list:
  df5 = custom_stringindex(df5, col)
 
df5.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- hospital: integer (nullable = true)
 |-- hospital_type: integer (nullable = true)
 |-- hospital_city: integer (nullable = true)
 |-- hospital_region: integer (nullable = true)
 |-- available_extra_rooms_in_hospital: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- ward_type: string (nullable = true)
 |-- ward_facility: string (nullable = true)
 |-- bed_grade: integer (nullable = true)
 |-- patient_id: string (nullable = true)
 |-- city_code_patient: integer (nullable = true)
 |-- type_of_admission: string (nullable = true)
 |-- illness_severity: string (nullable = true)
 |-- patient_visitors: integer (nullable = true)
 |-- age: string (nullable = true)
 |-- admission_deposit: integer (nullable = true)
 |-- stay_days: string (nullable = true)
 |-- department_index: double (nullable = false)
 |-- ward_facility_index: double (nullable = false)
 |-- ward_type_index: double (nullable = false)
 |-- illness_severity_inde

### **3. Data Insight**

**Using Group By**

In [22]:
from pyspark.sql.functions import col

df5.groupBy("stay_days") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

df5.groupBy("illness_severity") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

df5.groupBy("ward_type") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

df5.groupBy("ward_facility") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

df5.groupBy("department") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

df5.groupBy("age") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+------------------+-----+
|         stay_days|count|
+------------------+-----+
|             21-30|87491|
|             11-20|78139|
|             31-40|55159|
|             51-60|35018|
|              0-10|23604|
|             41-50|11743|
|             71-80|10254|
|More than 100 Days| 6683|
|             81-90| 4838|
|            91-100| 2765|
|             61-70| 2744|
+------------------+-----+

+----------------+------+
|illness_severity| count|
+----------------+------+
|        Moderate|175843|
|           Minor| 85872|
|         Extreme| 56723|
+----------------+------+

+---------+------+
|ward_type| count|
+---------+------+
|        R|127947|
|        Q|106165|
|        S| 77794|
|        P|  5046|
|        T|  1477|
|        U|     9|
+---------+------+

+-------------+------+
|ward_facility| count|
+-------------+------+
|            F|112753|
|            E| 55351|
|            D| 51809|
|            C| 35463|
|            B| 35156|
|            A| 27906|
+------------

###**Machine Learning**

In [23]:
input_variable = ['hospital', 'hospital_type', 'hospital_city','hospital_region','available_extra_rooms_in_hospital',
                  'bed_grade','city_code_patient','patient_visitors','admission_deposit',
                  'department_index', 'ward_facility_index', 'ward_type_index', 'illness_severity_index',
                  'type_of_admission_index']

label = ['stay_days_index']
                    

**Feature engineering**

*Vector Assembler*

In [24]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=input_variable,
    outputCol="features",
    handleInvalid ="skip")

df6 = assembler.transform(df5)



*Partition training and test sets*

In [28]:
# set seed for reproducibility
(trainingData, testData) = df6.drop("features").randomSplit([0.8, 0.2], seed = 200000)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 250985
Test Dataset Count: 62808


**Decison Tree**

In [47]:
from pyspark.ml.feature import IndexToString
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

labelIndexer = StringIndexer(inputCol="stay_days", outputCol="stay_days_index").fit(df6)
# Train a LogisticRegression model.
dt = DecisionTreeClassifier(labelCol="stay_days_index", featuresCol="features")

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)


In [30]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, assembler, dt, labelConverter])

In [31]:
model = pipeline.fit(trainingData)

In [32]:
predictions = model.transform(testData)

predictions.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- hospital: integer (nullable = true)
 |-- hospital_type: integer (nullable = true)
 |-- hospital_city: integer (nullable = true)
 |-- hospital_region: integer (nullable = true)
 |-- available_extra_rooms_in_hospital: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- ward_type: string (nullable = true)
 |-- ward_facility: string (nullable = true)
 |-- bed_grade: integer (nullable = true)
 |-- patient_id: string (nullable = true)
 |-- city_code_patient: integer (nullable = true)
 |-- type_of_admission: string (nullable = true)
 |-- illness_severity: string (nullable = true)
 |-- patient_visitors: integer (nullable = true)
 |-- age: string (nullable = true)
 |-- admission_deposit: integer (nullable = true)
 |-- stay_days: string (nullable = true)
 |-- department_index: double (nullable = false)
 |-- ward_facility_index: double (nullable = false)
 |-- ward_type_index: double (nullable = false)
 |-- illness_severity_inde

In [33]:
# Select example rows to display.
predictions.select("type_of_admission", "stay_days", "predictedLabel" ).show(20)

+-----------------+---------+--------------+
|type_of_admission|stay_days|predictedLabel|
+-----------------+---------+--------------+
|        Emergency|     0-10|         21-30|
|           Urgent|    11-20|         11-20|
|           Trauma|    31-40|         21-30|
|           Urgent|    21-30|         21-30|
|           Urgent|    21-30|         21-30|
|           Trauma|    21-30|         21-30|
|        Emergency|    11-20|         11-20|
|           Trauma|    21-30|         21-30|
|           Trauma|    11-20|         21-30|
|           Trauma|    21-30|         21-30|
|           Trauma|   91-100|         51-60|
|           Trauma|    41-50|         21-30|
|           Trauma|    21-30|         11-20|
|           Trauma|    21-30|         21-30|
|           Trauma|    11-20|         11-20|
|           Trauma|    11-20|         21-30|
|           Trauma|    21-30|         21-30|
|           Trauma|    51-60|         21-30|
|           Trauma|    21-30|         21-30|
|         

**Multiclass Classification Evaluation**

In [43]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
def customMulticlassClassificationEvaluator(predictions):
  evaluator = MulticlassClassificationEvaluator(
      labelCol="stay_days_index", predictionCol="prediction", metricName="accuracy")
  accuracy = evaluator.evaluate(predictions)
  print("Accuracy = %g " %(accuracy))
  print("Test Error = %g " %(1.0 - accuracy))


customMulticlassClassificationEvaluator(predictions)

Accuracy = 0.391399 
Test Error = 0.608601 


**Observation:** Possible case of overfitting. Investigate root cause.

**Proposed solution:** On the feature sets perform:

1.   Principal component analysis 
2.   Standardization



***TO BE CONTINUED...***

