# COVID19 Notebook
website: `https://github.com/CSSEGISandData/COVID-19`

```bash
cd ~/cisc_525
hdfs dfs -copyFromLocal COVID-19 /user/student

jupyter notebook
```

In [None]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
import subprocess
from pyspark.sql.functions import col, max as max_
import datetime

In [None]:
spark = SparkSession.builder.appName("covid19-app").config("spark.config.option", "value").getOrCreate()
scfg = SparkConf().setAppName('covid19-app')


In [None]:
day = 'hdfs://localhost:9000/user/student/csse_covid_19_data/csse_covid_19_daily_reports_us/05-15-2020.csv'
time_series = 'hdfs://localhost:9000/user/student/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_US.csv'

In [None]:
ts_df = spark.read.option('header', 'true').csv(time_series)
day_df = spark.read.option('header', 'true').csv(day)

## RDD Section

In [None]:
# Converting from a dataframe df to a resilient distributed data rdd
ts_rdd = ts_df.rdd

In [None]:
# Data Column number

# UID,iso2,iso3,code3,FIPS,Admin2,Province_State,Country_Region,Lat,Long_,Combined_Key,
# 1/22/20,1/23/20,1/24/20,1/25/20,1/26/20,1/27/20,1/28/20,1/29/20,1/30/20,1/31/20,2/1/20,
# 2/2/20,2/3/20,2/4/20,2/5/20,2/6/20,2/7/20,2/8/20,2/9/20,2/10/20,2/11/20,2/12/20,2/13/20,
# 2/14/20,2/15/20,2/16/20,2/17/20,2/18/20,2/19/20,2/20/20,2/21/20,2/22/20,2/23/20,2/24/20,
# 2/25/20,2/26/20,2/27/20,2/28/20,2/29/20,3/1/20,3/2/20,3/3/20,3/4/20,3/5/20,3/6/20,3/7/20,
# 3/8/20,3/9/20,3/10/20,3/11/20,3/12/20,3/13/20,3/14/20,3/15/20,3/16/20,3/17/20,3/18/20,
# 3/19/20,3/20/20,3/21/20,3/22/20,3/23/20,3/24/20,3/25/20,3/26/20,3/27/20,3/28/20,3/29/20,
# 3/30/20,3/31/20,4/1/20,4/2/20,4/3/20,4/4/20,4/5/20,4/6/20,4/7/20,4/8/20,4/9/20,4/10/20,
# 4/11/20,4/12/20,4/13/20,4/14/20,4/15/20,4/16/20,4/17/20,4/18/20,4/19/20,4/20/20,4/21/20,
# 4/22/20,4/23/20,4/24/20,4/25/20,4/26/20,4/27/20,4/28/20,4/29/20,4/30/20,5/1/20,5/2/20,
# 5/3/20,5/4/20,5/5/20,5/6/20,5/7/20,5/8/20,5/9/20,5/10/20,5/11/20,5/12/20,5/13/20,5/14/20,
# 5/15/20

TS_COLUMNS = ['UID', 'iso2', 'iso3', 'code3', 'FIPS', 'Admin2', 'Province_State',
              'Country_Region', 'Lat', 'Long_', 'Combined_Key']

TS_DATE_START_COLUMN = 11

ts_first_row = ts_rdd.first()
ts_first_row


In [None]:
# Iterating through the list of values of the first row
for val in ts_first_row:
    print(val)


In [None]:
# Iterate through the list of values of first row using column name with ROW data type
for key in TS_COLUMNS:
    print(key, '=', ts_first_row[key])
    
# ts_by_dates = ts_first[TS_DATE_START_COLUMN:]
# for ts_by_date in ts_by_dates:
#     print(ts_by_date)

In [None]:
# Something about date and time
start_date = datetime.date(2020, 1, 22)
print (start_date.strftime('%m/%d/%y'))
start_date += datetime.timedelta(days=1)
cur_date_str = '{}/{}/{}'.format(start_date.month, start_date.day, start_date.year-2000)
print (start_date.strftime('%0m/%d/%y'))
print(cur_date_str)
print(cur_date_str == '1/23/20')

