# **PySpark Google Colab Installation**


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install findspark

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/fc/2d/2e39f9a023479ea798eed4351cd66f163ce61e00c717e03c37109f00c0f2/findspark-1.4.2-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.4.2


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()


In [None]:
import pyspark
sc = pyspark.SparkContext()

In [None]:
reviews = sc.textFile('reviews.txt')

In [None]:
reviews

reviews.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [None]:
reviews.collect()

['Wow... Loved this place',
 'Crust is not good',
 'Not tasty and the texture was just nasty',
 'Stopped by during the late May bank holiday off Rick Steve recommendation and loved it',
 'The selection on the menu was great and so were the prices',
 'Now I am getting angry and I want my damn pho.',
 "Honeslty it didn't taste THAT fresh",
 'The potatoes were like rubber and you could tell they had been made up ahead of time being kept under a warmer',
 'The fries were great too',
 'A great touch']

In [None]:
reviews.count()

10

In [None]:
reviews.take(2)

['Wow... Loved this place', 'Crust is not good']

## **Important Concepts**

# Map

In [None]:
my_list = [2,3,4,5,6]

In [None]:
def power_numbers(numbers):
  return numbers**2

In [None]:
map(power_numbers,my_list)

<map at 0x7f0d375aa518>

In [None]:
list(map(power_numbers,my_list))

[4, 9, 16, 25, 36]

In [None]:
results = map(power_numbers,my_list)

In [None]:
for result in results:
  print(result)

4
9
16
25
36


## **Lambda Expressions**

In [None]:
def power_numbers(numbers):
  return numbers**2

In [None]:
lambda numbers:numbers**2

<function __main__.<lambda>>

In [None]:
my_function = lambda numbers:numbers**2

In [None]:
my_function(2)

4

In [None]:
list(map(my_function,my_list))

[4, 9, 16, 25, 36]

# Filter

In [None]:
filter(lambda number:number>3,my_list)

<filter at 0x7f0d375cb128>

In [None]:
list(filter(lambda number:number>3,my_list))

[4, 5, 6]

# Reduce

In [None]:
my_list

[2, 3, 4, 5, 6]

In [None]:
import functools
functools.reduce(lambda x,y:x+y,my_list )

20

## **Now in Spark**

In [None]:
reviews.collect()

['Wow... Loved this place',
 'Crust is not good',
 'Not tasty and the texture was just nasty',
 'Stopped by during the late May bank holiday off Rick Steve recommendation and loved it',
 'The selection on the menu was great and so were the prices',
 'Now I am getting angry and I want my damn pho.',
 "Honeslty it didn't taste THAT fresh",
 'The potatoes were like rubber and you could tell they had been made up ahead of time being kept under a warmer',
 'The fries were great too',
 'A great touch']

# Map & Lambda Expressions

In [None]:
reviews.map(lambda word:word.upper())

PythonRDD[4] at RDD at PythonRDD.scala:53

In [None]:
reviews.map(lambda word:word.upper()).collect()

['WOW... LOVED THIS PLACE',
 'CRUST IS NOT GOOD',
 'NOT TASTY AND THE TEXTURE WAS JUST NASTY',
 'STOPPED BY DURING THE LATE MAY BANK HOLIDAY OFF RICK STEVE RECOMMENDATION AND LOVED IT',
 'THE SELECTION ON THE MENU WAS GREAT AND SO WERE THE PRICES',
 'NOW I AM GETTING ANGRY AND I WANT MY DAMN PHO.',
 "HONESLTY IT DIDN'T TASTE THAT FRESH",
 'THE POTATOES WERE LIKE RUBBER AND YOU COULD TELL THEY HAD BEEN MADE UP AHEAD OF TIME BEING KEPT UNDER A WARMER',
 'THE FRIES WERE GREAT TOO',
 'A GREAT TOUCH']

In [None]:
reviews.map(lambda word:word.upper()).take(1)

['WOW... LOVED THIS PLACE']

# Filter & Lambda Expressions

In [None]:
reviews.filter(lambda review:len(review) > 30)

PythonRDD[7] at RDD at PythonRDD.scala:53

In [None]:
reviews.filter(lambda review:len(review) > 30).collect()

