In [1]:
# Start a SparkSession
import findspark
import requests
import json
import sys
from pyspark.sql.functions import *
from pyspark.sql import functions as sf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType,TimestampType
from pyspark.sql import Row
import datetime as dt
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import *
from pyspark.context import SparkContext
import pyspark
from urllib.request import Request,urlopen
import boto3
from io import StringIO
from botocore.client import Config

In [2]:
##create spark session
spark = SparkSession.builder.appName("project").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

sc = spark.sparkContext

In [3]:
url = 'https://api.covid19api.com/summary'
def get_api_data(url):
    ##read table using spark
    http=urlopen(url).read().decode('utf-8')
    rdd = sc.parallelize([http])
    ##pass rdd to json
    df_read = spark.read.option("multiline","true")\
             .json(rdd)
    ##Explode covid data to select all countries
    df_explode = df_read.withColumn('Countries',sf.explode(sf.col('Countries'))).select(['Countries.Country','Countries.CountryCode','Countries.Date','Countries.ID','Countries.NewConfirmed','Countries.NewDeaths','Countries.NewRecovered','Countries.TotalConfirmed','Countries.TotalDeaths','Countries.TotalRecovered'])
    df_explode.printSchema()
    df_format = df_explode.selectExpr('cast(Country as string) Country',
                                      'cast(CountryCode as string) CountryCode',
                                      'cast(Date as date) Date',
                                      'cast(ID as string) ID',
                                      'cast(NewConfirmed as int) NewConfirmed',
                                      'cast(NewDeaths as int) NewDeaths',
                                      'cast(NewRecovered as int) NewRecovered',
                                      'cast(TotalConfirmed as int) TotalConfirmed',
                                      'cast(TotalDeaths as int) TotalDeaths',
                                      'cast(TotalRecovered as int) TotalRecovered',)
    df_format.printSchema()
    #Process the data 
    df_processed =df_format.groupBy('Country','CountryCode','Date','ID','NewConfirmed','NewDeaths','NewRecovered','TotalConfirmed','TotalDeaths','TotalRecovered').count()
    #convert pyspark dataframe to pandas
    pandas_df = df_processed.toPandas()
    print(pandas_df.head())
    return pandas_df

# ##load transformed dataframe to AWS S3 Bucket
ACCESS_KEY_ID =''
ACCESS_SECRET_KEY =''
BUCKET_NAME = 'project-group1'
i = 'covid_dataset.csv'

def upload_df(pandas_df,i):
    s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=ACCESS_SECRET_KEY)
    csv_buf=StringIO()
    pandas_df.to_csv(csv_buf, header=True, index=False)
    csv_buf.seek(0)
    s3.put_object(Bucket=BUCKET_NAME, Body=csv_buf.getvalue(), Key='datasets/'+i)

if __name__ == "__main__":
    pandas_df = get_api_data(url)
    #load_s3 = upload_df(pandas_df,i)
    

root
 |-- Country: string (nullable = true)
 |-- CountryCode: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- NewConfirmed: long (nullable = true)
 |-- NewDeaths: long (nullable = true)
 |-- NewRecovered: long (nullable = true)
 |-- TotalConfirmed: long (nullable = true)
 |-- TotalDeaths: long (nullable = true)
 |-- TotalRecovered: long (nullable = true)

root
 |-- Country: string (nullable = true)
 |-- CountryCode: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- ID: string (nullable = true)
 |-- NewConfirmed: integer (nullable = true)
 |-- NewDeaths: integer (nullable = true)
 |-- NewRecovered: integer (nullable = true)
 |-- TotalConfirmed: integer (nullable = true)
 |-- TotalDeaths: integer (nullable = true)
 |-- TotalRecovered: integer (nullable = true)

