* @Author: Sankar
* @Date: 2021-05-13 07:52:25
* @Last Modified by: Sankar
* @Last Modified time: 2021-05-14 19:58:09
* @Title : Preprocessing of worldometer.csv  data

In [1]:
from pyspark.sql import *
from pyspark.sql.types import *

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7,application_1620973165995_0022,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
from pyspark.sql.functions import UserDefinedFunction, col, isnan, when, col, count, isnull, mean

In [3]:
spark_df = spark.read.csv('wasbs://hadoopcluster@mydemostorage1234.blob.core.windows.net/raw_input/worldometer_data.csv', header=True, inferSchema=True)

In [4]:
spark_df.describe()

DataFrame[summary: string, Country/Region: string, Continent: string, Population: string, TotalCases: string, NewCases: string, TotalDeaths: string, NewDeaths: string, TotalRecovered: string, NewRecovered: string, ActiveCases: string, Serious,Critical: string, Tot Cases/1M pop: string, Deaths/1M pop: string, TotalTests: string, Tests/1M pop: string, WHO Region: string]

In [5]:
spark_df.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- TotalCases: integer (nullable = true)
 |-- NewCases: integer (nullable = true)
 |-- TotalDeaths: integer (nullable = true)
 |-- NewDeaths: integer (nullable = true)
 |-- TotalRecovered: integer (nullable = true)
 |-- NewRecovered: integer (nullable = true)
 |-- ActiveCases: integer (nullable = true)
 |-- Serious,Critical: integer (nullable = true)
 |-- Tot Cases/1M pop: integer (nullable = true)
 |-- Deaths/1M pop: double (nullable = true)
 |-- TotalTests: integer (nullable = true)
 |-- Tests/1M pop: integer (nullable = true)
 |-- WHO Region: string (nullable = true)

In [6]:
spark_df.head(5)

[Row(Country/Region=u'USA', Continent=u'North America', Population=331198130, TotalCases=5032179, NewCases=None, TotalDeaths=162804, NewDeaths=None, TotalRecovered=2576668, NewRecovered=None, ActiveCases=2292707, Serious,Critical=18296, Tot Cases/1M pop=15194, Deaths/1M pop=492.0, TotalTests=63139605, Tests/1M pop=190640, WHO Region=u'Americas'), Row(Country/Region=u'Brazil', Continent=u'South America', Population=212710692, TotalCases=2917562, NewCases=None, TotalDeaths=98644, NewDeaths=None, TotalRecovered=2047660, NewRecovered=None, ActiveCases=771258, Serious,Critical=8318, Tot Cases/1M pop=13716, Deaths/1M pop=464.0, TotalTests=13206188, Tests/1M pop=62085, WHO Region=u'Americas'), Row(Country/Region=u'India', Continent=u'Asia', Population=1381344997, TotalCases=2025409, NewCases=None, TotalDeaths=41638, NewDeaths=None, TotalRecovered=1377384, NewRecovered=None, ActiveCases=606387, Serious,Critical=8944, Tot Cases/1M pop=1466, Deaths/1M pop=30.0, TotalTests=22149351, Tests/1M pop=

In [7]:
# Helper function to drop unused columns and rename interesting columns.
def selectInterestingColumns(rawDf):
    # Mapping column index to name.
    columnNames = {0: "Country", 1:"Continent", 2:"Population", 3:"TotalCases", 5:"TotalDeaths",
                   7:"TotalRecovered", 9: "ActiveCases"} 
    # Rename column from 'data' to something meaningful
    cols = [col(rawDf.columns[i]).alias(columnNames[i]) for i in columnNames.keys()]
    
    # Drop columns we are not using.
    df = rawDf.select(cols)
    
    return df

In [8]:
df = selectInterestingColumns(spark_df).cache()
df.count()

209

In [9]:
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- TotalCases: integer (nullable = true)
 |-- TotalDeaths: integer (nullable = true)
 |-- TotalRecovered: integer (nullable = true)
 |-- ActiveCases: integer (nullable = true)

In [10]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------+---------+----------+----------+-----------+--------------+-----------+
|Country|Continent|Population|TotalCases|TotalDeaths|TotalRecovered|ActiveCases|
+-------+---------+----------+----------+-----------+--------------+-----------+
|      0|        1|         1|         0|         21|             4|          4|
+-------+---------+----------+----------+-----------+--------------+-----------+

In [11]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------+---------+----------+----------+-----------+--------------+-----------+
|Country|Continent|Population|TotalCases|TotalDeaths|TotalRecovered|ActiveCases|
+-------+---------+----------+----------+-----------+--------------+-----------+
|      0|        1|         1|         0|         21|             4|          4|
+-------+---------+----------+----------+-----------+--------------+-----------+

In [12]:
# Check the row which is null
df.filter(col("Continent").isNull()).show()

+----------------+---------+----------+----------+-----------+--------------+-----------+
|         Country|Continent|Population|TotalCases|TotalDeaths|TotalRecovered|ActiveCases|
+----------------+---------+----------+----------+-----------+--------------+-----------+
|Diamond Princess|     null|      null|       712|         13|           651|         48|
+----------------+---------+----------+----------+-----------+--------------+-----------+

In [13]:
# Diamond Princess is cruise ship and it does not fall under any countries, therefore we will remove this data
df = df.na.drop()
df.count()

183

In [14]:
df.show()

+------------+-------------+----------+----------+-----------+--------------+-----------+
|     Country|    Continent|Population|TotalCases|TotalDeaths|TotalRecovered|ActiveCases|
+------------+-------------+----------+----------+-----------+--------------+-----------+
|         USA|North America| 331198130|   5032179|     162804|       2576668|    2292707|
|      Brazil|South America| 212710692|   2917562|      98644|       2047660|     771258|
|       India|         Asia|1381344997|   2025409|      41638|       1377384|     606387|
|      Russia|       Europe| 145940924|    871894|      14606|        676357|     180931|
|South Africa|       Africa|  59381566|    538184|       9604|        387316|     141264|
|      Mexico|North America| 129066160|    462690|      50517|        308848|     103325|
|        Peru|South America|  33016319|    455409|      20424|        310337|     124648|
|       Chile|South America|  19132514|    366671|       9889|        340168|      16614|
|    Colom

In [15]:
df.write.csv('wasbs://hadoopcluster@mydemostorage1234.blob.core.windows.net/processed/worldometer.csv')

In [16]:
df.write.saveAsTable("hvcountrywise")