<a href="https://colab.research.google.com/github/srivatsan88/Mastering-Apache-Spark/blob/master/Spark_Transformations_and_Actions_Part_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-preview2-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
!ls /usr/lib/jvm/

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-preview2-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
import sys,tempfile, urllib
from pyspark.sql.functions import *

In [0]:
BASE_DIR = '/tmp'
CORONA_DATA_FILE = os.path.join(BASE_DIR, 'corona_data.csv')

In [0]:
corona_data=urllib.request.urlretrieve('https://raw.githubusercontent.com/srivatsan88/YouTubeLI/master/dataset/coronavirus/corona_dataset_latest.csv', CORONA_DATA_FILE)

In [0]:
!ls /tmp

In [0]:
corona_df = spark.read.option("inferSchema", "true").csv("/tmp/corona_data.csv", header=True)

In [0]:
corona_df.show()

+---+----------------+--------------------+--------+---------+----------+---------+-----+---------+--------------------+----+
|_c0|           State|             Country|     Lat|     Long|      Date|Confirmed|Death|Recovered|       state_cleaned|City|
+---+----------------+--------------------+--------+---------+----------+---------+-----+---------+--------------------+----+
|  0|            null|            Thailand|    15.0|    101.0|2020-01-22|        2|    0|        0|             Bangkok|null|
|  1|            null|               Japan|    36.0|    138.0|2020-01-22|        2|    0|        0|             Hiraide|null|
|  2|            null|           Singapore|  1.2833| 103.8333|2020-01-22|        0|    0|        0|           Singapore|null|
|  3|            null|               Nepal| 28.1667|    84.25|2020-01-22|        0|    0|        0|           Kathmandu|null|
|  4|            null|            Malaysia|     2.5|    112.5|2020-01-22|        0|    0|        0|             Sarawa

In [0]:
corona_df.count()

28143

In [0]:
import pyspark.sql.functions as F
corona_max_df=corona_df.join(corona_df.groupBy("Country","State_cleaned").agg(F.max("Date").alias("Date")),on=['Country', 'State_cleaned','Date'],how="inner")

In [0]:
corona_max_df.show()

+--------------------+--------------------+----------+-----+----------------+--------+---------+---------+-----+---------+----+
|             Country|       state_cleaned|      Date|  _c0|           State|     Lat|     Long|Confirmed|Death|Recovered|City|
+--------------------+--------------------+----------+-----+----------------+--------+---------+---------+-----+---------+----+
|            Thailand|             Bangkok|2020-03-20|27666|            null|    15.0|    101.0|      322|    1|       42|null|
|               Japan|             Hiraide|2020-03-20|27667|            null|    36.0|    138.0|      963|   33|      191|null|
|           Singapore|           Singapore|2020-03-20|27668|            null|  1.2833| 103.8333|      385|    0|      124|null|
|               Nepal|           Kathmandu|2020-03-20|27669|            null| 28.1667|    84.25|        1|    0|        1|null|
|            Malaysia|             Sarawak|2020-03-20|27670|            null|     2.5|    112.5|     103

In [0]:
corona_max_df.select("Country","State_cleaned","Confirmed","Recovered").filter(col("Country").isin('Australia','China')).groupBy('Country').sum().show(100)

+---------+--------------+--------------+
|  Country|sum(Confirmed)|sum(Recovered)|
+---------+--------------+--------------+
|Australia|           791|            26|
|    China|         81250|         71266|
+---------+--------------+--------------+



In [0]:
corona_max_df.select("Country","State_cleaned","Confirmed","Recovered").filter(col("Country").isin('Australia','China')).cube("Country").sum().sort(asc("Country")).show(100)

+---------+--------------+--------------+
|  Country|sum(Confirmed)|sum(Recovered)|
+---------+--------------+--------------+
|     null|         82041|         71292|
|Australia|           791|            26|
|    China|         81250|         71266|
+---------+--------------+--------------+



In [0]:
corona_max_df.select("Country","State_cleaned","Confirmed","Recovered").filter(col("Country").isin('Australia','Canada')).cube("Country","State_cleaned").sum().sort(asc("Country")).show(100)

