In [33]:
import findspark
findspark.init('C://spark-3.1.2-bin-hadoop3.2')
from pyspark.sql import SparkSession
import pyspark.sql.functions as F 
from pyspark.sql.types import StructType,StructField,StringType,BooleanType,DateType,IntegerType

import datetime

import pandas as pd
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
df = spark.read.option('header', True).csv('hdfs://localhost:9000/data/countries-aggregated.csv')
df.show(5)

+----------+-----------+---------+---------+------+
|      Date|    Country|Confirmed|Recovered|Deaths|
+----------+-----------+---------+---------+------+
|2020-01-22|Afghanistan|        0|        0|     0|
|2020-01-23|Afghanistan|        0|        0|     0|
|2020-01-24|Afghanistan|        0|        0|     0|
|2020-01-25|Afghanistan|        0|        0|     0|
|2020-01-26|Afghanistan|        0|        0|     0|
+----------+-----------+---------+---------+------+
only showing top 5 rows



In [5]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Confirmed: string (nullable = true)
 |-- Recovered: string (nullable = true)
 |-- Deaths: string (nullable = true)



In [6]:
df_clean = df.withColumn("Confirmed",df.Confirmed.cast(IntegerType())) \
            .withColumn("Recovered",df.Recovered.cast(IntegerType())) \
            .withColumn("Deaths",df.Deaths.cast(IntegerType()))\
            .withColumn("Date",df.Date.cast(DateType()))

In [7]:
df_clean.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Country: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Deaths: integer (nullable = true)



In [8]:
df_clean.describe().show()

+-------+-----------+------------------+------------------+-----------------+
|summary|    Country|         Confirmed|         Recovered|           Deaths|
+-------+-----------+------------------+------------------+-----------------+
|  count|     233415|            233415|            233415|           233415|
|   mean|       null|398886.92581453634|201284.89042263778|9243.440421566738|
| stddev|       null|  2123927.98766423|1142065.0017284749|41833.15158983137|
|    min|Afghanistan|                 0|                 0|                0|
|    max|   Zimbabwe|          41221266|          30974748|           662106|
+-------+-----------+------------------+------------------+-----------------+



In [9]:
df_clean.select('Country').distinct().sort('Country').show(200,False)

+--------------------------------+
|Country                         |
+--------------------------------+
|Afghanistan                     |
|Albania                         |
|Algeria                         |
|Andorra                         |
|Angola                          |
|Antigua and Barbuda             |
|Argentina                       |
|Armenia                         |
|Australia                       |
|Austria                         |
|Azerbaijan                      |
|Bahamas                         |
|Bahrain                         |
|Bangladesh                      |
|Barbados                        |
|Belarus                         |
|Belgium                         |
|Belize                          |
|Benin                           |
|Bhutan                          |
|Bolivia                         |
|Bosnia and Herzegovina          |
|Botswana                        |
|Brazil                          |
|Brunei                          |
|Bulgaria           

In [50]:
df_clean.createOrReplaceTempView("total_cases")

total_cases = spark.sql('''
          SELECT Country,Max(Confirmed) AS Confirmed,MAX(Recovered) AS Recovered,MAX(Deaths) AS Deaths
          FROM total_cases
          GROUP BY Country
          ORDER BY Country;
          ''')
#df_clean.groupby('Country').agg(max('Confirmed'),max('Recovered'),max('Deaths')).orderBy(df.Country).show(200,False)

In [51]:
confirmed,deaths,recovered = total_cases.agg(F.sum("Confirmed"),F.sum("Deaths"),F.sum("Recovered")).collect()[0]
country = "Global"

In [52]:
data_row = [(country,confirmed,recovered,deaths)]
schema = StructType([StructField("Country",StringType(),True),\
                     StructField("Confirmed",IntegerType(),True),\
                     StructField("Recovered",IntegerType(),True),\
                     StructField("Deaths", IntegerType(), True), \
                    ])
df_global_row = spark.createDataFrame(data=data_row,schema=schema)
df_global_row.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Deaths: integer (nullable = true)



In [53]:
total_cases = total_cases.union(df_global_row)

In [56]:
total_cases.show(5)

+-----------+---------+---------+------+
|    Country|Confirmed|Recovered|Deaths|
+-----------+---------+---------+------+
|Afghanistan|   154094|    82586|  7169|
|    Albania|   157436|   130314|  2548|
|    Algeria|   200301|   118409|  5596|
|    Andorra|    15096|    14380|   130|
|     Angola|    50738|    39582|  1345|
+-----------+---------+---------+------+
only showing top 5 rows



In [57]:
df_clean.createOrReplaceTempView("global_cases")

global_cases = spark.sql('''
          SELECT Date,SUM(Confirmed) AS Confirmed,SUM(Recovered) AS Recovered,SUM(Deaths) AS Deaths
          FROM global_cases
          GROUP BY Date
          ORDER BY Date;
          ''')

In [58]:
global_cases.show(5)

+----------+---------+---------+------+
|      Date|Confirmed|Recovered|Deaths|
+----------+---------+---------+------+
|2020-01-22|     1114|       60|    34|
|2020-01-23|     1310|       64|    36|
|2020-01-24|     1882|       78|    52|
|2020-01-25|     2866|       84|    84|
|2020-01-26|     4236|      112|   112|
+----------+---------+---------+------+
only showing top 5 rows



In [68]:
# Database Properties

import psycopg2
from sqlalchemy import create_engine

engine = create_engine("postgresql+psycopg2://postgres:Sarthak#123@127.0.0.1/covid_analysis?client_encoding=utf8")

def to_db(df,table_name):
    df = df.toPandas()
    df.to_sql(table_name, engine, index=False, if_exists='replace')
    print("Successfully Created",table_name)

to_db(total_cases,'total_cases')
to_db(global_cases,'global_cases')
to_db(df_clean,'master-table')

Successfully Created total_cases
Successfully Created global_cases
Successfully Created master-table


In [67]:
pdf = pd.read_sql('select * from total_cases', engine)
pdf

Unnamed: 0,Country,Confirmed,Recovered,Deaths
0,Afghanistan,154094,82586,7169
1,Albania,157436,130314,2548
2,Algeria,200301,118409,5596
3,Andorra,15096,14380,130
4,Angola,50738,39582,1345
...,...,...,...,...
191,West Bank and Gaza,372108,312320,3831
192,Yemen,8452,4251,1604
193,Zambia,207960,189658,3633
194,Zimbabwe,126399,82994,4543


In [86]:
# df_new = df.repartition(10)
# print(df_new.rdd.getNumPartitions())
# df_new.write.mode("overwrite").csv("data/example.csv", header=True)