# Spark setup codes
---

In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark
!pip install pyspark



In [4]:
# use findspark to initate the findspark and find the spark
import findspark
findspark.init()

In [5]:
# to validate if the findspark as found the correct instance
findspark.find()

'/content/spark-3.0.0-bin-hadoop3.2'

In [7]:
# setting up a sparksession, create a app
# create a config to run the spark UI on port 4050
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [8]:
# start the spark
spark

In [9]:
#for having the SPARK UI to be ported to port 4050
from pyngrok import ngrok
print(ngrok.connect(4050))

t=2020-12-09T11:46:37+0000 lvl=warn msg="can't bind default web address, trying alternatives" obj=web addr=127.0.0.1:4040
NgrokTunnel: "http://257587862047.ngrok.io" -> "http://localhost:4050"


# Spark Data frames
---

In [10]:
# Use !wget to download csv files from a URL
!wget https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/mail_addresses.csv

--2020-12-09 11:46:38--  https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/mail_addresses.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1588 (1.6K) [text/plain]
Saving to: ‘mail_addresses.csv.1’


2020-12-09 11:46:38 (27.8 MB/s) - ‘mail_addresses.csv.1’ saved [1588/1588]



In [11]:
# Load data in to data frame
df = spark.read.csv('mail_addresses.csv', header=True, inferSchema=True)

In [12]:
#show schema
df.printSchema()

#show columns as a list
df.columns

#describe the measure columns
df.describe().show()

root
 |-- id: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- attention: string (nullable = true)
 |-- address_1: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_province: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)

+-------+------------------+------------------+--------------------+--------------------+---------+---------+--------------+-----------------+-------+
|summary|                id|       location_id|           attention|           address_1|address_2|     city|state_province|      postal_code|country|
+-------+------------------+------------------+--------------------+--------------------+---------+---------+--------------+-----------------+-------+
|  count|                21|                21|                  20|                  21|        2|       21|            21|               21|     21|
|   mean|              1

# ways to get data, slice and dice dataframes
---

In [13]:
#select a column as singular column
df.select('city').show()

+-------------------+
|               city|
+-------------------+
|       Redwood City|
|          San Mateo|
|          San Mateo|
|          San Mateo|
|          San Mateo|
|         Menlo Park|
|         Menlo Park|
|         Menlo Park|
|       Redwood City|
|       Redwood City|
|       Redwood City|
|       Redwood City|
|       Redwood City|
|       Redwood City|
|      San Francisco|
|          Sunnyvale|
|South San Francisco|
|       Redwood City|
|          San Mateo|
|            Fairfax|
+-------------------+
only showing top 20 rows



In [14]:
#select multiple columns
df.select(['city','country']).show()

+-------------------+-------+
|               city|country|
+-------------------+-------+
|       Redwood City|     US|
|          San Mateo|     US|
|          San Mateo|     US|
|          San Mateo|     US|
|          San Mateo|     US|
|         Menlo Park|     US|
|         Menlo Park|     US|
|         Menlo Park|     US|
|       Redwood City|     US|
|       Redwood City|     US|
|       Redwood City|     US|
|       Redwood City|     US|
|       Redwood City|     US|
|       Redwood City|     US|
|      San Francisco|     US|
|          Sunnyvale|     US|
|South San Francisco|     US|
|       Redwood City|     US|
|          San Mateo|     US|
|            Fairfax|     US|
+-------------------+-------+
only showing top 20 rows



In [15]:
#create a temp view out of dataframe , kind of temp table
df.createOrReplaceTempView('mail_address')

In [16]:
sql_result = spark.sql("select * from mail_address where city = 'San Mateo'")

In [17]:
#filter the data frame based on a condition
##sql_result.filter("location_id > 5").show()

#filter the data frame based on a condition, and select particular columns{this is using condition as SQL}
sql_result.filter("location_id > 1 and location_id <=5 ").select(['city','state_province','country']).show()

+---------+--------------+-------+
|     city|state_province|country|
+---------+--------------+-------+
|San Mateo|            CA|     US|
|San Mateo|            CA|     US|
|San Mateo|            CA|     US|
|San Mateo|            CA|     US|
+---------+--------------+-------+



In [22]:
#filtering based on data frame, and multiple conditions not like sql
sql_result.filter((sql_result['location_id'] > 1) & (sql_result['location_id'] <= 5)).select(['city','state_province','country']).show()

+---------+--------------+-------+
|     city|state_province|country|
+---------+--------------+-------+
|San Mateo|            CA|     US|
|San Mateo|            CA|     US|
|San Mateo|            CA|     US|
|San Mateo|            CA|     US|
+---------+--------------+-------+



In [23]:
#using collect() function to store the values as a list object
collect_result = sql_result.filter((sql_result['location_id'] > 1) & (sql_result['location_id'] <= 5)).select(['city','state_province','country']).collect()

In [36]:
#accesing the values in the collected list object
collect_result[0].asDict()['city']

'San Mateo'

# spark group by operations
---

In [48]:
#load data as dataframe
# use below code to laod sales data in to data frame
# !wget https://raw.githubusercontent.com/plotly/datasets/master/sales_success.csv
df = spark.read.csv('sales_success.csv', header=True, inferSchema=True)
# rename the column and make it persistent
df = df.withColumnRenamed('_c0','id')

In [69]:
# group by followed by type of aggregation and pass in the measure values names as items
# the below code will aggreagate the data for all the measure/integet/numeric columns available
# df.groupby('region').sum().show()  
df.groupby('region').sum('calls', 'sales').show()

+------+----------+----------+
|region|sum(calls)|sum(sales)|
+------+----------+----------+
| South|       381|       193|
|  East|       407|       160|
|  West|       321|       189|
| North|       631|       382|
+------+----------+----------+



In [71]:
# .agg method, takes in a dictionary
#df.agg({'sales':'sum'}).show()

# to specify the group by and have .agg method on top of it
# assign the group by to a valiable and then perform the .agg aggregation

by_region = df.groupBy('region')

In [72]:
# by passing in a dictionary in .agg method we can specify what kind of aggregation is required for what measure column
by_region.agg({'sales':'sum','calls':'sum','id':'count'}).show()

+------+----------+---------+----------+
|region|sum(calls)|count(id)|sum(sales)|
+------+----------+---------+----------+
| South|       381|       12|       193|
|  East|       407|       12|       160|
|  West|       321|        9|       189|
| North|       631|       18|       382|
+------+----------+---------+----------+

