In [1]:
import numpy as np 
import datetime
import pandas as pd 
import random
import math
import time
import operator 
from fbprophet import Prophet
from datetime import date 
from datetime import timedelta

In [2]:
def getpath():
  data_path = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/"
  confirmed_df = pd.read_csv(data_path+'time_series_covid19_confirmed_US.csv')
  deaths_df = pd.read_csv(data_path+'time_series_covid19_deaths_US.csv')
  recoveries_df = pd.read_csv(data_path+'time_series_covid19_recovered_global.csv')
  latest_data = pd.read_csv('https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports_us/04-17-2020.csv')
  recoveries_df = recoveries_df[recoveries_df['Country/Region']=='US']
  return confirmed_df

In [3]:
def getlatestdata():
  today = date.today()
  yesterday = today - timedelta(days = 1) 
  yesterday = str(yesterday)
  yesterday = datetime.datetime.strptime(yesterday, '%Y-%m-%d').strftime('%m-%d-%Y')
  yesterday = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports_us/'+yesterday+'.csv'
  latest_data = pd.read_csv(yesterday)
  return latest_data

In [4]:
def createdf(confirmed_df):
  prediction_df = pd.melt(confirmed_df, id_vars=['Admin2','Province_State',
       'Country_Region', 'Lat', 'Long_'], value_vars=confirmed_df.columns[confirmed_df.columns.str.contains("/")])
  prediction_df = prediction_df.rename(columns={"value":"Confirmed_Cases","variable":"Confirmed_Date","Province_State":"State",'Admin2':'County',"Country_Region":"Country","Lat":"Latitude","Long_":"Longitude"})
  prediction_df = prediction_df[prediction_df['Latitude']!= 0.000000]
  prediction_df=prediction_df[~prediction_df['County'].isna()]
  prediction_df['Confirmed_Date'] = pd.to_datetime(prediction_df['Confirmed_Date'])
  prediction_df = prediction_df.sort_values(by = ["State","Confirmed_Date"])
  prediction_df["County_State"] = prediction_df["County"]+'-'+prediction_df["State"]
  prediction_df = prediction_df.drop(['State', 'County','Country'], axis = 1)
  return prediction_df

In [5]:
def createlist(prediction_df):
  distinct_county_state = prediction_df["County_State"].unique()
  distinct_county_state = distinct_county_state.tolist()
  return distinct_county_state

In [6]:
def prediction(distinct_county_state,prediction_df,df):
  for idx,word in enumerate(distinct_county_state):
    x = prediction_df.loc[prediction_df["County_State"]==word]
    y = x.drop(['Latitude', 'Longitude','County_State'], axis = 1)
    y = y.rename(columns={'Confirmed_Date':'ds','Confirmed_Cases':'y'})
    forecast = prophet(y)
    forecast["County_State"]=word
    forecast["Longitude"] = x['Latitude'].iloc[0]
    forecast["Latitude"] = x['Longitude'].iloc[0]
    df = df.append(forecast)
  return df

In [7]:
def prophet(y):
  m = Prophet(interval_width=0.95)
  m.fit(y)
  future = m.make_future_dataframe(periods=7)
  forecast  = m.predict(future)
  forecast = forecast[['ds','yhat','yhat_lower','yhat_upper','weekly','trend']]
  forecast = forecast[-8:]
  return forecast

In [8]:
def createcountydf(confirmed_df):
  y = pd.melt(confirmed_df, id_vars=['Admin2','Province_State','Country_Region', 'Lat', 'Long_'], value_vars=confirmed_df.columns[confirmed_df.columns.str.contains("/")])
  y = y.rename(columns={"value":"Confirmed_Cases","variable":"Confirmed_Date","Province_State":"State",'Admin2':'County',"Country_Region":"Country","Lat":"Latitude","Long_":"Longitude"})
  y = y[y['Latitude']!= 0.000000]
  y=y[~y['County'].isna()]
  y['Confirmed_Date'] = pd.to_datetime(y['Confirmed_Date'])
  y = y.sort_values(by = ["State","Confirmed_Date"])
  y["County_State"] = y["County"]+'-'+y["State"]
  y = spark.createDataFrame(y)
  return y 

In [9]:
def save_as_table(finaldf):
  finaldf.write.mode("overwrite").saveAsTable("default.cvd") #creates prediction table next 7 days
  
def save_as_table_latest(x):
  x.write.mode("overwrite").saveAsTable("default.latestus") #creates latest data table state wise as per new file available in github
  
def save_as_table_county(y):
  y.write.mode("overwrite").saveAsTable("default.latestcounty") #creates latest data table county wise as per new file available in github

In [10]:
def main():
  df = pd.DataFrame()
  confirmed_df = getpath()
  prediction_df = createdf(confirmed_df)
  distinct_county_state = createlist(prediction_df)
  finaldf = prediction(distinct_county_state,prediction_df,df)
  finaldf = spark.createDataFrame(finaldf)
  y = createcountydf(confirmed_df)
  x = getlatestdata()
  x = spark.createDataFrame(x)
  save_as_table_county(y)
  save_as_table(finaldf)
  save_as_table_latest(x)
  
  

In [11]:
main() 