cur_date = datetime.date(2020, 1, 22)
DATE_COLUMNS = []
while True:
    cur_date += datetime.timedelta(days=1)
    cur_date_str = '{}/{}/{}'.format(cur_date.month, cur_date.day, cur_date.year-2000)
    print(cur_date_str)
    DATE_COLUMNS.append(cur_date_str)
    if cur_date_str == '6/7/20':
        break

for date_str in DATE_COLUMNS:
    print(date_str)

In [None]:
# Printing out the content of the values by date columns. 
# Date columns are to extend over time.

start_date = datetime.date(2020, 1, 22)
ts_first = ts_rdd.first();
ts_by_dates = ts_first[TS_DATE_START_COLUMN:]

# for ts_by_date in ts_by_dates:
#     print(ts_by_date)
    
for date_str in DATE_COLUMNS:
    print(date_str, '=', ts_first[date_str])

### Group By Province or State


In [None]:
# When we group by provice or state, we get a list of nodes. each node
# consists of the key (name of the state) and a list of da rows for each 
# of the states.

ts_states = ts_rdd.groupBy(lambda x: x['Province_State'])
# dir(ts_states)
sorted_by_states = ts_states.sortByKey('Province_State')
# print(sorted_by_states)

sorted_by_states.collect()

for row in sorted_by_states.collect():
    print(row[0], len(row[1]))

In [None]:

print(len(ts_states.collect()))
for state in sorted(ts_states.collect()):
    print(state[0], len(state[1]))
    for item in state[1]:
        print ('\t', item['Admin2'])

In [None]:
# ts_filtered = ts_rdd.filter(lambda x: x['Admin2'] != None)
ts_admin2s = ts_rdd.groupBy(lambda x: x['Admin2'])
# ts_states
ts_admin2s = ts_admin2s.filter(lambda x: x[0] != None)
# print(ts_admin2s.collect())
print(len(ts_admin2s.collect()))
for admin2 in sorted(ts_admin2s.collect()):
    print(admin2[0], len(admin2[1]))


## Dataframe

In [None]:
ts_df

In [None]:
dir(ts_df)

In [None]:
ts_df.count()

In [None]:
ts_df.first()['UID']

In [None]:
out = ts_df.groupBy('Province_State').count().orderBy('Province_State')
out.collect()


In [None]:
ts_df.first()['Province_State']

In [None]:
ts_df.dtypes

In [None]:
ts_df.columns

In [None]:
first_row = ts_df.first()
print(first_row.UID)
print(first_row['UID'])

In [None]:
ts_df.select('Province_State').show()

In [None]:
ts_df.sort('Province_State').select('Province_State').show()

In [None]:

ts_df.filter(ts_df['5/15/20'] != '0').select('Province_State', '5/15/20').orderBy(desc('5/15/20')).show()

In [None]:
target_date = '6/7/20'
target = ts_df.select('Admin2', 'Province_State', target_date).where(ts_df['Admin2'] != 'null')

In [None]:
target.Admin2

In [None]:
from pyspark.sql.types import (StructField, StringType, StructType, IntegerType)
data_fields = [StructField('Admin2', StringType(), True), StructField('Province_State', StringType(), True),
               StructField(target_date, StringType(), True)]
data_schema = StructType(data_fields)
newDF = spark.createDataFrame(target.rdd, schema=data_schema)
newDF.dtypes

In [None]:

target_list = []
for row in target.collect():
    target_list.append(['{}.{}'.format(row['Province_State'], row['Admin2']), int(row[target_date])])
target_list

In [None]:
def get_key(x):
    return x[1]

target_list
sorted(target_list, key = get_key, reverse=True)

# 'New York.New York', 206969 Jun 7
# 'New York.New York', 206511 Jun 6
# 'Georgia.Fulton', 4823 Jun 7
# 'Georgia.Fulton', 4822 Jun 6

### End DF demonstration

In [None]:
def count_by_country_region (day):
    df = spark.read.option("header", "true").csv(day)
    return df.groupBy('Country_region').count().orderBy(desc('count'))

In [None]:

count_by_country_region(day).show()

In [None]:
import subprocess

dir_in = "/user/student/covid19/daily"
args = "hdfs dfs -ls "+dir_in+" | awk '{print $8}'"
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)

s_output, s_err = proc.communicate()
all_dart_dirs = s_output.split()