In [1]:
!pip install -q findspark


In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

# Job DataSet analysis using PySpark

- Create a pyspark dataframe from the CSV, this sets the firrst row as column headers

In [14]:
job_data = spark.read.format("csv").option("header",
True).load("original.csv")
job_data.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

# Handle missing values and clean the dataset

In [16]:
#check and handle missing values
from pyspark.sql.functions import *

#creating a new column clean city, where if city is null it replaces it with unknown
job_data2 = job_data.withColumn("Clean_City", when(job_data.City.isNull(), "Unknown").otherwise(job_data.City))

#rermoving rows will null job titles
job_data2 =job_data2.filter(job_data2.JobTitle.isNotNull())

#converting salary to a numeric value, remove currency symbols and cast to float
job_data2 = job_data2.withColumn("Clean_Salary", col("Salary").substr(2,100).cast("float"))

#replace null salaries with mean value of salary
mean_salary = job_data2.selectExpr("avg(Clean_Salary) as mean").collect()[0]["mean"]

from pyspark.sql.functions import lit
job_data2 = job_data2.withColumn("New_Salary", when(job_data2.Clean_Salary.isNull(), lit(mean_salary)).otherwise(job_data2.Clean_Salary))
job_data2.show()


+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     Clean_City|Clean_Salary|      New_Salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 3

In [18]:
#rerplacing null latitude values with the median. Convert a latitude to float
import numpy as np
latitudes =job_data2.select("Latitude").filter(job_data2.Latitude.isNotNull()).withColumn("Latitude", col("Latitude").cast("float"))

median_lat = np.median([row["Latitude"] for row in latitudes.collect()])
job_data2 = job_data2.withColumn("Updated_Latitude", when(job_data2.Latitude.isNull(), lit(median_lat)).otherwise(job_data2.Latitude)) 

job_data2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     Clean_City|Clean_Salary|      New_Salary| Updated_Latitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|     

# SALARY ANALYSIS
- do men or women make morre on average
- analyze salary differences by job title
- identify which city has the highest average salaryr


# Identifying gender salary differences


In [19]:
#SALARRY ANALYSIS BY GENDER
from pyspark.sql.functions import col, avg, when, lit

#groupby the gender and calculate average salary per gender
genders = job_data2.groupBy("Gender").agg(avg("New_Salary").alias("Average_Salary"))
genders.show()

+------+------------------+
|Gender|    Average_Salary|
+------+------------------+
|Female|55677.250125558036|
|  Male| 55361.09385573019|
+------+------------------+



In [20]:
#creating gender specific salarry columns
df = job_data2.withColumn("Female_Salary", when(col("Gender") =="Female", col("New_Salary")).otherwise(lit(0)))
df = df.withColumn("Male_Salary", when(col("Gender") == "Male",col("New_Salary")).otherwise(lit(0)))

df.show()


+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     Clean_City|Clean_Salary|      New_Salary| Updated_Latitude|   Female_Salary|     Male_Salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|   57438.1796875|             0.0|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|   62846.6015625|   

In [21]:
# SALARY ANALYSIS BY JOB TITLE
#grouping by job title and computing averages

df = df.groupBy("JobTitle").agg(
 avg("Female_Salary").alias("Final_Female_Salary"),
 avg("Male_Salary").alias("Final_Male_Salary"))
df.show()


+--------------------+-------------------+------------------+
|            JobTitle|Final_Female_Salary| Final_Male_Salary|
+--------------------+-------------------+------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|
|   Media Manager III| 29586.436197916668|17381.920572916668|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|
|       Geologist III|       31749.046875|    12830.75390625|
|        Geologist II|                0.0|   43293.865234375|
|Database Administ...|                0.0|     52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625|
|Software Engineer II|                0.0|      74782.640625|
|       Accountant IV|    82732.248046875|               0.0|
|    Product Engineer|     41825.48359375|       20464.94375|
|Software Test Eng...|   32218.6083984375|   27122.462890625|
|Safety Technician...|                0.0|   29421.529296875|
|    Jun

In [22]:
#computing salary difference between avg male and avg female slaary
df = df.withColumn("Delta", col("Final_Female_Salary") -
col("Final_Male_Salary"))
df.show()



