<a href="https://colab.research.google.com/github/sharanyasathish/Data-Engineering/blob/master/Spark_Transformations_and_Actions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!wget -q https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import findspark

In [3]:
!ls /usr/lib/jvm/

default-java  java-1.11.0-openjdk-amd64  java-11-openjdk-amd64


In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

In [5]:
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [6]:
#The below imports are made to download the dataset and use it.
import sys,tempfile,urllib
import urllib.request
from pyspark.sql.functions import *

In [7]:
#Setting the base directory to store the datasets are twitter tweets about corona and corona dataset
BASE_DIR ='/tmp'
CORONA_DATA_FILE = os.path.join(BASE_DIR,'corona_data.csv')

In [8]:
corona_data=urllib.request.urlretrieve('https://raw.githubusercontent.com/srivatsan88/YouTubeLI/master/dataset/coronavirus/corona_dataset_latest.csv', CORONA_DATA_FILE)

In [9]:
BASE_DIR = '/tmp'
TWITTER_DATA_FILE = os.path.join(BASE_DIR, 'twitter_data.csv')

In [10]:
tweet_data = urllib.request.urlretrieve('https://raw.githubusercontent.com/srivatsan88/YouTubeLI/master/dataset/coronavirus/tweets.csv', TWITTER_DATA_FILE)

In [11]:
!ls /tmp

blockmgr-348a4b98-0ddf-432c-beb8-8519e19fe2e7
corona_data.csv
hsperfdata_root
spark-fc6d1e97-bb8e-4602-82b7-853a9fdefc2b
spark-ff62dcbc-bbc5-4492-b842-fba867ab9e7c
twitter_data.csv


In [12]:
#InferSchema: Based on the datatype, check the Schema and then assign datatype or else all the features will be of datatype string.
corona_df = spark.read.option("inferSchema","true").csv("/tmp/corona_data.csv", header = True)

In [13]:
corona_df.show()

+---+----------------+--------------------+--------+---------+----------+---------+-----+---------+--------------------+----+
|_c0|           State|             Country|     Lat|     Long|      Date|Confirmed|Death|Recovered|       state_cleaned|City|
+---+----------------+--------------------+--------+---------+----------+---------+-----+---------+--------------------+----+
|  0|            null|            Thailand|    15.0|    101.0|2020-01-22|        2|    0|        0|             Bangkok|null|
|  1|            null|               Japan|    36.0|    138.0|2020-01-22|        2|    0|        0|             Hiraide|null|
|  2|            null|           Singapore|  1.2833| 103.8333|2020-01-22|        0|    0|        0|           Singapore|null|
|  3|            null|               Nepal| 28.1667|    84.25|2020-01-22|        0|    0|        0|           Kathmandu|null|
|  4|            null|            Malaysia|     2.5|    112.5|2020-01-22|        0|    0|        0|             Sarawa

In [14]:
#count is an action 
corona_df.count()

28143

In [15]:
twit_df = spark.read.option("inferSchema","true").csv("/tmp/twitter_data.csv", header=True)

In [16]:
twit_df.count()

1000

In [17]:
twit_df.show()

