# PySpark 101 

In [1]:
import numpy as np
import pandas as pd

import warnings
warnings.filterwarnings("ignore")

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [6]:
from IPython.display import display, HTML
display(HTML("<style>:root { --jp-notebook-max-width: 95% !important; }</style>"))

## Spark Packages

In [7]:
from pyspark.sql import SparkSession

## Create Spark Session : which establishes a connection to the spark core

In [9]:
spark = SparkSession.builder.getOrCreate()

## Read input csv file

In [8]:
df = pd.read_csv('data/input/school-decile.csv', encoding='cp1252')
df.head(2)

Unnamed: 0,School Id,Org Name,Telephone,Fax,Email^,Principal*,URL,Physical Address Line1,Physical Address Suburb,Physical Address City,...,Isolation Index,Decile,Total,European,M?ori,Pacific,Asian,MELAA,Other,International
0,1,Te Kura o Te Kao,09 409 7813,,office@tekao.school.nz,Ng?waiata Evans (Acting),http://www.tekuraotekao.school.nz,6603 Far North Road,,Te Kao,...,4.18,2.0,36,0,36,0,0,0,0,0
1,2,Taipa Area School,09 406 0159,09 406 1096,office@taipa.school.nz,Doreen Bailey,http://www.taipa.school.nz,578 State Highway 10,,Taipa,...,2.88,2.0,300,34,246,11,8,0,1,0


In [10]:
spark_df = spark.read.csv(path = 'data/input/school-decile.csv', header = True, encoding='cp1252')

In [11]:
spark_df.show(2)

+---------+-----------------+-----------+-----------+--------------------+--------------------+--------------------+----------------------+-----------------------+---------------------+--------------------+---------------------+-------------------+--------------------------+----------------+---------+--------------------+---------+--------------------+--------------+---------------------+----------------+-----------------+----------------+------------------+----------------+------------------+------------+------+--------------------+----------+----------+---------------+------+-----+--------+-----+-------+-----+-----+-----+-------------+
|School Id|         Org Name|  Telephone|        Fax|              Email^|          Principal*|                 URL|Physical Address Line1|Physical Address Suburb|Physical Address City|Postal Address Line1|Postal Address Suburb|Postal Address City|Postal Address Postal Code|      Urban Area| Org Type|          Definition|Authority|    School Donations|

In [16]:
spark_df['School Id','Org Name'].show(2)

+---------+-----------------+
|School Id|         Org Name|
+---------+-----------------+
|        1| Te Kura o Te Kao|
|        2|Taipa Area School|
+---------+-----------------+
only showing top 2 rows



In [12]:
type(spark_df)

pyspark.sql.dataframe.DataFrame

## Create a view of the dataframe : Can sql query from the view

In [17]:
spark_df.createOrReplaceTempView('spark_df_view')

In [18]:
select_query = 'select * from spark_df_view limit 2'

In [21]:
query_result = spark.sql(select_query)

In [22]:
query_result.show()

+---------+-----------------+-----------+-----------+--------------------+--------------------+--------------------+----------------------+-----------------------+---------------------+--------------------+---------------------+-------------------+--------------------------+----------------+---------+--------------------+---------+--------------------+--------------+---------------------+----------------+-----------------+----------------+------------------+----------------+------------------+------------+------+--------------------+----------+----------+---------------+------+-----+--------+-----+-------+-----+-----+-----+-------------+
|School Id|         Org Name|  Telephone|        Fax|              Email^|          Principal*|                 URL|Physical Address Line1|Physical Address Suburb|Physical Address City|Postal Address Line1|Postal Address Suburb|Postal Address City|Postal Address Postal Code|      Urban Area| Org Type|          Definition|Authority|    School Donations|

In [27]:
query_result['School Id','Org Name'].show()

+---------+-----------------+
|School Id|         Org Name|
+---------+-----------------+
|        1| Te Kura o Te Kao|
|        2|Taipa Area School|
+---------+-----------------+



## Row Filter dataframe

In [24]:
spark_df.filter(spark_df.Decile >  4).show(2)

+---------+--------------------+-----------+-----------+--------------------+----------------+--------------------+----------------------+-----------------------+---------------------+--------------------+---------------------+-------------------+--------------------------+----------------+--------------------+--------------+---------+--------------------+--------------+---------------------+----------------+-----------------+----------------+------------------+----------------+------------------+--------------------+------+--------------------+----------+---------+---------------+------+-----+--------+-----+-------+-----+-----+-----+-------------+
|School Id|            Org Name|  Telephone|        Fax|              Email^|      Principal*|                 URL|Physical Address Line1|Physical Address Suburb|Physical Address City|Postal Address Line1|Postal Address Suburb|Postal Address City|Postal Address Postal Code|      Urban Area|            Org Type|    Definition|Authority|    Sc

