# **Labs 1 and 2 PySpark:**

In these labs we will be using the "[[NeurIPS 2020] Data Science for COVID-19 (DS4C)](https://www.kaggle.com/datasets/kimjihoo/coronavirusdataset?select=PatientInfo.csv)" dataset, retrieved from [Kaggle](https://www.kaggle.com/) on 1/6/2022, for educational non commercial purpose, License
[CC BY-NC-SA 4.0
](https://creativecommons.org/licenses/by-nc-sa/4.0/)


The csv file that we will be using in this lab is **PatientInfo**.

In [127]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## PatientInfo.csv

**patient_id**
the ID of the patient

**sex**
the sex of the patient

**age**
the age of the patient

**country**
the country of the patient

**province**
the province of the patient

**city**
the city of the patient

**infection_case**
the case of infection

**infected_by**
the ID of who infected the patient


**contact_number**
the number of contacts with people

**symptom_onset_date**
the date of symptom onset

**confirmed_date**
the date of being confirmed

**released_date**
the date of being released

**deceased_date**
the date of being deceased

**state**
isolated / released / deceased

### Import the pyspark and check it's version

In [128]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [129]:
import pyspark
pyspark.__version__

'3.3.0'

### Import and create SparkSession

In [130]:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName('lab1').master('local[*]').getOrCreate()

### Load the PatientInfo.csv file and show the first 5 rows

In [131]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [132]:
df_raw = spark_session.read.csv('/content/PatientInfo.csv' , header = True , inferSchema=True , samplingRatio=0.1)
df_raw.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number| symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|2020-01-22 00:00:00|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|               null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 

### Display the schema of the dataset

In [133]:
df_raw.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: long (nullable = true)
 |-- contact_number: integer (nullable = true)
 |-- symptom_onset_date: timestamp (nullable = true)
 |-- confirmed_date: timestamp (nullable = true)
 |-- released_date: timestamp (nullable = true)
 |-- deceased_date: timestamp (nullable = true)
 |-- state: string (nullable = true)



### Display the statistical summary

In [134]:
df_raw.summary().show()

+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+-------------------+--------+
|summary|          patient_id|   sex| age|   country|province|          city|      infection_case|         infected_by|     contact_number|   state|
+-------+--------------------+------+----+----------+--------+--------------+--------------------+--------------------+-------------------+--------+
|  count|                5165|  4043|3785|      5165|    5165|          5071|                4246|                1345|                785|    5165|
|   mean|2.8636345618679576E9|  null|null|      null|    null|          null|                null|2.2845944015643125E9| 1273903.6675159235|    null|
| stddev| 2.074210725277473E9|  null|null|      null|    null|          null|                null|1.5265072953383324E9|3.569155830456973E7|    null|
|    min|          1000000001|female|  0s|Bangladesh|   Busan|     Andong-si|Anyang Gunpo Past...|        

In [135]:
## to know the total number of rows in the data
df_raw.createOrReplaceTempView('my_table')
spark_session.sql('select count(*) from my_table').show()

+--------+
|count(1)|
+--------+
|    5165|
+--------+



### Using the state column.
### How many people survived (released), and how many didn't survive (isolated/deceased)?

In [138]:
# no nulls  in state column
import pyspark.sql.functions as fn
survived_people = df_raw.select('state').where('state = "released" ').count()
not_survived = df_raw.select('state').where('state <> "released"').count()
#not_survived = df_raw.select('state').where(col('state') != 'released').count()
#df_raw.select('state').groupBy('state').agg(fn.count(col('state'))).show()

print('survived people number is ',survived_people)
print('not survived people number is ',not_survived)
print('total number is ',not_survived + survived_people)

survived people number is  2929
not survived people number is  2236
total number is  5165


### Display the number of null values in each column

In [139]:
from pyspark.sql.functions import col 
for column in df_raw.columns:
  n_nulls = df_raw.select(column) \
                  .where(col(column).isNull()) \
                  .count()
  #df_raw.select(column).where(col(column).isNull()).show()
  print('the number of nulls in the column',column , 'is ',n_nulls)
print('the confirmed date is the date he entered the quarantine \n \
  the released date is the date he exited the hospital \n \
  the deceased date is the date he died (if he died) \n \
  ')

the number of nulls in the column patient_id is  0
the number of nulls in the column sex is  1122
the number of nulls in the column age is  1380
the number of nulls in the column country is  0
the number of nulls in the column province is  0
the number of nulls in the column city is  94
the number of nulls in the column infection_case is  919
the number of nulls in the column infected_by is  3820
the number of nulls in the column contact_number is  4380
the number of nulls in the column symptom_onset_date is  4476
the number of nulls in the column confirmed_date is  3
the number of nulls in the column released_date is  3578
the number of nulls in the column deceased_date is  5099
the number of nulls in the column state is  0
the confirmed date is the date he entered the quarantine 
   the released date is the date he exited the hospital 
   the deceased date is the date he died (if he died) 
   


## Data preprocessing

### Fill the nulls in the deceased_date with the released_date. 
- You can use <b>coalesce</b> function

In [140]:
from pyspark.sql.functions import coalesce
from pyspark.sql.types import TimestampType
df_raw = df_raw.withColumn('final date' , coalesce(df_raw['deceased_date'] , df_raw['released_date']).cast(TimestampType()))
df_raw.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+-------------------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number| symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|         final date|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+-------------------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|2020-01-22 00:00:00|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|2020-02-05 00:00:00|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|               null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|relea

In [141]:
df_raw.select('final date').where(col('final date').isNull()).count()
#df_raw.select('final date').where(col('final date').isNull()).show()
## the nulls may  be the person is still in the hospital (not released and not died)

3514

### Add a column named no_days which is difference between the deceased_date and the confirmed_date then show the top 5 rows. Print the schema.
- <b> Hint: You need to typecast these columns as date first <b>

In [142]:
from pyspark.sql.functions import datediff,col
df_raw = df_raw.withColumn('no_days' , datediff(end='final date' , start='confirmed_date' ))
df_raw.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+-------------------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number| symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|         final date|no_days|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+-------------------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|2020-01-22 00:00:00|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|2020-02-05 00:00:00|     13|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|               null|2020-01-30 00:00:00|2020-03

### Add a is_male column if male then it should yield true, else then False

In [143]:
df_raw = df_raw.withColumn('is_male' , df_raw['sex'] == 'male')
df_raw.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+-------------------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number| symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|         final date|no_days|is_male|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+-------------------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|2020-01-22 00:00:00|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|2020-02-05 00:00:00|     13|   true|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       null|            31|               

### Add a is_dead column if patient state is not released then it should yield true, else then False

- Use <b>UDF</b> to perform this task. 
- However, UDF is not recommended there is no built in function can do the required operation.
- UDF is slower than built in functions.

In [144]:
from pyspark.sql.functions import udf
def add_is_dead(x):
  if x == 'released':
    return False
  else:
    return True
is_dead_udf = udf(add_is_dead)
#df_raw.select(is_dead_udf(df_raw['state'])).show()
df_raw = df_raw.withColumn('is_dead' , is_dead_udf(df_raw['state']))
df_raw.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+-------------------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number| symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|         final date|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+-------------------+-------+-------+-------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|2020-01-22 00:00:00|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|2020-02-05 00:00:00|     13|   true|  false|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       nul

### Change the ages to bins from 10s, 0s, 10s, 20s,.etc to 0,10, 20

In [145]:
from pyspark.sql.types import FloatType
df_raw = df_raw.withColumn('age' , fn.translate('age','s','') )
df_raw.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+-------------------+-------+-------+-------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number| symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|         final date|no_days|is_male|is_dead|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+-------------------+-------------------+-------------------+-------------+--------+-------------------+-------+-------+-------+
|1000000001|  male| 50|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       null|            75|2020-01-22 00:00:00|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|2020-02-05 00:00:00|     13|   true|  false|
|1000000002|  male| 30|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       nul

### Change age, and no_days  to be typecasted as Double

In [146]:
from pyspark.sql.types import DoubleType
df_raw = df_raw.withColumn('age',df_raw['age'].cast(DoubleType()) )
df_raw = df_raw.withColumn('no_days', df_raw['no_days'].cast(DoubleType()))
df_raw.printSchema()

root
 |-- patient_id: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: long (nullable = true)
 |-- contact_number: integer (nullable = true)
 |-- symptom_onset_date: timestamp (nullable = true)
 |-- confirmed_date: timestamp (nullable = true)
 |-- released_date: timestamp (nullable = true)
 |-- deceased_date: timestamp (nullable = true)
 |-- state: string (nullable = true)
 |-- final date: timestamp (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: boolean (nullable = true)
 |-- is_dead: string (nullable = true)



### Drop the columns
["patient_id","sex","infected_by","contact_number","released_date","state",
"symptom_onset_date","confirmed_date","deceased_date","country","no_days",
"city","infection_case"]

In [147]:
df_raw = df_raw.drop("patient_id","sex","infected_by","contact_number","released_date","state", "symptom_onset_date","confirmed_date","deceased_date","country","final date", "city","infection_case")
df_raw.show()

+----+--------+-------+-------+-------+
| age|province|no_days|is_male|is_dead|
+----+--------+-------+-------+-------+
|50.0|   Seoul|   13.0|   true|  false|
|30.0|   Seoul|   32.0|   true|  false|
|50.0|   Seoul|   20.0|   true|  false|
|20.0|   Seoul|   16.0|   true|  false|
|20.0|   Seoul|   24.0|  false|  false|
|50.0|   Seoul|   19.0|  false|  false|
|20.0|   Seoul|   10.0|   true|  false|
|20.0|   Seoul|   22.0|   true|  false|
|30.0|   Seoul|   16.0|   true|  false|
|60.0|   Seoul|   24.0|  false|  false|
|50.0|   Seoul|   23.0|  false|  false|
|20.0|   Seoul|   20.0|   true|  false|
|80.0|   Seoul|   null|   true|   true|
|60.0|   Seoul|   25.0|  false|  false|
|70.0|   Seoul|   null|   true|  false|
|70.0|   Seoul|   21.0|   true|  false|
|70.0|   Seoul|   10.0|   true|  false|
|20.0|   Seoul|   null|   true|  false|
|70.0|   Seoul|   17.0|  false|  false|
|70.0|   Seoul|   null|  false|  false|
+----+--------+-------+-------+-------+
only showing top 20 rows



### Recount the number of nulls now

In [148]:
col = 'is_dead'
df_raw.select(col).where( df_raw[col].isNull() | isnan(df_raw[col]) ).count()

0

In [149]:
from pyspark.sql.functions import isnan
nulls = [df_raw.select(col).where( df_raw[col].isNull()  ).count() for col in df_raw.columns]
print(nulls)

[1380, 0, 3514, 1122, 0]


## Now do the same but using SQL select statement

### From the original Patient DataFrame, Create a temporary view (table).

### Use SELECT statement to select all columns from the dataframe and show the output.

### *Using SQL commands*, limit the output to only 5 rows 

### Select the count of males and females in the dataset

### How many people did survive, and how many didn't?

### Now, let's perform some preprocessing using SQL:
1. Convert *age* column to double after removing the 's' at the end -- *hint: check SUBSTRING method*
2. Select only the following columns: `['sex', 'age', 'province', 'state']`
3. Store the result of the query in a new dataframe

## Machine Learning 
### Create a pipeline model to predict is_dead and evaluate the performance.
- Use <b>StringIndexer</b> to transform <b>string</b> data type to indices.
- Use <b>OneHotEncoder</b> to deal with categorical values.
- Use <b>Imputer</b> to fill missing data with mean.

In [153]:
df_raw.printSchema()

root
 |-- age: double (nullable = true)
 |-- province: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: boolean (nullable = true)
 |-- is_dead: string (nullable = true)



In [166]:
from pyspark.sql.functions import StringType
df_raw  = df_raw.withColumn('is_male' , df_raw['is_male'].cast(StringType()) )
df_raw.printSchema()

root
 |-- age: double (nullable = true)
 |-- province: string (nullable = true)
 |-- no_days: double (nullable = true)
 |-- is_male: string (nullable = true)
 |-- is_dead: string (nullable = true)



In [183]:
from pyspark.ml.feature import VectorAssembler , StringIndexer , OneHotEncoder , Imputer
numeric_cols = [f for (f,d) in df_raw.dtypes if ((d == 'double' ))]
categorical_cols = [f for (f,d) in df_raw.dtypes if ((d=='string' or d=='boolean'))]
print('numerical cols are ', numeric_cols)
print('categorical cols are ', categorical_cols)
index_output = [categorical_col + 'index' for categorical_col in categorical_cols]

numerical cols are  ['age', 'no_days']
categorical cols are  ['province', 'is_male', 'is_dead']


# string indexer

In [184]:
string_indexer = StringIndexer(inputCols=categorical_cols , outputCols=index_output , handleInvalid='skip')
string_indexer_model = string_indexer.fit(df_raw[categorical_cols])
df_indexed = string_indexer_model.transform(df_raw[categorical_cols])
df_indexed.show()
## you should include the columns of is_male_index , province_index in the input columns of the OHE

+--------+-------+-------+-------------+------------+------------+
|province|is_male|is_dead|provinceindex|is_maleindex|is_deadindex|
+--------+-------+-------+-------------+------------+------------+
|   Seoul|   true|  false|          0.0|         1.0|         0.0|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|
|   Seoul|  false|  false|          0.0|         0.0|         0.0|
|   Seoul|  false|  false|          0.0|         0.0|         0.0|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|
|   Seoul|  false|  false|          0.0|         0.0|         0.0|
|   Seoul|  false|  false|          0.0|         0.0|         0.0|
|   Seoul|   true|  false|          0.0|         1.0|         

# OHE

In [185]:
ohe_input = index_output
ohe_input.remove('is_deadindex') # exclude the ouput (target variable)
ohe_output = [ohe_in + '_ohe' for ohe_in in ohe_input] 
ohe = OneHotEncoder(inputCols=index_output, outputCols= ohe_output)
ohe_model = ohe.fit(df_indexed)
df_ohe = ohe_model.transform(df_indexed)
df_ohe.show()

+--------+-------+-------+-------------+------------+------------+-----------------+----------------+
|province|is_male|is_dead|provinceindex|is_maleindex|is_deadindex|provinceindex_ohe|is_maleindex_ohe|
+--------+-------+-------+-------------+------------+------------+-----------------+----------------+
|   Seoul|   true|  false|          0.0|         1.0|         0.0|   (16,[0],[1.0])|       (1,[],[])|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|   (16,[0],[1.0])|       (1,[],[])|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|   (16,[0],[1.0])|       (1,[],[])|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|   (16,[0],[1.0])|       (1,[],[])|
|   Seoul|  false|  false|          0.0|         0.0|         0.0|   (16,[0],[1.0])|   (1,[0],[1.0])|
|   Seoul|  false|  false|          0.0|         0.0|         0.0|   (16,[0],[1.0])|   (1,[0],[1.0])|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|   (16,[0],[1.0]

# vector assembler

In [188]:
vector_assembler = VectorAssembler(inputCols = ohe_output , outputCol='features' ) # we made handle invalid in the string indexer so no need to make  it here again
df_vector_out = vector_assembler.transform(df_ohe)
df_vector_out.show()


+--------+-------+-------+-------------+------------+------------+-----------------+----------------+--------------------+
|province|is_male|is_dead|provinceindex|is_maleindex|is_deadindex|provinceindex_ohe|is_maleindex_ohe|            features|
+--------+-------+-------+-------------+------------+------------+-----------------+----------------+--------------------+
|   Seoul|   true|  false|          0.0|         1.0|         0.0|   (16,[0],[1.0])|       (1,[],[])|      (17,[0],[1.0])|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|   (16,[0],[1.0])|       (1,[],[])|      (17,[0],[1.0])|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|   (16,[0],[1.0])|       (1,[],[])|      (17,[0],[1.0])|
|   Seoul|   true|  false|          0.0|         1.0|         0.0|   (16,[0],[1.0])|       (1,[],[])|      (17,[0],[1.0])|
|   Seoul|  false|  false|          0.0|         0.0|         0.0|   (16,[0],[1.0])|   (1,[0],[1.0])|(17,[0,16],[1.0,1...|
|   Seoul|  fals

In [190]:
df_vector_out.select('features')

DataFrame[features: vector]