<a href="https://colab.research.google.com/github/nenuchepanu001/pysparktutorial/blob/main/tutorial1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [10]:
mydata = spark.read.format("csv").option("header","true").load("original.csv")
mydata.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [11]:
from pyspark.sql.functions import *
mydata2  = mydata.withColumn("clean_city",when(mydata.City.isNull(),'Unknown').otherwise(mydata.City))
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|        Unknown|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Femal

In [13]:
# to filter data which is notnull
mydata2 =  mydata2.filter(mydata2.JobTitle.isNotNull())
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|Kinsealy-Drinan|
|  8|   Goddart|     Flear|  Mal

In [16]:
#substring taking from 2nd charafcter
mydata2 = mydata2.withColumn('clean_salary',mydata2.Salary.substr(2,100).cast('float'))
mydata2.show(3)

+---+----------+----------+------+-------------+--------------------+---------+----------+-----------+-------------+------------+
| id|first_name| last_name|gender|         City|            JobTitle|   Salary|  Latitude|  Longitude|   clean_city|clean_salary|
+---+----------+----------+------+-------------+--------------------+---------+----------+-----------+-------------+------------+
|  1|   Melinde| Shilburne|Female|    Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|    Nowa Ruda|    57438.18|
|  2|  Kimberly|Von Welden|Female|       Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|       Bulgan|     62846.6|
|  4|   Shannon| O'Griffin|  Male|Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|Divnomorskoye|    61489.23|
+---+----------+----------+------+-------------+--------------------+---------+----------+-----------+-------------+------------+
only showing top 3 rows



In [25]:
mean = mydata2.groupBy().avg('clean_salary').take(1)[0][0]
print(mean)

55516.32088199837


In [28]:
from pyspark.sql.functions import lit
# to set static value

mydata2=mydata2.withColumn('new_salary',when(mydata2.clean_salary.isNull(),lit(mean)).otherwise(mydata2.clean_salary))
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      new_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 3

In [29]:
import numpy as np

In [31]:
latitudes = mydata2.select('Latitude')
latitudes.show()

+----------+
|  Latitude|
+----------+
|50.5774075|
|48.8231572|
|44.5047212|
|      null|
|53.4266145|
|45.1905186|
| 32.027934|
|  4.272793|
|     -5.85|
| 39.172378|
|49.8151822|
|42.1014803|
|49.7923299|
|43.4945737|
|52.7441662|
| 38.696249|
|-7.7232567|
|40.7172049|
|  49.16291|
|40.7576842|
+----------+
only showing top 20 rows



In [33]:
latitudes = latitudes.filter(latitudes.Latitude.isNotNull())
latitudes.show()

+----------+
|  Latitude|
+----------+
|50.5774075|
|48.8231572|
|44.5047212|
|53.4266145|
|45.1905186|
| 32.027934|
|  4.272793|
|     -5.85|
| 39.172378|
|49.8151822|
|42.1014803|
|49.7923299|
|43.4945737|
|52.7441662|
| 38.696249|
|-7.7232567|
|40.7172049|
|  49.16291|
|40.7576842|
|48.4902808|
+----------+
only showing top 20 rows



In [34]:
latitudes =latitudes.withColumn('latitude2',latitudes.Latitude.cast('float')).select("latitude2")
latitudes.show()

+----------+
| latitude2|
+----------+
| 50.577408|
|  48.82316|
| 44.504723|
| 53.426613|
| 45.190517|
| 32.027935|
|  4.272793|
|     -5.85|
|  39.17238|
|  49.81518|
|  42.10148|
|  49.79233|
| 43.494576|
| 52.744167|
| 38.696247|
|-7.7232566|
| 40.717205|
|  49.16291|
| 40.757683|
|  48.49028|
+----------+
only showing top 20 rows



In [36]:
median =  np.median(latitudes.collect())
print(median)

31.93397331237793


