<a href="https://colab.research.google.com/github/msharmagcp/spark/blob/main/datatypes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/88.7 kB 16%] [Connec                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com] [1 InRelease 31.5 kB/88.7 kB 36%] [2 InRe0% [Connecting to archive.ubuntu.com] [1 InRelease 43.1 kB/88.7 kB 49%] [Connec0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                            

In [2]:
mydata = spark.read.format("csv").option("header", "true").load("original.csv")
mydata.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [3]:
from pyspark.sql.functions import *
mydata2 = mydata.withColumn("clean_city", when(mydata.City.isNull(), 'Unknown').otherwise(mydata.City))

In [4]:
mydata2 = mydata2.filter(mydata2.JobTitle.isNotNull())

In [5]:
mydata2 = mydata2.withColumn("clean_salary", mydata2.Salary.substr(2,100).cast('float'))

In [6]:
mean = mydata2.groupBy().avg('clean_salary')
mean.show()

+-----------------+
|avg(clean_salary)|
+-----------------+
|55516.32088199837|
+-----------------+



In [7]:
mean = mydata2.groupBy().avg('clean_salary').take(1)[0][0]
print(mean)

55516.32088199837


In [8]:
from pyspark.sql.functions import lit
mydata2 = mydata2.withColumn('new_salary', when(mydata2.clean_salary.isNull(), lit(mean)).otherwise(mydata2.clean_salary))

In [9]:
import numpy as np
latitudes = mydata2.select('Latitude')

In [10]:
latitudes = latitudes.filter(latitudes.Latitude.isNotNull())

In [11]:
latitudes = latitudes.withColumn('latitude2', latitudes.Latitude.cast('float')).select('latitude2')
median = np.median(latitudes.collect())
print(median)

31.93397331237793


In [12]:
mydata2 = mydata2.withColumn('lat', when(mydata2.Latitude.isNull(), lit(median)).otherwise(mydata.Latitude))

In [13]:
import pyspark.sql.functions as sqlfunc

genders = mydata2.groupBy('gender').agg(sqlfunc.avg('new_salary').alias('avgSalary'))
genders.show()

+------+------------------+
|gender|         avgSalary|
+------+------------------+
|Female|55677.250125558036|
|  Male| 55361.09385573019|
+------+------------------+



In [14]:
df = mydata2.withColumn('female_salary', when(mydata2.gender == 'Female', mydata2.new_salary).otherwise(lit(0)))

In [15]:
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      new_salary|              lat|   female_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|   62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...

In [16]:
df = df.withColumn('male_salary', when(mydata2.gender == 'Male', df.new_salary).otherwise(lit(0)))

In [17]:
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      new_salary|              lat|   female_salary|     male_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|   57438.1796875|             0.0|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|   62846.6015625|   

In [18]:
df = df.groupBy('Jobtitle').agg(sqlfunc.avg('female_salary').alias('final_female_salary'), sqlfunc.avg('male_salary').alias('final_male_salary'))

In [19]:
df.show()

+--------------------+-------------------+------------------+
|            Jobtitle|final_female_salary| final_male_salary|
+--------------------+-------------------+------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|
|   Media Manager III| 29586.436197916668|17381.920572916668|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|
|       Geologist III|       31749.046875|    12830.75390625|
|        Geologist II|                0.0|   43293.865234375|
|Database Administ...|                0.0|     52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625|
|Software Engineer II|                0.0|      74782.640625|
|       Accountant IV|    82732.248046875|               0.0|
|    Product Engineer|     41825.48359375|       20464.94375|
|Software Test Eng...|   32218.6083984375|   27122.462890625|
|Safety Technician...|                0.0|   29421.529296875|
|    Jun

In [20]:
df = df.withColumn('delta', df.final_female_salary - df.final_male_salary)

In [21]:
df.show()

+--------------------+-------------------+------------------+-------------------+
|            Jobtitle|final_female_salary| final_male_salary|              delta|
+--------------------+-------------------+------------------+-------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|   35049.5244140625|
|   Media Manager III| 29586.436197916668|17381.920572916668|       12204.515625|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|  8464.957356770836|
|       Geologist III|       31749.046875|    12830.75390625|     18918.29296875|
|        Geologist II|                0.0|   43293.865234375|   -43293.865234375|
|Database Administ...|                0.0|     52018.4609375|     -52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|   -16252.279296875|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625| -4636.834765625001|
|Software Engineer II|                0.0|      74782.640625|      -74782.640625|
|       Accounta

In [22]:
cityavg = mydata2.groupBy('city').agg(sqlfunc.avg('new_salary').alias('avgsalary'))

In [24]:
cityavg = cityavg.sort(col('avgsalary').desc())

In [25]:
cityavg.show()

+-----------------+-------------+
|             city|    avgsalary|
+-----------------+-------------+
|        Mesopotam|  99948.28125|
|       Zhongcheng| 99942.921875|
|           Caxias|99786.3984375|
|      Karangtawar|99638.9921875|
|        Itabaiana|  99502.15625|
|           Pasian|  99421.34375|
|           Webuye| 99368.546875|
|      Yuktae-dong| 99250.828125|
|           Zinder|  99222.84375|
|   Timiryazevskiy|   99142.9375|
|        Sawahbaru|99013.7109375|
|          Madimba|98737.8671875|
|         Huangshi|  98690.34375|
|          Gharyan|   98679.3125|
|         Yŏnan-ŭp| 98628.609375|
|     Wringinputih|98603.8203125|
|Monte da Boavista|  98586.71875|
|          Klukeng|98439.4921875|
|         Murmashi|  98226.15625|
|        Fox Creek|      98138.0|
+-----------------+-------------+
only showing top 20 rows



In [30]:
df2 = spark.read.csv("original.csv", header=True)
df2.dtypes

[('id', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string')]

In [39]:
from pyspark.sql.types import * 
schema = StructType([
  StructField('id', IntegerType()), 
  StructField('first_name', StringType()), 
  StructField('last_name', StringType()), 
  StructField('gender', StringType()), 
  StructField('City', StringType()), 
  StructField('JobTitle', StringType()), 
  StructField('Salary', StringType()), 
  StructField('Latitude', StringType()), 
  StructField('Longitude', FloatType())
])

In [40]:
df3 = spark.read.csv("original.csv", header=True, schema = schema)

In [41]:
df2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [42]:
df3.dtypes

[('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'float')]