# Assignment 3
Name: Lakshmi Biradar
Redid: 825975651

In [123]:
#path for input covid data file and out_path to save csv generated
input_path = 'covid_confirmed_usafacts (1).csv'
output_path = 'Output'

In [124]:
#imports
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
import pyspark.pandas as ps
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import SQLContext
from pyspark.sql.functions import expr
import numpy as np
import warnings
warnings.filterwarnings('ignore')

In [125]:
# Function to compute weekly covid date for California

def Covid_rate(covid_df, output):
    spark = SparkSession.builder \
        .master("local") \
        .appName("Print") \
        .getOrCreate()
    reader = spark.read
    CassesDF = reader.option("header", True).option("inferSchema", True).csv(covid_df)

    # filter for California State and convert to pandas on spark
    
    CA_df = CassesDF.filter((CassesDF.State == "CA") & (CassesDF.countyFIPS != 0) )
    countyColumns = CA_df[['countyFIPS', 'County Name']].to_pandas_on_spark()
    week_df = CA_df.drop('countyFIPS', 'County Name', 'State', 'StateFIPS').to_pandas_on_spark()
    columns_list = week_df.columns.astype('datetime64[ns]').to_series()

    #Calulate First sunday and last saturday.
    date_column = columns_list.reset_index()
    Start_index = date_column[date_column['index'].dt.dayofweek == 6].first_valid_index()
    end_index = date_column[date_column['index'].dt.dayofweek == 5].last_valid_index()
    start_date = date_column['index'][Start_index]
    end_date = date_column['index'][end_index]
    
    #Pre-processing before applying groupby. 1) identify sundays and then copy date 
    #to following weekdays and generate dict to map.
    
    column_df = columns_list.to_frame(name='Date')
    column_df['Day'] = column_df['Date'].dt.dayofweek
    column_df['Date'] = column_df['Date'].apply(lambda x: x.strftime("%Y-%m-%d") if x.dayofweek == 6 else np.NaN)
    column_df = column_df[start_date:end_date]
    column_df = column_df.fillna(method='ffill')
    column_df = column_df.reset_index()
    rename = column_df.set_index('index').to_dict()['Date']
    
    #remove unwanted dates and in week column copy respective date from previous rename to apply group by
    first_sunday = start_date.strftime("%Y-%m-%d")
    last_saturday = end_date.strftime("%Y-%m-%d")
    full_weeks = week_df.loc[:, first_sunday:last_saturday]
    full_weeks = full_weeks.T.diff().reset_index()
    full_weeks['week'] = full_weeks['index'].astype('datetime64[ns]')
    full_weeks = full_weeks.replace({"week": rename}) #replace weekdays of the week with respective sunday date for easy groupby
    full_weeks = full_weeks.drop(columns='index')
    grouped_weeks = full_weeks.groupby(['week']).sum().T
    
    # Groupby and merge the dataframe
    table = ps.merge(countyColumns, grouped_weeks, left_index=True, right_index=True)
    
    #write table to the path provided 
    table.to_csv(output,num_files=1)
    display(table)

