
# `Running Pyspark in Colab`

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. 
Follow the steps to install the dependencies:

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
!ls /usr/lib/jvm

default-java		   java-11-openjdk-amd64     java-8-openjdk-amd64
java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64


Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

Run a local spark session to test your installation:

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
import sys, tempfile, urllib
from pyspark.sql.functions import *

### Stopping the session

In [6]:
# spark.stop()

#### `Load the Datasets`

In [7]:
## Downloading data in 'tmp' directory with filename 'corona_data.csv'
BASE_DIR = '/tmp'
CORONA_DATA_FILE = os.path.join(BASE_DIR, 'corona_data.csv')

In [8]:
corona_data = urllib.request.urlretrieve('https://github.com/srivatsan88/YouTubeLI/blob/master/dataset/coronavirus/corona_dataset_latest.csv', 
                                         CORONA_DATA_FILE)

In [9]:
## Downloading data in 'tmp' directory with filename 'twitter_data.csv'
BASE_DIR = '/tmp'
TWITTER_DATA_FILE = os.path.join(BASE_DIR, 'twitter_data.csv')

In [10]:
twitter_data = urllib.request.urlretrieve('https://github.com/srivatsan88/YouTubeLI/blob/master/dataset/coronavirus/tweets.csv', 
                                          TWITTER_DATA_FILE)

In [11]:
# check
!ls /tmp

blockmgr-8084cbd6-aa46-4fc8-9f9e-bcffcc348d0f
corona_data.csv
hsperfdata_root
spark-65dcd619-e2ff-4d0a-a4df-69544c35d3d2
spark-e069c2fa-a8c6-4e63-874a-2450f0c18d05
twitter_data.csv


In [13]:
# Load the data
from google.colab import files
uploaded = files.upload()

Saving corona_dataset_latest.csv to corona_dataset_latest (1).csv
Saving tweets.csv to tweets (1).csv


In [14]:
##
corona_df = spark.read.options(header=True, inferSchema=True).csv("corona_dataset_latest.csv")
corona_df.show(5)
print(corona_df.count()) # count() is "Action"

+---+-----+---------+-------+--------+-------------------+---------+-----+---------+-------------+----+
|_c0|State|  Country|    Lat|    Long|               Date|Confirmed|Death|Recovered|state_cleaned|City|
+---+-----+---------+-------+--------+-------------------+---------+-----+---------+-------------+----+
|  0| null| Thailand|   15.0|   101.0|2020-01-22 00:00:00|        2|    0|        0|      Bangkok|null|
|  1| null|    Japan|   36.0|   138.0|2020-01-22 00:00:00|        2|    0|        0|      Hiraide|null|
|  2| null|Singapore| 1.2833|103.8333|2020-01-22 00:00:00|        0|    0|        0|    Singapore|null|
|  3| null|    Nepal|28.1667|   84.25|2020-01-22 00:00:00|        0|    0|        0|    Kathmandu|null|
|  4| null| Malaysia|    2.5|   112.5|2020-01-22 00:00:00|        0|    0|        0|      Sarawak|null|
+---+-----+---------+-------+--------+-------------------+---------+-----+---------+-------------+----+
only showing top 5 rows

28143


In [15]:
##
twitter_df = spark.read.option("inferSchema", "true").csv("tweets.csv", header=True)
twitter_df.show(5)
print(twitter_df.count())

