# A Crash Course In PySpark - Udemy

# Latest Config Code

The latest config code for Lecture 6 is included below. This can change when new versions of Spark are released. The following video covers off a similar config script (related to a previous version of Spark). However, it covers the main concepts of this code snippet to help you understand it's purpose.

In [1]:
# Latest Config Code

!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark


In [3]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()

from pyspark.sql import SparkSession
# spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

# Ingesting & Cleaning Data

In [4]:
# 1. Upload data file
# "/content/original.csv"

# 2. read data
mydata = spark.read.format("csv").option("header", "true").load("/content/original.csv")
mydata.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [9]:
from pyspark.sql.functions import *

# add new column 'clean_city' with null replace with 'Unknown'
mydata2 = mydata.withColumn("clean_city", when(mydata.City.isNull(), 'Unknown').otherwise(mydata.City))
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|        Unknown|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Femal

In [10]:
# filter rows from 'JobTitle' column which is not null
mydata2 = mydata2.filter(mydata2.JobTitle.isNotNull())
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|Kinsealy-Drinan|
|  8|   Goddart|     Flear|  Mal

In [11]:
# replace null values from 'Salary' column with mean
# but do some preprocessing first

# to remove '$' sign take sub-string from 2nd place to nth place and type cast it to float
mydata2 = mydata2.withColumn("clean_salary", mydata2.Salary.substr(2,100).cast("float"))
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|    63863.09|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil 

In [12]:
mean = mydata2.groupBy().avg("clean_salary").take(1)[0][0]
print(mean)

55516.32088199837


In [13]:
from pyspark.sql.functions import lit # to use the litral value

# replace null with litral value of mean
mydata2 = mydata2.withColumn("new_salary", when(mydata2.clean_salary.isNull(), lit(mean)).otherwise(mydata2.clean_salary))
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      new_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 3

In [14]:
# repalce null value from 'Latitude' and 'Longitude' column with median
import numpy as np

latitudes = mydata2.select("Latitude")
latitudes.show()

+----------+
|  Latitude|
+----------+
|50.5774075|
|48.8231572|
|44.5047212|
|      null|
|53.4266145|
|45.1905186|
| 32.027934|
|  4.272793|
|     -5.85|
| 39.172378|
|49.8151822|
|42.1014803|
|49.7923299|
|43.4945737|
|52.7441662|
| 38.696249|
|-7.7232567|
|40.7172049|
|  49.16291|
|40.7576842|
+----------+
only showing top 20 rows



In [15]:
latitudes = latitudes.filter(latitudes.Latitude.isNotNull())
latitudes.show()

+----------+
|  Latitude|
+----------+
|50.5774075|
|48.8231572|
|44.5047212|
|53.4266145|
|45.1905186|
| 32.027934|
|  4.272793|
|     -5.85|
| 39.172378|
|49.8151822|
|42.1014803|
|49.7923299|
|43.4945737|
|52.7441662|
| 38.696249|
|-7.7232567|
|40.7172049|
|  49.16291|
|40.7576842|
|48.4902808|
+----------+
only showing top 20 rows



In [16]:
latitudes = latitudes.withColumn("latitude2", latitudes.Latitude.cast("float")).select("latitude2")
latitudes.show()

+----------+
| latitude2|
+----------+
| 50.577408|
|  48.82316|
| 44.504723|
| 53.426613|
| 45.190517|
| 32.027935|
|  4.272793|
|     -5.85|
|  39.17238|
|  49.81518|
|  42.10148|
|  49.79233|
| 43.494576|
| 52.744167|
| 38.696247|
|-7.7232566|
| 40.717205|
|  49.16291|
| 40.757683|
|  48.49028|
+----------+
only showing top 20 rows



In [17]:
median = np.median(latitudes.collect())
print(median)

31.93397331237793


