In [1]:
import pyspark;
from pyspark import *;
from pyspark.sql import *;
from pyspark.sql.types import *;
from pyspark.sql import SparkSession;
from pyspark.sql.functions import *;

In [2]:
spark = SparkSession.builder.appName("Structured Data").config("spark.driver.memory","4g").config("spark.executor.memory","4g").getOrCreate()


In [3]:
dataFrame = spark.read.csv("C:\\Learning\\Python_Projects\\PySpark\\fire-incidents.csv", inferSchema=True, header=True)

In [4]:
dataFrame.printSchema()

root
 |-- IncidentNumber: string (nullable = true)
 |-- ExposureNumber: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- IncidentDate: string (nullable = true)
 |-- CallNumber: string (nullable = true)
 |-- AlarmDtTm: string (nullable = true)
 |-- ArrivalDtTm: string (nullable = true)
 |-- CloseDtTm: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZIPCode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- SuppressionUnits: integer (nullable = true)
 |-- SuppressionPersonnel: integer (nullable = true)
 |-- EMSUnits: integer (nullable = true)
 |-- EMSPersonnel: integer (nullable = true)
 |-- OtherUnits: integer (nullable = true)
 |-- OtherPersonnel: integer (nullable = true)
 |-- FirstUnitOnScene: string (nullable = true)
 |-- EstimatedPropertyLoss: integer (nullable = true)
 |-- EstimatedContentsLoss: string (nullable 

In [5]:
dataFrame = dataFrame.withColumn("IncidentNumber", col("IncidentNumber").cast(IntegerType()))

In [6]:
len(dataFrame.columns)

80

In [7]:
dataFrame.printSchema()

root
 |-- IncidentNumber: integer (nullable = true)
 |-- ExposureNumber: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- IncidentDate: string (nullable = true)
 |-- CallNumber: string (nullable = true)
 |-- AlarmDtTm: string (nullable = true)
 |-- ArrivalDtTm: string (nullable = true)
 |-- CloseDtTm: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZIPCode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- SuppressionUnits: integer (nullable = true)
 |-- SuppressionPersonnel: integer (nullable = true)
 |-- EMSUnits: integer (nullable = true)
 |-- EMSPersonnel: integer (nullable = true)
 |-- OtherUnits: integer (nullable = true)
 |-- OtherPersonnel: integer (nullable = true)
 |-- FirstUnitOnScene: string (nullable = true)
 |-- EstimatedPropertyLoss: integer (nullable = true)
 |-- EstimatedContentsLoss: string (nullable

In [8]:
dataFrame.select("IncidentNumber", "City", "ZIPCode", "FireFatalities").show(5,False)

+--------------+-------------+-------+--------------+
|IncidentNumber|City         |ZIPCode|FireFatalities|
+--------------+-------------+-------+--------------+
|20104668      |San Francisco|94107  |0             |
|20104708      |San Francisco|94131  |0             |
|20104648      |San Francisco|94109  |0             |
|20104598      |San Francisco|94112  |0             |
|20104575      |San Francisco|94118  |0             |
+--------------+-------------+-------+--------------+
only showing top 5 rows



In [9]:
dataFrame.select("IncidentNumber","IncidentDate").groupBy("IncidentDate").sum("IncidentNumber").show(5,False)

+-----------------------+-------------------+
|IncidentDate           |sum(IncidentNumber)|
+-----------------------+-------------------+
|2020-03-22T00:00:00.000|1342381539         |
|2020-03-09T00:00:00.000|1622441483         |
|2019-10-23T00:00:00.000|1836254154         |
|2019-01-28T00:00:00.000|1692043947         |
|2018-12-17T00:00:00.000|1542535969         |
+-----------------------+-------------------+
only showing top 5 rows



In [10]:
spark.range(5).toDF("Numbers").show()

+-------+
|Numbers|
+-------+
|      0|
|      1|
|      2|
|      3|
|      4|
+-------+



In [11]:
myRow = Row("Hello", None, 1, False)

In [12]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [13]:
myManualSchema = StructType([
    StructField("some", StringType(), True),
    StructField("values", StringType(), True),
    StructField("are good", StringType(), False)
])

myRow = Row("Hey",None,1)
myDataFrame = spark.createDataFrame([myRow], schema = myManualSchema)
myDataFrame.show()

+----+------+--------+
|some|values|are good|
+----+------+--------+
| Hey|  null|       1|
+----+------+--------+



In [14]:
myDataFrame.printSchema()

root
 |-- some: string (nullable = true)
 |-- values: string (nullable = true)
 |-- are good: string (nullable = false)



In [15]:
seed = 5
withReplacement = False
fraction = 0.01
dataFrame.sample(withReplacement, fraction, seed).toPandas()

Unnamed: 0,IncidentNumber,ExposureNumber,ID,Address,IncidentDate,CallNumber,AlarmDtTm,ArrivalDtTm,CloseDtTm,City,...,2017FixItZones,HSOCZones,CentralMarketTenderloinBoundary,CentralMarketTenderloinBoundaryPolygonUpdated,HSOCZonesasof20180605,Neighborhoods,SFFindNeighborhoods,CurrentPoliceDistricts,CurrentSupervisorDistricts,AnalysisNeighborhoods
0,20104375.0,0,201043750,1715 MCALLISTER STREET,2020-09-10T00:00:00.000,202540977,2020-09-10T09:55:28.000,2020-09-10T09:58:03.000,2020-09-10T10:12:28.000,San Francisco,...,,,,,,97.0,97.0,7.0,11.0,39.0
1,20103988.0,0,201039880,522-524 NATOMA STREET,2020-09-09T00:00:00.000,202531199,2020-09-09T11:00:34.000,2020-09-09T11:06:38.000,2020-09-09T11:09:47.000,San Francisco,...,24.0,1.0,1.0,1.0,1.0,32.0,32.0,1.0,10.0,34.0
2,20104053.0,0,201040530,"680 POINT LOBOS AV, PRAIRIE",2020-09-09T00:00:00.000,202531687,2020-09-09T13:20:20.000,2020-09-09T13:26:03.000,2020-09-09T13:59:27.000,Presidio,...,,,,,,6.0,6.0,,4.0,29.0
3,20102762.0,0,201027620,1589 17TH AVENUE,2020-09-06T00:00:00.000,202502130,2020-09-06T15:45:01.000,2020-09-06T15:48:36.000,2020-09-06T16:07:42.000,San Francisco,...,,,,,,44.0,44.0,10.0,8.0,14.0
4,20101279.0,0,201012790,24TH STREET,2020-09-03T00:00:00.000,202470570,2020-09-03T07:35:34.000,2020-09-03T07:39:07.000,2020-09-03T07:45:14.000,San Francisco,...,,,,,,84.0,84.0,3.0,5.0,22.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5419,3000763.0,0,030007630,6th St. / Market St.,2003-01-03T00:00:00.000,030030033,2003-01-03T02:40:10.000,2003-01-03T02:44:22.000,2003-01-03T03:32:51.000,SF,...,17.0,1.0,1.0,1.0,1.0,20.0,20.0,5.0,10.0,34.0
5420,3000789.0,0,030007890,1231 Market St.,2003-01-03T00:00:00.000,030030061,2003-01-03T06:42:05.000,2003-01-03T06:43:56.000,2003-01-03T06:46:23.000,SF,...,,1.0,1.0,1.0,1.0,32.0,32.0,5.0,10.0,34.0
5421,3000862.0,0,030008620,114 Sansome St.,2003-01-03T00:00:00.000,030030179,2003-01-03T12:09:08.000,2003-01-03T12:11:59.000,2003-01-03T13:30:51.000,SF,...,,,,,,108.0,108.0,6.0,3.0,8.0
5422,3001057.0,0,030010570,Farallones St. / San Jose Av.,2003-01-03T00:00:00.000,030030440,2003-01-03T23:38:55.000,2003-01-03T23:42:53.000,2003-01-03T23:43:20.000,SF,...,,,,,,81.0,81.0,10.0,1.0,24.0


In [16]:
dataFrame.count()

541014

In [17]:
len(dataFrame.columns)

80

In [18]:
dataFrame = dataFrame.sortWithinPartitions(col("ZIPCode"))

In [19]:
df2 = dataFrame.repartition(5, col("ExposureNumber") )

In [20]:
df2.printSchema()

root
 |-- IncidentNumber: integer (nullable = true)
 |-- ExposureNumber: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- IncidentDate: string (nullable = true)
 |-- CallNumber: string (nullable = true)
 |-- AlarmDtTm: string (nullable = true)
 |-- ArrivalDtTm: string (nullable = true)
 |-- CloseDtTm: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZIPCode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- SuppressionUnits: integer (nullable = true)
 |-- SuppressionPersonnel: integer (nullable = true)
 |-- EMSUnits: integer (nullable = true)
 |-- EMSPersonnel: integer (nullable = true)
 |-- OtherUnits: integer (nullable = true)
 |-- OtherPersonnel: integer (nullable = true)
 |-- FirstUnitOnScene: string (nullable = true)
 |-- EstimatedPropertyLoss: integer (nullable = true)
 |-- EstimatedContentsLoss: string (nullable

In [21]:
dataFrame.select(lit(5), lit("five")).count()

541014

In [22]:
dataFrame.count()

541014

In [23]:
a = ["Ram", "Sai", "Karthik", "Unnam"]
rdd = spark.sparkContext.parallelize(a)

In [24]:
dataFrame.columns

['IncidentNumber',
 'ExposureNumber',
 'ID',
 'Address',
 'IncidentDate',
 'CallNumber',
 'AlarmDtTm',
 'ArrivalDtTm',
 'CloseDtTm',
 'City',
 'ZIPCode',
 'Battalion',
 'StationArea',
 'Box',
 'SuppressionUnits',
 'SuppressionPersonnel',
 'EMSUnits',
 'EMSPersonnel',
 'OtherUnits',
 'OtherPersonnel',
 'FirstUnitOnScene',
 'EstimatedPropertyLoss',
 'EstimatedContentsLoss',
 'FireFatalities',
 'FireInjuries',
 'CivilianFatalities',
 'CivilianInjuries',
 'NumberofAlarms',
 'PrimarySituation',
 'MutualAid',
 'ActionTakenPrimary',
 'ActionTakenSecondary',
 'ActionTakenOther',
 'DetectorAlertedOccupants',
 'PropertyUse',
 'AreaofFireOrigin',
 'IgnitionCause',
 'IgnitionFactorPrimary',
 'IgnitionFactorSecondary',
 'HeatSource',
 'ItemFirstIgnited',
 'HumanFactorsAssociatedwithIgnition',
 'StructureType',
 'StructureStatus',
 'FloorofFireOrigin',
 'FireSpread',
 'NoFlameSpead',
 'Numberoffloorswithminimumdamage',
 'Numberoffloorswithsignificantdamage',
 'Numberoffloorswithheavydamage',
 'Numbe

In [25]:
df2 = dataFrame.withColumn("Address", when(col("Address").isNull(), lit("Nothing is present")).otherwise(col("Address")))

In [26]:
df2.select("Address").where(col("Address") == "Nothing is present").count()

6431

In [27]:
conditionFilter = instr(col("Address"), "present") >=1
df2.select("Address").where(conditionFilter).show(5,False)

+------------------+
|Address           |
+------------------+
|Nothing is present|
|Nothing is present|
|Nothing is present|
|Nothing is present|
|Nothing is present|
+------------------+
only showing top 5 rows



In [28]:
df2.describe().show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------------+---------+------------------+-----------------+------------------+--------------------+-------------------+------------------+-------------------+-------------------+------------------+---------------------+---------------------+--------------------+--------------------+--------------------+--------------------+-------------------+----------------+--------------------+--------------------+--------------------+--------------------+------------------------+----------------+----------------+--------------------+---------------------+-----------------------+-----------------+----------------+----------------------------------+--------------------+---------------+------------------+---------------+------------------+-------------------------------+------------------

In [29]:
df2.toPandas().describe()

Unnamed: 0,IncidentNumber,SuppressionUnits,SuppressionPersonnel,EMSUnits,EMSPersonnel,OtherUnits,OtherPersonnel,EstimatedPropertyLoss,CivilianFatalities,CivilianInjuries,...,2017FixItZones,HSOCZones,CentralMarketTenderloinBoundary,CentralMarketTenderloinBoundaryPolygonUpdated,HSOCZonesasof20180605,Neighborhoods,SFFindNeighborhoods,CurrentPoliceDistricts,CurrentSupervisorDistricts,AnalysisNeighborhoods
count,534854.0,534853.0,534853.0,534853.0,534854.0,534854.0,534854.0,129739.0,534853.0,534853.0,...,93478.0,84675.0,62548.0,62919.0,92969.0,523503.0,523503.0,521276.0,525330.0,525231.0
mean,11465390.0,2.532421,9.129993,0.182661,0.337372,0.036973,0.067966,5430.941,6e-05,0.001105,...,14.647628,2.060278,1.000016,1.000302,2.321505,54.320938,54.320938,5.180534,6.73487,21.29233
std,5091142.0,8.55842,22.144772,0.772124,1.18831,1.393472,1.465132,314012.2,0.008861,0.063994,...,6.421997,1.162598,0.003998,0.075747,1.389569,33.56861,33.56861,2.812717,3.26722,12.604912
min,3000001.0,0.0,0.0,0.0,0.0,0.0,0.0,-25000.0,0.0,0.0,...,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
25%,7060010.0,1.0,4.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,10.0,1.0,1.0,1.0,1.0,25.0,25.0,3.0,3.0,8.0
50%,11114460.0,2.0,9.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,17.0,1.0,1.0,1.0,3.0,50.0,50.0,5.0,7.0,21.0
75%,16054340.0,3.0,10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,18.0,3.0,1.0,1.0,3.0,87.0,87.0,7.0,10.0,34.0
max,20104710.0,3333.0,5960.0,150.0,312.0,1000.0,1000.0,100000000.0,3.0,24.0,...,53.0,5.0,2.0,20.0,5.0,117.0,117.0,10.0,11.0,41.0


In [30]:
print(df2.columns)

['IncidentNumber', 'ExposureNumber', 'ID', 'Address', 'IncidentDate', 'CallNumber', 'AlarmDtTm', 'ArrivalDtTm', 'CloseDtTm', 'City', 'ZIPCode', 'Battalion', 'StationArea', 'Box', 'SuppressionUnits', 'SuppressionPersonnel', 'EMSUnits', 'EMSPersonnel', 'OtherUnits', 'OtherPersonnel', 'FirstUnitOnScene', 'EstimatedPropertyLoss', 'EstimatedContentsLoss', 'FireFatalities', 'FireInjuries', 'CivilianFatalities', 'CivilianInjuries', 'NumberofAlarms', 'PrimarySituation', 'MutualAid', 'ActionTakenPrimary', 'ActionTakenSecondary', 'ActionTakenOther', 'DetectorAlertedOccupants', 'PropertyUse', 'AreaofFireOrigin', 'IgnitionCause', 'IgnitionFactorPrimary', 'IgnitionFactorSecondary', 'HeatSource', 'ItemFirstIgnited', 'HumanFactorsAssociatedwithIgnition', 'StructureType', 'StructureStatus', 'FloorofFireOrigin', 'FireSpread', 'NoFlameSpead', 'Numberoffloorswithminimumdamage', 'Numberoffloorswithsignificantdamage', 'Numberoffloorswithheavydamage', 'Numberoffloorswithextremedamage', 'DetectorsPresent', '

In [31]:
df2.select("Address").show(5,False)

+------------------+
|Address           |
+------------------+
|Nothing is present|
|Nothing is present|
|Nothing is present|
|Nothing is present|
|Nothing is present|
+------------------+
only showing top 5 rows



In [32]:
#This function capitalizes each word that is separated by space
df2.select(initcap(col("Address"))).show(5,False)

+------------------+
|initcap(Address)  |
+------------------+
|Nothing Is Present|
|Nothing Is Present|
|Nothing Is Present|
|Nothing Is Present|
|Nothing Is Present|
+------------------+
only showing top 5 rows



In [33]:
from pyspark.sql.functions import regexp_replace
regex_string = "is|present"
df2.select(regexp_replace(col("Address"), regex_string, "hey there this is replaced...").alias("Regex_Replaced_Col"), "Address").show(5,False)

+-------------------------------------------------------------------+------------------+
|Regex_Replaced_Col                                                 |Address           |
+-------------------------------------------------------------------+------------------+
|Nothing hey there this is replaced... hey there this is replaced...|Nothing is present|
|Nothing hey there this is replaced... hey there this is replaced...|Nothing is present|
|Nothing hey there this is replaced... hey there this is replaced...|Nothing is present|
|Nothing hey there this is replaced... hey there this is replaced...|Nothing is present|
|Nothing hey there this is replaced... hey there this is replaced...|Nothing is present|
+-------------------------------------------------------------------+------------------+
only showing top 5 rows



In [34]:
isContains = col("Address").contains("is")
isPresent = col("Address").contains("present")

df2.withColumn("NewColumn", when(isContains, lit("5")).otherwise(col("Address"))).select("Address","NewColumn").show(20,False)

+------------------+---------+
|Address           |NewColumn|
+------------------+---------+
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
|Nothing is present|5        |
+------------------+---------+
only showing top 20 rows



In [35]:
dateDF = spark.range(10).\
        withColumn("date", current_date()).\
        withColumn("time", current_timestamp())

In [36]:
dateDF.select(date_add(col("date"),120).alias("Addition of date"), date_sub(col("date"),25).alias("Subtraction of date"), "date").show(5,False)

+----------------+-------------------+----------+
|Addition of date|Subtraction of date|date      |
+----------------+-------------------+----------+
|2022-01-24      |2021-09-01         |2021-09-26|
|2022-01-24      |2021-09-01         |2021-09-26|
|2022-01-24      |2021-09-01         |2021-09-26|
|2022-01-24      |2021-09-01         |2021-09-26|
|2022-01-24      |2021-09-01         |2021-09-26|
+----------------+-------------------+----------+
only showing top 5 rows



In [37]:
#Using coalesce function to return non-null value
dataFrame.select(coalesce(col("Address"))).show(5,False)
#the above code is not working

+-----------------+
|coalesce(Address)|
+-----------------+
|null             |
|null             |
|null             |
|null             |
|null             |
+-----------------+
only showing top 5 rows



In [38]:
df3 = dataFrame.limit(50)

In [39]:
df3.columns

['IncidentNumber',
 'ExposureNumber',
 'ID',
 'Address',
 'IncidentDate',
 'CallNumber',
 'AlarmDtTm',
 'ArrivalDtTm',
 'CloseDtTm',
 'City',
 'ZIPCode',
 'Battalion',
 'StationArea',
 'Box',
 'SuppressionUnits',
 'SuppressionPersonnel',
 'EMSUnits',
 'EMSPersonnel',
 'OtherUnits',
 'OtherPersonnel',
 'FirstUnitOnScene',
 'EstimatedPropertyLoss',
 'EstimatedContentsLoss',
 'FireFatalities',
 'FireInjuries',
 'CivilianFatalities',
 'CivilianInjuries',
 'NumberofAlarms',
 'PrimarySituation',
 'MutualAid',
 'ActionTakenPrimary',
 'ActionTakenSecondary',
 'ActionTakenOther',
 'DetectorAlertedOccupants',
 'PropertyUse',
 'AreaofFireOrigin',
 'IgnitionCause',
 'IgnitionFactorPrimary',
 'IgnitionFactorSecondary',
 'HeatSource',
 'ItemFirstIgnited',
 'HumanFactorsAssociatedwithIgnition',
 'StructureType',
 'StructureStatus',
 'FloorofFireOrigin',
 'FireSpread',
 'NoFlameSpead',
 'Numberoffloorswithminimumdamage',
 'Numberoffloorswithsignificantdamage',
 'Numberoffloorswithheavydamage',
 'Numbe

In [40]:
df3.cache()

DataFrame[IncidentNumber: int, ExposureNumber: string, ID: string, Address: string, IncidentDate: string, CallNumber: string, AlarmDtTm: string, ArrivalDtTm: string, CloseDtTm: string, City: string, ZIPCode: string, Battalion: string, StationArea: string, Box: string, SuppressionUnits: int, SuppressionPersonnel: int, EMSUnits: int, EMSPersonnel: int, OtherUnits: int, OtherPersonnel: int, FirstUnitOnScene: string, EstimatedPropertyLoss: int, EstimatedContentsLoss: string, FireFatalities: string, FireInjuries: string, CivilianFatalities: int, CivilianInjuries: int, NumberofAlarms: int, PrimarySituation: string, MutualAid: string, ActionTakenPrimary: string, ActionTakenSecondary: string, ActionTakenOther: string, DetectorAlertedOccupants: string, PropertyUse: string, AreaofFireOrigin: string, IgnitionCause: string, IgnitionFactorPrimary: string, IgnitionFactorSecondary: string, HeatSource: string, ItemFirstIgnited: string, HumanFactorsAssociatedwithIgnition: string, StructureType: string,

In [41]:
from pyspark.sql import *
df4 = df3.withColumn("TimeStamp", current_timestamp().cast(StringType()))

In [42]:
df4.printSchema

<bound method DataFrame.printSchema of DataFrame[IncidentNumber: int, ExposureNumber: string, ID: string, Address: string, IncidentDate: string, CallNumber: string, AlarmDtTm: string, ArrivalDtTm: string, CloseDtTm: string, City: string, ZIPCode: string, Battalion: string, StationArea: string, Box: string, SuppressionUnits: int, SuppressionPersonnel: int, EMSUnits: int, EMSPersonnel: int, OtherUnits: int, OtherPersonnel: int, FirstUnitOnScene: string, EstimatedPropertyLoss: int, EstimatedContentsLoss: string, FireFatalities: string, FireInjuries: string, CivilianFatalities: int, CivilianInjuries: int, NumberofAlarms: int, PrimarySituation: string, MutualAid: string, ActionTakenPrimary: string, ActionTakenSecondary: string, ActionTakenOther: string, DetectorAlertedOccupants: string, PropertyUse: string, AreaofFireOrigin: string, IgnitionCause: string, IgnitionFactorPrimary: string, IgnitionFactorSecondary: string, HeatSource: string, ItemFirstIgnited: string, HumanFactorsAssociatedwithI