+---------+--------------------+--------------+--------------+
|  Country|       State_cleaned|sum(Confirmed)|sum(Recovered)|
+---------+--------------------+--------------+--------------+
|     null|          Queensland|           184|             8|
|     null|            Manitoba|            17|             0|
|     null|  Northern Territory|             3|             0|
|     null|      Grand Princess|            10|             0|
|     null|   Western Australia|            64|             0|
|     null|        Saskatchewan|            20|             0|
|     null|            Tasmania|            10|             3|
|     null|From Diamond Prin...|             0|             0|
|     null|    British Columbia|           271|             4|
|     null|         Nova Scotia|            15|             0|
|     null|Newfoundland and ...|             4|             0|
|     null|             Ontario|           308|             5|
|     null|Australian Capita...|             6|        

In [0]:
corona_max_df.select("Country","State_cleaned","Confirmed","Recovered").filter(col("Country").isin('Australia','Canada')).rollup("Country","State_cleaned").sum().sort(asc("Country")).show(100)

+---------+--------------------+--------------+--------------+
|  Country|       State_cleaned|sum(Confirmed)|sum(Recovered)|
+---------+--------------------+--------------+--------------+
|     null|                null|          1734|            35|
|Australia|  Northern Territory|             3|             0|
|Australia|            Tasmania|            10|             3|
|Australia|     New South Wales|           353|             4|
|Australia|Australian Capita...|             6|             0|
|Australia|   Western Australia|            64|             0|
|Australia|            Victoria|           121|             8|
|Australia|                null|           791|            26|
|Australia|From Diamond Prin...|             0|             0|
|Australia|     South Australia|            50|             3|
|Australia|          Queensland|           184|             8|
|   Canada|       New Brunswick|            11|             0|
|   Canada|        Saskatchewan|            20|        

In [0]:
corona_max_df.corr('Confirmed','Recovered')

0.8052934236742176

In [0]:
corona_max_df.cache()

DataFrame[Country: string, state_cleaned: string, Date: string, _c0: int, State: string, Lat: double, Long: double, Confirmed: int, Death: int, Recovered: int, City: string]

In [0]:
%timeit corona_max_df.count()

The slowest run took 14.90 times longer than the fastest. This could mean that an intermediate result is being cached.
1 loop, best of 3: 66.5 ms per loop


In [0]:
from pyspark import StorageLevel
corona_max_df.persist(StorageLevel.MEMORY_AND_DISK)


DataFrame[Country: string, state_cleaned: string, Date: string, _c0: int, State: string, Lat: double, Long: double, Confirmed: int, Death: int, Recovered: int, City: string]

In [0]:
%timeit corona_max_df.count()

10 loops, best of 3: 63.1 ms per loop


In [0]:
pd=corona_df.toPandas()

In [0]:
pd.corr()

Unnamed: 0,_c0,Lat,Long,Confirmed,Death,Recovered
_c0,1.0,-0.001594,-0.005303,0.046789,0.043155,0.042857
Lat,-0.001594,1.0,-0.383389,0.008531,0.007041,0.000778
Long,-0.005303,-0.383389,1.0,0.098893,0.074918,0.079047
Confirmed,0.046789,0.008531,0.098893,1.0,0.963376,0.853924
Death,0.043155,0.007041,0.074918,0.963376,1.0,0.857292
Recovered,0.042857,0.000778,0.079047,0.853924,0.857292,1.0


In [0]:
corona_max_df.createOrReplaceTempView("corona")

In [0]:
spark.sql("select * from corona").show()

+--------------------+--------------------+----------+-----+----------------+--------+---------+---------+-----+---------+----+
|             Country|       state_cleaned|      Date|  _c0|           State|     Lat|     Long|Confirmed|Death|Recovered|City|
+--------------------+--------------------+----------+-----+----------------+--------+---------+---------+-----+---------+----+
|            Thailand|             Bangkok|2020-03-20|27666|            null|    15.0|    101.0|      322|    1|       42|null|
|               Japan|             Hiraide|2020-03-20|27667|            null|    36.0|    138.0|      963|   33|      191|null|
|           Singapore|           Singapore|2020-03-20|27668|            null|  1.2833| 103.8333|      385|    0|      124|null|
|               Nepal|           Kathmandu|2020-03-20|27669|            null| 28.1667|    84.25|        1|    0|        1|null|
|            Malaysia|             Sarawak|2020-03-20|27670|            null|     2.5|    112.5|     103

