Schema Definition

In [166]:
#Biulding a Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CRUD").getOrCreate()

# Reading CSV File for Performing Transformations
country_latest = spark.read.csv("country_wise_latest.csv", header = True, inferSchema = True)

country_latest.show()

+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           6

In [167]:
#printing the schema 
country_latest.printSchema()



root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)
 |-- Deaths / 100 Cases: double (nullable = true)
 |-- Recovered / 100 Cases: double (nullable = true)
 |-- Deaths / 100 Recovered: string (nullable = true)
 |-- Confirmed last week: integer (nullable = true)
 |-- 1 week change: integer (nullable = true)
 |-- 1 week % increase: double (nullable = true)
 |-- WHO Region: string (nullable = true)



SELECT COMMAND

In [168]:
#Select the specific columns
country_latest.select("Country/Region","Confirmed","Deaths").show(10)

+-------------------+---------+------+
|     Country/Region|Confirmed|Deaths|
+-------------------+---------+------+
|        Afghanistan|    36263|  1269|
|            Albania|     4880|   144|
|            Algeria|    27973|  1163|
|            Andorra|      907|    52|
|             Angola|      950|    41|
|Antigua and Barbuda|       86|     3|
|          Argentina|   167416|  3059|
|            Armenia|    37390|   711|
|          Australia|    15303|   167|
|            Austria|    20558|   713|
+-------------------+---------+------+
only showing top 10 rows



FILTER

In [169]:
#Countries with deaths > 10000
country_latest.filter(country_latest["Deaths"] > 10000).show()
##Countries where the number of active cases is greater than recovered cases.
country_latest.filter(country_latest['Active'] > country_latest['Recovered']).show()





+--------------+---------+------+---------+-------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|Country/Region|Confirmed|Deaths|Recovered| Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+--------------+---------+------+---------+-------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|        Brazil|  2442375| 87618|  1846641| 508116|    23284|       614|        33728|              3.59|                75.61|                  4.74|            2118646|       323729|            15.28|            Americas|
|        France|   220352| 30212|    81212| 108928|     2551|        17|          267|             13.71

In [170]:
#Countries where the confirmed cases are between 50,000 and 100,000 (inclusive).
country_latest.filter((country_latest["Confirmed"] >= 50000) & (country_latest["Confirmed"] <= 100000)).show()

+--------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|      Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+--------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|             Belarus|    67251|   538|    60492|  6221|      119|         4|           67|               0.8|                89.95|                  0.89|              66213|         1038|             1.57|              Europe|
|             Belgium|    66428|  9822|    17452| 39154|      402|         1|       

WITH COLUMN

In [171]:
from pyspark.sql.functions import *


country_latest = country_latest.withColumn(
    "DeathRate",
    when(col("Confirmed") != 0, col("Deaths") / col("Confirmed")).otherwise(0)
)

# Then show the result
country_latest.show()


+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|           DeathRate|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...| 0.03499434685492099|
|   

SORT

In [172]:
from pyspark.sql.functions import *
country_latest.orderBy(col('Confirmed').desc()).show(10)


+--------------+---------+------+---------+-------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+--------------------+
|Country/Region|Confirmed|Deaths|Recovered| Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|           DeathRate|
+--------------+---------+------+---------+-------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+--------------------+
|            US|  4290259|148011|  1325804|2816444|    56336|      1076|        27941|              3.45|                 30.9|                 11.16|            3834677|       455582|            11.88|            Americas|0.034499315775574385|
|        Brazil|  24

DROP

In [173]:
country_latest.drop(" DeathRate").show()


+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|           DeathRate|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...| 0.03499434685492099|
|   

 UNION AND UNION BY NAME

In [174]:
# Reading CSV File for Performing Transformations
world_meter= spark.read.csv("worldometer_data.csv", header = True, inferSchema = True)

world_meter.show()

country_latest.union(world_meter).show()

+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------------+
|Country/Region|    Continent|Population|TotalCases|NewCases|TotalDeaths|NewDeaths|TotalRecovered|NewRecovered|ActiveCases|Serious,Critical|Tot Cases/1M pop|Deaths/1M pop|TotalTests|Tests/1M pop|          WHO Region|
+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------------+
|           USA|North America| 331198130|   5032179|    NULL|     162804|     NULL|       2576668|        NULL|    2292707|           18296|           15194|        492.0|  63139605|      190640|            Americas|
|        Brazil|South America| 212710692|   2917562|    NULL|      98644|     NULL|       2047660|        NULL|     771258|         

GROUP BY

In [175]:
from pyspark.sql.functions import sum

country_latest.groupBy("Country/Region").agg(
    sum("Confirmed").alias("TotalConfirmed"),
    sum("Deaths").alias("TotalDeaths"),
    sum("Recovered").alias("TotalRecovered")
).orderBy("TotalConfirmed", ascending=False).show()


+--------------+--------------+-----------+--------------+
|Country/Region|TotalConfirmed|TotalDeaths|TotalRecovered|
+--------------+--------------+-----------+--------------+
|            US|       4290259|     148011|       1325804|
|        Brazil|       2442375|      87618|       1846641|
|         India|       1480073|      33408|        951166|
|        Russia|        816680|      13334|        602249|
|  South Africa|        452529|       7067|        274925|
|        Mexico|        395489|      44022|        303810|
|          Peru|        389717|      18418|        272547|
|         Chile|        347923|       9187|        319954|
|United Kingdom|        301708|      45844|          1437|
|          Iran|        293606|      15912|        255144|
|      Pakistan|        274289|       5842|        241026|
|         Spain|        272421|      28432|        150376|
|  Saudi Arabia|        268934|       2760|        222936|
|      Colombia|        257101|       8777|        13116