+---+----+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|_c0| geo|                text|                user|         location|            entities|           sentiment|             country|
+---+----+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|  0|null|What is God sayin...|          petodinice|            Lagos|[('about #', 'CAR...|{'neg': 0.0, 'neu...|             Nigeria|
|  1|null|"BREAKING: ""this...| but i took the t...|             -… "|     JerryfranksonJF|      Abuja, Nigeria|"[(""Arsenal's Mi...|
|  2|null| #Coronavirus tes...|              cek422|Pennsylvania, USA|                  []|{'neg': 0.173, 'n...|                 USA|
|  3|null| Get ready for ma...|        InfectiousDz|              NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|                 USA|
|  4|null| The #coronavirus...|          vic_gibson|          

In [16]:
##
twitter_df.filter("country = 'USA'") # Transformation will create a execution plan
twitter_df.filter("country = 'USA'").show(10) # show() is 'Action' -- execute the transformation

+---+----+--------------------+---------------+--------------------+--------------------+--------------------+-------+
|_c0| geo|                text|           user|            location|            entities|           sentiment|country|
+---+----+--------------------+---------------+--------------------+--------------------+--------------------+-------+
|  2|null| #Coronavirus tes...|         cek422|   Pennsylvania, USA|                  []|{'neg': 0.173, 'n...|    USA|
|  3|null| Get ready for ma...|   InfectiousDz|                 NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|    USA|
|  5|null| COVID-19 update ...| StewartNgilana|Durban | Port Eli...|[('Italy', 'GPE')...|{'neg': 0.178, 'n...|    USA|
|  6|null| It’s painful to ...|      BWheatnyc|             Florida|                  []|{'neg': 0.098, 'n...|    USA|
|  8|null| Questions about ...|    straightj23|        Columbus, OH|[('NAfME', 'CARDI...|{'neg': 0.0, 'neu...|    USA|
| 13|null| We’re the heck w...| harrytiffanyiv| 

In [17]:
twitter_df.filter("country = 'USA' and location like 'New%'").show(10) # show() is 'Action'

+---+----+--------------------+---------------+------------------+--------------------+--------------------+-------+
|_c0| geo|                text|           user|          location|            entities|           sentiment|country|
+---+----+--------------------+---------------+------------------+--------------------+--------------------+-------+
| 31|null| I ordered Alex J...|       rcgillan|     New York, USA|[('Alex Jones', '...|{'neg': 0.109, 'n...|    USA|
| 49|null| This week we are...|  JamesWithers3|New York, New York|[('This week', 'D...|{'neg': 0.0, 'neu...|    USA|
|228|null|This is a very co...|baskingntheGlow|     New York City|[('hourly', 'TIME')]|{'neg': 0.12, 'ne...|    USA|
|238|null|I’m reposting thi...|   Veronicaromm|   New Jersey, USA|[('English', 'LAN...|{'neg': 0.0, 'neu...|    USA|
|261|null|Too early ...?  #...|      HJeppesen|      New York, NY|                ['']|{'neg': 0.0, 'neu...|    USA|
|275|null|  The Republican ...|  GenetBataiile|New Hampshire, US

In [18]:
## 
tw_filter_df = twitter_df.filter("country='USA'")
tw_filter_df.explain()

== Physical Plan ==
*(1) Project [_c0#103, geo#104, text#105, user#106, location#107, entities#108, sentiment#109, country#110]
+- *(1) Filter (isnotnull(country#110) && (country#110 = USA))
   +- *(1) FileScan csv [_c0#103,geo#104,text#105,user#106,location#107,entities#108,sentiment#109,country#110] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/content/tweets.csv], PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,USA)], ReadSchema: struct<_c0:int,geo:string,text:string,user:string,location:string,entities:string,sentiment:strin...


In [19]:
tw_filter_df = twitter_df.filter("country = 'USA' and location like '%New%'")
tw_filter_df.explain()

== Physical Plan ==
*(1) Project [_c0#103, geo#104, text#105, user#106, location#107, entities#108, sentiment#109, country#110]
+- *(1) Filter (((isnotnull(country#110) && isnotnull(location#107)) && (country#110 = USA)) && Contains(location#107, New))
   +- *(1) FileScan csv [_c0#103,geo#104,text#105,user#106,location#107,entities#108,sentiment#109,country#110] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/content/tweets.csv], PartitionFilters: [], PushedFilters: [IsNotNull(country), IsNotNull(location), EqualTo(country,USA), StringContains(location,New)], ReadSchema: struct<_c0:int,geo:string,text:string,user:string,location:string,entities:string,sentiment:strin...


In [20]:
twitter_df.show(5)

+---+----+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|_c0| geo|                text|                user|         location|            entities|           sentiment|             country|
+---+----+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+
|  0|null|What is God sayin...|          petodinice|            Lagos|[('about #', 'CAR...|{'neg': 0.0, 'neu...|             Nigeria|
|  1|null|"BREAKING: ""this...| but i took the t...|             -… "|     JerryfranksonJF|      Abuja, Nigeria|"[(""Arsenal's Mi...|
|  2|null| #Coronavirus tes...|              cek422|Pennsylvania, USA|                  []|{'neg': 0.173, 'n...|                 USA|
|  3|null| Get ready for ma...|        InfectiousDz|              NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|                 USA|
|  4|null| The #coronavirus...|          vic_gibson|          

In [21]:
##
tw_filter_df = twitter_df.filter(col("location").startswith("N"))
tw_filter_df.explain()

== Physical Plan ==
*(1) Project [_c0#103, geo#104, text#105, user#106, location#107, entities#108, sentiment#109, country#110]
+- *(1) Filter (isnotnull(location#107) && StartsWith(location#107, N))
   +- *(1) FileScan csv [_c0#103,geo#104,text#105,user#106,location#107,entities#108,sentiment#109,country#110] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/content/tweets.csv], PartitionFilters: [], PushedFilters: [IsNotNull(location), StringStartsWith(location,N)], ReadSchema: struct<_c0:int,geo:string,text:string,user:string,location:string,entities:string,sentiment:strin...


In [22]:
twitter_df

DataFrame[_c0: int, geo: string, text: string, user: string, location: string, entities: string, sentiment: string, country: string]

In [23]:
## first record of data
twitter_df.first()

Row(_c0=0, geo=None, text='What is God saying to us about #coronavirus ?', user='petodinice', location='Lagos', entities="[('about #', 'CARDINAL')]", sentiment="{'neg': 0.0, 'neu': 0.769, 'pos': 0.231, 'compound': 0.2732}", country='Nigeria')

In [24]:
twitter_df.take(5)

[Row(_c0=0, geo=None, text='What is God saying to us about #coronavirus ?', user='petodinice', location='Lagos', entities="[('about #', 'CARDINAL')]", sentiment="{'neg': 0.0, 'neu': 0.769, 'pos': 0.231, 'compound': 0.2732}", country='Nigeria'),
 Row(_c0=1, geo=None, text='"BREAKING: ""this is disappointing', user=' but i took the test"". Arsenal\'s Mikel Arteta tests positive for #coronavirus', location='  -… "', entities='JerryfranksonJF', sentiment='Abuja, Nigeria', country='"[(""Arsenal\'s Mikel Arteta""'),
 Row(_c0=2, geo=None, text=' #Coronavirus testing must be made free to the public if we are going to understand the scope of this crisis. Anything le…', user='cek422', location='Pennsylvania, USA', entities='[]', sentiment="{'neg': 0.173, 'neu': 0.71, 'pos': 0.117, 'compound': -0.3767}", country='USA'),
 Row(_c0=3, geo=None, text=' Get ready for mass event crowd cancellations across the World starting this weekend: cricket in #Australia in empty st…', user='InfectiousDz', locatio

## RDD

In [25]:
## map()
x = spark.sparkContext.parallelize([1, 4, 8])
y = x.map(lambda x: (x, x*x))
print(x.collect())
print(y.collect())

[1, 4, 8]
[(1, 1), (4, 16), (8, 64)]


In [26]:
## flatMap()
x = spark.sparkContext.parallelize([1, 4, 8])
y = x.flatMap(lambda x: (x, x*x))
print(x.collect())
print(y.collect())

[1, 4, 8]
[1, 1, 4, 16, 8, 64]


In [27]:
x

ParallelCollectionRDD[62] at parallelize at PythonRDD.scala:195

In [28]:
## select 'text' column
twitter_df.select("text").show(10)

+--------------------+
|                text|
+--------------------+
|What is God sayin...|
|"BREAKING: ""this...|
| #Coronavirus tes...|
| Get ready for ma...|
| The #coronavirus...|
| COVID-19 update ...|
| It’s painful to ...|
| 📽️Friends, I wi...|
| Questions about ...|
|How they’re deali...|
+--------------------+
only showing top 10 rows



In [29]:
twitter_df.select("text", "user").show(10)

+--------------------+--------------------+
|                text|                user|
+--------------------+--------------------+
|What is God sayin...|          petodinice|
|"BREAKING: ""this...| but i took the t...|
| #Coronavirus tes...|              cek422|
| Get ready for ma...|        InfectiousDz|
| The #coronavirus...|          vic_gibson|
| COVID-19 update ...|      StewartNgilana|
| It’s painful to ...|           BWheatnyc|
| 📽️Friends, I wi...|             LorseaR|
| Questions about ...|         straightj23|
|How they’re deali...|       _______coolio|
+--------------------+--------------------+
only showing top 10 rows



In [30]:
## map() -- one to one mapping --> Takes each row at a time then --> "word tokenization"
twitter_df.rdd.map(lambda line: line.text.split(" ")).take(3) ## takes 3 rows

[['What', 'is', 'God', 'saying', 'to', 'us', 'about', '#coronavirus', '?'],
 ['"BREAKING:', '""this', 'is', 'disappointing'],
 ['',
  '#Coronavirus',
  'testing',
  'must',
  'be',
  'made',
  'free',
  'to',
  'the',
  'public',
  'if',
  'we',
  'are',
  'going',
  'to',
  'understand',
  'the',
  'scope',
  'of',
  'this',
  'crisis.',
  'Anything',
  'le…']]

In [31]:
## flatMap() -- one to many mapping --> Takes all rows at same time then --> "word tokenization"
twitter_df.rdd.flatMap(lambda line: line.text.split(" ")).take(20) ### takes 20 words

['What',
 'is',
 'God',
 'saying',
 'to',
 'us',
 'about',
 '#coronavirus',
 '?',
 '"BREAKING:',
 '""this',
 'is',
 'disappointing',
 '',
 '#Coronavirus',
 'testing',
 'must',
 'be',
 'made',
 'free']

In [32]:
#
corona_df.show(10)

+---+----------------+---------+--------+---------+-------------------+---------+-----+---------+----------------+----+
|_c0|           State|  Country|     Lat|     Long|               Date|Confirmed|Death|Recovered|   state_cleaned|City|
+---+----------------+---------+--------+---------+-------------------+---------+-----+---------+----------------+----+
|  0|            null| Thailand|    15.0|    101.0|2020-01-22 00:00:00|        2|    0|        0|         Bangkok|null|
|  1|            null|    Japan|    36.0|    138.0|2020-01-22 00:00:00|        2|    0|        0|         Hiraide|null|
|  2|            null|Singapore|  1.2833| 103.8333|2020-01-22 00:00:00|        0|    0|        0|       Singapore|null|
|  3|            null|    Nepal| 28.1667|    84.25|2020-01-22 00:00:00|        0|    0|        0|       Kathmandu|null|
|  4|            null| Malaysia|     2.5|    112.5|2020-01-22 00:00:00|        0|    0|        0|         Sarawak|null|
|  5|British Columbia|   Canada| 49.2827

In [33]:
##
corona_df.filter("Country = 'US'").show(5)

+---+----------------+-------+-------+---------+-------------------+---------+-----+---------+----------------+----------------+
|_c0|           State|Country|    Lat|     Long|               Date|Confirmed|Death|Recovered|   state_cleaned|            City|
+---+----------------+-------+-------+---------+-------------------+---------+-----+---------+----------------+----------------+
| 98|      Washington|     US|47.4009|-121.4905|2020-01-22 00:00:00|        0|    0|        0|      Washington|      Washington|
| 99|        New York|     US|42.1657| -74.9481|2020-01-22 00:00:00|        0|    0|        0|        New York|        New York|
|100|      California|     US|36.1162|-119.6816|2020-01-22 00:00:00|        0|    0|        0|      California|      California|
|101|   Massachusetts|     US|42.2302| -71.5301|2020-01-22 00:00:00|        0|    0|        0|   Massachusetts|   Massachusetts|
|102|Diamond Princess|     US|35.4437|  139.638|2020-01-22 00:00:00|        0|    0|        0|Dia

In [34]:
## sorting date
corona_df.filter("Country = 'US'").sort(col("Date"), ascending = False).show(5)

+-----+--------------+-------+-------+---------+-------------------+---------+-----+---------+--------------+--------------+
|  _c0|         State|Country|    Lat|     Long|               Date|Confirmed|Death|Recovered| state_cleaned|          City|
+-----+--------------+-------+-------+---------+-------------------+---------+-----+---------+--------------+--------------+
|27764|    Washington|     US|47.4009|-121.4905|2020-03-20 00:00:00|     1524|   83|        0|    Washington|    Washington|
|27769|Grand Princess|     US|37.6489|-122.6655|2020-03-20 00:00:00|       23|    0|        0|Grand Princess|Grand Princess|
|27765|      New York|     US|42.1657| -74.9481|2020-03-20 00:00:00|     8310|   42|        0|      New York|      New York|
|27766|    California|     US|36.1162|-119.6816|2020-03-20 00:00:00|     1177|   23|        0|    California|    California|
|27767| Massachusetts|     US|42.2302| -71.5301|2020-03-20 00:00:00|      413|    1|        0| Massachusetts| Massachusetts|


In [35]:
## alternative way to sorting date
corona_df.filter("Country = 'US'").orderBy(col("Date"), ascending = False).show(5)

+-----+--------------+-------+-------+---------+-------------------+---------+-----+---------+--------------+--------------+
|  _c0|         State|Country|    Lat|     Long|               Date|Confirmed|Death|Recovered| state_cleaned|          City|
+-----+--------------+-------+-------+---------+-------------------+---------+-----+---------+--------------+--------------+
|27764|    Washington|     US|47.4009|-121.4905|2020-03-20 00:00:00|     1524|   83|        0|    Washington|    Washington|
|27769|Grand Princess|     US|37.6489|-122.6655|2020-03-20 00:00:00|       23|    0|        0|Grand Princess|Grand Princess|
|27765|      New York|     US|42.1657| -74.9481|2020-03-20 00:00:00|     8310|   42|        0|      New York|      New York|
|27766|    California|     US|36.1162|-119.6816|2020-03-20 00:00:00|     1177|   23|        0|    California|    California|
|27767| Massachusetts|     US|42.2302| -71.5301|2020-03-20 00:00:00|      413|    1|        0| Massachusetts| Massachusetts|


In [36]:
# sort by two columns "Date" and "Confirmed"
corona_df.filter("country = 'US'").sortWithinPartitions([col("Date"), col("Confirmed")], ascending=False).show(10)

+-----+-------------+-------+-------+---------+-------------------+---------+-----+---------+-------------+-------------+
|  _c0|        State|Country|    Lat|     Long|               Date|Confirmed|Death|Recovered|state_cleaned|         City|
+-----+-------------+-------+-------+---------+-------------------+---------+-----+---------+-------------+-------------+
|27765|     New York|     US|42.1657| -74.9481|2020-03-20 00:00:00|     8310|   42|        0|     New York|     New York|
|27764|   Washington|     US|47.4009|-121.4905|2020-03-20 00:00:00|     1524|   83|        0|   Washington|   Washington|
|27766|   California|     US|36.1162|-119.6816|2020-03-20 00:00:00|     1177|   23|        0|   California|   California|
|27773|   New Jersey|     US|40.2989|  -74.521|2020-03-20 00:00:00|      890|   11|        0|   New Jersey|   New Jersey|
|27776|     Illinois|     US|40.3495| -88.9861|2020-03-20 00:00:00|      585|    5|        0|     Illinois|     Illinois|
|27772|      Florida|   

In [37]:
# Summary statistics
corona_df.describe().show()

+-------+-----------------+---------+-----------+------------------+------------------+------------------+------------------+------------------+-------------+-----------+
|summary|              _c0|    State|    Country|               Lat|              Long|         Confirmed|             Death|         Recovered|state_cleaned|       City|
+-------+-----------------+---------+-----------+------------------+------------------+------------------+------------------+------------------+-------------+-----------+
|  count|            28143|    19116|      28143|             28143|             28143|             28143|             28143|             28143|        28143|      14573|
|   mean|          14071.0|     null|       null|30.965553459118834|-34.57031257861667|161.88245744945456| 5.494368048893153| 60.17290267562094|         null|       null|
| stddev|8124.328649186959|     null|       null|19.365472826597646| 80.78375872452575| 2519.847217725942|109.29475709869875|1346.6597829124426| 

In [38]:
# data types for each column
corona_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Death: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- state_cleaned: string (nullable = true)
 |-- City: string (nullable = true)



In [39]:
#
corona_df.filter('Confirmed > 10000').sort(col('Confirmed')).show(10)

+-----+------+-------+-------+--------+-------------------+---------+-----+---------+-------------+----+
|  _c0| State|Country|    Lat|    Long|               Date|Confirmed|Death|Recovered|state_cleaned|City|
+-----+------+-------+-------+--------+-------------------+---------+-----+---------+-------------+----+
|24005|  null|   Iran|   32.0|    53.0|2020-03-12 00:00:00|    10075|  429|     2959|       Tehran|null|
|22912|  null|  Italy|   43.0|    12.0|2020-03-10 00:00:00|    10149|  631|      724|         Rome|null|
|27346|France| France|46.2276|  2.2137|2020-03-19 00:00:00|    10871|  243|       12|       France|null|
| 5401| Hubei|  China|30.9756|112.2707|2020-02-02 00:00:00|    11177|  350|      295|        Hubei|null|
|24482|  null|   Iran|   32.0|    53.0|2020-03-13 00:00:00|    11364|  514|     2959|       Tehran|null|
|26253|  null|  Spain|   40.0|    -4.0|2020-03-17 00:00:00|    11748|  533|     1028|       Toledo|null|
|26723|  null|Germany|   51.0|     9.0|2020-03-18 00:00

In [40]:
#
corona_df.filter('Confirmed > 10000').approxQuantile('Confirmed', [0.25, 0.5, 0.75, 0.9, 0.95], 0.9)

[10075.0, 10075.0, 10075.0, 10075.0, 67800.0]

In [41]:
#
corona_df.agg({"Date":"max"}).collect() ## collect() is "Action"

[Row(max(Date)=datetime.datetime(2020, 3, 20, 0, 0))]

In [42]:
#
corona_df.agg({"Date":"max", "confirmed":"max"}).collect() ## collect() is "Action"

[Row(max(confirmed)=67800, max(Date)=datetime.datetime(2020, 3, 20, 0, 0))]

In [43]:
##
max_date = corona_df.agg({"Date":"max"})
max_date.show()

+-------------------+
|          max(Date)|
+-------------------+
|2020-03-20 00:00:00|
+-------------------+



In [44]:
import pyspark.sql.functions as F
corona_df.groupBy("Country", "State_cleaned").agg(F.max("Date")).show()

+--------------+--------------------+-------------------+
|       Country|       State_cleaned|          max(Date)|
+--------------+--------------------+-------------------+
|      Cameroon|             Yaounde|2020-03-20 00:00:00|
|        Cyprus|             Nicosia|2020-03-20 00:00:00|
|            US|            Michigan|2020-03-20 00:00:00|
|         China|             Qinghai|2020-03-20 00:00:00|
|      Portugal|              Lisbon|2020-03-20 00:00:00|
|            US|            Colorado|2020-03-20 00:00:00|
|United Kingdom|      Cayman Islands|2020-03-20 00:00:00|
|            US|            Missouri|2020-03-20 00:00:00|
|         China|              Hainan|2020-03-20 00:00:00|
|     Australia|Australian Capita...|2020-03-20 00:00:00|
|            US|                Guam|2020-03-20 00:00:00|
|        France|             Reunion|2020-03-20 00:00:00|
|      Colombia|        Cundinamarca|2020-03-20 00:00:00|
|          Cuba|              Havana|2020-03-20 00:00:00|
|     Mauritiu

In [45]:
#
corona_df.join(corona_df.groupBy("Country", "State_cleaned").agg(F.max("Date").alias("Date")), 
               on = ['Country', "State_cleaned", "Date"], how = "inner").show(5)

+---------+-------------+-------------------+-----+-----+-------+--------+---------+-----+---------+----+
|  Country|state_cleaned|               Date|  _c0|State|    Lat|    Long|Confirmed|Death|Recovered|City|
+---------+-------------+-------------------+-----+-----+-------+--------+---------+-----+---------+----+
| Thailand|      Bangkok|2020-03-20 00:00:00|27666| null|   15.0|   101.0|      322|    1|       42|null|
|    Japan|      Hiraide|2020-03-20 00:00:00|27667| null|   36.0|   138.0|      963|   33|      191|null|
|Singapore|    Singapore|2020-03-20 00:00:00|27668| null| 1.2833|103.8333|      385|    0|      124|null|
|    Nepal|    Kathmandu|2020-03-20 00:00:00|27669| null|28.1667|   84.25|        1|    0|        1|null|
| Malaysia|      Sarawak|2020-03-20 00:00:00|27670| null|    2.5|   112.5|     1030|    3|       87|null|
+---------+-------------+-------------------+-----+-----+-------+--------+---------+-----+---------+----+
only showing top 5 rows



In [46]:
#
corona_df.join(corona_df.groupBy("Country", "State_cleaned").agg(F.max("Date").alias("Date")), 
               on = ['Country', "State_cleaned", "Date"], how = "inner").sort("Confirmed", ascending = False).show(5)

+-------+-------------+-------------------+-----+-----+-------+--------+---------+-----+---------+----+
|Country|state_cleaned|               Date|  _c0|State|    Lat|    Long|Confirmed|Death|Recovered|City|
+-------+-------------+-------------------+-----+-----+-------+--------+---------+-----+---------+----+
|  China|        Hubei|2020-03-20 00:00:00|27820|Hubei|30.9756|112.2707|    67800| 3133|    58382|null|
|  Italy|         Rome|2020-03-20 00:00:00|27682| null|   43.0|    12.0|    47021| 4032|     4440|null|
|  Spain|       Toledo|2020-03-20 00:00:00|27684| null|   40.0|    -4.0|    20410| 1043|     1588|null|
|Germany|       Berlin|2020-03-20 00:00:00|27677| null|   51.0|     9.0|    19848|   67|      180|null|
|   Iran|       Tehran|2020-03-20 00:00:00|27821| null|   32.0|    53.0|    19644| 1433|     6745|null|
+-------+-------------+-------------------+-----+-----+-------+--------+---------+-----+---------+----+
only showing top 5 rows



In [48]:
##
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

ws = Window().partitionBy("Country", "State_cleaned").orderBy(col("Date").desc())
corona_df.withColumn("row_num", row_number().over(ws)).show()

+-----+-----+--------+------------------+-------+-------------------+---------+-----+---------+-------------+----+-------+
|  _c0|State| Country|               Lat|   Long|               Date|Confirmed|Death|Recovered|state_cleaned|City|row_num|
+-----+-----+--------+------------------+-------+-------------------+---------+-----+---------+-------------+----+-------+
|27745| null|Cameroon|3.8480000000000003|11.5021|2020-03-20 00:00:00|       20|    0|        0|      Yaounde|null|      1|
|27268| null|Cameroon|3.8480000000000003|11.5021|2020-03-19 00:00:00|       13|    0|        0|      Yaounde|null|      2|
|26791| null|Cameroon|3.8480000000000003|11.5021|2020-03-18 00:00:00|       10|    0|        0|      Yaounde|null|      3|
|26314| null|Cameroon|3.8480000000000003|11.5021|2020-03-17 00:00:00|       10|    0|        0|      Yaounde|null|      4|
|25837| null|Cameroon|3.8480000000000003|11.5021|2020-03-16 00:00:00|        4|    0|        0|      Yaounde|null|      5|
|25360| null|Cam

In [50]:
#
corona_df.withColumn("row_num", row_number().over(ws)).where(col("row_num") == 1).show(10)

+-----+--------------------+--------------+------------------+---------+-------------------+---------+-----+---------+--------------------+--------+-------+
|  _c0|               State|       Country|               Lat|     Long|               Date|Confirmed|Death|Recovered|       state_cleaned|    City|row_num|
+-----+--------------------+--------------+------------------+---------+-------------------+---------+-----+---------+--------------------+--------+-------+
|27745|                null|      Cameroon|3.8480000000000003|  11.5021|2020-03-20 00:00:00|       20|    0|        0|             Yaounde|    null|      1|
|27859|             Qinghai|         China|           35.7452|  95.9956|2020-03-20 00:00:00|       18|    0|       18|             Qinghai|    null|      1|
|27762|                null|        Cyprus|           35.1264|  33.4299|2020-03-20 00:00:00|       67|    0|        0|             Nicosia|    null|      1|
|27812|            Michigan|            US|           43.3

In [51]:
#
corona_max_df = corona_df.join(corona_df.groupBy("Country", "State_cleaned").agg(F.max("Date").alias("Date")), 
               on = ['Country', "State_cleaned", "Date"], how = "inner")
corona_max_df.show(10)

+---------+----------------+-------------------+-----+----------------+--------+---------+---------+-----+---------+----+
|  Country|   state_cleaned|               Date|  _c0|           State|     Lat|     Long|Confirmed|Death|Recovered|City|
+---------+----------------+-------------------+-----+----------------+--------+---------+---------+-----+---------+----+
| Thailand|         Bangkok|2020-03-20 00:00:00|27666|            null|    15.0|    101.0|      322|    1|       42|null|
|    Japan|         Hiraide|2020-03-20 00:00:00|27667|            null|    36.0|    138.0|      963|   33|      191|null|
|Singapore|       Singapore|2020-03-20 00:00:00|27668|            null|  1.2833| 103.8333|      385|    0|      124|null|
|    Nepal|       Kathmandu|2020-03-20 00:00:00|27669|            null| 28.1667|    84.25|        1|    0|        1|null|
| Malaysia|         Sarawak|2020-03-20 00:00:00|27670|            null|     2.5|    112.5|     1030|    3|       87|null|
|   Canada|British Colum

In [53]:
## Country level
corona_df.groupby("Country").pivot("Date").agg(F.sum("Confirmed")).show(10)

+-----------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------

In [55]:
#
corona_df.filter("Country == 'US'").crosstab("State", "Date").show(10)

+-------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+-----------

In [57]:
#
corona_max_df.groupBy("Country").agg({'Confirmed':'Sum'}).show(10)

+-----------+--------------+
|    Country|sum(Confirmed)|
+-----------+--------------+
|       Chad|             1|
|   Paraguay|            13|
|     Russia|           253|
|    Senegal|            38|
|     Sweden|          1639|
| Cabo Verde|             1|
|     Guyana|             7|
|Philippines|           230|
|   Djibouti|             1|
|  Singapore|           385|
+-----------+--------------+
only showing top 10 rows



In [58]:
#
corona_max_df.groupBy("Country").agg({'Confirmed':'sum', 'Recovered':'sum', 'Death':'sum'}).orderBy("sum(Confirmed)", ascending=False).show(10)

+--------------+--------------+----------+--------------+
|       Country|sum(Recovered)|sum(Death)|sum(Confirmed)|
+--------------+--------------+----------+--------------+
|         China|         71266|      3253|         81250|
|         Italy|          4440|      4032|         47021|
|         Spain|          1588|      1043|         20410|
|       Germany|           180|        67|         19848|
|          Iran|          6745|      1433|         19644|
|            US|             0|       244|         19100|
|        France|            12|       450|         12726|
|  Korea, South|          1540|        94|          8652|
|   Switzerland|            15|        54|          5294|
|United Kingdom|            67|       178|          4014|
+--------------+--------------+----------+--------------+
only showing top 10 rows



In [60]:
##
corona_df.filter("Country == 'Italy'").sort("Date", ascending=False).show(10)

+-----+-----+-------+----+----+-------------------+---------+-----+---------+-------------+----+
|  _c0|State|Country| Lat|Long|               Date|Confirmed|Death|Recovered|state_cleaned|City|
+-----+-----+-------+----+----+-------------------+---------+-----+---------+-------------+----+
|27682| null|  Italy|43.0|12.0|2020-03-20 00:00:00|    47021| 4032|     4440|         Rome|null|
|27205| null|  Italy|43.0|12.0|2020-03-19 00:00:00|    41035| 3405|     4440|         Rome|null|
|26728| null|  Italy|43.0|12.0|2020-03-18 00:00:00|    35713| 2978|     4025|         Rome|null|
|26251| null|  Italy|43.0|12.0|2020-03-17 00:00:00|    31506| 2503|     2941|         Rome|null|
|25774| null|  Italy|43.0|12.0|2020-03-16 00:00:00|    27980| 2158|     2749|         Rome|null|
|25297| null|  Italy|43.0|12.0|2020-03-15 00:00:00|    24747| 1809|     2335|         Rome|null|
|24820| null|  Italy|43.0|12.0|2020-03-14 00:00:00|    21157| 1441|     1966|         Rome|null|
|24343| null|  Italy|43.0|12.0

In [61]:
##
corona_max_df.withColumn("Active", corona_max_df.Confirmed - corona_max_df.Recovered - corona_max_df.Death).sort(
    "Active", ascending=False).show(10)

+--------------+----------------+-------------------+-----+--------------+-------+-------------------+---------+-----+---------+--------+------+
|       Country|   state_cleaned|               Date|  _c0|         State|    Lat|               Long|Confirmed|Death|Recovered|    City|Active|
+--------------+----------------+-------------------+-----+--------------+-------+-------------------+---------+-----+---------+--------+------+
|         Italy|            Rome|2020-03-20 00:00:00|27682|          null|   43.0|               12.0|    47021| 4032|     4440|    null| 38549|
|       Germany|          Berlin|2020-03-20 00:00:00|27677|          null|   51.0|                9.0|    19848|   67|      180|    null| 19601|
|         Spain|          Toledo|2020-03-20 00:00:00|27684|          null|   40.0|               -4.0|    20410| 1043|     1588|    null| 17779|
|        France|          France|2020-03-20 00:00:00|27823|        France|46.2276|             2.2137|    12612|  450|       12|  

In [62]:
#
corona_max_df = corona_max_df.withColumn("Active", corona_max_df.Confirmed - corona_max_df.Recovered - corona_max_df.Death)
corona_max_df.show(10)

+---------+----------------+-------------------+-----+----------------+--------+---------+---------+-----+---------+----+------+
|  Country|   state_cleaned|               Date|  _c0|           State|     Lat|     Long|Confirmed|Death|Recovered|City|Active|
+---------+----------------+-------------------+-----+----------------+--------+---------+---------+-----+---------+----+------+
| Thailand|         Bangkok|2020-03-20 00:00:00|27666|            null|    15.0|    101.0|      322|    1|       42|null|   279|
|    Japan|         Hiraide|2020-03-20 00:00:00|27667|            null|    36.0|    138.0|      963|   33|      191|null|   739|
|Singapore|       Singapore|2020-03-20 00:00:00|27668|            null|  1.2833| 103.8333|      385|    0|      124|null|   261|
|    Nepal|       Kathmandu|2020-03-20 00:00:00|27669|            null| 28.1667|    84.25|        1|    0|        1|null|     0|
| Malaysia|         Sarawak|2020-03-20 00:00:00|27670|            null|     2.5|    112.5|     10

In [63]:
#
corona_max_df.groupBy("Country").sum("Active").orderBy("sum(Active)", ascending=False).show(10)

+--------------+-----------+
|       Country|sum(Active)|
+--------------+-----------+
|         Italy|      38549|
|       Germany|      19601|
|            US|      18856|
|         Spain|      17779|
|        France|      12264|
|          Iran|      11466|
|  Korea, South|       7018|
|         China|       6731|
|   Switzerland|       5225|
|United Kingdom|       3769|
+--------------+-----------+
only showing top 10 rows



### Part 2

In [64]:
corona_max_df.select("Country", "State_cleaned", "Confirmed", "Recovered").filter(col("Country").isin('Australia', 'China')).\
                    groupBy('Country').sum().show(10)

+---------+--------------+--------------+
|  Country|sum(Confirmed)|sum(Recovered)|
+---------+--------------+--------------+
|    China|         81250|         71266|
|Australia|           791|            26|
+---------+--------------+--------------+



In [65]:
corona_max_df.select("Country", "State_cleaned", "Confirmed", "Recovered").filter(col("Country").isin('Australia', 'China')).\
                    cube('Country').sum().show(10)

+---------+--------------+--------------+
|  Country|sum(Confirmed)|sum(Recovered)|
+---------+--------------+--------------+
|Australia|           791|            26|
|     null|         82041|         71292|
|    China|         81250|         71266|
+---------+--------------+--------------+



In [69]:
corona_max_df.select("Country", "State_cleaned", "Confirmed", "Recovered").filter(col("Country").isin('Australia', 'China')).\
                    cube('Country', 'State_cleaned').sum().sort(asc("Country")).show(10)

+-------+------------------+--------------+--------------+
|Country|     State_cleaned|sum(Confirmed)|sum(Recovered)|
+-------+------------------+--------------+--------------+
|   null|             Jilin|            93|            92|
|   null|           Beijing|           491|           390|
|   null|             Gansu|           134|            98|
|   null|         Guangdong|          1395|          1323|
|   null|          Shandong|           762|           748|
|   null|             Hunan|          1018|          1014|
|   null|Northern Territory|             3|             0|
|   null|             Hebei|           318|           310|
|   null|        Queensland|           184|             8|
|   null|          Liaoning|           126|           122|
+-------+------------------+--------------+--------------+
only showing top 10 rows



In [None]:
corona_max_df.select("Country", "State_cleaned", "Confirmed", "Recovered").filter(col("Country").isin('Australia', 'China')).\
                    rollup("Country", "State").sum().show(10)