<a href="https://colab.research.google.com/github/priyadharshini13/pyspark/blob/main/PySpark_LearningSpark_Ch3_B.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install pyspark



In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Ch3 - Common DF Operation').master('local[3]').getOrCreate()


# Common Dataframe operation

1. Using DataFrame reader
2. Using DataFrame Writer


In [5]:
import pandas as pd
# df_json = json.dumps(result)
emp_data = {"EMPID":{"0":1,"1":2,"2":3,"3":4,"4":5},"ENAME":{"0":"Deepika","1":"Vignesh","2":"Ganesh","3":"Haritha","4":"ilango"},"AGE":{"0":30,"1":27,"2":28,"3":29,"4":40},"SEX":{"0":"FEMALE","1":"MALE","2":"MALE","3":"FEMALE","4":"MALE"}}
extract_data_df = pd.DataFrame.from_dict(emp_data)
extract_data_df
# extract_data_df1 = extract_data_df["ENAME"].str.upper()

Unnamed: 0,EMPID,ENAME,AGE,SEX
0,1,Deepika,30,FEMALE
1,2,Vignesh,27,MALE
2,3,Ganesh,28,MALE
3,4,Haritha,29,FEMALE
4,5,ilango,40,MALE


In [6]:
from pyspark import SparkFiles

# url = 'https://raw.githubusercontent.com/jokecamp/FootballData/master/openFootballData/cities.csv'
url = 'https://raw.githubusercontent.com/databricks/LearningSparkV2/master/chapter3/data/sf-fire-calls.csv'
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("sf-fire-calls.csv"), header=True)
df.show()

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

In [7]:
# Defining schema
from pyspark.sql.types import *

# Programatic way to define schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
      StructField('UnitID', StringType(), True),
      StructField('IncidentNumber', IntegerType(), True),
      StructField('CallType', StringType(), True),
      StructField('CallDate', StringType(), True),
      StructField('WatchDate', StringType(), True),
      StructField('CallFinalDisposition', StringType(), True),
      StructField('AvailableDtTm', StringType(), True),
      StructField('Address', StringType(), True),
      StructField('City', StringType(), True),
      StructField('Zipcode', IntegerType(), True),
      StructField('Battalion', StringType(), True),
      StructField('StationArea', StringType(), True),
      StructField('Box', StringType(), True),
      StructField('OriginalPriority', StringType(), True),
      StructField('Priority', StringType(), True),
      StructField('FinalPriority', IntegerType(), True),
      StructField('ALSUnit', BooleanType(), True),
      StructField('CallTypeGroup', StringType(), True),
      StructField('NumAlarms', IntegerType(), True),
      StructField('UnitType', StringType(), True),
      StructField('UnitSequenceInCallDispatch', IntegerType(), True),
      StructField('FirePreventionDistrict', StringType(), True),
      StructField('SupervisorDistrict', StringType(), True),
      StructField('Neighborhood', StringType(), True),
      StructField('Location', StringType(), True),
      StructField('RowID', StringType(), True),
      StructField('Delay', FloatType(), True)])


In [8]:
url = 'https://raw.githubusercontent.com/databricks/LearningSparkV2/master/chapter3/data/sf-fire-calls.csv'
spark.sparkContext.addFile(url)
fire_df = spark.read.csv(SparkFiles.get('sf-fire-calls.csv'), header=True, schema=fire_schema)
fire_df.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

In [9]:
parquet_path = 'demo/fire'
fire_df.write.format("parquet").save(parquet_path)

parquet_table = 'fire1'
fire_df.write.format("parquet").saveAsTable(parquet_table) # stored inside spark-warehouse/

# Transformations and actions
 1. projections and filters
   Projection - a way to return only a specific row matching certain relational condition by using filters.

In [10]:
from pyspark.sql.functions import col
fire_df_filtered = fire_df.select("IncidentNumber", "AvailableDtTm", "CallType").where(col("CallType") != "Medical Incident")
fire_df_filtered.show()
# fire_df_filtered.explain()

+--------------+--------------------+--------------------+
|IncidentNumber|       AvailableDtTm|            CallType|
+--------------+--------------------+--------------------+
|       2003235|01/11/2002 01:51:...|      Structure Fire|
|       2003250|01/11/2002 04:16:...|        Vehicle Fire|
|       2003259|01/11/2002 06:01:...|              Alarms|
|       2003279|01/11/2002 08:03:...|      Structure Fire|
|       2003301|01/11/2002 09:46:...|              Alarms|
|       2003304|01/11/2002 09:58:...|              Alarms|
|       2003382|01/11/2002 02:59:...|      Structure Fire|
|       2003408|01/11/2002 04:09:...|      Structure Fire|
|       2003408|01/11/2002 04:09:...|      Structure Fire|
|       2003408|01/11/2002 04:09:...|      Structure Fire|
|       2003429|01/11/2002 05:17:...|Odor (Strange / U...|
|       2003453|01/11/2002 06:48:...|              Alarms|
|       2003497|01/11/2002 09:03:...|      Structure Fire|
|       2003554|01/12/2002 01:56:...|      Structure Fir

In [11]:
# Finding distinct callTypes 
  # countDistinct or count_distinct
from pyspark.sql.functions import *

# 1. Distinct on selected column
fire_df_distinct1 = fire_df.select('CallType').where(col('CallType').isNotNull()).agg(countDistinct('CallType').alias('countDistinctCallType'))
fire_df_distinct2 = fire_df.select('CallType').where(col('CallType').isNotNull()).agg(count_distinct('CallType'))
# agg(countDistinct('CallType').alias('DistinctCallType'))
fire_df_distinct1.show()
fire_df_distinct2.show()

# 2. Distinct rows
fire_df_distinct_row = fire_df.distinct().count()
print('Distinct rows', fire_df_distinct_row)

+---------------------+
|countDistinctCallType|
+---------------------+
|                   30|
+---------------------+

+---------------+
|count(CallType)|
+---------------+
|             30|
+---------------+

Distinct rows 175296


In [12]:
#List distinct call type
fire_df_distinct_list = fire_df.select('CallType').where(col('CallType').isNotNull()).distinct().show(truncate=False)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

Renaming, adding, dropping columns


In [13]:
fire_df_rename = fire_df.withColumnRenamed('Delay', 'ResponseDelayedinMins')
fire_df_rename.show(3)
fire_df_rename1 = fire_df_rename.select('ResponseDelayedinMins').where(col('ResponseDelayedinMins') > 5).alias('Delay>5')
fire_df_rename1.show()


+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+--------------

Timestamp conversion

In [14]:
fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [15]:
# to_timestamp -> Converts string datatype to Pyspark timestamp datatype
  # to_timestamp(column, format of input column(optional))
  # if format is not specified, output will be null
fire_ts_df = fire_df.withColumn('IncidentDate', to_timestamp(col('CallDate'),"MM/dd/yyyy")).drop('CallDate').withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate").withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),"MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")
fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, False)

fire_ts_df.printSchema()

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable =

In [16]:
# to_date(column, format)

df = spark.createDataFrame([["02-03-2013"],["05-06-2023"]], ['DateColumnInString'])
df.select(col('DateColumnInString'), to_date(col('DateColumnInString'), "MM-dd-yyyy").alias('DateColumnInDateFormat')).show()

+------------------+----------------------+
|DateColumnInString|DateColumnInDateFormat|
+------------------+----------------------+
|        02-03-2013|            2013-02-03|
|        05-06-2023|            2023-05-06|
+------------------+----------------------+

