## India covid-19 data inflow source pipeline

In [1]:
import os, glob
import datetime
from requests import get
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup

In [2]:
#!python3 -m pip install papermill OwmUCHEBBy-BBKPxRekb

# India MHW website daliy data scraping for official COVID-19 reports

## Dashboard Input data Pipeline

### Data Loading 

In [3]:
#test_df = pd.read_html(web_url)[7].iloc[:,1:]

In [4]:
#test_df.info()

In [5]:
## COLLECT DATA FROM MINISTERY OF HEALTH

web_url = "https://www.mohfw.gov.in"
file_store_loc = "/Volumes/Lab/PROJECTS/COVID-19-AnalyticsHub/India/data/mhw_ind_covid_daily_reports/"
def ind_mh_web_scrap(url, data_dir):
    #df = pd.read_html(url, header=0)[1].iloc[:,1:]
    df = pd.read_html(url)[7].iloc[:,1:] #Read covid-19 india 
    df.loc[(df['Name of State / UT'].str.contains("Total")),'Name of State / UT'] = "India"
    df.rename(columns={'Name of State / UT': 'State/UT'}, inplace=True)
    df['report_date'] = pd.Timestamp.now().normalize()
    #df.iloc[-1:,:3] = df.iloc[-1:,:3].apply(lambda x: x.str.replace('*',''))
    file_name = datetime.datetime.now().strftime("%Y-%m-%d")
    df.to_csv(data_dir+file_name+'.csv', index=False)

    
if __name__ == "__main__":
    ind_mh_web_scrap(web_url,file_store_loc)

### Data Transformation

In [11]:
#Static data India state loading
file_loc = '/Volumes/Lab/PROJECTS/COVID-19-AnalyticsHub/India/data/state_geo_table.csv'
ind_state_geo = pd.read_csv(file_loc, index_col=0).reset_index()

In [12]:
ind_state_geo.head()

Unnamed: 0,State/UT,lat,lon
0,Andhra Pradesh,14.750429,78.570023
1,Arunachal Pradesh,28.218,94.7278
2,Assam,26.2006,92.9376
3,Bihar,25.785414,87.479973
4,Chandigarh,30.719997,76.780006


In [92]:
#store all csv files daily reports from directory.
root_dir = '/Volumes/Lab/PROJECTS/COVID-19-AnalyticsHub/India/data/mhw_ind_covid_daily_reports/'
files = sorted([file for file in os.listdir(root_dir)])[2:]
out_df = pd.concat((pd.read_csv(root_dir+ file) \
                            for file in files),ignore_index=False) # Load one by one csv file and concat into pandas dataframe
out_df.columns = ['State/UT', 'Confirmed_IN','Confirmed_FN','Recovered', 'Death', 'report_date']
#df.iloc[-1:,:3] = df.iloc[-1:,:3].apply(lambda x: x.str.replace('*',''))

out_df['report_date'] = pd.to_datetime(out_df['report_date']).dt.date #change into pandas timestamp
out_df['Confirmed_ALL'] = [x+y for x,y in zip(out_df.Confirmed_IN, out_df.Confirmed_FN)]

In [93]:
replace_values = {'Union Territory of Chandigarh' : 'Punjab', 
                  'Union Territory of Jammu and Kashmir' : 'Jammu and Kashmir', 
                  'Union Territory of Ladakh' : 'Ladakh',
                  'Odisha' :'Orissa',
                  'Jammu and Kashmir' : 'Jammu and Kashmir'}                                                                                          
# 
out_df = out_df.replace({"State/UT": replace_values}) 
out_df = out_df.merge(ind_state_geo, on='State/UT', how = 'left')
out_df = out_df[~out_df['State/UT'].str.contains(r'[0-9]')] # remove numeric values
out_df['report_date'] = pd.to_datetime(out_df['report_date']) # convert column into datetime

In [94]:

#out_df = out_df[~out_df['S.N'].str.contains("Total")]
#out_df.fillna(0, inplace=True)

#out_df.rename(columns = {'State/UT':'States'}, inplace = True)

In [95]:

#out_df = out_df[~out_df['S.N'].str.contains("Total")]
#out_df['Confirmed_ALL'] = [x+y for x,y in zip(out_df.Confirmed_IN,out_df.Confirmed_IN)]

In [96]:
#Calculation for day's number
#out_df = out_df.sort_values(by='Confirmed_ALL',ascending=False).groupby(['report_date', 'State/UT']).first().reset_index()
out_df = out_df.sort_values(by='Confirmed_ALL',ascending=False).groupby(['report_date', 'State/UT']).first().reset_index()
out_df_shift = out_df.copy()
out_df_shift['report_date'] = pd.to_datetime(out_df_shift['report_date']) + \
pd.to_timedelta(pd.np.ceil(1.0), unit="D")
out_df_shift = out_df_shift[['report_date', 'State/UT','Recovered', 'Death', 'Confirmed_ALL']]

  