In [18]:
mydata2 = mydata2.withColumn("lat", when(mydata2.Latitude.isNull(), lit(median)).otherwise(mydata2.Latitude))
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      new_salary|              lat|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|     

In [22]:
# Answering our scenario questions
import pyspark.sql.functions as sqlfunc

# 1. Avg. salary based on genders
genders = mydata2.groupBy("gender").agg(sqlfunc.avg("new_salary").alias('AvgSalary'))
genders.show()

+------+------------------+
|gender|         AvgSalary|
+------+------------------+
|Female|55677.250125558036|
|  Male| 55361.09385573019|
+------+------------------+



In [25]:
# 2. Avg. salary based on JobTitle
departments = mydata2.groupBy("JobTitle").agg(sqlfunc.avg("new_salary").alias('AvgSalary'))
departments.show()

+--------------------+------------------+
|            JobTitle|         AvgSalary|
+--------------------+------------------+
|Systems Administr...|  66131.4248046875|
|   Media Manager III|46968.356770833336|
|  Recruiting Manager|61231.947591145836|
|       Geologist III|    44579.80078125|
|        Geologist II|   43293.865234375|
|Database Administ...|     52018.4609375|
|   Financial Analyst|   62959.833203125|
|  Analyst Programmer|     37449.0921875|
|Software Engineer II|      74782.640625|
|       Accountant IV|   82732.248046875|
|    Product Engineer|    62290.42734375|
|Software Test Eng...|  59341.0712890625|
|Safety Technician...|   29421.529296875|
|    Junior Executive|    65262.55078125|
|Systems Administr...|       77059.21875|
|Human Resources A...| 40631.93994140625|
|        VP Marketing|60825.713216145836|
|  Environmental Tech|59367.870768229164|
|Mechanical System...|         75692.375|
| Assistant Professor|    49088.72421875|
+--------------------+------------

In [26]:
# 3. Avg. salary based on City
cities = mydata2.groupBy("City").agg(sqlfunc.avg("new_salary").alias('AvgSalary'))
cities.show()

+-----------------+----------------+
|             City|       AvgSalary|
+-----------------+----------------+
|        Sułkowice|  33432.98828125|
|          Klippan|     77039.46875|
|      Trollhättan|53311.6845703125|
|        Shinaihai|    39544.640625|
|         Hongzhou|  35707.30859375|
|         Cipinang| 11617.509765625|
| Viejo Daan Banua|         43927.5|
|         Tsiatsan| 18795.439453125|
|       San Andres|  52426.80078125|
|           Krasna|   72022.7890625|
|      Springfield|40697.3251953125|
|            Město|  27797.98046875|
|Chaloem Phra Kiat|  54840.19921875|
|          Tadotsu|  55595.30078125|
|   Hénin-Beaumont|        55082.75|
|          Kajaani|  20224.83984375|
|           Duozhu|    71416.859375|
|           Abéché|   93375.1796875|
|     Habingkloang|     56892.96875|
|         Malishka|   76783.4765625|
+-----------------+----------------+
only showing top 20 rows



In [28]:
# 4. find avg male and female salary w.r.t. job title
# df = mydata2.groupBy("JobTitle", "gender").agg(sqlfunc.avg("new_salary").alias('AvgSalary'))
# df.show()

df = mydata2.withColumn("female_salary", when(mydata2.gender == 'Female', mydata2.new_salary).otherwise(lit(0)))
df = df.withColumn("male_salary", when(mydata2.gender == 'Male', mydata2.new_salary).otherwise(lit(0)))

df = df.groupBy("JobTitle").agg(sqlfunc.avg("female_salary").alias('avg_female_salary'),
                                sqlfunc.avg("male_salary").alias('avg_male_salary'))
df.show()