['Not tasty and the texture was just nasty',
 'Stopped by during the late May bank holiday off Rick Steve recommendation and loved it',
 'The selection on the menu was great and so were the prices',
 'Now I am getting angry and I want my damn pho.',
 "Honeslty it didn't taste THAT fresh",
 'The potatoes were like rubber and you could tell they had been made up ahead of time being kept under a warmer']

In [None]:
reviews.map(lambda review:(review,len(review))).collect()

[('Wow... Loved this place', 23),
 ('Crust is not good', 17),
 ('Not tasty and the texture was just nasty', 40),
 ('Stopped by during the late May bank holiday off Rick Steve recommendation and loved it',
  86),
 ('The selection on the menu was great and so were the prices', 58),
 ('Now I am getting angry and I want my damn pho.', 46),
 ("Honeslty it didn't taste THAT fresh", 35),
 ('The potatoes were like rubber and you could tell they had been made up ahead of time being kept under a warmer',
  110),
 ('The fries were great too', 24),
 ('A great touch', 13)]

# FlatMap & Lambda Expressions

In [None]:
reviews.flatMap(lambda review:[review,len(review)]).collect()

['Wow... Loved this place',
 23,
 'Crust is not good',
 17,
 'Not tasty and the texture was just nasty',
 40,
 'Stopped by during the late May bank holiday off Rick Steve recommendation and loved it',
 86,
 'The selection on the menu was great and so were the prices',
 58,
 'Now I am getting angry and I want my damn pho.',
 46,
 "Honeslty it didn't taste THAT fresh",
 35,
 'The potatoes were like rubber and you could tell they had been made up ahead of time being kept under a warmer',
 110,
 'The fries were great too',
 24,
 'A great touch',
 13]

# Create RDD from a List

In [None]:
my_list

[2, 3, 4, 5, 6]

In [None]:
sc.parallelize(my_list).collect()

[2, 3, 4, 5, 6]

## **Spark DataFrames**

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession \
        .builder \
        .appName('Python Spark DataFrame Example') \
        .getOrCreate()

In [None]:
kiva = spark.read.csv('kiva_el.csv',sep=',',
                      header='true',inferSchema='true')

# Checking the Type

In [None]:
type(kiva)

pyspark.sql.dataframe.DataFrame

## **Printing the Data Schema**

In [None]:
kiva.printSchema()

