### Import pyspark & create  a session 

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('localhost').getOrCreate() # To create a session on standalone pyspark cluster

#### read the data in spark session, format of type csv , header (to make first row as header) and load the original file

In [None]:
mydata = spark.read.format('csv').option("header", "true").load("original.csv")

#### show the dataframe

In [None]:
mydata.show()

#### Show the datatypes of dataframes

In [105]:
mydata.dtypes

[('id', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string')]

In [None]:
from pyspark.sql.functions import *

##### add a new column and add a criteria to check if the data is null, provide a default value otherwise the city value

In [None]:
mydata2= mydata.withColumn("clean_city",when(mydata.City.isNull(),'Unknown').otherwise(mydata.City))

In [None]:
mydata2.show()

#### Filter the column where job titile is not null & replace the existing df

In [None]:
mydata2 = mydata2.filter(mydata2.JobTitle.isNotNull())

In [None]:
mydata2.show()

#### Create  a new dataframe, add a column, take substing & cast it to float

In [None]:
salary_data=mydata2.withColumn("clean_salary",mydata2.Salary.substr(2,100).cast('float'))

In [None]:
salary_data.show()

In [None]:
mean=salary_data.groupBy().avg('clean_salary')

##### take the mean salary from filterred column & slice it to get the value

In [None]:
mean=salary_data.groupBy().avg('clean_salary').take(1)[0][0]

In [None]:
print(mean)

#####  select latitute col, filter the values which are not null , add a new col in df & cast the values to float

In [None]:
import numpy as np
lats=salary_data.select('Latitude')

In [None]:
lats=lats.filter(lats.Latitude.isNotNull())

In [None]:
lats = lats.withColumn('Lat_new',lats.Latitude.cast('float')).select('Lat_new')

In [None]:
median= np.median(lats.collect())
print(median)

#### take the literal value of median and replace it in new col if the lat is null else take the original value

In [None]:
salary_data = salary_data.withColumn('lat', when(salary_data.Latitude.isNull(), lit(median)).otherwise(salary_data.Latitude))

In [None]:
salary_data.show()

In [None]:
import pyspark.sql.functions as sqlfunc

##### Group by gender taking the aggregated value of the avg salary from cleaned salary col , alias to provide a new col to agg salary

In [None]:
genders=salary_data.groupBy('gender').agg(sqlfunc.avg('clean_salary').alias('AvgSalary'))

In [None]:
genders.show()

In [None]:
salary_data= salary_data.withColumn('female_salary',when(salary_data.gender=='Female',salary_data.clean_salary).otherwise(lit(0)))

In [None]:
salary_data.show()

In [None]:
salary_data= salary_data.withColumn('male_salary',when(salary_data.gender=='Male',salary_data.clean_salary).otherwise(lit(0)))

In [None]:
agg_df=salary_data

In [None]:
agg_df=agg_df.groupBy('JobTitle').agg(sqlfunc.avg('female_salary').alias('female_salary_aggregated'), sqlfunc.avg('male_salary').alias('male_salary_aggregated'))

In [None]:
agg_df=agg_df.withColumn('Delta', agg_df.female_salary_aggregated - agg_df.male_salary_aggregated)

In [None]:
agg_df.show()

In [None]:
city_avg_salary=salary_data.groupBy('City').agg(sqlfunc.avg(salary_data.clean_salary).alias('avg_salary'))

In [None]:
city_avg_salary=city_avg_salary.sort(col('avg_salary').desc())

In [None]:
city_avg_salary.show()