# Fetch All Latest Covid Data
* Create account on RapidAPI and subscribe to "https://rapidapi.com/ShubhGupta/api/covid19-data/" and get XRapidAPIKey
* Store CODE_HOME and XRAPIDAPIKEY as environment variable
* Use "all" endpoint to get latest covid data for all countries and store it in latest-covid-data.csv at ASSIGNMENTPATH

In [37]:
import requests
import json
import csv
import os

XRAPIDAPIKEY = os.environ.get('XRAPIDAPIKEY')
CODE_HOME = os.getcwd()

url = "https://covid19-data.p.rapidapi.com/all"

headers = {
    "X-RapidAPI-Host": "covid19-data.p.rapidapi.com",
    "X-RapidAPI-Key": XRAPIDAPIKEY
}

response = requests.request("GET", url, headers=headers)
jsondata = json.loads(response.text)
print(response.text)
with open(CODE_HOME + '/latest-covid-data.csv', "w") as output_file:
    dict_writer = csv.DictWriter(output_file, jsondata[0].keys())
    dict_writer.writeheader()
    dict_writer.writerows(jsondata)

[{"countrycode":"AD","country":"Andorra","latitude":"42.5063","longitude":"1.5218","confirmed":852,"deaths":51,"recovered":757,"active":44},{"countrycode":"AE","country":"United Arab Emirates","latitude":"23.424076","longitude":"53.847818","confirmed":39904,"deaths":283,"recovered":22740,"active":16881},{"countrycode":"AF","country":"Afghanistan","latitude":"33.93911","longitude":"67.709953","confirmed":21459,"deaths":384,"recovered":2651,"active":18424},{"countrycode":"AG","country":"Antigua and Barbuda","latitude":"17.0608","longitude":"-61.7964","confirmed":26,"deaths":3,"recovered":20,"active":3},{"countrycode":"AL","country":"Albania","latitude":"41.1533","longitude":"20.1683","confirmed":1299,"deaths":34,"recovered":960,"active":305},{"countrycode":"AM","country":"Armenia","latitude":"40.0691","longitude":"45.0382","confirmed":13675,"deaths":217,"recovered":4451,"active":9007},{"countrycode":"AO","country":"Angola","latitude":"-11.2027","longitude":"17.8739","confirmed":96,"death

# Sprak Dataframe
* Create Spark Dataframe from json data fetched above
* Drop unwanted columns

In [38]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType

spark = SparkSession.builder.appName("PySparkAssignment.com").getOrCreate()


schema = StructType([
    StructField("countrycode",StringType(),True), \
    StructField("country",StringType(),True), \
    StructField("latitude",StringType(),True), \
    StructField("longitude", StringType(), True), \
    StructField("confirmed", LongType(), True), \
    StructField("deaths", LongType(), True), \
    StructField("recovered", LongType(), True), \
    # US contains active cases as empty string which is not allowing LongType
    StructField("active", StringType(), True) \
  ])

df = spark.createDataFrame(jsondata, schema=schema)
df = df.drop("countrycode", "latitude", "longitude")

df.printSchema()

total_countries = df.count()

root
 |-- country: string (nullable = true)
 |-- confirmed: long (nullable = true)
 |-- deaths: long (nullable = true)
 |-- recovered: long (nullable = true)
 |-- active: string (nullable = true)



## To change column type of "active" change empty string to 0

In [39]:
from pyspark.sql.functions import when, col, round
df = df.withColumn("active", when(col("active")=="" ,0) \
                   .otherwise(col("active"))) \
                   .withColumn("active",col("active").cast("Long"))

df.printSchema()

df.show(200) # 181 countries are there but assuming 200

root
 |-- country: string (nullable = true)
 |-- confirmed: long (nullable = true)
 |-- deaths: long (nullable = true)
 |-- recovered: long (nullable = true)
 |-- active: long (nullable = true)

+--------------------+---------+------+---------+------+
|             country|confirmed|deaths|recovered|active|
+--------------------+---------+------+---------+------+
|             Andorra|      852|    51|      757|    44|
|United Arab Emirates|    39904|   283|    22740| 16881|
|         Afghanistan|    21459|   384|     2651| 18424|
| Antigua and Barbuda|       26|     3|       20|     3|
|             Albania|     1299|    34|      960|   305|
|             Armenia|    13675|   217|     4451|  9007|
|              Angola|       96|     4|       38|    54|
|           Argentina|    24761|   717|     7568| 16476|
|             Austria|    16979|   672|    15875|   432|
|           Australia|     7274|   102|     6740|   432|
|          Azerbaijan|     8191|    98|     4606|  3487|
|Bosnia

***

# Contries Affected by Covid Country

In [40]:
affected_country = df.withColumn("affected",round(col("deaths")/col("confirmed"), 4)) \
                                 .sort(col("affected")).collect()

## Most affected

In [41]:
print('Country: ' + affected_country[total_countries-1][0])
print('Death Rate: ' + str(affected_country[total_countries-1][5]))

Country: Yemen
Death Rate: 0.2424


## Least affected

In [42]:
print('Country: ' + affected_country[0][0])
print('Death Rate: ' + str(affected_country[0][5]))

Country: Bhutan
Death Rate: 0.0


***

# Countries having Confirmed Cases

In [43]:
sort_by_total_cases_df = df.sort(col("confirmed")).collect()

## Highest cases

In [44]:
print('Country: ' + sort_by_total_cases_df[total_countries-1][0])
print('Cases: ' + str(sort_by_total_cases_df[total_countries-1][1]))

Country: United States of America
Cases: 1979411


## Lowest cases

In [45]:
print('Country: ' + sort_by_total_cases_df[0][0])
print('Cases: ' + str(sort_by_total_cases_df[0][1]))

Country: Lesotho
Cases: 4


***

# Total Confirmed Cases

In [46]:
total_cases = df.groupBy().sum("confirmed").collect()
print('Cases: ' + str(total_cases[0][0]))

Cases: 7230042


***

# Country's Covid Treatment Efficiency
## (death/total cases)

In [47]:
efficientdf = df.withColumn("efficiency",round(col("recovered")/col("confirmed"), 4)) \
                            .sort(col("efficiency")).collect()

## 1. Highest efficiency

In [48]:
print("Country: " + efficientdf[total_countries-1][0])
print("Efficiently: " + str(efficientdf[total_countries-1][5]))

Country: Timor-Leste
Efficiently: 1.0


## 2. Lowest efficiency

In [49]:
print("Country: " + efficientdf[0][0])
print("Efficiently: " + str(efficientdf[0][5]))

Country: Chile
Efficiently: 0.0


***

# Contries still suffering covid with active cases

In [50]:
active_cases = df.sort(col("active")).collect()

## Highest suffering

In [51]:
print("Cases: " + str(active_cases[total_countries-1][4]))

Cases: 304360


## Lowest suffering

In [52]:
print("Cases: " + str(active_cases[0][4]))

Cases: 0


***

# Save analysis to csv file

In [53]:
analysis = [
    {
        'most_affected': affected_country[total_countries-1][0],
        'least_affected': affected_country[0][0],
        'highest_cases': sort_by_total_cases_df[total_countries-1][0],
        'lowest_cases': sort_by_total_cases_df[0][0],
        'total_cases': 'All',
        'highest_efficiency': efficientdf[total_countries-1][0],
        'lowest_efficiency': efficientdf[0][0],
        'highest_suffering': active_cases[total_countries-1][0],
        'lowest_suffering': active_cases[0][0]
    },
    {
        'most_affected': affected_country[total_countries-1][5],
        'least_affected': affected_country[0][5],
        'highest_cases': sort_by_total_cases_df[total_countries-1][1],
        'lowest_cases': sort_by_total_cases_df[0][1],
        'total_cases': total_cases[0][0],
        'highest_efficiency': efficientdf[total_countries-1][5],
        'lowest_efficiency': efficientdf[0][5],
        'highest_suffering': active_cases[total_countries-1][4],
        'lowest_suffering': active_cases[0][4]
    }
]

with open(CODE_HOME + '/covid-analysis.csv', "w") as output_file:
    dict_writer = csv.DictWriter(output_file, analysis[0].keys())
    dict_writer.writeheader()
    dict_writer.writerows(analysis)

***

# Test REST APIs

In [54]:
import requests

host = "http://127.0.0.1:5000"


endpoints = [
    '/latest-covid-data',
    '/most-affected-country',
    '/least-affected-country',
    '/highest-cases',
    '/lowest-cases',
    '/total-cases',
    '/highest-covid-efficiency',
    '/lowest-covid-efficiency',
    '/highest-active-cases',
    '/lowest-active-cases'
]
payload={}
headers = {}

for endpoint in endpoints:
    url = host + endpoint
    response = requests.request("GET", url, headers=headers, data=payload)
    print('=============' + endpoint + '=============')
    print(response.text)


[{"countrycode": "AD", "country": "Andorra", "latitude": "42.5063", "longitude": "1.5218", "confirmed": "852", "deaths": "51", "recovered": "757", "active": "44"}, {"countrycode": "AE", "country": "United Arab Emirates", "latitude": "23.424076", "longitude": "53.847818", "confirmed": "39904", "deaths": "283", "recovered": "22740", "active": "16881"}, {"countrycode": "AF", "country": "Afghanistan", "latitude": "33.93911", "longitude": "67.709953", "confirmed": "21459", "deaths": "384", "recovered": "2651", "active": "18424"}, {"countrycode": "AG", "country": "Antigua and Barbuda", "latitude": "17.0608", "longitude": "-61.7964", "confirmed": "26", "deaths": "3", "recovered": "20", "active": "3"}, {"countrycode": "AL", "country": "Albania", "latitude": "41.1533", "longitude": "20.1683", "confirmed": "1299", "deaths": "34", "recovered": "960", "active": "305"}, {"countrycode": "AM", "country": "Armenia", "latitude": "40.0691", "longitude": "45.0382", "confirmed": "13675", "deaths": "217", 