In [0]:
spark.sql("select * from corona where Country in ('Australia','Canada') order by Country").show()

+---------+--------------------+----------+-----+--------------------+------------------+---------+---------+-----+---------+----+
|  Country|       state_cleaned|      Date|  _c0|               State|               Lat|     Long|Confirmed|Death|Recovered|City|
+---------+--------------------+----------+-----+--------------------+------------------+---------+---------+-----+---------+----+
|Australia|     New South Wales|2020-03-20|27672|     New South Wales|          -33.8688| 151.2093|      353|    6|        4|null|
|Australia|            Victoria|2020-03-20|27673|            Victoria|          -37.8136| 144.9631|      121|    0|        8|null|
|Australia|   Western Australia|2020-03-20|27715|   Western Australia|          -31.9505| 115.8605|       64|    1|        0|null|
|Australia|          Queensland|2020-03-20|27674|          Queensland|          -28.0167|    153.4|      184|    0|        8|null|
|Australia|            Tasmania|2020-03-20|27727|            Tasmania|          -41

In [0]:
spark.sql("select * from corona where Country in ('Australia','Canada') order by Country").show()

+---------+--------------------+----------+-----+--------------------+------------------+---------+---------+-----+---------+----+
|  Country|       state_cleaned|      Date|  _c0|               State|               Lat|     Long|Confirmed|Death|Recovered|City|
+---------+--------------------+----------+-----+--------------------+------------------+---------+---------+-----+---------+----+
|Australia|     New South Wales|2020-03-20|27672|     New South Wales|          -33.8688| 151.2093|      353|    6|        4|null|
|Australia|            Victoria|2020-03-20|27673|            Victoria|          -37.8136| 144.9631|      121|    0|        8|null|
|Australia|   Western Australia|2020-03-20|27715|   Western Australia|          -31.9505| 115.8605|       64|    1|        0|null|
|Australia|          Queensland|2020-03-20|27674|          Queensland|          -28.0167|    153.4|      184|    0|        8|null|
|Australia|            Tasmania|2020-03-20|27727|            Tasmania|          -41

In [0]:
spark.sql("select Country, State_cleaned, SUM(Confirmed), SUM(Recovered) from corona where Country in ('Australia','Canada') GROUP BY ROLLUP (Country, State_cleaned) order by Country").show()

+---------+--------------------+--------------+--------------+
|  Country|       State_cleaned|sum(Confirmed)|sum(Recovered)|
+---------+--------------------+--------------+--------------+
|     null|                null|          1734|            35|
|Australia|  Northern Territory|             3|             0|
|Australia|     New South Wales|           353|             4|
|Australia|          Queensland|           184|             8|
|Australia|   Western Australia|            64|             0|
|Australia|Australian Capita...|             6|             0|
|Australia|                null|           791|            26|
|Australia|            Tasmania|            10|             3|
|Australia|            Victoria|           121|             8|
|Australia|From Diamond Prin...|             0|             0|
|Australia|     South Australia|            50|             3|
|   Canada|        Saskatchewan|            20|             0|
|   Canada|       New Brunswick|            11|        

In [0]:
spark.sql("select Country, State_cleaned, SUM(Confirmed), SUM(Recovered) from corona where Country in ('Australia','Canada') GROUP BY CUBE (Country, State_cleaned) order by Country").show()

+-------+--------------------+--------------+--------------+
|Country|       State_cleaned|sum(Confirmed)|sum(Recovered)|
+-------+--------------------+--------------+--------------+
|   null|            Victoria|           121|             8|
|   null|     South Australia|            50|             3|
|   null|       New Brunswick|            11|             0|
|   null|         Nova Scotia|            15|             0|
|   null|     New South Wales|           353|             4|
|   null|  Northern Territory|             3|             0|
|   null|Australian Capita...|             6|             0|
|   null|          Queensland|           184|             8|
|   null|        Saskatchewan|            20|             0|
|   null|Newfoundland and ...|             4|             0|
|   null|                null|          1734|            35|
|   null|            Tasmania|            10|             3|
|   null|    British Columbia|           271|             4|
|   null|             On