+--------------------+------------------+------------------+
|            JobTitle| avg_female_salary|   avg_male_salary|
+--------------------+------------------+------------------+
|Systems Administr...|   50590.474609375|  15540.9501953125|
|   Media Manager III|29586.436197916668|17381.920572916668|
|  Recruiting Manager|34848.452473958336|  26383.4951171875|
|       Geologist III|      31749.046875|    12830.75390625|
|        Geologist II|               0.0|   43293.865234375|
|Database Administ...|               0.0|     52018.4609375|
|   Financial Analyst|   23353.776953125|       39606.05625|
|  Analyst Programmer|  16406.1287109375|  21042.9634765625|
|Software Engineer II|               0.0|      74782.640625|
|       Accountant IV|   82732.248046875|               0.0|
|    Product Engineer|    41825.48359375|       20464.94375|
|Software Test Eng...|  32218.6083984375|   27122.462890625|
|Safety Technician...|               0.0|   29421.529296875|
|    Junior Executive|15

In [29]:
# 5. find delta between male and female salary
df = df.withColumn("delta", df.avg_female_salary - df.avg_male_salary)
df.show()

+--------------------+------------------+------------------+-------------------+
|            JobTitle| avg_female_salary|   avg_male_salary|              delta|
+--------------------+------------------+------------------+-------------------+
|Systems Administr...|   50590.474609375|  15540.9501953125|   35049.5244140625|
|   Media Manager III|29586.436197916668|17381.920572916668|       12204.515625|
|  Recruiting Manager|34848.452473958336|  26383.4951171875|  8464.957356770836|
|       Geologist III|      31749.046875|    12830.75390625|     18918.29296875|
|        Geologist II|               0.0|   43293.865234375|   -43293.865234375|
|Database Administ...|               0.0|     52018.4609375|     -52018.4609375|
|   Financial Analyst|   23353.776953125|       39606.05625|   -16252.279296875|
|  Analyst Programmer|  16406.1287109375|  21042.9634765625| -4636.834765625001|
|Software Engineer II|               0.0|      74782.640625|      -74782.640625|
|       Accountant IV|   827

In [30]:
# 6. which city has highest Salary
cityavg = mydata2.groupBy("City").agg(sqlfunc.avg("new_salary").alias('AvgSalary'))
cityavg = cityavg.sort(cityavg.AvgSalary.desc())
cityavg.show()

+-----------------+-------------+
|             City|    AvgSalary|
+-----------------+-------------+
|        Mesopotam|  99948.28125|
|       Zhongcheng| 99942.921875|
|           Caxias|99786.3984375|
|      Karangtawar|99638.9921875|
|        Itabaiana|  99502.15625|
|           Pasian|  99421.34375|
|           Webuye| 99368.546875|
|      Yuktae-dong| 99250.828125|
|           Zinder|  99222.84375|
|   Timiryazevskiy|   99142.9375|
|        Sawahbaru|99013.7109375|
|          Madimba|98737.8671875|
|         Huangshi|  98690.34375|
|          Gharyan|   98679.3125|
|         Yŏnan-ŭp| 98628.609375|
|     Wringinputih|98603.8203125|
|Monte da Boavista|  98586.71875|
|          Klukeng|98439.4921875|
|         Murmashi|  98226.15625|
|        Fox Creek|      98138.0|
+-----------------+-------------+
only showing top 20 rows



# Course content

# Bringing data into dataframes

In [31]:
df2 = spark.read.csv("/content/original.csv", header=True)
df2.show()

# other file formate are simple. eg. spark.read.text(....)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [32]:
df2.dtypes

[('id', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string')]

In [36]:
from pyspark.sql.types import *

schema = StructType([
    StructField("id", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("gender", StringType()),
    StructField("City", StringType()),
    StructField("JobTitle", StringType()),
    StructField("Salary", StringType()),
    StructField("Latitude", FloatType()),
    StructField("Longitude", FloatType())
])

df3 = spark.read.csv("/content/original.csv", header=True, schema=schema)
df3.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [37]:
df3.dtypes

[('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'float'),
 ('Longitude', 'float')]