In [1]:
!pip3 install pyarrow


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import pandas as pd

In [3]:
spark = SparkSession.builder.appName("Read COVID Data").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/04 00:15:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.csv('covid_data_countries.csv', header=True, inferSchema=True)

# Show DataFrame
df.show()

+-------------------+--------+------+---------+------------+--------------+
|            Country|   Cases|Deaths|Recovered|Active Cases|Critical Cases|
+-------------------+--------+------+---------+------------+--------------+
|        Afghanistan|  232152|  7985|   210164|       14003|             0|
|            Albania|  334863|  3605|   330233|        1025|             0|
|            Algeria|  272010|  6881|   183061|       82068|             0|
|            Andorra|   48015|   165|        0|       47850|             0|
|             Angola|  107327|  1937|   103419|        1971|             0|
|           Anguilla|    3904|    12|        0|        3892|             0|
|Antigua and Barbuda|    9106|   146|     8954|           6|             0|
|          Argentina|10094643|130733|  9963910|           0|             0|
|            Armenia|  451831|  8777|   435162|        7892|             0|
|              Aruba|   44224|   292|    42438|        1494|             2|
|          A

In [5]:
pandas_covid_data = df.collect()
pandas_covid_data = pd.DataFrame(pandas_covid_data, columns=df.columns)
pandas_covid_data

Unnamed: 0,Country,Cases,Deaths,Recovered,Active Cases,Critical Cases
0,Afghanistan,232152,7985,210164,14003,0
1,Albania,334863,3605,330233,1025,0
2,Algeria,272010,6881,183061,82068,0
3,Andorra,48015,165,0,47850,0
4,Angola,107327,1937,103419,1971,0
...,...,...,...,...,...,...
226,Wallis and Futuna,3550,8,438,3104,0
227,Western Sahara,10,1,9,0,0
228,Yemen,11945,2159,9124,662,0
229,Zambia,349304,4069,341316,3919,0


In [6]:
def calculate_death_to_cases_ratio(df):
    return df.withColumn("death_to_cases_ratio", col("Deaths") / col("Cases"))

calculate_death_to_cases_ratio(df)

DataFrame[Country: string, Cases: int, Deaths: int, Recovered: int, Active Cases: int, Critical Cases: int, death_to_cases_ratio: double]

In [7]:
# 2.1) Most affected country among all the countries ( total death/total covid cases).
def most_affected_country(df):
    df_with_ratio = calculate_death_to_cases_ratio(df)
    most_affected_country_data = df_with_ratio.orderBy(col("death_to_cases_ratio").desc()).first()
    return pd.DataFrame([most_affected_country_data.asDict()])

print("Most affected Country:",most_affected_country(df)['Country'][0])
most_affected_country(df)

Most affected Country: MS Zaandam


Unnamed: 0,Country,Cases,Deaths,Recovered,Active Cases,Critical Cases,death_to_cases_ratio
0,MS Zaandam,9,2,7,0,0,0.222222


In [8]:
# 2.2) Least affected country among all the countries ( total death/total covid cases).
def least_affected_country(df):
    df_with_ratio = calculate_death_to_cases_ratio(df)
    least_affected_country_data = df_with_ratio.orderBy(col("death_to_cases_ratio")).first()
    return pd.DataFrame([least_affected_country_data.asDict()])

print("Least affected Country:",least_affected_country(df)['Country'][0])
least_affected_country(df)

Least affected Country: Falkland Islands (Malvinas)


Unnamed: 0,Country,Cases,Deaths,Recovered,Active Cases,Critical Cases,death_to_cases_ratio
0,Falkland Islands (Malvinas),1930,0,1930,0,0,0.0


In [9]:
# 2.3) Country with highest covid cases.
def country_with_highest_cases(df):
    country_highest_cases = df.orderBy(col("Cases").desc()).first()
    return pd.DataFrame([country_highest_cases.asDict()])

print("Country with highest COVID cases:", country_with_highest_cases(df)['Country'][0])
country_with_highest_cases(df)

Country with highest COVID cases: USA


Unnamed: 0,Country,Cases,Deaths,Recovered,Active Cases,Critical Cases
0,USA,111769834,1218856,109712052,838926,1058


In [10]:
# 2.4) Country with minimum covid cases.
def country_with_minimum_cases(df):
    country_minimum_cases = df.orderBy(col("Cases")).first()
    return pd.DataFrame([country_minimum_cases.asDict()])

print("Country with minimum COVID cases:", country_with_minimum_cases(df)['Country'][0])
country_with_minimum_cases(df)

Country with minimum COVID cases: MS Zaandam


Unnamed: 0,Country,Cases,Deaths,Recovered,Active Cases,Critical Cases
0,MS Zaandam,9,2,7,0,0


In [11]:
# 2.5) Total cases.
def total_cases(df):
    total_cases = df.selectExpr("sum(Cases)").collect()[0][0]
    return pd.DataFrame([{"total_cases": total_cases}])

print("Total cases:", total_cases(df)['total_cases'][0])
total_cases(df)