In [37]:
mydata2 = mydata2.withColumn('lat',when(mydata2.Latitude.isNull(),lit(median)).otherwise(mydata2.Latitude))
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      new_salary|              lat|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|     

In [39]:
#avg salary of gender male and female

genders = mydata2.groupBy('gender').agg(avg('new_salary').alias('AvgSalary'))
genders.show()

+------+------------------+
|gender|         AvgSalary|
+------+------------------+
|Female|55677.250125558036|
|  Male| 55361.09385573019|
+------+------------------+



In [59]:
df = mydata2.withColumn('female_salary',when(mydata2.gender=='Female',mydata2.new_salary).otherwise(lit(0)))
df = df.withColumn('male_salary',when(mydata2.gender=='Male',mydata2.new_salary).otherwise(lit(0)))
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      new_salary|              lat|   female_salary|     male_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|   57438.1796875|             0.0|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|   62846.6015625|   

In [64]:
df = df.groupBy('JobTitle').agg(avg('female_salary').alias('final_female_salary'),avg('male_salary').alias('final_male_salary'))
df.show()

+--------------------+-------------------+------------------+
|            JobTitle|final_female_salary| final_male_salary|
+--------------------+-------------------+------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|
|   Media Manager III| 29586.436197916668|17381.920572916668|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|
|       Geologist III|       31749.046875|    12830.75390625|
|        Geologist II|                0.0|   43293.865234375|
|Database Administ...|                0.0|     52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625|
|Software Engineer II|                0.0|      74782.640625|
|       Accountant IV|    82732.248046875|               0.0|
|    Product Engineer|     41825.48359375|       20464.94375|
|Software Test Eng...|   32218.6083984375|   27122.462890625|
|Safety Technician...|                0.0|   29421.529296875|
|    Jun

In [65]:
df = df.withColumn('delta',df.final_female_salary-df.final_male_salary)
df.show()

+--------------------+-------------------+------------------+-------------------+
|            JobTitle|final_female_salary| final_male_salary|              delta|
+--------------------+-------------------+------------------+-------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|   35049.5244140625|
|   Media Manager III| 29586.436197916668|17381.920572916668|       12204.515625|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|  8464.957356770836|
|       Geologist III|       31749.046875|    12830.75390625|     18918.29296875|
|        Geologist II|                0.0|   43293.865234375|   -43293.865234375|
|Database Administ...|                0.0|     52018.4609375|     -52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|   -16252.279296875|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625| -4636.834765625001|
|Software Engineer II|                0.0|      74782.640625|      -74782.640625|
|       Accounta

In [66]:
cityavg = mydata2.groupBy('City').agg(avg('new_salary').alias('avgSalary'))
cityavg.show()

+-----------------+----------------+
|             City|       avgSalary|
+-----------------+----------------+
|        Sułkowice|  33432.98828125|
|          Klippan|     77039.46875|
|      Trollhättan|53311.6845703125|
|        Shinaihai|    39544.640625|
|         Hongzhou|  35707.30859375|
|         Cipinang| 11617.509765625|
| Viejo Daan Banua|         43927.5|
|         Tsiatsan| 18795.439453125|
|       San Andres|  52426.80078125|
|           Krasna|   72022.7890625|
|      Springfield|40697.3251953125|
|            Město|  27797.98046875|
|Chaloem Phra Kiat|  54840.19921875|
|          Tadotsu|  55595.30078125|
|   Hénin-Beaumont|        55082.75|
|          Kajaani|  20224.83984375|
|           Duozhu|    71416.859375|
|           Abéché|   93375.1796875|
|     Habingkloang|     56892.96875|
|         Malishka|   76783.4765625|
+-----------------+----------------+
only showing top 20 rows



In [68]:
cityavg = cityavg.sort(col('avgSalary').desc())
cityavg.show()