+---+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|_c0| geo|                text|                user|            location|            entities|           sentiment|             country|
+---+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0|null|What is God sayin...|          petodinice|               Lagos|[('about #', 'CAR...|{'neg': 0.0, 'neu...|             Nigeria|
|  1|null|"BREAKING: ""this...| but i took the t...|                -… "|     JerryfranksonJF|      Abuja, Nigeria|"[(""Arsenal's Mi...|
|  2|null| #Coronavirus tes...|              cek422|   Pennsylvania, USA|                  []|{'neg': 0.173, 'n...|                 USA|
|  3|null| Get ready for ma...|        InfectiousDz|                 NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|                 USA|
|  4|null| The #coronavirus...|          

In [20]:
#Transformations are how we want to transform our dataset. It creates a new dataframe as output.
twit_df.filter("country = 'USA'")

DataFrame[_c0: int, geo: string, text: string, user: string, location: string, entities: string, sentiment: string, country: string]

In [22]:
twit_df.filter("country = 'USA'").show()

+---+----+--------------------+---------------+--------------------+--------------------+--------------------+-------+
|_c0| geo|                text|           user|            location|            entities|           sentiment|country|
+---+----+--------------------+---------------+--------------------+--------------------+--------------------+-------+
|  2|null| #Coronavirus tes...|         cek422|   Pennsylvania, USA|                  []|{'neg': 0.173, 'n...|    USA|
|  3|null| Get ready for ma...|   InfectiousDz|                 NYC|[('World', 'ORG')...|{'neg': 0.085, 'n...|    USA|
|  5|null| COVID-19 update ...| StewartNgilana|Durban | Port Eli...|[('Italy', 'GPE')...|{'neg': 0.178, 'n...|    USA|
|  6|null| It’s painful to ...|      BWheatnyc|             Florida|                  []|{'neg': 0.098, 'n...|    USA|
|  8|null| Questions about ...|    straightj23|        Columbus, OH|[('NAfME', 'CARDI...|{'neg': 0.0, 'neu...|    USA|
| 13|null| We’re the heck w...| harrytiffanyiv| 

In [23]:
twit_df.filter("country ='USA' and location like 'New%'").show()

+---+----+--------------------+---------------+------------------+--------------------+--------------------+-------+
|_c0| geo|                text|           user|          location|            entities|           sentiment|country|
+---+----+--------------------+---------------+------------------+--------------------+--------------------+-------+
| 31|null| I ordered Alex J...|       rcgillan|     New York, USA|[('Alex Jones', '...|{'neg': 0.109, 'n...|    USA|
| 49|null| This week we are...|  JamesWithers3|New York, New York|[('This week', 'D...|{'neg': 0.0, 'neu...|    USA|
|228|null|This is a very co...|baskingntheGlow|     New York City|[('hourly', 'TIME')]|{'neg': 0.12, 'ne...|    USA|
|238|null|I’m reposting thi...|   Veronicaromm|   New Jersey, USA|[('English', 'LAN...|{'neg': 0.0, 'neu...|    USA|
|261|null|Too early ...?  #...|      HJeppesen|      New York, NY|                ['']|{'neg': 0.0, 'neu...|    USA|
|275|null|  The Republican ...|  GenetBataiile|New Hampshire, US

In [26]:
tw_filter_df = twit_df.filter("country = 'USA' and location like 'New%'")

In [27]:
tw_filter_df.explain()

== Physical Plan ==
*(1) Project [_c0#126, geo#127, text#128, user#129, location#130, entities#131, sentiment#132, country#133]
+- *(1) Filter (((isnotnull(country#133) AND isnotnull(location#130)) AND (country#133 = USA)) AND StartsWith(location#130, New))
   +- FileScan csv [_c0#126,geo#127,text#128,user#129,location#130,entities#131,sentiment#132,country#133] Batched: false, DataFilters: [isnotnull(country#133), isnotnull(location#130), (country#133 = USA), StartsWith(location#130, N..., Format: CSV, Location: InMemoryFileIndex[file:/tmp/twitter_data.csv], PartitionFilters: [], PushedFilters: [IsNotNull(country), IsNotNull(location), EqualTo(country,USA), StringStartsWith(location,New)], ReadSchema: struct<_c0:int,geo:string,text:string,user:string,location:string,entities:string,sentiment:strin...




In [28]:
tw_filter_df.show()

+---+----+--------------------+---------------+------------------+--------------------+--------------------+-------+
|_c0| geo|                text|           user|          location|            entities|           sentiment|country|
+---+----+--------------------+---------------+------------------+--------------------+--------------------+-------+
| 31|null| I ordered Alex J...|       rcgillan|     New York, USA|[('Alex Jones', '...|{'neg': 0.109, 'n...|    USA|
| 49|null| This week we are...|  JamesWithers3|New York, New York|[('This week', 'D...|{'neg': 0.0, 'neu...|    USA|
|228|null|This is a very co...|baskingntheGlow|     New York City|[('hourly', 'TIME')]|{'neg': 0.12, 'ne...|    USA|
|238|null|I’m reposting thi...|   Veronicaromm|   New Jersey, USA|[('English', 'LAN...|{'neg': 0.0, 'neu...|    USA|
|261|null|Too early ...?  #...|      HJeppesen|      New York, NY|                ['']|{'neg': 0.0, 'neu...|    USA|
|275|null|  The Republican ...|  GenetBataiile|New Hampshire, US

In [29]:
twit_df.first()

Row(_c0=0, geo=None, text='What is God saying to us about #coronavirus ?', user='petodinice', location='Lagos', entities="[('about #', 'CARDINAL')]", sentiment="{'neg': 0.0, 'neu': 0.769, 'pos': 0.231, 'compound': 0.2732}", country='Nigeria')

In [30]:
twit_df.take(5)

[Row(_c0=0, geo=None, text='What is God saying to us about #coronavirus ?', user='petodinice', location='Lagos', entities="[('about #', 'CARDINAL')]", sentiment="{'neg': 0.0, 'neu': 0.769, 'pos': 0.231, 'compound': 0.2732}", country='Nigeria'),
 Row(_c0=1, geo=None, text='"BREAKING: ""this is disappointing', user=' but i took the test"". Arsenal\'s Mikel Arteta tests positive for #coronavirus', location='  -… "', entities='JerryfranksonJF', sentiment='Abuja, Nigeria', country='"[(""Arsenal\'s Mikel Arteta""'),
 Row(_c0=2, geo=None, text=' #Coronavirus testing must be made free to the public if we are going to understand the scope of this crisis. Anything le…', user='cek422', location='Pennsylvania, USA', entities='[]', sentiment="{'neg': 0.173, 'neu': 0.71, 'pos': 0.117, 'compound': -0.3767}", country='USA'),
 Row(_c0=3, geo=None, text=' Get ready for mass event crowd cancellations across the World starting this weekend: cricket in #Australia in empty st…', user='InfectiousDz', locatio

In [32]:
x = spark.sparkContext.parallelize([1,5,4,8]) #creating a parallel RDD
y = x.map(lambda x: (x, x*x)) #Taking each value and creating an output value for each element.
print(x.collect())
print(y.collect())

[1, 5, 4, 8]
[(1, 1), (5, 25), (4, 16), (8, 64)]


In [33]:
x = spark.sparkContext.parallelize([1,4,8]) #To convert RDD to Datafreame, we can you RDD.toDF function.
y = x.flatMap(lambda x: (x, x*x)) #Not takong each element, it flattens it out.
print(x.collect())
print(y.collect())

[1, 4, 8]
[1, 1, 4, 16, 8, 64]


In [34]:
twit_df.select("text").show()

+--------------------+
|                text|
+--------------------+
|What is God sayin...|
|"BREAKING: ""this...|
| #Coronavirus tes...|
| Get ready for ma...|
| The #coronavirus...|
| COVID-19 update ...|
| It’s painful to ...|
| 📽️Friends, I wi...|
| Questions about ...|
|How they’re deali...|
| BREAKING: Democr...|
| “If we close dow...|
| I pity the poor ...|
| We’re the heck w...|
| I don't think sh...|
| Well written, so...|
| 1/2 CDC Director...|
| In all seriousne...|
| Wash your hands....|
| #CoronaVirusCana...|
+--------------------+
only showing top 20 rows



In [35]:
twit_df.select("text", "user").show()

+--------------------+--------------------+
|                text|                user|
+--------------------+--------------------+
|What is God sayin...|          petodinice|
|"BREAKING: ""this...| but i took the t...|
| #Coronavirus tes...|              cek422|
| Get ready for ma...|        InfectiousDz|
| The #coronavirus...|          vic_gibson|
| COVID-19 update ...|      StewartNgilana|
| It’s painful to ...|           BWheatnyc|
| 📽️Friends, I wi...|             LorseaR|
| Questions about ...|         straightj23|
|How they’re deali...|       _______coolio|
| BREAKING: Democr...|      champagneaylin|
| “If we close dow...|       YorkLawLondon|
| I pity the poor ...|      BeesonMargaret|
| We’re the heck w...|      harrytiffanyiv|
| I don't think sh...|         grammyheath|
| Well written, so...|      barbara_ellena|
| 1/2 CDC Director...|               fatal|
| In all seriousne...|          pwjkmiller|
| Wash your hands....|        Mrrandy123RP|
| #CoronaVirusCana...|           

In [38]:
twit_df.rdd.map(lambda line: line.text.split(" ")).take(5)

[['What', 'is', 'God', 'saying', 'to', 'us', 'about', '#coronavirus', '?'],
 ['"BREAKING:', '""this', 'is', 'disappointing'],
 ['',
  '#Coronavirus',
  'testing',
  'must',
  'be',
  'made',
  'free',
  'to',
  'the',
  'public',
  'if',
  'we',
  'are',
  'going',
  'to',
  'understand',
  'the',
  'scope',
  'of',
  'this',
  'crisis.',
  'Anything',
  'le…'],
 ['',
  'Get',
  'ready',
  'for',
  'mass',
  'event',
  'crowd',
  'cancellations',
  'across',
  'the',
  'World',
  'starting',
  'this',
  'weekend:',
  'cricket',
  'in',
  '#Australia',
  'in',
  'empty',
  'st…'],
 ['',
  'The',
  '#coronavirus',
  'pandemic',
  'is',
  'revealing',
  'just',
  'how',
  'closely',
  'we',
  'are',
  'all',
  'bound',
  'together...[A',
  'thread]',
  '']]

In [40]:
twit_df.rdd.flatMap(lambda line: line.text.split(" ")).take(100)

['What',
 'is',
 'God',
 'saying',
 'to',
 'us',
 'about',
 '#coronavirus',
 '?',
 '"BREAKING:',
 '""this',
 'is',
 'disappointing',
 '',
 '#Coronavirus',
 'testing',
 'must',
 'be',
 'made',
 'free',
 'to',
 'the',
 'public',
 'if',
 'we',
 'are',
 'going',
 'to',
 'understand',
 'the',
 'scope',
 'of',
 'this',
 'crisis.',
 'Anything',
 'le…',
 '',
 'Get',
 'ready',
 'for',
 'mass',
 'event',
 'crowd',
 'cancellations',
 'across',
 'the',
 'World',
 'starting',
 'this',
 'weekend:',
 'cricket',
 'in',
 '#Australia',
 'in',
 'empty',
 'st…',
 '',
 'The',
 '#coronavirus',
 'pandemic',
 'is',
 'revealing',
 'just',
 'how',
 'closely',
 'we',
 'are',
 'all',
 'bound',
 'together...[A',
 'thread]',
 '',
 '',
 'COVID-19',
 'update',
 'as',
 'of',
 'this',
 'morning:1.',
 'Death',
 'toll',
 'in',
 'Italy',
 'passes',
 '1,0002.',
 "Arsenal's",
 'head',
 'coach',
 'Arteta',
 'tests',
 'positive3.',
 'US…',
 '',
 'It’s',
 'painful',
 'to',
 'say,',
 'but',
 'as',
 'an']

In [41]:
corona_df.filter("country = 'US'").sort(col("Date"),ascending = False).show()

+-----+----------------+-------+------------------+---------+----------+---------+-----+---------+----------------+----------------+
|  _c0|           State|Country|               Lat|     Long|      Date|Confirmed|Death|Recovered|   state_cleaned|            City|
+-----+----------------+-------+------------------+---------+----------+---------+-----+---------+----------------+----------------+
|27764|      Washington|     US|           47.4009|-121.4905|2020-03-20|     1524|   83|        0|      Washington|      Washington|
|27784|         Arizona|     US|           33.7298|-111.4312|2020-03-20|       78|    0|        0|         Arizona|         Arizona|
|27765|        New York|     US|           42.1657| -74.9481|2020-03-20|     8310|   42|        0|        New York|        New York|
|27766|      California|     US|           36.1162|-119.6816|2020-03-20|     1177|   23|        0|      California|      California|
|27767|   Massachusetts|     US|           42.2302| -71.5301|2020-03-