<a href="https://colab.research.google.com/github/satyazprakash/100Days_of_Code-TheCompletePythonProBootcamp/blob/main/PySpark_Running_SQL_on_DataFrame.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

# Selecting Data from a Data Frame by applying Multiple Filters on the data

In [30]:
from pyspark.sql.types import *     # Importing Pyspark types

# creating a myscheme for the file being imported using StructType

myschema=StructType([
    StructField("id", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("gender",StringType()),
    StructField("city", StringType()),
    StructField("job_title", StringType()),
    StructField("salary", StringType()),
    StructField("latitude", FloatType()),
    StructField("longitude",FloatType())
])

In [9]:
# Reading the CSV file into dataframe df for processing using schema defined as myschema

df = spark.read.csv("original.csv",header=True,schema=myschema)

In [10]:
# Listing out sample 20 records from the data present in dataframe df.

df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           city|           job_title|   salary|  latitude| longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [11]:
# LIsting out the types of the dataframe df that we have.

df.dtypes

[('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('city', 'string'),
 ('job_title', 'string'),
 ('salary', 'string'),
 ('latitude', 'float'),
 ('longitude', 'float')]

In [12]:
# This displays the First Record in the DataFrame

df.first()


Row(id=1, first_name='Melinde', last_name='Shilburne', gender='Female', city='Nowa Ruda', job_title='Assistant Professor', salary='$57438.18', latitude=50.57740783691406, longitude=16.49671745300293)

In [13]:
# This lists all the column names that are present in the dataframe

df.columns

['id',
 'first_name',
 'last_name',
 'gender',
 'city',
 'job_title',
 'salary',
 'latitude',
 'longitude']

In [14]:
# This shows total no of records that are present in the dataframe.
df.count()

1000

In [15]:
# This shows total no of distinct records that are present in the dataframe.

df.distinct().count()

1000

In [16]:
# Importing the SQL Functions for usage in our PySpark Code

from pyspark.sql.functions import *

# Listing Values from Dataframe

df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           city|           job_title|   salary|  latitude| longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [25]:
# Creating a Temp table out of data of the dataframe

df.registerTempTable("original")

# Inputting the date of Select query in the dataframe query1
query1 = spark.sql('Select * from original')

# displaying the results of Dataframe Query1
query1.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           city|           job_title|   salary|  latitude| longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [27]:
# # Inputting the date of Select query in the dataframe query2

query2 = spark.sql('select concat(first_name," ", last_name) as full_name from original where gender = "Female"')

# # displaying the results of Dataframe Query2
query2.show()


+-------------------+
|          full_name|
+-------------------+
|  Melinde Shilburne|
|Kimberly Von Welden|
|    Alvera Di Boldi|
|         Maris Folk|
|       Masha Divers|
|     Kylynn Lockart|
|         Rey Meharg|
|      Claude Briant|
|  Tiffanie Pattison|
|    Lurleen Janczak|
|      Nichol Holtum|
|       Shaun Bridle|
|     Leandra Anfrey|
|    Jaquelyn Hazard|
|  Prudence Honacker|
|       Cherey Liger|
|          Neda Krop|
|    Barbi Fattorini|
|   Lonnie Townshend|
|    Valida Salzberg|
+-------------------+
only showing top 20 rows



# Adding Calculated columns as output


In [31]:
# Adding a new column in dataframe clean_slarry and removed $  from the Salary column and updating the new column value

df = df.withColumn('clean_salary', df.salary.substr(2,100).cast('float'))

# Displaying the result set of the dataframe.
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+
| id|first_name| last_name|gender|           city|           job_title|   salary|  latitude| longitude|clean_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|    57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|     62846.6|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|    57576.52|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|    61489.23|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|    63863.09|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil En

In [32]:
# Adding a new column in dataframe clean_slarry and removed $  from the Salary column and updating the new column value

df = df.withColumn('monthly_salary', df.clean_salary/12)

# Displaying the result set of the dataframe.
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+------------------+
| id|first_name| last_name|gender|           city|           job_title|   salary|  latitude| longitude|clean_salary|    monthly_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+------------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|    57438.18| 4786.514973958333|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|     62846.6|    5237.216796875|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|    57576.52| 4798.043294270833|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|    61489.23|   5124.1025390625|
|  5|  Sherwood|   Macieja|  Male|      Mytishch

In [34]:
# Adding a new column in dataframe clean_slarry and removed $  from the Salary column and updating the new column value

df = df.withColumn('are_they_female', when (df.gender== "Female","Yes").otherwise("No"))

# Displaying the result set of the dataframe.
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+------------------+---------------+
| id|first_name| last_name|gender|           city|           job_title|   salary|  latitude| longitude|clean_salary|    monthly_salary|are_they_female|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+------------------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|    57438.18| 4786.514973958333|            Yes|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|     62846.6|    5237.216796875|            Yes|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|    57576.52| 4798.043294270833|            Yes|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504

# Group By and Aggregrations


In [37]:
# Group By Gender Male/Female and what is the total Salary.

import pyspark.sql.functions as sqlfunc

df1 = df.groupBy('gender').agg(sqlfunc.sum('clean_salary'))
df1.show()


+------+--------------------+
|gender|   sum(clean_salary)|
+------+--------------------+
|Female|2.7364519950195312E7|
|  Male|2.8123435678710938E7|
+------+--------------------+



In [43]:
# Group By Gender agg on  total, sum, min, max, average


df1 = df.groupBy('gender').agg(sqlfunc.sum('clean_salary').alias('total'),
(sqlfunc.avg('clean_salary').alias('avg')),
(sqlfunc.min('clean_salary').alias('min')),
(sqlfunc.max('clean_salary').alias('max')))

# show the output present in Dataframe
df1.show()


+------+--------------------+-----------------+--------+--------+
|gender|               total|              avg|     min|     max|
+------+--------------------+-----------------+--------+--------+
|Female|2.7364519950195312E7|55618.94298820185|10616.44|99948.28|
|  Male|2.8123435678710938E7|55361.09385573019|10101.92|99942.92|
+------+--------------------+-----------------+--------+--------+



In [44]:
# Group By Multipls Column Gender,city agg on  total, sum, min, max, average


df1 = df.groupBy('gender','city').agg(sqlfunc.sum('clean_salary').alias('total'),
(sqlfunc.avg('clean_salary').alias('avg')),
(sqlfunc.min('clean_salary').alias('min')),
(sqlfunc.max('clean_salary').alias('max')))

# show the output present in Dataframe
df1.show()

+------+-----------------+----------------+----------------+--------+--------+
|gender|             city|           total|             avg|     min|     max|
+------+-----------------+----------------+----------------+--------+--------+
|Female|           Dachun| 25090.869140625| 25090.869140625|25090.87|25090.87|
|Female|      Trollhättan|106623.369140625|53311.6845703125|26830.47| 79792.9|
|  Male|          Wenshao| 18941.509765625| 18941.509765625|18941.51|18941.51|
|Female|            Lanas| 13765.900390625| 13765.900390625| 13765.9| 13765.9|
|  Male|            Mörön|    77940.078125|    77940.078125|77940.08|77940.08|
|Female|             Same|   73369.7265625|   73369.7265625|73369.73|73369.73|
|Female|          Sawahan|  24608.83984375|  24608.83984375|24608.84|24608.84|
|  Male|Monte da Boavista|     98586.71875|     98586.71875|98586.72|98586.72|
|Female|         Nusajaya|    71637.921875|    71637.921875|71637.92|71637.92|
|Female|            Kista|   96192.3984375|   96192.

# Writing Dataframe to files


In [51]:
# Writing the contents of the dataframe to file of different formats.

#df1.write.csv('df1.csv', header = True, sep = ",", mode = "overwright", compression = "gzip")
#df1.write.json('df1.json')
#df1.write.parquet('df1.parquet')

# Write DataFrame to File with header, overwrite mode, seperator and Compression as gzip
df1.write.csv('df1.csv', header = True, sep = ",", mode = "overwrite", compression = "gzip")
df1.write.json('df1.json', mode = "overwrite", compression = "gzip")
df1.write.parquet('df1.parquet', mode = "overwrite", compression = "gzip")mb

In [53]:
import time

while True:
  print("Sleeping")
  time.sleep(60)


Sleeping


KeyboardInterrupt: 