Total cases: 704609872


Unnamed: 0,total_cases
0,704609872


In [12]:

# 2.6) Country that handled the covid most efficiently( total recovery/ total covid cases).
def most_efficient_country(df):
    df_with_ratio = df.withColumn("recovery_to_cases_ratio", col("Recovered") / col("Cases"))
    most_efficient_country = df_with_ratio.orderBy(col("recovery_to_cases_ratio").desc()).first()["Country"]
    return pd.DataFrame([{"most_efficient_country": most_efficient_country}])


print("Country that handled the COVID most efficiently:", most_efficient_country(df)['most_efficient_country'][0])
most_efficient_country(df)

Country that handled the COVID most efficiently: Falkland Islands (Malvinas)


Unnamed: 0,most_efficient_country
0,Falkland Islands (Malvinas)


In [13]:

# 2.7) Country that handled the covid least efficiently( total recovery/ total covid cases).
def least_efficient_country(df):
    df_with_ratio = df.withColumn("recovery_to_cases_ratio", col("Recovered") / col("Cases"))
    least_efficient_country = df_with_ratio.orderBy(col("recovery_to_cases_ratio")).first()["Country"]
    return pd.DataFrame([{"least_efficient_country": least_efficient_country}])

print("Country that handled the COVID least efficiently:", least_efficient_country(df)['least_efficient_country'][0])
least_efficient_country(df)


Country that handled the COVID least efficiently: Andorra


Unnamed: 0,least_efficient_country
0,Andorra


In [14]:
# 2.8) Country least suffering from covid ( least critical cases).
def country_least_critical_cases(df):
    country_least_critical_cases = df.orderBy(col("Critical Cases")).first()
    return pd.DataFrame([country_least_critical_cases.asDict()])

print("Country least suffering from COVID (least critical cases):", country_least_critical_cases(df)['Country'][0])
country_least_critical_cases(df)


Country least suffering from COVID (least critical cases): Afghanistan


Unnamed: 0,Country,Cases,Deaths,Recovered,Active Cases,Critical Cases
0,Afghanistan,232152,7985,210164,14003,0


In [15]:
# 2.9) Country still suffering from covid (highest critical cases).
def country_highest_critical_cases(df):
    country_highest_critical_cases = df.orderBy(col("Critical Cases").desc()).first()
    return pd.DataFrame([country_highest_critical_cases.asDict()])

print("Country still suffering from COVID (highest critical cases):", country_highest_critical_cases(df)['Country'][0])
country_highest_critical_cases(df)

Country still suffering from COVID (highest critical cases): USA


Unnamed: 0,Country,Cases,Deaths,Recovered,Active Cases,Critical Cases
0,USA,111769834,1218856,109712052,838926,1058


In [16]:
# Restful APIs
def handle_request(path):
    if path == '/get-covid-data':
        return pandas_covid_data
    elif path == '/most-affected-country':
        return most_affected_country(df)
    elif path == '/least-affected-country':
        return least_affected_country(df)
    elif path == '/country-highest-cases':
        return country_with_highest_cases(df)
    elif path == '/country-minimum-cases':
        return country_with_minimum_cases(df)
    elif path == '/total-cases':
        return total_cases(df)
    elif path == '/most-efficient-country':
        return most_efficient_country(df)
    elif path == '/least-efficient-country':
        return least_efficient_country(df)
    elif path == '/country-least-critical-cases':
        return country_least_critical_cases(df)
    elif path == '/country-highest-critical-cases':
        return country_highest_critical_cases(df)
    else:
        return None


class RequestHandler(BaseHTTPRequestHandler):

    def do_GET(self):
        if self.path == '/':
            # Send the HTML page
            self.send_response(200)
            self.send_header('Content-type', 'text/html')
            self.end_headers()
            with open('index.html', 'rb') as file:
                self.wfile.write(file.read())
        else:
            data = handle_request(self.path)
            if data is not None:
                if isinstance(data, pd.DataFrame):
                    response = data.to_json(orient='records')
                    self.send_response(200)
                    self.send_header('Content-type', 'application/json')
                    self.end_headers()
                    self.wfile.write(response.encode('utf-8'))
                else:
                    response = json.dumps(data)
                    self.send_response(200)
                    self.send_header('Content-type', 'application/json')
                    self.end_headers()
                    self.wfile.write(response.encode('utf-8'))
            else:
                self.send_response(404)
                self.end_headers()
                self.wfile.write(b'404 Not Found')


def run_server():
    server_address = ('', 8000)
    httpd = HTTPServer(server_address, RequestHandler)
    print('Starting server on port 8000...')
    print('Visit http://localhost:8000/ to access the API links.')

    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        httpd.server_close()
        print('Server stopped.')  

if __name__ == '__main__':
    run_server()


Starting server on port 8000...
Visit http://localhost:8000/ to access the API links.


24/04/04 00:15:23 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
127.0.0.1 - - [04/Apr/2024 00:16:44] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [04/Apr/2024 00:16:45] "GET /get-covid-data HTTP/1.1" 200 -


Server stopped.
