# Big data project : Covid-19 M2 Stat & Eco</a>
###Authors: Sandra FERRIERES, Lucas FABRE, Aicha SAAD</a>
###Deadline: April 12th,2020

## Table of contents

### 1. Data cleaning
##### 1.1. Loading the data
##### 1.2. Drop useless columns
##### 1.3. Missing values
##### 1.4. Other cleaning
##### 1.5. Column types
##### 1.6. Additional explanatory variables creation
##### 1.7. Construct the target variable

### 2. Data analysis
##### 2.1. Univariate analysis
##### 2.2. Bivariate analysis
##### 2.3. Correlations

### 3. Models estimation with pipelines
##### 3.1 Random Forest Classifier
##### 3.2 Naive Bayes classifier

### 4. Model optimization with Scikit-Learn
##### 4.1) Cross-Validation
##### 4.2) Definition of parallel computing structure : RDD
##### 4.3) Determine the best model

In [3]:
import pyspark.sql.functions as psf
import pyspark.sql.types as pst
from datetime import date
from pyspark.sql.functions import *

## 1. Data cleaning

#### 1.1. Loading the data

First, we load the data from the website https://www.kaggle.com/sudalairajkumar/novel-corona-virus-2019-dataset/discussion?fbclid=IwAR2yuD9I4j5UBNK8rd-VXYYSKSFylH-voiZrSUkiL6X90GUyum841UpK2_s, extracting the file called COVID19_line_list_data.

In [7]:
filename = "/FileStore/tables/COVID19_line_list_data.csv"

In [8]:
df = spark.read.format("csv").option("header","True").option("inferSchema","True").load(filename)

In [9]:
df.printSchema()

In [10]:
display(df)

id,case_in_country,reporting date,_c3,summary,location,country,gender,age,symptom_onset,If_onset_approximated,hosp_visit_date,exposure_start,exposure_end,visiting Wuhan,from Wuhan,death,recovered,symptom,source,link,_c21,_c22,_c23,_c24,_c25,_c26
1,,1/20/2020,,"First confirmed imported COVID-19 pneumonia patient in Shenzhen (from Wuhan): male, 66, shenzheng residence, visited relatives in Wuhan on 12/29/2019, symptoms onset on 01/03/2020, returned to Shenzhen and seek medical care on 01/04/2020, hospitalized on 01/11/2020, sample sent to China CDC for testing on 01/18/2020, confirmed on 01/19/2020. 8 others under medical observation, contact tracing ongoing.","Shenzhen, Guangdong",China,male,66,01/03/20,0.0,01/11/20,12/29/2019,01/04/20,1,0,0,0,,Shenzhen Municipal Health Commission,http://wjw.sz.gov.cn/wzx/202001/t20200120_18987787.htm,,,,,,
2,,1/20/2020,,"First confirmed imported COVID-19 pneumonia patient in Shanghai (from Wuhan): female, 56, Wuhan residence, arrived in Shanghai from Wuhan on 01/12/2020, symptom onset and visited fever clinic on 01/15/2020, laboratory confirmed on 01/20/2020",Shanghai,China,female,56,1/15/2020,0.0,1/15/2020,,01/12/20,0,1,0,0,,Official Weibo of Shanghai Municipal Health Commission,https://www.weibo.com/2372649470/IqogQhgfa?from=page_1001062372649470_profile&wvr=6&mod=weibotime&type=comment,,,,,,
3,,1/21/2020,,"First confirmed imported cases in Zhejiang: patient is male, 46, lives in Wuhan, self-driving from Wuhan to Hangzhou on 01/03/2020, symptom onset 01/04/2020, hospitalized on 01/17/2020, sample deliver to China CDC for testing on 01/20/2020, test positive on 01/21/2020.",Zhejiang,China,male,46,01/04/20,0.0,1/17/2020,,01/03/20,0,1,0,0,,Health Commission of Zhejiang Province,http://www.zjwjw.gov.cn/art/2020/1/21/art_1202101_41786033.html,,,,,,
4,,1/21/2020,,"new confirmed imported COVID-19 pneumonia in Tianjin: female, age 60, recently visited Wuhan, visited fever clinic on 01/19/2020 in Tianjin then quarantined immediately.",Tianjin,China,female,60,,,1/19/2020,,,1,0,0,0,,人民日报官方微博,https://m.weibo.cn/status/4463235401268457?,,,,,,
5,,1/21/2020,,"new confirmed imported COVID-19 pneumonia in Tianjin: male, age 58, visited fever clinic on 01/14/2020.",Tianjin,China,male,58,,,1/14/2020,,,0,0,0,0,,人民日报官方微博,https://m.weibo.cn/status/4463235401268457?,,,,,,
6,,1/21/2020,,"First confirmed imported COVID-19 pneumonia patient in Chongqing (from Wuhan): female, age 44, symptoms onset on 01/15/2020, laboratory confirmed on 01/21/2020.",Chongqing,China,female,44,1/15/2020,0.0,,,,0,1,0,0,,Chongqing Municipal Health Commission,http://wsjkw.cq.gov.cn/tzgg/20200121/249730.html,,,,,,
7,,1/21/2020,,"First confirmed imported COVID-19 pneumonia patient in Sichuan (from Wuhan): Male, 34 years old, works in Wuhan, symptoms onset on 01/11/2020 in Chengdu.",Sichuan,China,male,34,01/11/20,0.0,,,,0,1,0,0,,央视新闻,https://m.weibo.cn/status/4463300522087848?,,,,,,
8,,1/21/2020,,"new confirmed imported COVID-19 pneumonia patient in Beijing: male, 37, visited Wuhan on 01/10/2020, return to Beijing on 01/11/2020, symptoms onset on 01/14/2020, hospitalized on 01/20/2020.",Beijing,China,male,37,1/14/2020,0.0,1/20/2020,01/10/20,01/11/20,1,0,0,0,,Beijing Municipal Health Commission,http://wjw.beijing.gov.cn/xwzx_20031/wnxw/202001/t20200121_1620353.html,,,,,,
9,,1/21/2020,,"new confirmed imported COVID-19 pneumonia patient in Beijing: male, 39, visited Wuhan on 01/03/2020, return to Beijing on 01/04/2020, symptoms onset on 01/09/2020, hospitalized on 01/14/2020.",Beijing,China,male,39,01/09/20,0.0,1/14/2020,01/03/20,01/04/20,1,0,0,0,,Beijing Municipal Health Commission,http://wjw.beijing.gov.cn/xwzx_20031/wnxw/202001/t20200121_1620353.html,,,,,,
10,,1/21/2020,,"new confirmed imported COVID-19 pneumonia patient in Beijing: male, 56, visited Wuhan on 01/08/2020, return to Beijing on 01/16/2020, symptoms onset on 01/16/2020, hospitalized on 01/20/2020.",Beijing,China,male,56,1/16/2020,0.0,1/20/2020,01/08/20,1/16/2020,1,0,0,0,,Beijing Municipal Health Commission,http://wjw.beijing.gov.cn/xwzx_20031/wnxw/202001/t20200121_1620353.html,,,,,,


#### 1.2. Drop useless columns

In [12]:
columns_to_drop = ["_c3", "_c21", "_c22", "_c23", "_c24", "_c25", "_c26"]
df = df.drop(*columns_to_drop)

In [13]:
#check drop done correctly
display(df)