root
 |-- LocationName: string (nullable = true)
 |-- ISO: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- world_region: string (nullable = true)
 |-- MPI: string (nullable = true)
 |-- geo: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- id: string (nullable = true)
 |-- funded_amount: string (nullable = true)
 |-- loan_amount: string (nullable = true)
 |-- activity: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- use: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- partner_id: string (nullable = true)
 |-- posted_time: string (nullable = true)
 |-- disbursed_time: string (nullable = true)
 |-- funded_time: string (nullable = true)
 |-- term_in_months: string (nullable = true)
 |-- lender_count: double (nullable = true)
 |-- tags: string (nullable = true)
 |-- borrower_genders: string (nulla

In [None]:
kiva.show(5)

+--------------------+---+-----------+----------+--------------------+----+--------------------+----------+-----------+------+-------------+-----------+-------------+-----------+--------------------+------------+--------+----------+--------------------+--------------------+--------------------+--------------+------------+--------------------+--------------------+------------------+----------+----+--------+---------+
|        LocationName|ISO|    country|    region|        world_region| MPI|                 geo|       lat|        lon|    id|funded_amount|loan_amount|     activity|     sector|                 use|country_code|currency|partner_id|         posted_time|      disbursed_time|         funded_time|term_in_months|lender_count|                tags|    borrower_genders|repayment_interval|      date|year|   month|      day|
+--------------------+---+-----------+----------+--------------------+----+--------------------+----------+-----------+------+-------------+-----------+--------

# Creating Pandas DataFrame

In [None]:
df = kiva.limit(5).toPandas()

In [None]:
df

Unnamed: 0,LocationName,ISO,country,region,world_region,MPI,geo,lat,lon,id,funded_amount,loan_amount,activity,sector,use,country_code,currency,partner_id,posted_time,disbursed_time,funded_time,term_in_months,lender_count,tags,borrower_genders,repayment_interval,date,year,month,day
0,"Ahuachapan, El Salvador",SLV,El Salvador,Ahuachapan,Latin America and Caribbean,0.03,"(13.8216148, -89.9253233)",13.8216148,-89.9253233,670484,1300.0,1300.0,Agriculture,Agriculture,to pay for the land's rent and buy agricultura...,SV,USD,333.0,2014-02-15 21:03:42+00:00,2014-01-16 08:00:00+00:00,2014-02-21 20:41:13+00:00,14.0,29.0,,"female, female, female",bullet,2014-02-15,2014,February,Saturday
1,"Ahuachapan, El Salvador",SLV,El Salvador,Ahuachapan,Latin America and Caribbean,0.03,"(13.8216148, -89.9253233)",13.8216148,-89.9253233,675795,1800.0,3000.0,Livestock,Agriculture,to buy cows.,SV,USD,333.0,2014-02-24 17:53:22+00:00,2014-02-06 08:00:00+00:00,,14.0,53.0,"#Animals, #Parent, #Supporting Family, user_fa...","male, male, male",bullet,2014-02-24,2014,February,Monday
2,"Ahuachapan, El Salvador",SLV,El Salvador,Ahuachapan,Latin America and Caribbean,0.03,"(13.8216148, -89.9253233)",13.8216148,-89.9253233,686887,1900.0,1900.0,Agriculture,Agriculture,to buy agricultural supplies,SV,USD,333.0,2014-03-20 22:43:26+00:00,2014-02-20 08:00:00+00:00,2014-04-24 12:40:59+00:00,14.0,31.0,"#Hidden Gem, #Supporting Family, user_favorite...","male, male, female",bullet,2014-03-20,2014,March,Thursday
3,"Ahuachapan, El Salvador",SLV,El Salvador,Ahuachapan,Latin America and Caribbean,0.03,"(13.8216148, -89.9253233)",13.8216148,-89.9253233,706942,1925.0,2200.0,Agriculture,Agriculture,to buy agricultural supplies.,SV,USD,333.0,2014-05-07 14:48:20+00:00,2014-04-10 07:00:00+00:00,,14.0,54.0,"volunteer_pick, volunteer_like, user_favorite,...","male, male, male",bullet,2014-05-07,2014,May,Wednesday
4,"Ahuachapan, El Salvador",SLV,El Salvador,Ahuachapan,Latin America and Caribbean,0.03,"(13.8216148, -89.9253233)",13.8216148,-89.9253233,716964,1275.0,1500.0,Farm Supplies,Agriculture,to buy agricultural supplies.,SV,USD,333.0,2014-05-27 20:54:47+00:00,2014-05-15 07:00:00+00:00,,14.0,7.0,"volunteer_pick, volunteer_like, #Biz Durable A...","male, male, male",bullet,2014-05-27,2014,May,Tuesday


In [None]:
kiva = kiva.withColumnRenamed('LocationName','Location')

In [None]:
kiva.show(5)

+--------------------+---+-----------+----------+--------------------+----+--------------------+----------+-----------+------+-------------+-----------+-------------+-----------+--------------------+------------+--------+----------+--------------------+--------------------+--------------------+--------------+------------+--------------------+--------------------+------------------+----------+----+--------+---------+
|            Location|ISO|    country|    region|        world_region| MPI|                 geo|       lat|        lon|    id|funded_amount|loan_amount|     activity|     sector|                 use|country_code|currency|partner_id|         posted_time|      disbursed_time|         funded_time|term_in_months|lender_count|                tags|    borrower_genders|repayment_interval|      date|year|   month|      day|
+--------------------+---+-----------+----------+--------------------+----+--------------------+----------+-----------+------+-------------+-----------+--------

# Selecting in a DataFrame

In [None]:
kiva_select = kiva.select('region','activity','sector',
                          'loan_amount','funded_amount')

In [None]:
kiva_select.show(2)

+----------+-----------+-----------+-----------+-------------+
|    region|   activity|     sector|loan_amount|funded_amount|
+----------+-----------+-----------+-----------+-------------+
|Ahuachapan|Agriculture|Agriculture|     1300.0|       1300.0|
|Ahuachapan|  Livestock|Agriculture|     3000.0|       1800.0|
+----------+-----------+-----------+-----------+-------------+
only showing top 2 rows



## **Filtering a DataFrame**

In [None]:
kiva_select.filter((kiva_select.loan_amount > 100) 
              & (kiva_select.activity == 'Livestock')).show()

+----------+---------+-----------+-----------+-------------+
|    region| activity|     sector|loan_amount|funded_amount|
+----------+---------+-----------+-----------+-------------+
|Ahuachapan|Livestock|Agriculture|     3000.0|       1800.0|
|San Miguel|Livestock|Agriculture|     1600.0|       1600.0|
|San Miguel|Livestock|Agriculture|     1000.0|       1000.0|
|San Miguel|Livestock|Agriculture|      800.0|        800.0|
|San Miguel|Livestock|Agriculture|      500.0|        500.0|
|San Miguel|Livestock|Agriculture|     1500.0|       1500.0|
|San Miguel|Livestock|Agriculture|      700.0|        700.0|
|San Miguel|Livestock|Agriculture|     1000.0|       1000.0|
|San Miguel|Livestock|Agriculture|     1000.0|       1000.0|
|San Miguel|Livestock|Agriculture|      500.0|        500.0|
|San Miguel|Livestock|Agriculture|      650.0|        650.0|
|San Miguel|Livestock|Agriculture|      800.0|        800.0|
|San Miguel|Livestock|Agriculture|      600.0|        600.0|
|San Miguel|Livestock|Ag

In [None]:
kiva_select.filter((kiva_select.loan_amount > 100) 
              & (kiva_select.activity == 'Livestock')).collect()

# Group By

In [None]:
from pyspark.sql import functions as F

In [None]:
kiva_select.groupby(['sector','activity']).agg(
    F.sum('funded_amount').alias('Total Funded Amount')
).show()

+-------------+--------------------+-------------------+
|       sector|            activity|Total Funded Amount|
+-------------+--------------------+-------------------+
|         Food|Food Production/S...|           214125.0|
|       Retail| Home Products Sales|             2500.0|
|Manufacturing|          Metal Shop|            10150.0|
|     Services|         Auto Repair|            12300.0|
| Personal Use|   Personal Expenses|            17050.0|
| Personal Use|         Home Energy|             5700.0|
|         Food|       Cheese Making|             5800.0|
|  Agriculture|             Flowers|             1400.0|
|  Agriculture|               Dairy|            18575.0|
|       Retail|    Electrical Goods|              650.0|
|         Arts|              Crafts|             3200.0|
| Construction|Construction Supp...|              400.0|
|     Services|            Printing|             1200.0|
|     Clothing|            Clothing|             9000.0|
|         Food|             Fis

## **Running SQL Queries**

In [None]:
from pyspark.sql import SQLContext

In [None]:
sqlContext = SQLContext(sc)

In [None]:
kiva_select.registerTempTable('kiva_table')

In [None]:
kiva_sql = sqlContext.sql('SELECT * FROM kiva_table WHERE funded_amount>100 \
                           ORDER BY funded_amount DESC')

In [None]:
kiva_sql.show()

+----------+--------------------+-----------+-----------+-------------+
|    region|            activity|     sector|loan_amount|funded_amount|
+----------+--------------------+-----------+-----------+-------------+
|San Miguel|            Services|   Services|     1600.0|        975.0|
|San Miguel|         Agriculture|Agriculture|      975.0|        975.0|
|San Miguel|              Retail|     Retail|     1600.0|        975.0|
|San Miguel|Personal Housing ...|    Housing|     1500.0|        975.0|
|San Miguel| Home Products Sales|     Retail|     1025.0|        975.0|
|San Miguel|           Livestock|Agriculture|     1200.0|        975.0|
|San Miguel|         Agriculture|Agriculture|      950.0|        950.0|
|San Miguel|         Agriculture|Agriculture|      950.0|        950.0|
|San Miguel|         Auto Repair|   Services|     1500.0|        950.0|
|San Miguel|           Tailoring|   Services|      950.0|        950.0|
|San Miguel|              Cattle|Agriculture|      950.0|       

## **Happy Learning**