## Column Filter Dataframe 

In [26]:
spark_df.select(['Telephone']).show(4)

+-----------+
|  Telephone|
+-----------+
|09 409 7813|
|09 406 0159|
|09 408 0190|
|09 405 0199|
+-----------+
only showing top 4 rows



## Mutliple parameterised filter

In [31]:
isolation_filter = spark_df['Isolation Index'] > 2
decile_filter = spark_df['Decile'] > 1

In [32]:
spark_df.count()

2556

In [34]:
filtered_spark_df = spark_df.filter(isolation_filter).filter(decile_filter)

In [35]:
filtered_spark_df.count()

98

In [36]:
filtered_spark_df.show(2)

+---------+--------------------+-----------+-----------+--------------------+--------------------+--------------------+----------------------+-----------------------+---------------------+--------------------+---------------------+-------------------+--------------------------+----------+---------+--------------------+---------+--------------------+--------------+---------------------+----------------+-----------------+--------------------+------------------+----------------+----------+---------------+------+------------------+----------+----------+---------------+------+-----+--------+-----+-------+-----+-----+-----+-------------+
|School Id|            Org Name|  Telephone|        Fax|              Email^|          Principal*|                 URL|Physical Address Line1|Physical Address Suburb|Physical Address City|Postal Address Line1|Postal Address Suburb|Postal Address City|Postal Address Postal Code|Urban Area| Org Type|          Definition|Authority|    School Donations|   CoEd S

## Schema of spark dataframe

In [39]:
spark_df.printSchema()

