In [31]:
import boto3
import pandas as pd
import requests
from io import BytesIO
from datetime import datetime, timedelta
import uuid

In [32]:
# Adapter Layer

def get_data_csvs(url: str, files: list[str]) -> int:
    for file in files:
        new_url = url%file
        print(new_url)
        response = requests.get(new_url)
        with open(file,'w') as fh:
            fh.write(response.text)
    
    return 0
    
def get_meta_file(bucket_name,meta_file_name,s3_resource):
    download_file_from_s3_bucket(bucket_name,meta_file_name,s3_resource)
    return 0

def read_csvs_to_df(files: list[str]):
    dfs = []
    for file in files:
        df = pd.read_csv(file)
        dfs.append(df)
    
    return dfs


def create_bucket(bucket_name,s3_connection):
    session = boto3.session.Session()
    current_region = session.region_name
    bucket_name = bucket_name
    if current_region == 'us-east-1':
        bucket_response = s3_connection.create_bucket(
            Bucket=bucket_name)
    else:
        bucket_response = s3_connection.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={
                'LocationConstraint':current_region
            }
        )
    print(bucket_name,current_region)
    return bucket_name, bucket_response

def write_csvs_to_s3_bucket(bucket_name,files_names,s3_resource):
    for file_name in files_names:
        s3_resource.Object(bucket_name, file_name).upload_file(
        Filename=file_name)
        
def download_file_from_s3_bucket(bucket_name,file_name,s3_resource):
    s3_resource.Object(bucket_name,file_name).download_file(file_name)

In [40]:
# Application Layer

def extract(url: str, bucket_name: str, meta_file:str,  files: list[str], s3_resource) -> list[pd.DataFrame]:
    get_data_csvs(url,files)
    get_meta_file(bucket_name,meta_file,s3_resource)
    new_files = files.copy()
    new_files.append(meta_file)
    dfs: list[pd.DataFrame] = read_csvs_to_df(new_files)
    return dfs

def transform(df_recovered_cases: pd.DataFrame, meta_file: str, source_date: str):
    transform_recovered_cases_file(df_recovered_cases)
    transfor_meta_file(meta_file,source_date)

def load_data(s3_resource, bucket_name: str, files: list[str]) -> int:
    first_bucket_name, first_response = create_bucket(
    bucket_name= bucket_name,
    s3_connection=s3_resource.meta.client)

    #Write the data (CSV files) to the bucket
    write_csvs_to_s3_bucket(bucket_name,files,s3_resource)
    return 0

def transform_recovered_cases_file(df_recovered_cases: pd.DataFrame):
    #Transform the recovered covid cases
    series = df_recovered_cases.iloc[:,4:].max(axis=1).sort_values(ascending=False)
    df = df_recovered_cases.merge(series.to_frame('Recovered_Cases'),left_index=True, right_index=True)
    df_all_recovered_cases = df[['Province/State','Country/Region','Lat','Long','Recovered_Cases']]
    df_all_recovered_cases = df_all_recovered_cases.sort_values(by='Recovered_Cases',ascending=False)
    df_recovered_cases.to_csv('time_series_covid19_recovered_global.csv', index=False)
    return 0

def transfor_meta_file(meta_file_name: str, min_date: str):
    curr_date = datetime.now().strftime('%-m/%-d/%-y %-H:%-M:%-S')
    df = pd.read_csv(meta_file_name)
    print(min_date,curr_date)
    row_data = {'source_date':min_date,'processing_date':curr_date}
    df2 = pd.DataFrame(row_data,index=[0])
    # add df2 to the top of the df
    df_meta_file = pd.concat([df2,df])
    df_meta_file.to_csv(meta_file_name, index=False)
    return 0
    

def etl_covido_data(url: str, files: list[str], s3_resource, bucket_name: str, meta_file: str) -> int:
    df_confirmed_cases, df_deaths_cases, df_recovered_cases, df_meta_file = extract(url,bucket_name,meta_file,files,s3_resource)
    
    source_date = str(df_confirmed_cases.columns.to_list()[-1])
    transform(df_recovered_cases,meta_file,source_date)
    
    #After editing meta file and saving it to local storage as csv file we append meta_file name to files list 
    #which further be uploaded to S3 bucket
    new_files = files.copy()
    new_files.append(meta_file)

    #Load the csv files into the S3 Bucket
    load_data(s3_resource,bucket_name,new_files)
    return df_confirmed_cases, df_deaths_cases, df_recovered_cases, df_meta_file

In [41]:
# main function entrypoint

def main():
    #Parameters/Configurations
    url = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/%s"
    files = [
    "time_series_covid19_confirmed_global.csv",
    "time_series_covid19_deaths_global.csv",
    "time_series_covid19_recovered_global.csv"
]
    
    # Initialize the S3 resource
    s3_resource = boto3.resource('s3')

    # Unique global bucket name
    bucket_name = 'covido-bucket'

    # Meta csv file name for job dates control
    meta_file = 'meta_file.csv'
    

    # ETL the data
    result: tuple[pd.DataFrame] = etl_covido_data(url, files, s3_resource, bucket_name, meta_file)
    return result
    

In [42]:
# Run the main function
df_confirmed_cases, df_deaths_cases, df_recovered_cases, df_meta_file = main() 

https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv
https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv
https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_recovered_global.csv
6/15/22 6/16/22 7:9:38
covido-bucket us-east-1
