# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

In [None]:
# !pip install pyspark

### Import the pyspark and check it's version

In [1]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Import and create SparkSession

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DateType, BooleanType

In [6]:
spark = SparkSession.builder.getOrCreate()

### Load the PatientInfo.csv file and show the first 5 rows

In [7]:
patient_df = spark.read.csv("PatientInfo.csv", header=True, inferSchema=True)

### Cast Date from String to DateType

In [8]:
patient_df = patient_df.withColumn('confirmed_date', col('confirmed_date').cast(DateType()))
patient_df = patient_df.withColumn('released_date', col('released_date').cast(DateType()))
patient_df = patient_df.withColumn('deceased_date', col('deceased_date').cast(DateType()))
patient_df = patient_df.withColumn('symptom_onset_date', col('symptom_onset_date').cast(DateType()))

### Display the schema of the dataset

In [9]:
patient_df.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: date (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [10]:
patient_df.summary().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|      contact_number|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+--------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1346|                 791|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9|1.6772572523506988E7|    null|
| stddev| 2.074210725277473E9|  null|null|      null|    null|          null|                null|1.5265072953383324E9| 3.093097580985502E8|    null|
|    min|          1000000001|female|  0s|Bangladesh|   Busan|     Andong-si|Anyang Gunpo Past...|  

### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [11]:
patient_df.groupBy('state').count().show()

+--------+-----+
|   state|count|
+--------+-----+
|isolated| 2158|
|released| 2929|
|deceased|   78|
+--------+-----+



### Display the number of null values in each column

In [12]:
from pyspark.sql.functions import when, count as countFn

In [13]:
# count null for all columns
def count_null(df):
    df.select([countFn(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# count values in columns
def value_counts(df, colum_name):
    df.groupBy(colum_name).count().show()

In [14]:
count_null(df=patient_df)

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4476|             3|         3578|         5099|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [15]:
from pyspark.sql.functions import coalesce, datediff

In [16]:
patient_df1 = patient_df.withColumn('deceased_date', coalesce('deceased_date', 'released_date'))

In [17]:
count_null(df=patient_df1)

+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|patient_id| sex| age|country|province|city|infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|state|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+
|         0|1122|1380|      0|       0|  94|           919|       3819|          4374|              4476|             3|         3578|         3514|    0|
+----------+----+----+-------+--------+----+--------------+-----------+--------------+------------------+--------------+-------------+-------------+-----+



### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [18]:
# casting before
no_days = datediff(patient_df1['deceased_date'] , patient_df1['confirmed_date'])
patient_df2 = patient_df1.withColumn('no_days', no_days)

In [19]:
patient_df2.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|no_days|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+-------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|   2020-02-05|released|     13|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|   2020-03-02|released|     32|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|

### Add a is_male column if male then it should yield true, else then False

In [20]:
patient_df3 = patient_df2.withColumn('is_male', when(col('sex')=='male', True).when(col('sex')=='female', False).otherwise(None).alias('sex'))

In [21]:
value_counts(patient_df3, colum_name='is_male')

+-------+-----+
|is_male|count|
+-------+-----+
|   null| 1122|
|   true| 1825|
|  false| 2218|
+-------+-----+



### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [22]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

In [23]:
def rename_state(value):
    if  value == 'released':
        return False
    else:
        return True

In [24]:
df_udf = udf(rename_state, BooleanType())

In [25]:
patient_df4 = patient_df3.withColumn("is_dead", df_udf(col("state")).alias("is_dead"))

In [26]:
value_counts(patient_df4, colum_name='is_dead')

+-------+-----+
|is_dead|count|
+-------+-----+
|   true| 2236|
|  false| 2929|
+-------+-----+



### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [27]:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import DoubleType

In [28]:
patient_df5 = patient_df4.withColumn('age', regexp_replace('age','s',''))

In [29]:
value_counts(patient_df5, colum_name='age')

+----+-----+
| age|count|
+----+-----+
|  30|  523|
|   0|   66|
|null| 1380|
| 100|    1|
|  70|  232|
|  60|  482|
|  90|   49|
|  40|  518|
|  20|  899|
|  10|  178|
|  80|  170|
|  50|  667|
+----+-----+



### Change age, and no_days  to be typecasted as Double

In [30]:
patient_df6 = patient_df5.withColumn('age', col('age').cast(DoubleType()))\
                        .withColumn('no_days', col('age').cast(DoubleType()))\
                        .withColumn('is_male', col('is_male').cast(IntegerType()))\
                        .withColumn('is_dead', col('is_dead').cast(IntegerType()))

In [31]:
patient_df6.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: date (nullable = true)
 |-- confirmed_date: date (nullable = true)
 |-- released_date: date (nullable = true)
 |-- deceased_date: date (nullable = true)
 |-- state: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: integer (nullable = true)
 |-- is_dead: integer (nullable = true)



### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [32]:
drop_col =["patient_id","sex","infected_by","contact_number","released_date","state",
                                            "symptom_onset_date","confirmed_date","deceased_date","country","no_days", "city","infection_case"]
patient_df7 = patient_df6.drop(*drop_col)

### Recount the number of nulls now

In [33]:
count_null(patient_df7)

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|1380|       0|   1122|      0|
+----+--------+-------+-------+



## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

In [None]:
# paientTb
patient_df.createOrReplaceTempView('table')

### Use SELECT statement to select all columns from the dataframe and show the output.

In [69]:
spark.sql("Select * from table").show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|

### *Using SQL commands*, limit the output to only 5 rows 

In [68]:
spark.sql("Select * from table limit 5").show()

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|    2020-01-23|   2020-02-05|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|    2020-01-30|   2020-03-02|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|            17|              null|    2020-01-30|   202

### Select the count of males and females in the dataset

In [79]:
spark.sql("Select sex, count(sex) from table group by sex").show()

+------+----------+
|   sex|count(sex)|
+------+----------+
|  null|         0|
|female|      2218|
|  male|      1825|
+------+----------+



### How many people did survive, and how many didn't?

In [80]:
spark.sql("Select state, count(state) from table group by state").show()

+--------+------------+
|   state|count(state)|
+--------+------------+
|isolated|        2158|
|released|        2929|
|deceased|          78|
+--------+------------+



### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

In [82]:
paientTb = spark.sql("select SUBSTRING(age, 0, length(age)-1) as age, sex, province, state from table")

In [83]:
paientTb.show()

+---+------+--------+--------+
|age|   sex|province|   state|
+---+------+--------+--------+
| 50|  male|   Seoul|released|
| 30|  male|   Seoul|released|
| 50|  male|   Seoul|released|
| 20|  male|   Seoul|released|
| 20|female|   Seoul|released|
| 50|female|   Seoul|released|
| 20|  male|   Seoul|released|
| 20|  male|   Seoul|released|
| 30|  male|   Seoul|released|
| 60|female|   Seoul|released|
| 50|female|   Seoul|released|
| 20|  male|   Seoul|released|
| 80|  male|   Seoul|deceased|
| 60|female|   Seoul|released|
| 70|  male|   Seoul|released|
| 70|  male|   Seoul|released|
| 70|  male|   Seoul|released|
| 20|  male|   Seoul|released|
| 70|female|   Seoul|released|
| 70|female|   Seoul|released|
+---+------+--------+--------+
only showing top 20 rows



In [84]:
paientTb.createTempView('table2')

In [88]:
sql_df = spark.sql("select Cast(age as Double) as age, sex, province, state from table2")

In [89]:
sql_df.show()

+----+------+--------+--------+
| age|   sex|province|   state|
+----+------+--------+--------+
|50.0|  male|   Seoul|released|
|30.0|  male|   Seoul|released|
|50.0|  male|   Seoul|released|
|20.0|  male|   Seoul|released|
|20.0|female|   Seoul|released|
|50.0|female|   Seoul|released|
|20.0|  male|   Seoul|released|
|20.0|  male|   Seoul|released|
|30.0|  male|   Seoul|released|
|60.0|female|   Seoul|released|
|50.0|female|   Seoul|released|
|20.0|  male|   Seoul|released|
|80.0|  male|   Seoul|deceased|
|60.0|female|   Seoul|released|
|70.0|  male|   Seoul|released|
|70.0|  male|   Seoul|released|
|70.0|  male|   Seoul|released|
|20.0|  male|   Seoul|released|
|70.0|female|   Seoul|released|
|70.0|female|   Seoul|released|
+----+------+--------+--------+
only showing top 20 rows



## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [34]:
# drop na from gender
patient_df8 = patient_df7.dropna(subset='is_male', how='any')
count_null(patient_df8)

+---+--------+-------+-------+
|age|province|is_male|is_dead|
+---+--------+-------+-------+
|261|       0|      0|      0|
+---+--------+-------+-------+



In [35]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, LogisticRegression, LinearSVC, FMClassifier
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, OneHotEncoder,StringIndexer, Imputer

In [36]:
X_train, X_test = patient_df8.randomSplit([0.8, 0.2], seed=60)

In [37]:
patient_df8.show(5)

+----+--------+-------+-------+
| age|province|is_male|is_dead|
+----+--------+-------+-------+
|50.0|   Seoul|      1|      0|
|30.0|   Seoul|      1|      0|
|50.0|   Seoul|      1|      0|
|20.0|   Seoul|      1|      0|
|20.0|   Seoul|      0|      0|
+----+--------+-------+-------+
only showing top 5 rows



In [38]:
cat_col = [col_name for (col_name, dt) in patient_df8.dtypes if dt =='string']

num_col = list(set(patient_df8.columns) - set(cat_col))

indexer_col = [col_name+'_INDEX' for (col_name, dt) in patient_df8.dtypes if dt =='string']

ohe_col = [col_name+'_OHE' for (col_name, dt) in patient_df8.dtypes if dt =='string']

In [39]:
num_col, indexer_col, ohe_col, cat_col

(['is_male', 'age', 'is_dead'],
 ['province_INDEX'],
 ['province_OHE'],
 ['province'])

In [40]:
imputer = Imputer(inputCol='age', outputCol='age', strategy='mean')

In [41]:
indexer = StringIndexer(inputCols=cat_col, outputCols=indexer_col, handleInvalid='skip')

In [42]:
ohe = OneHotEncoder(inputCols=indexer_col, outputCols=ohe_col)

In [43]:
assembler_col = num_col[:-1] + ohe_col

In [44]:
assembler = VectorAssembler(inputCols=assembler_col, outputCol='features')

In [45]:
assembler.getInputCols(), assembler.getOutputCol()

(['is_male', 'age', 'province_OHE'], 'features')

In [46]:
treeClassifier = DecisionTreeClassifier(featuresCol="features", labelCol="is_dead", predictionCol="prediction", seed=60)
forestClassifier = RandomForestClassifier(featuresCol="features", labelCol="is_dead", predictionCol="prediction", seed=60)
svcClassifier = LinearSVC(featuresCol="features", labelCol="is_dead", predictionCol="prediction")
logClassifier = LogisticRegression(featuresCol="features", labelCol="is_dead", predictionCol="prediction")

In [47]:
pipeline = Pipeline(stages=[imputer, indexer, ohe, assembler, treeClassifier])

In [48]:
evalualuator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='is_dead', metricName='accuracy')

In [49]:
model = pipeline.fit(X_train)

In [50]:
train_ =  model.transform(X_train)
evalualuator.evaluate(train_)

0.9012875536480687

In [51]:
# train_.select('features', 'prediction')
value_counts(train_, colum_name='prediction')

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 2219|
|       1.0| 1043|
+----------+-----+



In [52]:
test_ = model.transform(X_test)
evalualuator.evaluate(test_)

0.8898847631241997

In [53]:
# test_.select('features', 'prediction').show()
value_counts(test_, colum_name='prediction')

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  533|
|       1.0|  248|
+----------+-----+