id,case_in_country,reporting date,summary,location,country,gender,age,symptom_onset,If_onset_approximated,hosp_visit_date,exposure_start,exposure_end,visiting Wuhan,from Wuhan,death,recovered,symptom,source,link
1,,1/20/2020,"First confirmed imported COVID-19 pneumonia patient in Shenzhen (from Wuhan): male, 66, shenzheng residence, visited relatives in Wuhan on 12/29/2019, symptoms onset on 01/03/2020, returned to Shenzhen and seek medical care on 01/04/2020, hospitalized on 01/11/2020, sample sent to China CDC for testing on 01/18/2020, confirmed on 01/19/2020. 8 others under medical observation, contact tracing ongoing.","Shenzhen, Guangdong",China,male,66,01/03/20,0.0,01/11/20,12/29/2019,01/04/20,1,0,0,0,,Shenzhen Municipal Health Commission,http://wjw.sz.gov.cn/wzx/202001/t20200120_18987787.htm
2,,1/20/2020,"First confirmed imported COVID-19 pneumonia patient in Shanghai (from Wuhan): female, 56, Wuhan residence, arrived in Shanghai from Wuhan on 01/12/2020, symptom onset and visited fever clinic on 01/15/2020, laboratory confirmed on 01/20/2020",Shanghai,China,female,56,1/15/2020,0.0,1/15/2020,,01/12/20,0,1,0,0,,Official Weibo of Shanghai Municipal Health Commission,https://www.weibo.com/2372649470/IqogQhgfa?from=page_1001062372649470_profile&wvr=6&mod=weibotime&type=comment
3,,1/21/2020,"First confirmed imported cases in Zhejiang: patient is male, 46, lives in Wuhan, self-driving from Wuhan to Hangzhou on 01/03/2020, symptom onset 01/04/2020, hospitalized on 01/17/2020, sample deliver to China CDC for testing on 01/20/2020, test positive on 01/21/2020.",Zhejiang,China,male,46,01/04/20,0.0,1/17/2020,,01/03/20,0,1,0,0,,Health Commission of Zhejiang Province,http://www.zjwjw.gov.cn/art/2020/1/21/art_1202101_41786033.html
4,,1/21/2020,"new confirmed imported COVID-19 pneumonia in Tianjin: female, age 60, recently visited Wuhan, visited fever clinic on 01/19/2020 in Tianjin then quarantined immediately.",Tianjin,China,female,60,,,1/19/2020,,,1,0,0,0,,人民日报官方微博,https://m.weibo.cn/status/4463235401268457?
5,,1/21/2020,"new confirmed imported COVID-19 pneumonia in Tianjin: male, age 58, visited fever clinic on 01/14/2020.",Tianjin,China,male,58,,,1/14/2020,,,0,0,0,0,,人民日报官方微博,https://m.weibo.cn/status/4463235401268457?
6,,1/21/2020,"First confirmed imported COVID-19 pneumonia patient in Chongqing (from Wuhan): female, age 44, symptoms onset on 01/15/2020, laboratory confirmed on 01/21/2020.",Chongqing,China,female,44,1/15/2020,0.0,,,,0,1,0,0,,Chongqing Municipal Health Commission,http://wsjkw.cq.gov.cn/tzgg/20200121/249730.html
7,,1/21/2020,"First confirmed imported COVID-19 pneumonia patient in Sichuan (from Wuhan): Male, 34 years old, works in Wuhan, symptoms onset on 01/11/2020 in Chengdu.",Sichuan,China,male,34,01/11/20,0.0,,,,0,1,0,0,,央视新闻,https://m.weibo.cn/status/4463300522087848?
8,,1/21/2020,"new confirmed imported COVID-19 pneumonia patient in Beijing: male, 37, visited Wuhan on 01/10/2020, return to Beijing on 01/11/2020, symptoms onset on 01/14/2020, hospitalized on 01/20/2020.",Beijing,China,male,37,1/14/2020,0.0,1/20/2020,01/10/20,01/11/20,1,0,0,0,,Beijing Municipal Health Commission,http://wjw.beijing.gov.cn/xwzx_20031/wnxw/202001/t20200121_1620353.html
9,,1/21/2020,"new confirmed imported COVID-19 pneumonia patient in Beijing: male, 39, visited Wuhan on 01/03/2020, return to Beijing on 01/04/2020, symptoms onset on 01/09/2020, hospitalized on 01/14/2020.",Beijing,China,male,39,01/09/20,0.0,1/14/2020,01/03/20,01/04/20,1,0,0,0,,Beijing Municipal Health Commission,http://wjw.beijing.gov.cn/xwzx_20031/wnxw/202001/t20200121_1620353.html
10,,1/21/2020,"new confirmed imported COVID-19 pneumonia patient in Beijing: male, 56, visited Wuhan on 01/08/2020, return to Beijing on 01/16/2020, symptoms onset on 01/16/2020, hospitalized on 01/20/2020.",Beijing,China,male,56,1/16/2020,0.0,1/20/2020,01/08/20,1/16/2020,1,0,0,0,,Beijing Municipal Health Commission,http://wjw.beijing.gov.cn/xwzx_20031/wnxw/202001/t20200121_1620353.html


#### *Summary* column

This column gives a lot of information about the individuals. We will try to extract some information, because it will allow us to fill missing values in other columns.

In [16]:
df_summary = df.select('id','summary')

In [17]:
df_summary = df_summary.withColumn("gender", psf.when(psf.lower(psf.col("summary")).rlike("male"),\
                                              psf.when(psf.lower(psf.col("summary")).rlike("female"), "female")\
                                               .otherwise("male"))\
                                                 .otherwise(psf.when(psf.col("summary").isNull(), None)))

In [18]:
df_summary = df_summary.withColumn("age", psf.when(psf.regexp_extract(df_summary["summary"], r'[\s\,]\d\d[\,\.\s]',0).substr(1, 3)=="",None)\
                                   .otherwise(psf.regexp_extract(df_summary["summary"], r'[\s\,]\d\d[\,\.\s]',0).substr(1, 3)))

We can also perform text processing on this column.

In [20]:
import pyspark.ml.feature as ml_feature

In [21]:
#remove punctuation and numbers to only keep uselful information
df = df.withColumn("summary", \
                   psf.trim(psf.lower(psf.regexp_replace(psf.col("summary"), '[^\sa-zA-Z]', '')))
                  )

In [22]:
# remove male and female words from summary column (redundant with gender column)
df = df.withColumn("summary", \
                  psf.regexp_replace(psf.col("summary"), ' female ', ' '))
df = df.withColumn("summary", \
                  psf.regexp_replace(psf.col("summary"), ' male ', ' '))
# remove multiple spaces
df = df.withColumn("summary", \
                  psf.regexp_replace(psf.col("summary"), '\s+', ' '))

#### 1.3. Missing values

Missing values are encoded differently according to the columns : NA or None. We will transform all to None.

In [25]:
# replace "NA" value by null in string columns
str_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, pst.StringType)]
for column in str_cols :
  df = df.withColumn(column, psf.when(psf.col(column) != 'NA', psf.col(column)))