In [97]:
out_df

Unnamed: 0,report_date,State/UT,Confirmed_IN,Confirmed_FN,Recovered,Death,Confirmed_ALL,States,Latitude,Longitude
0,2020-03-19,Andhra Pradesh,2,0,0,0,2,Andhra Pradesh (20),14.750429,78.570026
1,2020-03-19,Chandigarh,1,0,0,0,1,Chandigarh (1),30.719997,76.780006
2,2020-03-19,Chhattisgarh,1,0,0,0,1,Chhattisgarh (3),22.090420,82.159987
3,2020-03-19,Delhi,11,1,2,1,12,Delhi (2),28.669993,77.230004
4,2020-03-19,Haryana,3,14,0,0,17,Haryana (10),28.450006,77.019991
...,...,...,...,...,...,...,...,...,...,...
157,2020-03-25,Tamil Nadu,16,2,1,0,18,Tamil Nadu (19),12.920386,79.150042
158,2020-03-25,Telengana,25,10,1,0,35,Telengana (10),17.123184,79.208824
159,2020-03-25,Uttar Pradesh,34,1,11,0,35,Uttar Pradesh (29),27.599981,78.050006
160,2020-03-25,Uttarakhand,3,1,0,0,4,Uttarakhand (12),30.316496,78.032188


In [98]:
final_df = out_df.merge(out_df_shift, on=['report_date','State/UT'], suffixes=('', '_shift'), how='left')

In [99]:
final_df.Death.unique()

array([0, 1, 4, 7, 2, 9, 10, '0', '1', '1#', '9#', '2'], dtype=object)

In [101]:
final_df.head()

Unnamed: 0,report_date,State/UT,Confirmed_IN,Confirmed_FN,Recovered,Death,Confirmed_ALL,States,Latitude,Longitude,Recovered_shift,Death_shift,Confirmed_ALL_shift,new_cases
0,2020-03-19,Andhra Pradesh,2,0,0,0,2,Andhra Pradesh (20),14.750429,78.570026,,,,
1,2020-03-19,Chandigarh,1,0,0,0,1,Chandigarh (1),30.719997,76.780006,,,,
2,2020-03-19,Chhattisgarh,1,0,0,0,1,Chhattisgarh (3),22.09042,82.159987,,,,
3,2020-03-19,Delhi,11,1,2,1,12,Delhi (2),28.669993,77.230004,,,,
4,2020-03-19,Haryana,3,14,0,0,17,Haryana (10),28.450006,77.019991,,,,


In [100]:
final_df = out_df.merge(out_df_shift, on=['report_date','State/UT'], suffixes=('', '_shift'), how='left')
final_df['new_cases'] = final_df.Confirmed_ALL - final_df.Confirmed_ALL_shift
final_df['new_Deaths'] = final_df.Death - final_df.Death_shift
final_df['new_Recovd'] = final_df.Recovered - final_df.Recovered_shift
final_df.report_date['report_date'] = final_df.report_date.dt.date
final_df.fillna(0,inplace=True)


TypeError: unsupported operand type(s) for -: 'str' and 'int'

In [81]:
final_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 162 entries, 0 to 161
Data columns (total 16 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   report_date          162 non-null    datetime64[ns]
 1   State/UT             162 non-null    object        
 2   Confirmed_IN         162 non-null    int64         
 3   Confirmed_FN         162 non-null    int64         
 4   Recovered            162 non-null    int64         
 5   Death                162 non-null    int64         
 6   Confirmed_ALL        162 non-null    int64         
 7   States               162 non-null    object        
 8   Latitude             162 non-null    float64       
 9   Longitude            162 non-null    float64       
 10  Recovered_shift      162 non-null    float64       
 11  Death_shift          162 non-null    float64       
 12  Confirmed_ALL_shift  162 non-null    float64       
 13  new_cases            162 non-null  

In [13]:
final_df.head()

NameError: name 'final_df' is not defined

## Elasticsearch Pipeline

In [82]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

In [83]:
es = Elasticsearch(['https://a149be5c55774db58b9b12c4741d5e95.asia-south1.gcp.elastic-cloud.com:9243'], 
                    http_auth=('elastic', 'hftyQ1VbGu9JJnbIFiEvDW0R'), timeout = 3000)
# delete index if exists
if es.indices.exists('india-covid19-db01'):
    es.indices.delete(index='india-covid19-db01')

settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "location": {
        "type": "geo_point"
      }
    }
  }
}

In [84]:
es.indices.create(index='india-covid19-db01', ignore=400, body=settings)
documents = final_df.to_dict(orient='records')
for i in range(0, len(documents)):
    documents[i].update({'location': {'lat':documents[i]['Latitude'], 'lon': documents[i]['Longitude']}})
bulk(es, documents, index = 'india-covid19-db01', raise_on_error = True)

(162, [])