+-----------------+-------------+
|             City|    avgSalary|
+-----------------+-------------+
|        Mesopotam|  99948.28125|
|       Zhongcheng| 99942.921875|
|           Caxias|99786.3984375|
|      Karangtawar|99638.9921875|
|        Itabaiana|  99502.15625|
|           Pasian|  99421.34375|
|           Webuye| 99368.546875|
|      Yuktae-dong| 99250.828125|
|           Zinder|  99222.84375|
|   Timiryazevskiy|   99142.9375|
|        Sawahbaru|99013.7109375|
|          Madimba|98737.8671875|
|         Huangshi|  98690.34375|
|          Gharyan|   98679.3125|
|         Yŏnan-ŭp| 98628.609375|
|     Wringinputih|98603.8203125|
|Monte da Boavista|  98586.71875|
|          Klukeng|98439.4921875|
|         Murmashi|  98226.15625|
|        Fox Creek|      98138.0|
+-----------------+-------------+
only showing top 20 rows



In [69]:
df2 =  spark.read.csv('original.csv',header=True)
df2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [70]:
df2.dtypes

[('id', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string')]

In [78]:
from pyspark.sql.types import *
schema = StructType([
    StructField('id',IntegerType()),
    StructField('first_name', StringType()),
    StructField('last_name', StringType()),
    StructField('gender', StringType()),
    StructField('City', StringType()),
    StructField('JobTitle', StringType()),
    StructField('Salary', StringType()),
    StructField('Latitude', StringType()),
    StructField('Longitude', FloatType())
 ])

df = spark.read.csv('original.csv',header=True,schema=schema)
df .show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [79]:
df.dtypes

[('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'float')]

In [80]:
df.head(6)

[Row(id=1, first_name='Melinde', last_name='Shilburne', gender='Female', City='Nowa Ruda', JobTitle='Assistant Professor', Salary='$57438.18', Latitude='50.5774075', Longitude=16.49671745300293),
 Row(id=2, first_name='Kimberly', last_name='Von Welden', gender='Female', City='Bulgan', JobTitle='Programmer II', Salary='$62846.60', Latitude='48.8231572', Longitude=103.52182006835938),
 Row(id=3, first_name='Alvera', last_name='Di Boldi', gender='Female', City=None, JobTitle=None, Salary='$57576.52', Latitude='39.9947462', Longitude=116.33977508544922),
 Row(id=4, first_name='Shannon', last_name="O'Griffin", gender='Male', City='Divnomorskoye', JobTitle='Budget/Accounting Analyst II', Salary='$61489.23', Latitude='44.5047212', Longitude=38.1300163269043),
 Row(id=5, first_name='Sherwood', last_name='Macieja', gender='Male', City='Mytishchi', JobTitle='VP Sales', Salary='$63863.09', Latitude=None, Longitude=37.64899444580078),
 Row(id=6, first_name='Maris', last_name='Folk', gender='Female

In [81]:
df.first()

Row(id=1, first_name='Melinde', last_name='Shilburne', gender='Female', City='Nowa Ruda', JobTitle='Assistant Professor', Salary='$57438.18', Latitude='50.5774075', Longitude=16.49671745300293)

In [82]:
df.describe().show()

+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+-----------------+-----------------+
|summary|               id|first_name|last_name|gender|               City|           JobTitle|   Salary|         Latitude|        Longitude|
+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+-----------------+-----------------+
|  count|             1000|      1000|     1000|  1000|                999|                998|     1000|              999|             1000|
|   mean|            500.5|      null|     null|  null|               null|               null|     null|25.43151724234234|43.33756460386515|
| stddev|288.8194360957494|      null|     null|  null|               null|               null|     null| 24.5790825486909| 69.4206453674681|
|    min|                1|   Abagail|    Abbay|Female|             Abéché|Account Coordinator|$10101.92|       -0.6256517|       -123.04196|
|    m

In [83]:
df.columns

['id',
 'first_name',
 'last_name',
 'gender',
 'City',
 'JobTitle',
 'Salary',
 'Latitude',
 'Longitude']

In [84]:
df.count()

1000

In [85]:
df.distinct().count()

1000

# handling_null_duplicates

In [86]:
df_dropped = df.na.drop()
df_dropped.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572| 103.52182|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.130016|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145|-6.1644998|
|  8|   Goddart|     Flear|  Male|      Trélissac|Desktop Support T...|$46116.36|45.1905186| 0.7423124|
|  9|      Roth|O'Cannavan|  Male|         Heitan|VP Product Manage...|$73697.10| 32.027934| 106.65711|
| 10|      Bran|   Trahear|  Male|       Arbeláez|Mechanical Sys

In [87]:
df_dropped.count()

997

In [88]:
df_null_jobs =  df.filter(df.JobTitle.isNotNull())
df_null_jobs.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572| 103.52182|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145|-6.1644998|
|  8|   Goddart|     Flear|  Male|      Trélissac|Desktop Support T...|$46116.36|45.1905186| 0.7423124|
|  9|      Roth|O'Cannavan|  Male|         Heitan|VP Product Man

In [90]:
df_no_duplicates = df.dropDuplicates()
df_no_duplicates.show()

+---+----------+----------+------+------------------+--------------------+---------+-----------+----------+
| id|first_name| last_name|gender|              City|            JobTitle|   Salary|   Latitude| Longitude|
+---+----------+----------+------+------------------+--------------------+---------+-----------+----------+
|159|Georgianne| Henriques|Female|            Jinsha|Environmental Spe...|$63954.52|  30.679359|104.011665|
|215|     Angel|   Robjant|  Male|   Bambous Virieux|Occupational Ther...|$96856.73|-20.3438619| 57.763683|
|377|     Noach|   Golling|  Male|          Yuanqiao| Executive Secretary|$72247.30|    32.5639| 120.39677|
|525|     Terry|    Layton|Female|San Pedro Masahuat|    Dental Hygienist|$10808.16| 13.5432995| -89.03824|
|833|    Damara|   Beaford|Female|           Połomia|Environmental Spe...|$10616.44| 49.9890993| 18.569973|
|172|    Marita|   Whyborn|Female| Sofo-Birnin-Gwari|  Biostatistician II|$45680.97|   11.01088|  6.798413|
|485|    Carola| Himsworth|F

In [91]:
df_select = df.select('first_name','last_name')
df_select.show()

+----------+----------+
|first_name| last_name|
+----------+----------+
|   Melinde| Shilburne|
|  Kimberly|Von Welden|
|    Alvera|  Di Boldi|
|   Shannon| O'Griffin|
|  Sherwood|   Macieja|
|     Maris|      Folk|
|     Masha|    Divers|
|   Goddart|     Flear|
|      Roth|O'Cannavan|
|      Bran|   Trahear|
|    Kylynn|   Lockart|
|       Rey|    Meharg|
|      Kerr|    Braden|
|    Mickie| Whanstall|
|    Kaspar|     Pally|
|    Norbie|    Gwyllt|
|    Claude|    Briant|
|     Thain|    Habbon|
|  Tiffanie|  Pattison|
|    Ettore|  Gerriets|
+----------+----------+
only showing top 20 rows



In [92]:
df_renamed = df.withColumnRenamed('first_name','fn')
df_renamed.show()

+---+--------+----------+------+---------------+--------------------+---------+----------+----------+
| id|      fn| last_name|gender|           City|            JobTitle|   Salary|  Latitude| Longitude|
+---+--------+----------+------+---------------+--------------------+---------+----------+----------+
|  1| Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.496717|
|  2|Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572| 103.52182|
|  3|  Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.339775|
|  4| Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.130016|
|  5|Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|
|  6|   Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145|-6.1644998|
|  7|   Masha|    Divers|Female|         Dachun|                null|$25090.87| 24

In [93]:
df_filter = df.filter(df.first_name=='Alvera')
df_filter.show()

+---+----------+---------+------+----+--------+---------+----------+----------+
| id|first_name|last_name|gender|City|JobTitle|   Salary|  Latitude| Longitude|
+---+----------+---------+------+----+--------+---------+----------+----------+
|  3|    Alvera| Di Boldi|Female|null|    null|$57576.52|39.9947462|116.339775|
+---+----------+---------+------+----+--------+---------+----------+----------+



In [95]:
df_filter = df.filter(df.first_name.like('%lvera'))
df_filter.show()

+---+----------+---------+------+----+--------+---------+----------+----------+
| id|first_name|last_name|gender|City|JobTitle|   Salary|  Latitude| Longitude|
+---+----------+---------+------+----+--------+---------+----------+----------+
|  3|    Alvera| Di Boldi|Female|null|    null|$57576.52|39.9947462|116.339775|
+---+----------+---------+------+----+--------+---------+----------+----------+



In [96]:
df_filter = df.filter(df.first_name.endswith('din'))
df_filter.show()

+---+----------+-------------+------+-----------+--------+---------+-----------+---------+
| id|first_name|    last_name|gender|       City|JobTitle|   Salary|   Latitude|Longitude|
+---+----------+-------------+------+-----------+--------+---------+-----------+---------+
|901|     Aldin|Matuszkiewicz|  Male|East London|Operator|$41468.83|-32.9549324|27.931913|
+---+----------+-------------+------+-----------+--------+---------+-----------+---------+



In [98]:
df_filter = df.filter(df.first_name.startswith('Ala'))
df_filter.show()

+---+----------+----------+------+---------+--------------------+---------+-----------+---------+
| id|first_name| last_name|gender|     City|            JobTitle|   Salary|   Latitude|Longitude|
+---+----------+----------+------+---------+--------------------+---------+-----------+---------+
|160|     Alane|Southworth|Female|Eindhoven|Programmer Analyst I|$24042.10| 51.4131558|5.4852796|
|405|  Alasteir|   Sotheby|  Male|  Secunda|  Analyst Programmer|$21326.73|-26.5157792|29.191391|
|751|     Aland|   Seebert|  Male|      Oss| Software Engineer I|$40392.24| 51.7610501| 5.551041|
+---+----------+----------+------+---------+--------------------+---------+-----------+---------+



In [99]:
df_filter = df.filter(df.first_name.isin('Aldin','valma'))
df_filter.show()

+---+----------+-------------+------+-----------+--------+---------+-----------+---------+
| id|first_name|    last_name|gender|       City|JobTitle|   Salary|   Latitude|Longitude|
+---+----------+-------------+------+-----------+--------+---------+-----------+---------+
|901|     Aldin|Matuszkiewicz|  Male|East London|Operator|$41468.83|-32.9549324|27.931913|
+---+----------+-------------+------+-----------+--------+---------+-----------+---------+



In [100]:
df_substr = df.select(df.first_name,df.first_name.substr(1,4).alias('name'))
df_substr.show()

+----------+----+
|first_name|name|
+----------+----+
|   Melinde|Meli|
|  Kimberly|Kimb|
|    Alvera|Alve|
|   Shannon|Shan|
|  Sherwood|Sher|
|     Maris|Mari|
|     Masha|Mash|
|   Goddart|Godd|
|      Roth|Roth|
|      Bran|Bran|
|    Kylynn|Kyly|
|       Rey| Rey|
|      Kerr|Kerr|
|    Mickie|Mick|
|    Kaspar|Kasp|
|    Norbie|Norb|
|    Claude|Clau|
|     Thain|Thai|
|  Tiffanie|Tiff|
|    Ettore|Etto|
+----------+----+
only showing top 20 rows



# Adding multiple filters

In [108]:
df_filter = df.filter((df.first_name.isin('Aldin','valma')) | (df.City.like('%ondon')))
df_filter.show()

+---+----------+-------------+------+-----------+--------+---------+-----------+---------+
| id|first_name|    last_name|gender|       City|JobTitle|   Salary|   Latitude|Longitude|
+---+----------+-------------+------+-----------+--------+---------+-----------+---------+
|901|     Aldin|Matuszkiewicz|  Male|East London|Operator|$41468.83|-32.9549324|27.931913|
+---+----------+-------------+------+-----------+--------+---------+-----------+---------+



In [107]:
df_filter = df.filter((df.id>10) & (df.id<100))
df_filter.show()

+---+----------+---------+------+--------------+--------------------+---------+----------+----------+
| id|first_name|last_name|gender|          City|            JobTitle|   Salary|  Latitude| Longitude|
+---+----------+---------+------+--------------+--------------------+---------+----------+----------+
| 11|    Kylynn|  Lockart|Female|      El Cardo|Nuclear Power Eng...|$13604.63|     -5.85| -79.88333|
| 12|       Rey|   Meharg|Female|   Wangqingtuo|Systems Administr...|$73423.70| 39.172378| 116.93161|
| 13|      Kerr|   Braden|  Male|     Sułkowice|Compensation Analyst|$33432.99|49.8151822| 19.377174|
| 14|    Mickie|Whanstall|  Male|   Springfield|Assistant Media P...|$50838.53|42.1014803|-72.576675|
| 15|    Kaspar|    Pally|  Male|        Chrást|  Analyst Programmer|$40163.03|49.7923299| 13.491532|
| 16|    Norbie|   Gwyllt|  Male|        Xijiao|              Editor|$32492.73|43.4945737|  5.897802|
| 17|    Claude|   Briant|Female|     Mieścisko|Research Assistan...|$51862.48|52.

# running sql on dataframes

In [109]:
from pyspark.sql.types import *
schema = StructType([
    StructField('id',IntegerType()),
    StructField('first_name', StringType()),
    StructField('last_name', StringType()),
    StructField('gender', StringType()),
    StructField('City', StringType()),
    StructField('JobTitle', StringType()),
    StructField('Salary', StringType()),
    StructField('Latitude', StringType()),
    StructField('Longitude', FloatType())
 ])

df = spark.read.csv('original.csv',header=True,schema=schema)
df .show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [110]:
df.registerTempTable('original')

In [112]:
query1 = spark.sql('select * from original')
query1.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [113]:
query2 = spark.sql('select concat(first_name," ",last_name) as full_name from original where gender="Female"')
query2.show()

+-------------------+
|          full_name|
+-------------------+
|  Melinde Shilburne|
|Kimberly Von Welden|
|    Alvera Di Boldi|
|         Maris Folk|
|       Masha Divers|
|     Kylynn Lockart|
|         Rey Meharg|
|      Claude Briant|
|  Tiffanie Pattison|
|    Lurleen Janczak|
|      Nichol Holtum|
|       Shaun Bridle|
|     Leandra Anfrey|
|    Jaquelyn Hazard|
|  Prudence Honacker|
|       Cherey Liger|
|          Neda Krop|
|    Barbi Fattorini|
|   Lonnie Townshend|
|    Valida Salzberg|
+-------------------+
only showing top 20 rows



# Adding calculated columns

In [114]:
df = df.withColumn('clean_salary',df.Salary.substr(2,100).cast('float'))
df.show(3)

+---+----------+----------+------+---------+-------------------+---------+----------+----------+------------+
| id|first_name| last_name|gender|     City|           JobTitle|   Salary|  Latitude| Longitude|clean_salary|
+---+----------+----------+------+---------+-------------------+---------+----------+----------+------------+
|  1|   Melinde| Shilburne|Female|Nowa Ruda|Assistant Professor|$57438.18|50.5774075| 16.496717|    57438.18|
|  2|  Kimberly|Von Welden|Female|   Bulgan|      Programmer II|$62846.60|48.8231572| 103.52182|     62846.6|
|  3|    Alvera|  Di Boldi|Female|     null|               null|$57576.52|39.9947462|116.339775|    57576.52|
+---+----------+----------+------+---------+-------------------+---------+----------+----------+------------+
only showing top 3 rows



In [115]:
df = df.withColumn('monthly_salary',df.clean_salary/12)
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+------------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude| Longitude|clean_salary|    monthly_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+------------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.496717|    57438.18| 4786.514973958333|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572| 103.52182|     62846.6|    5237.216796875|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.339775|    57576.52| 4798.043294270833|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.130016|    61489.23|   5124.1025390625|
|  5|  Sherwood|   Macieja|  Male|      Mytishch

In [116]:
df = df.withColumn('are_they_female',when(df.gender=='Female','Yes').otherwise('no'))
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+------------------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude| Longitude|clean_salary|    monthly_salary|are_they_female|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+------------------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.496717|    57438.18| 4786.514973958333|            Yes|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572| 103.52182|     62846.6|    5237.216796875|            Yes|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.339775|    57576.52| 4798.043294270833|            Yes|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047

# Aggregation

In [117]:
df = df.withColumn('clean_salary',df.Salary.substr(2,100).cast('float'))
df.show(3)

+---+----------+----------+------+---------+-------------------+---------+----------+----------+------------+-----------------+---------------+
| id|first_name| last_name|gender|     City|           JobTitle|   Salary|  Latitude| Longitude|clean_salary|   monthly_salary|are_they_female|
+---+----------+----------+------+---------+-------------------+---------+----------+----------+------------+-----------------+---------------+
|  1|   Melinde| Shilburne|Female|Nowa Ruda|Assistant Professor|$57438.18|50.5774075| 16.496717|    57438.18|4786.514973958333|            Yes|
|  2|  Kimberly|Von Welden|Female|   Bulgan|      Programmer II|$62846.60|48.8231572| 103.52182|     62846.6|   5237.216796875|            Yes|
|  3|    Alvera|  Di Boldi|Female|     null|               null|$57576.52|39.9947462|116.339775|    57576.52|4798.043294270833|            Yes|
+---+----------+----------+------+---------+-------------------+---------+----------+----------+------------+-----------------+---------

In [121]:
df1 = df.groupBy('gender').agg(sum('clean_salary').alias('total'),avg('clean_salary'),min('clean_salary'),max('clean_salary'))
df1.show()

+------+--------------------+-----------------+-----------------+-----------------+
|gender|               total|avg(clean_salary)|min(clean_salary)|max(clean_salary)|
+------+--------------------+-----------------+-----------------+-----------------+
|Female|2.7364519950195312E7|55618.94298820185|         10616.44|         99948.28|
|  Male|2.8123435678710938E7|55361.09385573019|         10101.92|         99942.92|
+------+--------------------+-----------------+-----------------+-----------------+



In [128]:
df1 = df.groupBy('gender','City').agg(sum('clean_salary').alias('total'),avg('clean_salary').alias('avg'),min('clean_salary').alias('min'),max('clean_salary').alias('max'))
df1.show()

+------+-----------------+----------------+----------------+--------+--------+
|gender|             City|           total|             avg|     min|     max|
+------+-----------------+----------------+----------------+--------+--------+
|Female|           Dachun| 25090.869140625| 25090.869140625|25090.87|25090.87|
|Female|      Trollhättan|106623.369140625|53311.6845703125|26830.47| 79792.9|
|  Male|          Wenshao| 18941.509765625| 18941.509765625|18941.51|18941.51|
|Female|            Lanas| 13765.900390625| 13765.900390625| 13765.9| 13765.9|
|  Male|            Mörön|    77940.078125|    77940.078125|77940.08|77940.08|
|Female|             Same|   73369.7265625|   73369.7265625|73369.73|73369.73|
|Female|          Sawahan|  24608.83984375|  24608.83984375|24608.84|24608.84|
|  Male|Monte da Boavista|     98586.71875|     98586.71875|98586.72|98586.72|
|Female|         Nusajaya|    71637.921875|    71637.921875|71637.92|71637.92|
|Female|            Kista|   96192.3984375|   96192.

# writing dataframe to files

In [129]:
#df1.write.csv('df1.csv')
#df1.write.json('df1.json')
df1.write.parquet('df1.parquet')

# Challenge

In [131]:
df = spark.read.csv('challenge.csv',header=True)
df.show()

+---------------+--------------+-----------------+----------+
|     ip_address|       Country|      Domain Name|Bytes_used|
+---------------+--------------+-----------------+----------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|
| 119.239.207.13|         China|         youtu.be|        51|
|  68.69.217.210|         China|        adobe.com|        10|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|
|   211.13.10.68|     Indonesia|          hud.gov|        29|
|   239.80.21.97|      Suriname|       smh.com.au|       218|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|
| 127.242.24.138|         China| surveymonkey.com|       123|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|
|   237.54.11.63|         China|       amazon.com|        83|
| 252.141.157.25|         Japan|      cornell.edu|       374|
|185.220.128.248|       Belgium|       weebly.com|       389|
|   151.77.19.45|   Afghanistan|independent.co.uk|       282|
|  9.161

In [133]:
df_filter = df.withColumn('is_mexico',when(df.Country=='Mexico','Yes').otherwise('No'))
df_filter.show()

+---------------+--------------+-----------------+----------+---------+
|     ip_address|       Country|      Domain Name|Bytes_used|is_mexico|
+---------------+--------------+-----------------+----------+---------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|       No|
| 119.239.207.13|         China|         youtu.be|        51|       No|
|  68.69.217.210|         China|        adobe.com|        10|       No|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|       No|
|   211.13.10.68|     Indonesia|          hud.gov|        29|       No|
|   239.80.21.97|      Suriname|       smh.com.au|       218|       No|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|       No|
| 127.242.24.138|         China| surveymonkey.com|       123|       No|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|       No|
|   237.54.11.63|         China|       amazon.com|        83|       No|
| 252.141.157.25|         Japan|      cornell.edu|       374|   

In [136]:
df2 = df_filter.groupBy('is_mexico').agg(sum(df_filter.Bytes_used).alias('total_used'))
df2.show()

+---------+----------+
|is_mexico|total_used|
+---------+----------+
|       No|  508076.0|
|      Yes|    6293.0|
+---------+----------+



In [141]:
df3 = df_filter.groupBy('Country').agg(countDistinct(df_filter.ip_address).alias('no_of_ips'))
df3.show()

+-----------+---------+
|    Country|no_of_ips|
+-----------+---------+
|       Chad|        1|
|     Russia|       56|
|   Paraguay|        1|
|      Yemen|        1|
|     Sweden|       28|
|Philippines|       65|
|   Malaysia|        5|
|     Turkey|        1|
|     Malawi|        2|
|    Germany|        5|
|    Comoros|        1|
|Afghanistan|        5|
|     Rwanda|        1|
|      Sudan|        1|
|     France|       21|
|     Greece|        8|
|  Sri Lanka|        3|
|   Dominica|        1|
|  Argentina|       14|
|    Belgium|        1|
+-----------+---------+
only showing top 20 rows



In [142]:
df3.sort(col('no_of_ips').desc()).show()

+--------------+---------+
|       Country|no_of_ips|
+--------------+---------+
|         China|      172|
|     Indonesia|      114|
|   Philippines|       65|
|        Russia|       56|
|        Brazil|       35|
|        Poland|       31|
|        Sweden|       28|
|         Japan|       25|
|      Portugal|       23|
|Czech Republic|       23|
|        France|       21|
|          Peru|       19|
|      Colombia|       17|
| United States|       15|
|       Ukraine|       14|
|     Argentina|       14|
|        Mexico|       13|
|      Thailand|       12|
|       Nigeria|       11|
|        Canada|       11|
+--------------+---------+
only showing top 20 rows