+--------------------+-------------------+------------------+-------------------+
|            JobTitle|Final_Female_Salary| Final_Male_Salary|              Delta|
+--------------------+-------------------+------------------+-------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|   35049.5244140625|
|   Media Manager III| 29586.436197916668|17381.920572916668|       12204.515625|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|  8464.957356770836|
|       Geologist III|       31749.046875|    12830.75390625|     18918.29296875|
|        Geologist II|                0.0|   43293.865234375|   -43293.865234375|
|Database Administ...|                0.0|     52018.4609375|     -52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|   -16252.279296875|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625| -4636.834765625001|
|Software Engineer II|                0.0|      74782.640625|      -74782.640625|
|       Accounta

# identifying highest paying city

In [23]:
# calculating average salary by city
city_avg = job_data2.groupBy("City").agg(avg("New_Salary").alias("Average_Salary"))

#sorting city by highest salary - lowest
city_avg = city_avg.sort(col("Average_Salary").desc())
city_avg.show()

+-----------------+--------------+
|             City|Average_Salary|
+-----------------+--------------+
|        Mesopotam|   99948.28125|
|       Zhongcheng|  99942.921875|
|           Caxias| 99786.3984375|
|      Karangtawar| 99638.9921875|
|        Itabaiana|   99502.15625|
|           Pasian|   99421.34375|
|           Webuye|  99368.546875|
|      Yuktae-dong|  99250.828125|
|           Zinder|   99222.84375|
|   Timiryazevskiy|    99142.9375|
|        Sawahbaru| 99013.7109375|
|          Madimba| 98737.8671875|
|         Huangshi|   98690.34375|
|          Gharyan|    98679.3125|
|         Yŏnan-ŭp|  98628.609375|
|     Wringinputih| 98603.8203125|
|Monte da Boavista|   98586.71875|
|          Klukeng| 98439.4921875|
|         Murmashi|   98226.15625|
|        Fox Creek|       98138.0|
+-----------------+--------------+
only showing top 20 rows



# Data Aggregations and exporrting dataframes

In [25]:
df = job_data2.withColumn("Clean_Salary", col("Salary").substr(2,
100).cast("float"))

#performing multiple aggreggations
from pyspark.sql.functions import avg, min, max
df1 = df.groupBy("Gender").agg(
 sum("Clean_Salary").alias("Total_Salary"),
 avg("Clean_Salary").alias("Average_Salary"),
 min("Clean_Salary").alias("Min_Salary"),
 max("Clean_Salary").alias("Max_Salary")
)
df1.show()

+------+--------------------+------------------+----------+----------+
|Gender|        Total_Salary|    Average_Salary|Min_Salary|Max_Salary|
+------+--------------------+------------------+----------+----------+
|Female|2.7281852561523438E7|55677.250125558036|  10616.44|  99948.28|
|  Male|2.8123435678710938E7| 55361.09385573019|  10101.92|  99942.92|
+------+--------------------+------------------+----------+----------+



In [26]:
#grouping gender and city
df1 = df.groupBy("Gender", "City").agg(
 sum("Clean_Salary").alias("Total_Salary"),
 avg("Clean_Salary").alias("Average_Salary"),
 min("Clean_Salary").alias("Min_Salary"),
 max("Clean_Salary").alias("Max_Salary")
)
df1.show()

+------+-----------------+----------------+----------------+----------+----------+
|Gender|             City|    Total_Salary|  Average_Salary|Min_Salary|Max_Salary|
+------+-----------------+----------------+----------------+----------+----------+
|Female|      Trollhättan|106623.369140625|53311.6845703125|  26830.47|   79792.9|
|  Male|          Wenshao| 18941.509765625| 18941.509765625|  18941.51|  18941.51|
|Female|            Lanas| 13765.900390625| 13765.900390625|   13765.9|   13765.9|
|  Male|            Mörön|    77940.078125|    77940.078125|  77940.08|  77940.08|
|Female|             Same|   73369.7265625|   73369.7265625|  73369.73|  73369.73|
|Female|          Sawahan|  24608.83984375|  24608.83984375|  24608.84|  24608.84|
|  Male|Monte da Boavista|     98586.71875|     98586.71875|  98586.72|  98586.72|
|Female|         Nusajaya|    71637.921875|    71637.921875|  71637.92|  71637.92|
|Female|            Kista|   96192.3984375|   96192.3984375|   96192.4|   96192.4|
|  M

# Exporting DataFrames

In [28]:
#we want to save the data that we grouped together (gender and city) to a new dataframe

#writing to a csv
df1.write.csv("df1.csv")


#writing to json
df12.write.json("df1.json")

#writing to parquet
df13.write.parquet("df1.parquet")


AnalysisException: [PATH_ALREADY_EXISTS] Path file:/home/jovyan/work/Lesson5/PySpark/df1.csv already exists. Set mode as "overwrite" to overwrite the existing path.