In [26]:
#count number of missing values in each column
df.select([psf.count(psf.when(psf.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

Before filling NaN values, we try to extract as much information as we can from summary column :

In [28]:
df = df.join(df_summary, df.id == df_summary.id).select(df["*"], psf.coalesce(df.age, df_summary.age))
df = df.withColumn("age", df["coalesce(age, age)"]).drop("coalesce(age, age)")

In [29]:
display(df.select([psf.count(psf.when(psf.col(c).isNull(), c)).alias(c) for c in df.columns]))

id,case_in_country,reporting date,summary,location,country,gender,age,symptom_onset,If_onset_approximated,hosp_visit_date,exposure_start,exposure_end,visiting Wuhan,from Wuhan,death,recovered,symptom,source,link
0,197,1,0,0,0,183,235,522,525,578,956,745,1,4,0,0,813,1,1


Conclusions and how to deal with missing values :
- case in country : 0
- reporting date : "" (empty string)
- summary : "" (empty string)
- gender : extract from summary; otherwise fill with most most frequent modality
- age : extract from summary; otherwise fill with the mean/median ?
- symptom_onset : unknown date -> 
- If_onset_approximated : 
- hosp_visit_date : never ?
- exposure_start : never
- exposure_end : never
- visiting Wuhan : 0
- from Wuhan : 0
- symptom : "" (empty string)
- source : "" (empty string)
- link : "" (empty string)

In [31]:
# find most common gender
max_val = df.groupBy('gender').count().select("count").rdd.max()[0]
most_common_gender = df.groupBy('gender').count().where(psf.col("count")==max_val) \
                      .select("gender").first().asDict()['gender'] #find most common type

In [32]:
fill_NaN_rule = {"case_in_country" : 0, "reporting date" : "", "summary" : "", 
                 "gender": most_common_gender, "age": df.agg({"age":"avg"}).first().asDict()['avg(age)'], 
                 "symptom_onset": "", "If_onset_approximated": 0, "hosp_visit_date": "", 
                 "exposure_start": "", "exposure_end": "", "visiting Wuhan": 0, "from Wuhan": 0, 
                 "symptom": "", "source": "", "link": ""}
def fill_NaN(df, rule):
  for column in rule.keys():
    df = df.withColumn(column, psf.when(df[column].isNull(),  psf.lit(rule[column])).otherwise(df[column]))
  return df
df = df.fillna(fill_NaN_rule)

In [33]:
# check the results
df.select([psf.count(psf.when(psf.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

#### 1.4. Other cleaning

When observing the data, we realize that some columns don't contain the appropriate information : for example, in *death* or *recovered* columns we find dates whereas we are suppose to have only 0 or 1.

In [36]:
binary_columns = ["visiting Wuhan", "from Wuhan", "death", "recovered"]
for bin_col in binary_columns :
  df = df.withColumn(bin_col, psf.when(psf.length(df[bin_col])>1, 1).otherwise(df[bin_col]))

Another remark is about the columns that are supposed to contain dates : the format is not always the same : sometimes it is "m/dd/yy" or "mm/dd/yyyy".

In [38]:
date_columns = ["reporting date", "symptom_onset", "hosp_visit_date", "exposure_start", "exposure_end"]
for date_col in date_columns :
  df = df.withColumn(date_col, psf.regexp_replace(date_col, r'/20$', '/2020'))
  df = df.withColumn(date_col, psf.regexp_replace(date_col, r'/19$', '/2019'))

The symptom column needs to be cleaned (remove useless spaces, leading and trailing spaces, convert to lowercase).

In [40]:
df = df.withColumn("symptom", psf.regexp_replace(psf.col("symptom"), r"\,\s", "\,")) #remove useless spaces
df = df.withColumn("symptom", psf.lower(psf.trim(df["symptom"]))) # remove trailing and leading spaces

In [41]:
df = df.withColumn("symptom", \
                   psf.when(psf.col("summary").rlike("pneumonia"), 
                            psf.flatten(psf.array(psf.split(psf.col("symptom"), r"\,"), psf.array(psf.lit("pneumonia")))))\
                   .otherwise(psf.split(psf.col("symptom"), r"\,"))
                  )
df = df.withColumn("symptom", psf.array_remove(psf.col("symptom"), ""))
df = df.withColumn("symptom",\
                  psf.when(psf.col("symptom") == psf.array(), psf.array(psf.lit("no symptom")))\
                   .otherwise(psf.col("symptom"))
                  )

Create dummies from symptoms

In [43]:
list_symptoms = ['pneumonia', 'fever', 'cough', 'sore throat', 'malaise', 'headache', \
                 'chills','fatigue','runny nose','sputum','diarrhea','shortness of breath', \
                 'joint pain','vomiting','no symptom']
for symptom in list_symptoms:
    df = df.withColumn(symptom, psf.when(array_contains(col("symptom"), symptom), 1).otherwise(0).alias(symptom))

In [44]:
display(df)

id,case_in_country,reporting date,summary,location,country,gender,age,symptom_onset,If_onset_approximated,hosp_visit_date,exposure_start,exposure_end,visiting Wuhan,from Wuhan,death,recovered,symptom,source,link,pneumonia,fever,cough,sore throat,malaise,headache,chills,fatigue,runny nose,sputum,diarrhea,shortness of breath,joint pain,vomiting,no symptom
1,0,1/20/2020,first confirmed imported covid pneumonia patient in shenzhen from wuhan shenzheng residence visited relatives in wuhan on symptoms onset on returned to shenzhen and seek medical care on hospitalized on sample sent to china cdc for testing on confirmed on others under medical observation contact tracing ongoing,"Shenzhen, Guangdong",China,male,66,01/03/2020,0,01/11/2020,12/29/2019,01/04/2020,1,0,0,0,List(pneumonia),Shenzhen Municipal Health Commission,http://wjw.sz.gov.cn/wzx/202001/t20200120_18987787.htm,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2,0,1/20/2020,first confirmed imported covid pneumonia patient in shanghai from wuhan wuhan residence arrived in shanghai from wuhan on symptom onset and visited fever clinic on laboratory confirmed on,Shanghai,China,female,56,1/15/2020,0,1/15/2020,,01/12/2020,0,1,0,0,List(pneumonia),Official Weibo of Shanghai Municipal Health Commission,https://www.weibo.com/2372649470/IqogQhgfa?from=page_1001062372649470_profile&wvr=6&mod=weibotime&type=comment,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
3,0,1/21/2020,first confirmed imported cases in zhejiang patient is lives in wuhan selfdriving from wuhan to hangzhou on symptom onset hospitalized on sample deliver to china cdc for testing on test positive on,Zhejiang,China,male,46,01/04/2020,0,1/17/2020,,01/03/2020,0,1,0,0,List(no symptom),Health Commission of Zhejiang Province,http://www.zjwjw.gov.cn/art/2020/1/21/art_1202101_41786033.html,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1
4,0,1/21/2020,new confirmed imported covid pneumonia in tianjin age recently visited wuhan visited fever clinic on in tianjin then quarantined immediately,Tianjin,China,female,60,,0,1/19/2020,,,1,0,0,0,List(pneumonia),人民日报官方微博,https://m.weibo.cn/status/4463235401268457?,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
5,0,1/21/2020,new confirmed imported covid pneumonia in tianjin age visited fever clinic on,Tianjin,China,male,58,,0,1/14/2020,,,0,0,0,0,List(pneumonia),人民日报官方微博,https://m.weibo.cn/status/4463235401268457?,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
6,0,1/21/2020,first confirmed imported covid pneumonia patient in chongqing from wuhan age symptoms onset on laboratory confirmed on,Chongqing,China,female,44,1/15/2020,0,,,,0,1,0,0,List(pneumonia),Chongqing Municipal Health Commission,http://wsjkw.cq.gov.cn/tzgg/20200121/249730.html,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
7,0,1/21/2020,first confirmed imported covid pneumonia patient in sichuan from wuhan years old works in wuhan symptoms onset on in chengdu,Sichuan,China,male,34,01/11/2020,0,,,,0,1,0,0,List(pneumonia),央视新闻,https://m.weibo.cn/status/4463300522087848?,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
8,0,1/21/2020,new confirmed imported covid pneumonia patient in beijing visited wuhan on return to beijing on symptoms onset on hospitalized on,Beijing,China,male,37,1/14/2020,0,1/20/2020,01/10/2020,01/11/2020,1,0,0,0,List(pneumonia),Beijing Municipal Health Commission,http://wjw.beijing.gov.cn/xwzx_20031/wnxw/202001/t20200121_1620353.html,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
9,0,1/21/2020,new confirmed imported covid pneumonia patient in beijing visited wuhan on return to beijing on symptoms onset on hospitalized on,Beijing,China,male,39,01/09/2020,0,1/14/2020,01/03/2020,01/04/2020,1,0,0,0,List(pneumonia),Beijing Municipal Health Commission,http://wjw.beijing.gov.cn/xwzx_20031/wnxw/202001/t20200121_1620353.html,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
10,0,1/21/2020,new confirmed imported covid pneumonia patient in beijing visited wuhan on return to beijing on symptoms onset on hospitalized on,Beijing,China,male,56,1/16/2020,0,1/20/2020,01/08/2020,1/16/2020,1,0,0,0,List(pneumonia),Beijing Municipal Health Commission,http://wjw.beijing.gov.cn/xwzx_20031/wnxw/202001/t20200121_1620353.html,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0


#### 1.5. Column types

First, we deal with date columns :

In [47]:
# convert string to unix timestamp format
for date_col in date_columns :
  df = df.withColumn(date_col, \
                     psf.when(psf.col(date_col)!="", psf.from_unixtime(psf.unix_timestamp(date_col, 'MM/dd/yyyy')))\
                       .otherwise(psf.col(date_col)))

Now, some columns contain numbers but are string columns (age, visiting Wuhan, from Wuhan, death, recovered): let's correct this.

In [49]:
int_cols = ["age", "visiting Wuhan", "from Wuhan", "death", "recovered", "If_onset_approximated"]
for int_col in int_cols :
  df = df.withColumn(int_col, df[int_col].cast("int"))

In [50]:
df.printSchema()

#### 1.6. Additional explanatory variables creation

Some ideas of features :
  - number of symptoms
  - exposure duration
  - visited hospital or not
  - is in quarantine or not
  - group by source
  - group by link

In [53]:
# number of symptoms
df = df.withColumn("nb_symptoms", \
                   psf.when(psf.col("symptom") == psf.array(psf.lit("no symptom")), 0) \
                           .otherwise(psf.size(psf.col("symptom")) )
                  )
# exposure duration
df = df.withColumn("exposure_duration",\
                   psf.when(psf.col("exposure_start")=="", 0)\
                     .otherwise(psf.when(psf.col("exposure_end")=="", 
                                          psf.datediff(psf.lit(date.today()), psf.col("exposure_start")))\
                                .otherwise(psf.datediff(psf.col("exposure_end"), psf.col("exposure_start")))) 
                  )
# visited hospital
df = df.withColumn("visited_hospital",\
                  psf.when(psf.col("hosp_visit_date")=="", 0)\
                  .otherwise(1) )
# quarantine ?
df = df.withColumn("is_quarantine", \
                  psf.when(psf.lower(psf.col("summary")).rlike("quarantine"), 1)\
                  .otherwise(0) )
#sick
df = df.withColumn("sick",\
                  when((col("death")== 0 )&(col("recovered")== 0),1)\
                   .otherwise(0) )

We can also continue to analyze *summary* column by extracting some symptoms or illnesses :

In [55]:
# popularity of the source
source_count = df.groupBy('source').count()
df = df.join(source_count, source_count.source == df.source, "left").select(df["*"], psf.col("count").alias("source_importance"))
# popularity of the link
link_count = df.groupBy("link").count()
df = df.join(link_count, link_count.link == df.link, "left").select(df["*"], psf.col("count").alias("link_importance"))

#### 1.7. Construct the target variable

In [57]:
df_pipelines = df.withColumn("state", when((df["death"] == 1), 'Died')
      .when((df["recovered"] == 1 ), 'Recovered')
      .when((df["sick"] ==1), 'Sick'))

In [58]:
# tokenize text
tokenizer = ml_feature.Tokenizer(inputCol='summary', outputCol='token_summary')
df = tokenizer.transform(df_pipelines)
## remove stopwords
remover = ml_feature.StopWordsRemover()
stopwords = remover.getStopWords() 
# extend list of words to remove 
redundant_words = ["pneumonia", "quarantine"]
stopwords.extend(redundant_words)
# remove all undesired words
remover = ml_feature.StopWordsRemover(inputCol='token_summary', outputCol='summary_clean', stopWords = stopwords)
df = remover.transform(df)
# remove redundant words
# drop useless columns
df = df.drop("token_summary")

In [59]:
# Length of summary column(how many words it contains after cleaning: we consider these words as the most meaningful
# since they are left after our cleaning)
df = df.withColumn("info_summary",psf.size(psf.col("summary_clean")))

In [60]:
display(df_pipelines)

id,case_in_country,reporting date,summary,location,country,gender,age,symptom_onset,If_onset_approximated,hosp_visit_date,exposure_start,exposure_end,visiting Wuhan,from Wuhan,death,recovered,symptom,source,link,pneumonia,fever,cough,sore throat,malaise,headache,chills,fatigue,runny nose,sputum,diarrhea,shortness of breath,joint pain,vomiting,no symptom,nb_symptoms,exposure_duration,visited_hospital,is_quarantine,sick,source_importance,link_importance,state
475,25,2020-02-05 00:00:00,new confirmed covid patient in singapore husband of no had not been to china recently at ncid symptom onset fever visited clinic with wife on went to ncid on,Singapore,Singapore,male,49.0,2020-01-24 00:00:00,0,2020-02-03 00:00:00,,,0,0,0,1,List(fever),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0.0,1,0,0,22,4,Recovered
476,26,2020-02-05 00:00:00,new confirmed covid patient in singapore chinese national arrived in singapore from wuhan on daughter of no,Singapore,Singapore,female,42.0,,0,,,2020-01-21 00:00:00,1,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,0,0,0,22,4,Recovered
477,27,2020-02-05 00:00:00,new confirmed covid patient in singapore months son of no now in isolation room in hospital,Singapore,Singapore,male,0.0,,0,,2020-01-23 00:00:00,2020-02-03 00:00:00,0,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,11.0,0,0,0,22,4,Recovered
478,28,2020-02-05 00:00:00,new confirmed covid patient in singapore married fo no warded in isolationi at ncid symptom onset visited clinic isolated,Singapore,Singapore,male,45.0,2020-02-01 00:00:00,0,2020-02-02 00:00:00,2020-01-23 00:00:00,2020-02-03 00:00:00,0,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,11.0,1,0,0,22,4,Recovered
545,2,2020-01-24 00:00:00,new confirmed covid patient in south korea arrived at gimpo international airport works in wuhan visited chinese clinic after suffering from sore throat and other symptoms symptom onset confirmed recovered,South Korea,South Korea,male,55.0,2020-01-10 00:00:00,0,2020-01-19 00:00:00,,2020-01-22 00:00:00,1,0,0,1,"List(fever, sore throat)",Korea Bio Med,http://www.koreabiomed.com/news/articleView.html?idxno=7256,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,2,0.0,1,0,0,1,1,Recovered
532,82,2020-02-19 00:00:00,new confirmed covid patient in singapore symptom onset went to clinic and and hospital on and admitted,Singapore,Singapore,female,57.0,2020-02-09 00:00:00,0,2020-02-10 00:00:00,,,0,0,0,0,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,1,67,4,Sick
533,83,2020-02-19 00:00:00,new confirmed covid patient in singapore travels frequently to malaysia symptom onset went to clinic on ncid,Singapore,Singapore,male,54.0,2020-01-28 00:00:00,0,2020-02-01 00:00:00,,,0,0,0,1,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,0,67,4,Recovered
534,84,2020-02-19 00:00:00,new confirmed covid patient in singapore linked to case symptom onset went to clinic and referred to ncid on,Singapore,Singapore,female,35.0,2020-02-04 00:00:00,0,2020-02-04 00:00:00,,,0,0,0,1,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,0,67,4,Recovered
535,85,2020-02-19 00:00:00,new confirmed covid patient in singapore symptom onset went to clinic went to hospital,Singapore,Singapore,male,36.0,2020-02-14 00:00:00,0,2020-02-14 00:00:00,,,0,0,0,0,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,1,67,4,Sick
127,0,2020-01-25 00:00:00,confirmed imported covid pneumonia patient no in tianjin beijing resident visited wuhan from to arrived in tianjin on symptom onset on visited fever clinic on confirmed on,Tianjin,China,male,30.0,2020-01-24 00:00:00,0,2020-01-24 00:00:00,2020-01-18 00:00:00,2020-01-20 00:00:00,1,0,0,0,List(pneumonia),央视新闻,https://m.weibo.cn/status/4464681265155125?,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,2.0,1,0,1,133,4,Sick


In [61]:
display(df)

id,case_in_country,reporting date,summary,location,country,gender,age,symptom_onset,If_onset_approximated,hosp_visit_date,exposure_start,exposure_end,visiting Wuhan,from Wuhan,death,recovered,symptom,source,link,pneumonia,fever,cough,sore throat,malaise,headache,chills,fatigue,runny nose,sputum,diarrhea,shortness of breath,joint pain,vomiting,no symptom,nb_symptoms,exposure_duration,visited_hospital,is_quarantine,sick,source_importance,link_importance,state,summary_clean,info_summary
475,25,2020-02-05 00:00:00,new confirmed covid patient in singapore husband of no had not been to china recently at ncid symptom onset fever visited clinic with wife on went to ncid on,Singapore,Singapore,male,49.0,2020-01-24 00:00:00,0,2020-02-03 00:00:00,,,0,0,0,1,List(fever),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0.0,1,0,0,22,4,Recovered,"List(new, confirmed, covid, patient, singapore, husband, china, recently, ncid, symptom, onset, fever, visited, clinic, wife, went, ncid)",17
476,26,2020-02-05 00:00:00,new confirmed covid patient in singapore chinese national arrived in singapore from wuhan on daughter of no,Singapore,Singapore,female,42.0,,0,,,2020-01-21 00:00:00,1,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,0,0,0,22,4,Recovered,"List(new, confirmed, covid, patient, singapore, chinese, national, arrived, singapore, wuhan, daughter)",11
477,27,2020-02-05 00:00:00,new confirmed covid patient in singapore months son of no now in isolation room in hospital,Singapore,Singapore,male,0.0,,0,,2020-01-23 00:00:00,2020-02-03 00:00:00,0,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,11.0,0,0,0,22,4,Recovered,"List(new, confirmed, covid, patient, singapore, months, son, isolation, room, hospital)",10
478,28,2020-02-05 00:00:00,new confirmed covid patient in singapore married fo no warded in isolationi at ncid symptom onset visited clinic isolated,Singapore,Singapore,male,45.0,2020-02-01 00:00:00,0,2020-02-02 00:00:00,2020-01-23 00:00:00,2020-02-03 00:00:00,0,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,11.0,1,0,0,22,4,Recovered,"List(new, confirmed, covid, patient, singapore, married, fo, warded, isolationi, ncid, symptom, onset, visited, clinic, isolated)",15
545,2,2020-01-24 00:00:00,new confirmed covid patient in south korea arrived at gimpo international airport works in wuhan visited chinese clinic after suffering from sore throat and other symptoms symptom onset confirmed recovered,South Korea,South Korea,male,55.0,2020-01-10 00:00:00,0,2020-01-19 00:00:00,,2020-01-22 00:00:00,1,0,0,1,"List(fever, sore throat)",Korea Bio Med,http://www.koreabiomed.com/news/articleView.html?idxno=7256,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,2,0.0,1,0,0,1,1,Recovered,"List(new, confirmed, covid, patient, south, korea, arrived, gimpo, international, airport, works, wuhan, visited, chinese, clinic, suffering, sore, throat, symptoms, symptom, onset, confirmed, recovered)",23
532,82,2020-02-19 00:00:00,new confirmed covid patient in singapore symptom onset went to clinic and and hospital on and admitted,Singapore,Singapore,female,57.0,2020-02-09 00:00:00,0,2020-02-10 00:00:00,,,0,0,0,0,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,1,67,4,Sick,"List(new, confirmed, covid, patient, singapore, symptom, onset, went, clinic, hospital, admitted)",11
533,83,2020-02-19 00:00:00,new confirmed covid patient in singapore travels frequently to malaysia symptom onset went to clinic on ncid,Singapore,Singapore,male,54.0,2020-01-28 00:00:00,0,2020-02-01 00:00:00,,,0,0,0,1,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,0,67,4,Recovered,"List(new, confirmed, covid, patient, singapore, travels, frequently, malaysia, symptom, onset, went, clinic, ncid)",13
534,84,2020-02-19 00:00:00,new confirmed covid patient in singapore linked to case symptom onset went to clinic and referred to ncid on,Singapore,Singapore,female,35.0,2020-02-04 00:00:00,0,2020-02-04 00:00:00,,,0,0,0,1,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,0,67,4,Recovered,"List(new, confirmed, covid, patient, singapore, linked, case, symptom, onset, went, clinic, referred, ncid)",13
535,85,2020-02-19 00:00:00,new confirmed covid patient in singapore symptom onset went to clinic went to hospital,Singapore,Singapore,male,36.0,2020-02-14 00:00:00,0,2020-02-14 00:00:00,,,0,0,0,0,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,1,67,4,Sick,"List(new, confirmed, covid, patient, singapore, symptom, onset, went, clinic, went, hospital)",11
127,0,2020-01-25 00:00:00,confirmed imported covid pneumonia patient no in tianjin beijing resident visited wuhan from to arrived in tianjin on symptom onset on visited fever clinic on confirmed on,Tianjin,China,male,30.0,2020-01-24 00:00:00,0,2020-01-24 00:00:00,2020-01-18 00:00:00,2020-01-20 00:00:00,1,0,0,0,List(pneumonia),央视新闻,https://m.weibo.cn/status/4464681265155125?,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,2.0,1,0,1,133,4,Sick,"List(confirmed, imported, covid, patient, tianjin, beijing, resident, visited, wuhan, arrived, tianjin, symptom, onset, visited, fever, clinic, confirmed)",17


We export a special table in order to create a dashboard later.

In [63]:
df_for_dashboard = df.withColumnRenamed("reporting date", "reporting_date")
df_for_dashboard = df_for_dashboard.select("id", "reporting_date", "state", "country", "visited_hospital")
df_for_dashboard.write.format('com.databricks.spark.csv').option("header","True").option("inferSchema","True").mode("overwrite").save('df_for_dashboard1')

## 2. Data analysis

In [65]:
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

#### 2.1. Univariate Analysis

One interesting thing is to see the most common symptoms of COVID-19.

In [68]:
# count the number of times each symptom appear
counts_symptoms = df.select(psf.explode(col('symptom')).alias('col')).groupBy('col').count().collect()
dict_symptoms = {row['col']: row['count'] for row in counts_symptoms}
# not interested in "no symptom" item
del dict_symptoms["no symptom"]

In [69]:
#check deletion
display(df)

id,case_in_country,reporting date,summary,location,country,gender,age,symptom_onset,If_onset_approximated,hosp_visit_date,exposure_start,exposure_end,visiting Wuhan,from Wuhan,death,recovered,symptom,source,link,pneumonia,fever,cough,sore throat,malaise,headache,chills,fatigue,runny nose,sputum,diarrhea,shortness of breath,joint pain,vomiting,no symptom,nb_symptoms,exposure_duration,visited_hospital,is_quarantine,sick,source_importance,link_importance,state,summary_clean,info_summary
475,25,2020-02-05 00:00:00,new confirmed covid patient in singapore husband of no had not been to china recently at ncid symptom onset fever visited clinic with wife on went to ncid on,Singapore,Singapore,male,49.0,2020-01-24 00:00:00,0,2020-02-03 00:00:00,,,0,0,0,1,List(fever),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0.0,1,0,0,22,4,Recovered,"List(new, confirmed, covid, patient, singapore, husband, china, recently, ncid, symptom, onset, fever, visited, clinic, wife, went, ncid)",17
476,26,2020-02-05 00:00:00,new confirmed covid patient in singapore chinese national arrived in singapore from wuhan on daughter of no,Singapore,Singapore,female,42.0,,0,,,2020-01-21 00:00:00,1,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,0,0,0,22,4,Recovered,"List(new, confirmed, covid, patient, singapore, chinese, national, arrived, singapore, wuhan, daughter)",11
477,27,2020-02-05 00:00:00,new confirmed covid patient in singapore months son of no now in isolation room in hospital,Singapore,Singapore,male,0.0,,0,,2020-01-23 00:00:00,2020-02-03 00:00:00,0,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,11.0,0,0,0,22,4,Recovered,"List(new, confirmed, covid, patient, singapore, months, son, isolation, room, hospital)",10
478,28,2020-02-05 00:00:00,new confirmed covid patient in singapore married fo no warded in isolationi at ncid symptom onset visited clinic isolated,Singapore,Singapore,male,45.0,2020-02-01 00:00:00,0,2020-02-02 00:00:00,2020-01-23 00:00:00,2020-02-03 00:00:00,0,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,11.0,1,0,0,22,4,Recovered,"List(new, confirmed, covid, patient, singapore, married, fo, warded, isolationi, ncid, symptom, onset, visited, clinic, isolated)",15
545,2,2020-01-24 00:00:00,new confirmed covid patient in south korea arrived at gimpo international airport works in wuhan visited chinese clinic after suffering from sore throat and other symptoms symptom onset confirmed recovered,South Korea,South Korea,male,55.0,2020-01-10 00:00:00,0,2020-01-19 00:00:00,,2020-01-22 00:00:00,1,0,0,1,"List(fever, sore throat)",Korea Bio Med,http://www.koreabiomed.com/news/articleView.html?idxno=7256,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,2,0.0,1,0,0,1,1,Recovered,"List(new, confirmed, covid, patient, south, korea, arrived, gimpo, international, airport, works, wuhan, visited, chinese, clinic, suffering, sore, throat, symptoms, symptom, onset, confirmed, recovered)",23
532,82,2020-02-19 00:00:00,new confirmed covid patient in singapore symptom onset went to clinic and and hospital on and admitted,Singapore,Singapore,female,57.0,2020-02-09 00:00:00,0,2020-02-10 00:00:00,,,0,0,0,0,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,1,67,4,Sick,"List(new, confirmed, covid, patient, singapore, symptom, onset, went, clinic, hospital, admitted)",11
533,83,2020-02-19 00:00:00,new confirmed covid patient in singapore travels frequently to malaysia symptom onset went to clinic on ncid,Singapore,Singapore,male,54.0,2020-01-28 00:00:00,0,2020-02-01 00:00:00,,,0,0,0,1,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,0,67,4,Recovered,"List(new, confirmed, covid, patient, singapore, travels, frequently, malaysia, symptom, onset, went, clinic, ncid)",13
534,84,2020-02-19 00:00:00,new confirmed covid patient in singapore linked to case symptom onset went to clinic and referred to ncid on,Singapore,Singapore,female,35.0,2020-02-04 00:00:00,0,2020-02-04 00:00:00,,,0,0,0,1,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,0,67,4,Recovered,"List(new, confirmed, covid, patient, singapore, linked, case, symptom, onset, went, clinic, referred, ncid)",13
535,85,2020-02-19 00:00:00,new confirmed covid patient in singapore symptom onset went to clinic went to hospital,Singapore,Singapore,male,36.0,2020-02-14 00:00:00,0,2020-02-14 00:00:00,,,0,0,0,0,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,1,67,4,Sick,"List(new, confirmed, covid, patient, singapore, symptom, onset, went, clinic, went, hospital)",11
127,0,2020-01-25 00:00:00,confirmed imported covid pneumonia patient no in tianjin beijing resident visited wuhan from to arrived in tianjin on symptom onset on visited fever clinic on confirmed on,Tianjin,China,male,30.0,2020-01-24 00:00:00,0,2020-01-24 00:00:00,2020-01-18 00:00:00,2020-01-20 00:00:00,1,0,0,0,List(pneumonia),央视新闻,https://m.weibo.cn/status/4464681265155125?,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,2.0,1,0,1,133,4,Sick,"List(confirmed, imported, covid, patient, tianjin, beijing, resident, visited, wuhan, arrived, tianjin, symptom, onset, visited, fever, clinic, confirmed)",17


In [70]:
# filter when count is greater than 5 cases
dict_symptoms_filtered = {k:v for (k,v) in sorted(dict_symptoms.items(), key=lambda item: item[1]) if v > 5 and k != ''}
# plot
%matplotlib inline
fig = plt.figure(figsize=(5,5))
plt.grid()
plt.barh(range(len(dict_symptoms_filtered)), list(dict_symptoms_filtered.values()), align='center', color="brown")
plt.yticks(range(len(dict_symptoms_filtered)), list(dict_symptoms_filtered.keys()))
plt.title("Number of cases by symptom",fontsize=18,weight='bold')
plt.xlabel("Count",fontsize=15)
plt.ylabel("Symptom",fontsize=15)
plt.show()

quality of some graphs is better done with pandas than by directly using spark visualization tools. This is why 
only for this part we use pandas.

In [72]:
pd_df = df.toPandas()

In [73]:
pd_df.hist(column = 'age')
plt.title("Age distribution of infected persons",fontsize=18,weight='bold')
plt.xlabel("Age",fontsize=15)
plt.ylabel("Frequency",fontsize=15)
plt.show()

the graph above shows that the age category [50,60] years is predominant among the infected persons, which till now what has been confirmed by the empirical results.

To finish with univariate analysis, we can have a look at the distribution of the target variable : how many patients became sick ? died ? recovered ?

In [76]:
sns.countplot(x="state", data=pd_df)
plt.title("Number of cases concerned by status of the disease",fontsize=18,weight='bold')
plt.xlabel("State",fontsize=15)
plt.ylabel("Count",fontsize=15)
plt.show()

#### 2.2 Bivariate analysis

In [78]:
fig,ax = plt.subplots(figsize = (8,5))
sns.boxplot(x='death', y='age', data=pd_df)
plt.title("Boxplot of Age distribution per Death Status",fontsize=18,weight='bold')
plt.xlabel("Death Status",fontsize=15)
plt.ylabel("Age",fontsize=15)
plt.show()

On one hand, we have seen above that eldery people get more infected by the virus than younger ones. On the other hand, the boxplot above shows that they are the ones who died the most from it.

We start now looking at the link between death rate and the epicentre of COVID-19 pandemic (Wuhan).

In [81]:
fig,ax = plt.subplots(figsize = (8,5))
sns.barplot(x='from Wuhan', y='death', data=pd_df, ci=0)
plt.title("Death distribution as function of origin",fontsize=18,weight='bold')
plt.xlabel("from Wuhan",fontsize=15)
plt.ylabel("Death",fontsize=15)
plt.show()

As we can see on the chart  above , the death rate is higher among those who lived or were from Wuhan than elsewhere. It was true until end of march which the date basically until which we have most of our data collected.

In [83]:
fig,ax = plt.subplots(figsize = (8,5))
sns.barplot(x='visiting Wuhan', y='death', data=pd_df, ci=0)
plt.title("Death distribution by Wuhan visit status",fontsize=18,weight='bold')
plt.xlabel("Visiting Wuhan",fontsize=15)
plt.ylabel("Death",fontsize=15)
plt.show()

Contrary to what the previous graph has shown, death rate is not higher for people who visited Wuhan than the others. This is due to the fact that the disease rapidly spread around the World and so a lot of people died even if they had never visited Wuhan.

Now let's have a look at the figures for the disease by country according to our data.

In [86]:
df_country = pd_df[(pd_df['country']=='South Korea')| \
                   (pd_df['country']=='China')|(pd_df['country']=='France')| \
                   (pd_df['country']=='Hong Kong')|(pd_df['country']=='Taiwan')| \
                   (pd_df['country']=='Japan')]
fig,ax = plt.subplots(figsize = (22,5))
sns.barplot(x='country', y='death', data=df_country, ci=0)
plt.suptitle("Warning: our data contain some NA; small differences with the known rates are possible",fontsize=12)
plt.title("Percentage of deaths per country (on data till end March 2020)", fontsize=18,weight='bold')
plt.xlabel("Country",fontsize=15)
plt.ylabel("Percentage Death",fontsize=15)
plt.show()

In [87]:
g = sns.FacetGrid(pd_df, hue="death", size=6)
g = (g.map(sns.distplot, "exposure_duration", hist=True, rug=True))
plt.title("Exposure duration per death status",fontsize=18,weight='bold')
plt.xlabel("Exposure Duration (in number of days)",fontsize=15)
plt.ylabel("Percentage",fontsize=15)
plt.legend(['recovered','death'])
plt.show()

we can notice some peaks before 10 days, those are maybe the most vulnerable persons.

In [89]:
fig,ax = plt.subplots(figsize=(8,5))
sns.barplot(x='visited_hospital',y='death',data=pd_df, ci=0)
plt.title("Death distribution as a function of hospital visit status",fontsize=18,weight='bold')
plt.xlabel("visited hospital",fontsize=15)
plt.ylabel("Death",fontsize=15)
plt.show()

From the graph above, we can notice that the death rate is higher among people who went to hospital. One of the reason which could explain this outcome is that people who visited hospital are the ones who were at severe stages of the disease. Thereby, were the ones with the highest death probabilities.

Drop columns unused later.

In [92]:
columns_to_drop2 = ["reporting_date", "summary", "symptom_onset", \
                    "If_onset_approximated", "hosp_visit_date", "exposure_start", \
                    "exposure_end","link","deaths","recovered","sick","symptom","location"]
df = df.drop(*columns_to_drop2)

#### 2.3. Correlations

In [94]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

Correlation matrix between (some) numeric variables.

In [96]:
# convert to vector column first
vector_col="corr_features"
num_cols = ['case_in_country', 'age', 'visiting Wuhan', 'from Wuhan', 'death', 'nb_symptoms', \
            'info_summary', 'exposure_duration', 'visited_hospital', 'is_quarantine', 'source_importance', 'link_importance']
numerics = df.select([c for c in df.columns if c in num_cols])
assembler = VectorAssembler(inputCols=numerics.columns, outputCol=vector_col)
df_vector = assembler.transform(numerics.na.drop()).select(vector_col)
corr_mat= Correlation.corr(df_vector, vector_col, method="pearson").collect()[0][0]

Plot Heatmap

In [98]:
corrmatrix = corr_mat.toArray().tolist()
corr_df = spark.createDataFrame(corrmatrix, numerics.columns)

In [99]:
corr_df = corr_df.toPandas()
f, ax = plt.subplots(figsize=(9, 6))
heatmap = sns.heatmap(corr_df, linewidth=0.5,vmin = -1, 
                      vmax = 1, linewidths=.5, ax=ax)
heatmap

##3. Models estimation with pipelines

In this part, we are going to try random forest classifier & Naive Bayes classifier methods. We also wanted to try out Boosted Gradient descent but only binary classification is currently supported by MLib on spark for this method. Our target variable is a multiclass variable. Multiple logistic regression is also supported in MLib spark but since we are here not interested in modelling the probability of an individual to become sick, recovered or dead but by actually if he/she becomes one of this.

###3.1. Random Forest Classifier

In [103]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [104]:
columns_to_drop3 = ["reporting date", "symptom_onset", \
                    "If_onset_approximated", "hosp_visit_date", "exposure_start", \
                    "exposure_end","link","deaths","recovered","sick","symptom","location"]
df_pip = df_pipelines.drop(*columns_to_drop3)
df_pip = df_pip.dropna()

##### 1) Transformers

In [106]:
display(df_pipelines)

id,case_in_country,reporting date,summary,location,country,gender,age,symptom_onset,If_onset_approximated,hosp_visit_date,exposure_start,exposure_end,visiting Wuhan,from Wuhan,death,recovered,symptom,source,link,pneumonia,fever,cough,sore throat,malaise,headache,chills,fatigue,runny nose,sputum,diarrhea,shortness of breath,joint pain,vomiting,no symptom,nb_symptoms,exposure_duration,visited_hospital,is_quarantine,sick,source_importance,link_importance,state
475,25,2020-02-05 00:00:00,new confirmed covid patient in singapore husband of no had not been to china recently at ncid symptom onset fever visited clinic with wife on went to ncid on,Singapore,Singapore,male,49.0,2020-01-24 00:00:00,0,2020-02-03 00:00:00,,,0,0,0,1,List(fever),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0.0,1,0,0,22,4,Recovered
476,26,2020-02-05 00:00:00,new confirmed covid patient in singapore chinese national arrived in singapore from wuhan on daughter of no,Singapore,Singapore,female,42.0,,0,,,2020-01-21 00:00:00,1,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,0,0,0,22,4,Recovered
477,27,2020-02-05 00:00:00,new confirmed covid patient in singapore months son of no now in isolation room in hospital,Singapore,Singapore,male,0.0,,0,,2020-01-23 00:00:00,2020-02-03 00:00:00,0,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,11.0,0,0,0,22,4,Recovered
478,28,2020-02-05 00:00:00,new confirmed covid patient in singapore married fo no warded in isolationi at ncid symptom onset visited clinic isolated,Singapore,Singapore,male,45.0,2020-02-01 00:00:00,0,2020-02-02 00:00:00,2020-01-23 00:00:00,2020-02-03 00:00:00,0,0,0,1,List(no symptom),Straits Times,https://www.straitstimes.com/singapore/health/coronavirus-4-more-confirmed-cases-in-singapore-28-cases-so-far,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,11.0,1,0,0,22,4,Recovered
545,2,2020-01-24 00:00:00,new confirmed covid patient in south korea arrived at gimpo international airport works in wuhan visited chinese clinic after suffering from sore throat and other symptoms symptom onset confirmed recovered,South Korea,South Korea,male,55.0,2020-01-10 00:00:00,0,2020-01-19 00:00:00,,2020-01-22 00:00:00,1,0,0,1,"List(fever, sore throat)",Korea Bio Med,http://www.koreabiomed.com/news/articleView.html?idxno=7256,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,2,0.0,1,0,0,1,1,Recovered
532,82,2020-02-19 00:00:00,new confirmed covid patient in singapore symptom onset went to clinic and and hospital on and admitted,Singapore,Singapore,female,57.0,2020-02-09 00:00:00,0,2020-02-10 00:00:00,,,0,0,0,0,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,1,67,4,Sick
533,83,2020-02-19 00:00:00,new confirmed covid patient in singapore travels frequently to malaysia symptom onset went to clinic on ncid,Singapore,Singapore,male,54.0,2020-01-28 00:00:00,0,2020-02-01 00:00:00,,,0,0,0,1,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,0,67,4,Recovered
534,84,2020-02-19 00:00:00,new confirmed covid patient in singapore linked to case symptom onset went to clinic and referred to ncid on,Singapore,Singapore,female,35.0,2020-02-04 00:00:00,0,2020-02-04 00:00:00,,,0,0,0,1,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,0,67,4,Recovered
535,85,2020-02-19 00:00:00,new confirmed covid patient in singapore symptom onset went to clinic went to hospital,Singapore,Singapore,male,36.0,2020-02-14 00:00:00,0,2020-02-14 00:00:00,,,0,0,0,0,List(no symptom),Ministry of Health Singapore,https://www.moh.gov.sg/news-highlights/details/five-more-cases-discharged-three-new-cases-of-covid-19-infection-confirmed,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0.0,1,0,1,67,4,Sick
127,0,2020-01-25 00:00:00,confirmed imported covid pneumonia patient no in tianjin beijing resident visited wuhan from to arrived in tianjin on symptom onset on visited fever clinic on confirmed on,Tianjin,China,male,30.0,2020-01-24 00:00:00,0,2020-01-24 00:00:00,2020-01-18 00:00:00,2020-01-20 00:00:00,1,0,0,0,List(pneumonia),央视新闻,https://m.weibo.cn/status/4464681265155125?,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,2.0,1,0,1,133,4,Sick


In [107]:
#create a tfidf feature vector
tokenizer = ml_feature.Tokenizer(inputCol='summary', outputCol='token_summary')


stop = ml_feature.StopWordsRemover()
stopwords = stop.getStopWords() 

remover = ml_feature.StopWordsRemover(inputCol='token_summary', outputCol='tokenized_summary', stopWords = stopwords)


hashingTF = HashingTF(inputCol='tokenized_summary', outputCol='hashed')


# While applying HashingTF only needs a single pass to the data, applying TF-IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.

idf = IDF(inputCol='hashed', outputCol='tfidf')



In [108]:
#index the dependant variable
labelIndexer = StringIndexer(inputCol="state", outputCol="indexedLabel")

#index the categorical variable "gender"
genderIndexer = StringIndexer(
  inputCol="gender",
  outputCol="genderindex")

#assemble all features into a single vector used as model features
colchoice= ['case_in_country', 'age', 'visiting Wuhan', 'from Wuhan', 'pneumonia', 'fever', 'cough', 'sore throat', 'malaise', 'headache', 'chills', 'fatigue', 'runny nose', 'sputum', 'diarrhea', 'shortness of breath', 'joint pain', 'vomiting', 'no symptom', 'nb_symptoms', 'exposure_duration', 'visited_hospital', 'is_quarantine', 'source_importance', 'link_importance','genderindex']
assembler = VectorAssembler(
    inputCols= colchoice,
    outputCol='features')


##### 2) classifier

In [110]:
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features", numTrees=20)

##### 3) Pipeline

In [112]:
pipeline = Pipeline(stages=[tokenizer,remover,hashingTF,idf,labelIndexer,genderIndexer,assembler,rf])

Split the data between train set and test set and predict on test set.

##### 4) estimate rf model

In [115]:
(trainingData, testData) = df_pip.randomSplit([0.7, 0.3]) #split the data, 70% train, 30% test
#drop if label not in both datasets
model_rf = pipeline.fit(trainingData)#fit the pipeline model
predictions_rf = model_rf.transform(testData)#predict on the test sample
#see some predictions
predictions_rf.select("prediction", "indexedLabel", "features").show(10)

As we can see on the head of the predictions above , the model predicts sometimes falsely the class. We will see which model has the best accuracy to be kept. Then, we will use a third part machine ML algorithm to tune the parameters in order to increase the accuracy of the best model.

##### 5) Accuracy of the model rf

In [118]:
evaluator_rf = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy_rf= evaluator_rf.evaluate(predictions_rf)
print("Accuracy of random forest model on the test sample = %g " % (accuracy_rf))

###3.2. Naive Bayes classifier

##### 1) classifier

In [121]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(labelCol="indexedLabel", featuresCol="features",smoothing=1.0, modelType="multinomial")

##### 2) pipeline

In [123]:
pipeline = Pipeline(stages=[tokenizer,remover,hashingTF,idf,labelIndexer,genderIndexer,assembler,nb])

##### 3) estimate nb model

In [125]:
model_nb = pipeline.fit(trainingData)#fit the pipeline model
predictions_nb = model_nb.transform(testData)#predict  on the test sample
#see first predictions
predictions_nb.select("prediction", "indexedLabel", "features").show(10)

##### 4) accuracy of the model nb

In [127]:
evaluator_nb = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy_nb= evaluator_nb.evaluate(predictions_nb)
print("Accuracy of Naive Bayes model on the test sample = %g " % (accuracy_nb))

We are going to tune only the parameters of the best model. Therefore, let's choose the one with the higher accuracy(% of good predictions).

In [129]:
print("Accuracy of random forest model on the test sample = %g " % (accuracy_rf))
print("Accuracy of Naive Bayes model on the test sample = %g " % (accuracy_nb))

We can see clearly that Random Forest method performs better than bayesian one. So, we choose random forest model to tune.

##4. Model optimization with Scikit-Learn

###4.1. Cross-Validation

Cross validation (CV) is a useful method to estimate a model  while avoiding the risk of overfitting(not good for generalization of a model results to new data). It is a sort of generalization of the idea of splitting a sample between a train & validation samples. There exist multiple versions of CV, one of the most known is K-Fold CV. The principle of this one is to split an initial sample into K sub-samples of equal size, then to train the model on K-1 of these sub samples (for a given set of hyper-parameters) then test the model & compute the error of the left sub-sample (which plays the role of the validation). Then , repeat this process until all the K sub-samples have been used once as validation set. Finally, for each set of hyper-parameters an average error is computed over all these combinations.

In [134]:
# We split randomly our initial sample into k equal-size folds or parts
from sklearn import model_selection
#since we do not have relatively a medium dataset size, we are going to use 10 Fold CV
numFolds = 10
kf = model_selection.KFold(n_splits=numFolds)
(trainingData, testData) = df_pip.randomSplit([0.7, 0.3],seed=1040)
target = 'state'
featureNames = 'features'

In this section we are going to use *scikit-learn* library. It only supports *Pandas* dataframes, so we are going to prepare the data using the same steps as above, and then store the resulting dataframe into a *Pandas* one.

In [136]:
# applying the preparation steps
pipeline_prep = Pipeline(stages=[tokenizer,remover,hashingTF,idf,labelIndexer,genderIndexer,assembler])
model_prep = pipeline_prep.fit(df_pip)
df_prep = model_prep.transform(df_pip)

After the previous command, *df_prep* contains a lot of columns but the *feature* column contains a merging of all columns into a tuple.

In [138]:
# converting the resulting dataframe to Pandas
df_for_sklearn = df_prep.toPandas()
# select only trainable feature and target
df_for_sklearn = df_for_sklearn[["id","features","state"]]

We then split the data into a training and a testing sets.

In [140]:
from sklearn.model_selection import train_test_split

# Hold out 30% of the data for testing.  We will use the rest for training.
labels = df_for_sklearn[target].values
features = df_for_sklearn[featureNames].values
trainingLabels, testLabels, trainingFeatures, testFeatures = train_test_split(labels, features, test_size=0.3)

###4.2. Definition of parallel computing structure : RDD

In [142]:
import numpy as np
nbTrees=[int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]
tasks = []
for tree in nbTrees:
  for fold in range(numFolds):
    tasks = tasks + [(tree, fold)]
    
#parallel computing creating an RDD by task
tasksRDD = sc.parallelize(tasks, numSlices = len(tasks))

In [143]:
print(nbTrees)
print(tasks)

###4.3. Determine the best model

In a Random Forest classification model, usually the hyperparameters to optimize are :

1- numTrees: Number of trees in the forest

2- maxDepth :Max number of levels in each decision tree

3- featureSubsetStrategy: Number of features to test as candidates for splitting at each tree node

4- minInstancesPerNode: Minimum number of instances each child must have after a split

5- minInfoGain: Minimum information gain for a split to be considered at a tree node with respect to Gini Index.

However, the most important one is the first  which is the one we are going to optimize here since already we do not have a lot of variables.

In [146]:
trainingFeaturesBroadcast = sc.broadcast(trainingFeatures)
trainingLabelsBroadcast = sc.broadcast(trainingLabels)

In [147]:
from sklearn import ensemble
#then define a function to perform parallel computing with random forest 
def trainOneModel(tree, fold):
  """
  Given 1 task (1 hyperparameter tree value + 1 fold index), train the corresponding model.
  Return: model, error score on the fold's test data, task info.
  """
  # Extract indices for this fold
  trainIndex, valIndex = [], []
  fold_ = 0 # index into folds 'kf'

  # Get training data from the broadcast variables
  localTrainingFeatures = trainingFeaturesBroadcast.value
  localTrainingLabels = trainingLabelsBroadcast.value
  
  for trainIndex_, valIndex_ in kf.split(localTrainingFeatures):
    if fold_ == fold:
      trainIndex, valIndex = trainIndex_, valIndex_
      break
    fold_ += 1

  X_train, X_val = localTrainingFeatures[trainIndex], localTrainingFeatures[valIndex]
  Y_train, Y_val = localTrainingLabels[trainIndex], localTrainingLabels[valIndex]
  # Train the model, and score it
  model_rf_sklearn = ensemble.RandomForestClassifier(n_estimators=tree)
  model_rf_sklearn.fit(list(X_train), Y_train)
  score = model_rf_sklearn.score(list(X_val), Y_val)
  return  score, tree, fold

In [148]:
trainedModelAndScores = tasksRDD.map(lambda tree_fold: trainOneModel(tree_fold[0], tree_fold[1]))
trainedModelAndScores.cache()
trainedModelAndScores.count()

In [149]:
#get the scores results
allScores = trainedModelAndScores.map(lambda x: (x[0], x[1], x[2])).collect()
# Average scores over folds
avgScores = dict(map(lambda tree: (tree, 0.0), nbTrees))
for score, tree, fold in allScores:
  avgScores[tree] += score
for tree in nbTrees:
  avgScores[tree] /= numFolds
avgScores

In [150]:
# Find the best model
bestnb_tree = -5
bestScore = -5
for tree in nbTrees:
  if avgScores[tree] > bestScore:
    bestnb_tree = tree
    bestScore = avgScores[tree]
print('the best number of trees is: %g, which gives a score of: %g' % (bestnb_tree, bestScore))