root
 |-- School Id: string (nullable = true)
 |-- Org Name: string (nullable = true)
 |-- Telephone: string (nullable = true)
 |-- Fax: string (nullable = true)
 |-- Email^: string (nullable = true)
 |-- Principal*: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- Physical Address Line1: string (nullable = true)
 |-- Physical Address Suburb: string (nullable = true)
 |-- Physical Address City: string (nullable = true)
 |-- Postal Address Line1: string (nullable = true)
 |-- Postal Address Suburb: string (nullable = true)
 |-- Postal Address City: string (nullable = true)
 |-- Postal Address Postal Code: string (nullable = true)
 |-- Urban Area: string (nullable = true)
 |-- Org Type: string (nullable = true)
 |-- Definition: string (nullable = true)
 |-- Authority: string (nullable = true)
 |-- School Donations: string (nullable = true)
 |-- CoEd Status: string (nullable = true)
 |-- Territorial Authority: string (nullable = true)
 |-- Regional Council: string (nullabl

### Read with inferschema

In [40]:
spark_df = spark.read.csv(path = 'data/input/school-decile.csv', header = True, encoding='cp1252', inferSchema = True)

In [41]:
spark_df.printSchema()

root
 |-- School Id: integer (nullable = true)
 |-- Org Name: string (nullable = true)
 |-- Telephone: string (nullable = true)
 |-- Fax: string (nullable = true)
 |-- Email^: string (nullable = true)
 |-- Principal*: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- Physical Address Line1: string (nullable = true)
 |-- Physical Address Suburb: string (nullable = true)
 |-- Physical Address City: string (nullable = true)
 |-- Postal Address Line1: string (nullable = true)
 |-- Postal Address Suburb: string (nullable = true)
 |-- Postal Address City: string (nullable = true)
 |-- Postal Address Postal Code: string (nullable = true)
 |-- Urban Area: string (nullable = true)
 |-- Org Type: string (nullable = true)
 |-- Definition: string (nullable = true)
 |-- Authority: string (nullable = true)
 |-- School Donations: string (nullable = true)
 |-- CoEd Status: string (nullable = true)
 |-- Territorial Authority: string (nullable = true)
 |-- Regional Council: string (nullab

## Dataframe details

### dtype

In [46]:
spark_df.dtypes

[('School Id', 'int'),
 ('Org Name', 'string'),
 ('Telephone', 'string'),
 ('Fax', 'string'),
 ('Email^', 'string'),
 ('Principal*', 'string'),
 ('URL', 'string'),
 ('Physical Address Line1', 'string'),
 ('Physical Address Suburb', 'string'),
 ('Physical Address City', 'string'),
 ('Postal Address Line1', 'string'),
 ('Postal Address Suburb', 'string'),
 ('Postal Address City', 'string'),
 ('Postal Address Postal Code', 'string'),
 ('Urban Area', 'string'),
 ('Org Type', 'string'),
 ('Definition', 'string'),
 ('Authority', 'string'),
 ('School Donations', 'string'),
 ('CoEd Status', 'string'),
 ('Territorial Authority', 'string'),
 ('Regional Council', 'string'),
 ('Local Office Name', 'string'),
 ('Education Region', 'string'),
 ('General Electorate', 'string'),
 ('M?ori Electorate', 'string'),
 ('Area Unit', 'string'),
 ('Ward', 'string'),
 ('Col Id', 'int'),
 ('Col Name', 'string'),
 ('Latitude', 'double'),
 ('Longitude', 'double'),
 ('Isolation Index', 'string'),
 ('Decile', 'int')

### describe

In [55]:
spark_df.select(['School Id', 'Telephone', 'Fax', 'Email^', 'Decile']).describe().show()

+-------+------------------+-------------+-----------+--------------------+------------------+
|summary|         School Id|    Telephone|        Fax|              Email^|            Decile|
+-------+------------------+-------------+-----------+--------------------+------------------+
|  count|              2556|         2550|       2180|                2468|              2542|
|   mean|2029.8724569640062|         NULL|       NULL|                NULL| 6.643587726199843|
| stddev|1320.5517151019496|         NULL|       NULL|                NULL|11.148828442110686|
|    min|                 1|020 418 34857|03 201 6042|Beyza.McEvoy@macl...|                 1|
|    max|              6980|  09 975 7400|09 974 6030|xtend@nayland.sch...|                99|
+-------+------------------+-------------+-----------+--------------------+------------------+



## Modifying columns

### Add new column

In [76]:
spark_df = spark_df.withColumn(colName = 'temp_col', col = spark_df['Decile'] * 10)

In [77]:
spark_df.select(['Decile', 'temp_col']).show(5)

+------+--------+
|Decile|temp_col|
+------+--------+
|     2|      20|
|     2|      20|
|     2|      20|
|     1|      10|
|     6|      60|
+------+--------+
only showing top 5 rows



In [78]:
spark_df.columns

['School Id',
 'Org Name',
 'Telephone',
 'Fax',
 'Email^',
 'Principal*',
 'URL',
 'Physical Address Line1',
 'Physical Address Suburb',
 'Physical Address City',
 'Postal Address Line1',
 'Postal Address Suburb',
 'Postal Address City',
 'Postal Address Postal Code',
 'Urban Area',
 'Org Type',
 'Definition',
 'Authority',
 'School Donations',
 'CoEd Status',
 'Territorial Authority',
 'Regional Council',
 'Local Office Name',
 'Education Region',
 'General Electorate',
 'M?ori Electorate',
 'Area Unit',
 'Ward',
 'Col Id',
 'Col Name',
 'Latitude',
 'Longitude',
 'Isolation Index',
 'Decile',
 'Total',
 'European',
 'M?ori',
 'Pacific',
 'Asian',
 'MELAA',
 'Other',
 'International',
 'temp_col']

### Drop a column

In [79]:
spark_df = spark_df.drop('temp_col')

In [80]:
spark_df.columns

['School Id',
 'Org Name',
 'Telephone',
 'Fax',
 'Email^',
 'Principal*',
 'URL',
 'Physical Address Line1',
 'Physical Address Suburb',
 'Physical Address City',
 'Postal Address Line1',
 'Postal Address Suburb',
 'Postal Address City',
 'Postal Address Postal Code',
 'Urban Area',
 'Org Type',
 'Definition',
 'Authority',
 'School Donations',
 'CoEd Status',
 'Territorial Authority',
 'Regional Council',
 'Local Office Name',
 'Education Region',
 'General Electorate',
 'M?ori Electorate',
 'Area Unit',
 'Ward',
 'Col Id',
 'Col Name',
 'Latitude',
 'Longitude',
 'Isolation Index',
 'Decile',
 'Total',
 'European',
 'M?ori',
 'Pacific',
 'Asian',
 'MELAA',
 'Other',
 'International']

### Rename columns

In [88]:
spark_df = spark_df.withColumnsRenamed({'Other': 'Others', 'International' :  'Internationals'})

In [89]:
spark_df.columns

['School Id',
 'Org Name',
 'Telephone',
 'Fax',
 'Email^',
 'Principal*',
 'URL',
 'Physical Address Line1',
 'Physical Address Suburb',
 'Physical Address City',
 'Postal Address Line1',
 'Postal Address Suburb',
 'Postal Address City',
 'Postal Address Postal Code',
 'Urban Area',
 'Org Type',
 'Definition',
 'Authority',
 'School Donations',
 'CoEd Status',
 'Territorial Authority',
 'Regional Council',
 'Local Office Name',
 'Education Region',
 'General Electorate',
 'M?ori Electorate',
 'Area Unit',
 'Ward',
 'Col Id',
 'Col Name',
 'Latitude',
 'Longitude',
 'Isolation Index',
 'Decile',
 'Total',
 'European',
 'M?ori',
 'Pacific',
 'Asian',
 'MELAA',
 'Others',
 'Internationals']

## Drop na

In [95]:
spark_df.count()

2556

In [97]:
spark_df_dropped = spark_df.na.drop()
spark_df_dropped.count()

620