## As discussed in lecture arg parser for aws.
(doesn't require to run locally)

In [None]:
def files_from_args():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('-i', '--input', default='input')
    parser.add_argument('-o', '--output', default='output')
    args = parser.parse_args()
    return args.input, args.output

if __name__ == "__main__":
    input_df, output = files_from_args()
    Covid_rate(input_df, output)


### Call covid rate function with paths

In [126]:
if __name__ == "__main__":
    Covid_rate(input_path, output_path)



Unnamed: 0,countyFIPS,County Name,2020-01-26,2020-02-02,2020-02-09,2020-02-16,2020-02-23,2020-03-01,2020-03-08,2020-03-15,2020-03-22,2020-03-29,2020-04-05,2020-04-12,2020-04-19,2020-04-26,2020-05-03,2020-05-10,2020-05-17,2020-05-24,2020-05-31,2020-06-07,2020-06-14,2020-06-21,2020-06-28,2020-07-05,2020-07-12,2020-07-19,2020-07-26,2020-08-02,2020-08-09,2020-08-16,2020-08-23,2020-08-30,2020-09-06,2020-09-13,2020-09-20,2020-09-27,2020-10-04,2020-10-11,2020-10-18,2020-10-25,2020-11-01,2020-11-08,2020-11-15,2020-11-22,2020-11-29,2020-12-06,2020-12-13,2020-12-20,2020-12-27,2021-01-03,2021-01-10,2021-01-17,2021-01-24,2021-01-31,2021-02-07,2021-02-14,2021-02-21,2021-02-28,2021-03-07,2021-03-14,2021-03-21,2021-03-28,2021-04-04,2021-04-11,2021-04-18,2021-04-25,2021-05-02,2021-05-09,2021-05-16,2021-05-23,2021-05-30,2021-06-06,2021-06-13,2021-06-20,2021-06-27,2021-07-04,2021-07-11,2021-07-18,2021-07-25,2021-08-01,2021-08-08,2021-08-15,2021-08-22,2021-08-29,2021-09-05,2021-09-12,2021-09-19,2021-09-26,2021-10-03,2021-10-10,2021-10-17,2021-10-24,2021-10-31,2021-11-07,2021-11-14,2021-11-21,2021-11-28,2021-12-05,2021-12-12,2021-12-19,2021-12-26,2022-01-02,2022-01-09,2022-01-16,2022-01-23,2022-01-30,2022-02-06,2022-02-13,2022-02-20,2022-02-27,2022-03-06,2022-03-13,2022-03-20,2022-03-27,2022-04-03
0,6001,Alameda County,3,-5,1,5,5,25,134,204,207,265,257,292,281,352,328,424,465,533,466,630,774,1012,1247,1571,1613,1439,1345,1319,1478,1187,1063,777,554,648,517,489,526,532,603,735,1091,1459,2074,2346,4410,5242,5600,4535,5753,6661,5104,3615,2767,2235,1653,1097,935,772,661,570,620,621,673,690,656,560,500,382,341,283,225,299,310,411,596,1089,1684,2215,2592,2331,2245,2303,2961,1898,2568,1489,1155,1154,926,784,804,802,870,744,816,580,1022,965,1628,4145,11931,26014,26286,21594,16576,6503,5393,3227,4302,970,978,736,860,914,573
1,6003,Alpine County,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,7,14,11,11,5,7,7,0,1,3,4,0,0,0,1,4,0,2,2,1,0,2,0,0,0,1,1,0,0,0,0,0,0,1,0,0,0,0,0,2,0,0,0,1,0,6,2,1,1,0,1,2,0,0,0,1,2,0,1,3,1,0,3,2,2,3,1,1,-1,1,0,-3,0,2,0,0
2,6005,Amador County,0,-1,0,0,0,0,1,0,3,1,2,0,1,0,1,1,0,0,0,1,3,12,10,11,50,40,52,53,17,7,8,2,5,9,5,5,7,8,30,27,26,35,52,114,586,563,379,267,334,223,89,93,145,75,63,31,29,21,16,20,31,30,24,11,16,4,7,10,5,14,23,22,12,20,10,13,35,59,88,169,117,92,154,106,144,125,123,161,118,63,108,72,62,48,33,38,59,35,40,58,100,288,370,504,494,333,174,87,75,116,39,22,23,8,1
3,6007,Butte County,0,0,0,0,0,0,0,7,4,4,3,0,3,2,2,10,15,14,19,18,46,83,121,232,237,180,162,194,184,241,487,353,132,98,57,69,69,51,77,99,133,230,295,371,635,839,910,737,915,904,613,420,350,244,168,136,101,77,79,78,71,80,73,117,120,150,129,102,67,70,70,64,49,37,54,67,127,232,294,388,457,461,812,555,1099,808,566,438,362,483,401,384,292,184,163,180,204,154,153,203,747,1439,1980,2161,1766,867,720,436,214,198,62,80,51,51,37
4,6009,Calaveras County,0,-2,0,0,0,2,0,2,2,3,1,2,1,0,0,1,1,2,5,6,9,10,22,14,15,23,26,19,23,27,39,30,14,4,7,4,4,10,7,13,16,29,55,86,120,137,190,119,158,200,142,92,76,74,31,20,18,25,19,22,19,17,18,14,27,30,18,13,19,20,10,5,6,9,18,14,28,15,66,125,141,161,193,79,192,150,119,126,118,70,118,120,109,66,60,60,83,64,66,60,123,300,601,515,533,216,183,76,47,26,2,-8,10,9,5
5,6011,Colusa County,0,0,0,0,0,0,0,1,2,0,0,0,0,0,0,0,4,1,1,3,12,39,54,80,75,65,39,32,18,12,32,16,20,20,6,3,5,3,7,23,22,29,80,98,128,128,107,110,116,130,100,98,49,35,46,12,10,5,3,2,4,5,8,7,10,7,5,5,4,9,5,5,3,5,5,8,13,13,35,63,43,56,58,32,53,32,16,22,12,19,26,23,21,18,32,22,26,15,16,8,29,75,138,156,133,102,81,36,21,15,8,-2,2,2,3
6,6013,Contra Costa County,0,-6,1,1,16,26,79,144,122,199,149,98,95,99,118,133,194,197,239,268,512,870,1161,1479,1483,1452,1360,1239,1125,892,829,699,633,635,606,454,444,425,541,626,1008,1227,1686,1759,3199,3944,3927,3190,4537,5205,4106,2962,2025,1643,1278,990,838,694,595,523,542,504,582,592,503,467,444,373,359,291,303,325,324,381,451,758,1298,1879,2682,2097,2107,2099,3062,2632,2440,1300,1089,1016,871,722,759,691,703,584,639,500,898,719,1047,2598,8053,16721,16196,14179,11414,4622,3836,2011,1598,1100,635,551,804,537,337
7,6015,Del Norte County,0,0,0,0,0,0,0,0,0,2,0,0,0,3,2,7,27,4,0,2,0,10,3,3,20,2,4,9,6,9,3,6,5,7,9,10,13,4,5,13,16,42,58,77,91,104,91,53,55,44,25,14,11,25,29,39,29,31,34,48,44,47,30,25,13,45,36,24,18,7,10,10,8,17,11,21,19,66,133,172,283,309,289,150,307,145,123,13,28,19,32,32,27,12,28,46,48,42,-1,20,65,109,217,244,428,216,331,173,96,61,49,21,-16,3,4
8,6017,El Dorado County,0,0,0,0,0,3,8,8,8,11,7,5,1,8,8,17,7,20,14,24,35,65,91,130,138,92,95,64,62,43,36,42,24,41,85,56,35,38,68,101,123,283,432,598,814,847,779,622,767,734,531,350,172,198,163,105,96,78,47,73,67,99,123,120,131,137,90,72,77,37,29,33,50,44,83,103,156,225,327,441,475,405,535,480,522,348,227,275,235,190,217,197,193,180,148,138,202,176,188,300,850,1489,1747,1611,1438,734,646,368,145,87,73,35,101,43,14
9,6019,Fresno County,1,-6,0,0,0,2,17,72,77,122,127,142,202,403,400,440,547,494,673,854,1355,2680,2517,2974,2843,2624,2108,1744,1653,1264,1169,940,672,660,643,577,750,731,665,756,1499,1910,2963,2505,5186,6671,6661,5134,6454,7021,5110,3310,2258,1760,1308,1089,976,937,804,723,664,483,460,390,361,347,359,256,237,194,137,153,165,156,204,315,527,805,1428,1717,2480,2007,2460,3493,4405,2587,2188,2196,2066,2127,2306,2443,2064,1542,1317,931,1289,1077,1069,1252,3907,10009,11920,20918,14426,6483,5638,2658,1647,1273,724,917,867,705,172


# AWS CLI

aws emr create-cluster --os-release-label 2.0.20220406.1 --applications Name=Hadoop Name=Hive Name=Pig Name=Hue Name=Spark --ec2-attributes '{"KeyName":"emr-key","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-0b3fe2bcb07d70b1b","EmrManagedSlaveSecurityGroup":"sg-04c86372c06cb3119","EmrManagedMasterSecurityGroup":"sg-06867e219426fbf24"}' --release-label emr-6.6.0 --log-uri 's3n://aws-logs-870716117016-us-west-1/elasticmapreduce/' --steps '[{"Args":["spark-submit","--deploy-mode","client","s3://649-lab3/asignment.py","-i","s3://649-lab3/covid_confirmed_usafacts.csv","-o","s3://649-run-output/run5/"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":"command-runner.jar","Properties":"","Name":"Spark application"}]' --instance-groups '[{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"MASTER","InstanceType":"m5.xlarge","Name":"Master - 1"},{"InstanceCount":2,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"CORE","InstanceType":"m5.xlarge","Name":"Core - 2"}]' --auto-scaling-role EMR_AutoScaling_DefaultRole --bootstrap-actions '[{"Path":"s3://649-lab3/emr_bootstrap.sh","Name":"Custom action"}]' --ebs-root-volume-size 10 --service-role EMR_DefaultRole --enable-debugging --auto-termination-policy '{"IdleTimeout":1800}' --name 'My cluster' --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region us-west-1