<a href="https://colab.research.google.com/github/msdevanms/pyspark_training/blob/main/ETL_training_2023.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Initial Setup - Spark Installation

In [None]:
!pip install pyspark

In [None]:
# Create SparkSession and sparkcontext
from pyspark.sql import SparkSession
spark = SparkSession.builder\
                    .master("local")\
                    .appName('Firstprogram')\
                    .getOrCreate()
sc=spark.sparkContext

# Load CSV data

In [None]:
! wget 'https://raw.githubusercontent.com/msdevanms/pyspark_training/main/data/data_descriptive.csv'

In [None]:
mydataframe = spark.read.format("csv").option("header","true").load("data_descriptive.csv")

In [None]:
mydataframe.show()

# Inspect Data

In [None]:
mydataframe.count()

In [None]:
mydataframe.printSchema()

In [None]:
mydataframe.describe().show()

# Setting Schema

In [None]:
# Setting schema and importing:
from pyspark.sql.types import *
myschema = StructType([
      StructField('id', IntegerType()),
      StructField('first_name', StringType()),
      StructField('last_name', StringType()),
      StructField('gender', StringType()),
      StructField('City', StringType()),
      StructField('JobTitle', StringType()),
      StructField('Salary', StringType()),
      StructField('Latitude', StringType()),
      StructField('Longitude', FloatType())
])
df = spark.read.csv("data_descriptive.csv", header=True, schema = myschema)
df.show()

In [None]:
df.describe().show()

# Data Cleansing - Null and Duplicate removal

In [None]:
no_duplicate_df = df.dropDuplicates()

In [None]:
no_duplicate_df.count()

In [None]:
no_duplicate_df.sort(no_duplicate_df.id).show()

In [None]:
# Replace null for 'Unknown':
from pyspark.sql.functions import *
no_null_df = no_duplicate_df.withColumn("clean_city",when(no_duplicate_df.City.isNull(),'Unknown').otherwise(no_duplicate_df.City))

In [None]:
no_null_df.show(truncate=False)

In [None]:
no_null_df2 = no_null_df.filter(no_null_df.JobTitle.isNotNull())

In [None]:
no_null_df3 = no_null_df.filter(no_null_df.JobTitle.isNull())

In [None]:
no_null_df3.show()

In [None]:
no_null_df2.count()

998

In [None]:
no_null_df2.show(truncate=False)

In [None]:
with_column_df = no_null_df2.withColumn('clean_salary', no_null_df2.Salary.substr(2,100).cast('float'))

In [None]:
# take(1)[0][0]: take 1st element of df row, column = [0,0]
mean = with_column_df.groupBy().avg('clean_salary').take(1)[0][0]

In [None]:
mean

In [None]:
with_column_df.describe().select("summary","Salary", "clean_salary").show()

In [None]:
from pyspark.sql.functions import lit
with_column_df2 = with_column_df.withColumn('new_salary', when(with_column_df.clean_salary.isNull(), lit(mean)).otherwise(with_column_df.clean_salary))

In [None]:
with_column_df2.show()

In [None]:
latitudes = with_column_df2.select("latitude")

In [None]:
latitudes.show()

In [None]:
latitudes = latitudes.filter(latitudes.latitude.isNotNull())

In [None]:
latitudes.show()

In [None]:
# convert to float:
latitudes = latitudes.withColumn('latitude2', latitudes.latitude.cast('float')).select('latitude2')

In [None]:
latitudes.show()

In [None]:
import numpy as np
median = np.median(latitudes.collect())

In [None]:
print(median)

In [None]:
# replace null with median:
median_replaced_df = with_column_df2.withColumn('lat', when (with_column_df2.Latitude.isNull(), lit(median)).otherwise(with_column_df2.Latitude))

In [None]:
median_replaced_df.show()

# Aggregation and Group By

In [None]:
import pyspark.sql.functions as sqlfunc
genders = median_replaced_df.groupBy('gender').agg(sqlfunc.avg('new_salary').alias('AvgSalary'))

In [None]:
# Avg salary per gender:
genders.show()

In [None]:
df = median_replaced_df.withColumn('female_salary', when(median_replaced_df.gender=='Female', median_replaced_df.new_salary).otherwise(lit(0)))

In [None]:
df.show()

In [None]:
#lit = literal
df = df.withColumn('male_salary', when(df.gender=='Male', df.new_salary).otherwise(lit(0)))

In [None]:
df.show()

In [None]:
# Calculate avg salary per jog title:
df = df.groupBy('JobTitle').agg(sqlfunc.avg('female_salary').alias('female_salary'),sqlfunc.avg('male_salary').alias('male_salary'))

In [None]:
df.show()

In [None]:
# difference between gender salary:
df = df.withColumn('delta', df.female_salary - df.male_salary).show()

In [None]:
cityavg = median_replaced_df.groupBy('City').agg(sqlfunc.avg('new_salary').alias('avgsalary'))

In [None]:
cityavg = cityavg.sort(col('avgsalary').desc())

In [None]:
# Top cities with higher avg salary:
cityavg.show()

In [None]:
df.write.csv('out_dataframe.csv')
df.write.json('out_dataframe.json')
df.write.parquet('out_dataframe.parquet')