In [7]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("SF Fire Calls") \
    .getOrCreate()

In [5]:
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

sf_fire_file = "data/sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

# Cache the DataFrame since we will be performing some operations on it.
fire_df.cache()
fire_df.count()

In [None]:
# Print the dataframe schema
fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [8]:
few_fire_df = (fire_df
               .select("IncidentNumber", "AvailableDtTm", "CallType")
               .where(F.col("CallType") != "Medical Incident"))

few_fire_df.show(5, truncate=False)

''' sql

select IncidentNumber, AvailableDtTm, CallType
from fire_calls
where CallType != 'Medical Incident'
limit 5;

'''

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



" sql\n\nselect IncidentNumber, AvailableDtTm, CallType\nfrom fire_calls\nwhere CallType != 'Medical Incident'\nlimit 5;\n\n"

In [12]:
# Q-1) How many distinct types of calls were made to the Fire Department?
# To be sure, let's not count "null" strings in that column.

(fire_df
  .select("CallType")
  .where(F.col("CallType").isNotNull())
  .agg(F.countDistinct("CallType").alias("DistinctCallTypes"))
  .show())

'''
SQL:

select count(distinct calltype) as DistinctCallTypes 
from fire_calls 
where calltype is not null 

'''

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



In [14]:
# Q-2) What are distinct types of calls were made to the Fire Department?
# These are all the distinct type of call to the SF Fire Department

#pyspark
(fire_df
  .select("CallType")
  .where(F.col("CallType").isNotNull())
  .distinct()
  .show())

'''
#sql
select distinct calltype 
from fire_calls 
where calltype is not null 

'''

+--------------------+
|            CallType|
+--------------------+
|Elevator / Escala...|
|         Marine Fire|
|  Aircraft Emergency|
|      Administrative|
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|              HazMat|
|Watercraft in Dis...|
|           Explosion|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
|Extrication / Ent...|
|               Other|
|        Outside Fire|
|   Traffic Collision|
|       Assist Police|
|Gas Leak (Natural...|
|        Water Rescue|
+--------------------+
only showing top 20 rows



'\n#sql\nselect distinct calltype \nfrom fire_calls \nwhere calldate is not null \n\n'

In [None]:
# Q-3) Find out all response or delayed times greater than 5 mins?
# Rename the column Delay - > ReponseDelayedinMins
# Returns a new DataFrame
# Find out all calls where the response time to the fire site was delayed for more than 5 mins

new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(new_fire_df
  .select("ResponseDelayedinMins")
  .where(F.col("ResponseDelayedinMins") > 5)
  .show(5, False))

'''
#sql
select Delay as ReponseDelayedinMins
from fire_calls 
where Delay > 5
limit 5

'''

In [16]:
# Q-4a) What zip codes accounted for most common calls?
# Let's investigate what zip codes in San Francisco accounted for most fire calls and what type where they.
# Filter out by CallType
# Group them by CallType and Zip code
# Count them and display them in descending order

(fire_df
  .select("CallType", "Zipcode")
  .groupBy("CallType", "Zipcode")
  .agg(F.count("*").alias("calls_qty"))
  .orderBy(F.col("calls_qty").desc()) 
  .show(5, False)
  )


'''
#sql

select calltype, zipcode, count(*) as calls_qty
from fire_calls
group by calltype, zipcode
order by count(*) desc 
limit 5;

'''

+----------------+-------+---------+
|CallType        |Zipcode|calls_qty|
+----------------+-------+---------+
|Medical Incident|94102  |16130    |
|Medical Incident|94103  |14775    |
|Medical Incident|94110  |9995     |
|Medical Incident|94109  |9479     |
|Medical Incident|94124  |5885     |
+----------------+-------+---------+
only showing top 5 rows



'\n#sql\n\nselect \ncalltype, zipcode, count(*) as calls_qty\nfrom fire_calls\ngroup by 1, 2\norder by 3 desc \n\n'

In [31]:
# Q-4b) What San Francisco neighborhoods are in the zip codes 94102 and 94103
# Let's find out the neighborhoods associated with these two zip codes. In all likelihood, these are some of the contested neighborhood with high reported crimes.

(fire_df
  .where(F.col("Zipcode").isin('94102', '94103')) 
  .groupBy("Neighborhood")
  .agg(F.count("IncidentNumber").alias("incident_qty")) 
  .orderBy(F.col("incident_qty").desc()) 
  .show(5, False)
)


'''
#sql

select neighborhood, count(incidentnumber) as incident_qty
from fire_calls
where zipcode in ('94102', '94103')
group by neighborhood
order by count(incidentnumber) desc
limit 5;

'''

+------------------------------+------------+
|Neighborhood                  |incident_qty|
+------------------------------+------------+
|Tenderloin                    |17408       |
|South of Market               |14016       |
|Mission                       |5445        |
|Hayes Valley                  |2867        |
|Financial District/South Beach|1536        |
+------------------------------+------------+
only showing top 5 rows



"\n#sql\n\nselect neighborhood, count(incidentnumber) as incident_qty\nfrom fire_calls\nwhere zipcode in ('94102', '94103')\ngroup by neighborhood\norder by count(incidentnumber) desc\nlimit 5;\n\n"

In [34]:
# Q-5) What was the sum of all calls, average, min and max of the response times for calls?
# Let's use the built-in Spark SQL functions to compute the sum, avg, min, and max of few columns:
# Number of Total Alarms
# What were the min and max the delay in response time before the Fire Dept arrived at the scene of the call

(fire_df
    .select(
      F.sum("numalarms").alias("total_alarms"),
      F.min("Delay").alias("min_delay"),
      F.max("Delay").alias("max_delay"),
      F.avg("Delay").alias("avg_delay") )
  .show()
)


'''
#sql

select 
sum(numalarms) as total_alarms,
min(delay) as min_delay,
max(delay) as max_delay,
avg(delay) as avg_delay
from fire_calls

'''

+------------+-----------+---------+-----------------+
|total_alarms|  min_delay|max_delay|        avg_delay|
+------------+-----------+---------+-----------------+
|      176170|0.016666668|  1844.55|3.892364154521585|
+------------+-----------+---------+-----------------+



'\n#sql\n\n\n\n'

In [73]:
# ** Q-6b) What week of the year in 2018 had the most fire calls?**
# Note: Week 1 is the New Years' week and week 25 is the July 4 the week. Loads of fireworks, so it makes sense the higher number of calls.

# Convert "CallDate" field type from string to a date.

fire_df = fire_df.withColumn('CallDate', F.to_timestamp(F.col("CallDate"), 'MM/dd/yyyy'))

(fire_df
  .where(F.year("CallDate") == 2018)
  .groupBy(F.weekofyear("CallDate").alias("week"))
  .agg(F.count("*").alias("calls_qty"))
  .orderBy(F.col("calls_qty").desc())
  .show(1)
 )
 

'''
#sql

select 
weekofyear(calldate) as week,
count(*) as calls_qty
from fire_calls
where year(calldate) = 2018
group by week
order by calls_qty desc
limit 1;

'''

+----+---------+
|week|calls_qty|
+----+---------+
|  22|      259|
+----+---------+
only showing top 1 row



'\n#sql\n\nselect \nweekofyear(calldate) as week,\ncount(*) as calls_qty\nfrom fire_calls\nwhere year(calldate) = 2018\ngroup by week\norder by calls_qty desc\nlimit 1;\n\n'

In [83]:
# ** Q-7) What neighborhoods in San Francisco had the worst response time in 2018?**
# It appears that if you living in Presidio Heights, the Fire Dept arrived in less than 3 mins, while Mission Bay took more than 6 mins.

(fire_df
 .where((F.year("CallDate") == 2018) & (F.col("City") == 'San Francisco'))
 .groupby(F.col("Neighborhood"))
 .agg(F.avg("Delay").alias("avg_delay"))
 .orderBy(F.col("avg_delay").desc())
 .show(3)
 )


'''
#sql

select
neighborhood, avg(delay) as avg_delay
from fire_calls 
where year(calldate) = 2018 and city = 'San Francisco'
group by neighborhood
order by avg_delay desc 
limit 3

'''

+---------------+------------------+
|   Neighborhood|         avg_delay|
+---------------+------------------+
|Treasure Island|11.320833250880241|
|       Presidio| 6.248148073752721|
|      Chinatown| 6.158818309742307|
+---------------+------------------+
only showing top 3 rows



"\n#sql\n\nselect\nneighborhood, avg(delay) as avg_delay\nfrom fire_calls \nwhere year(calldate) = 2018 and city = 'San Francisco'\ngroup by neighborhood\norder by avg_delay desc \nlimit 3\n\n"

In [3]:
# ** Q-8a) How can we use SQL table to store data and read it back?**
#  connect to Snowflake from pyspark and read one table to dataframe

# pyspark --packages net.snowflake:snowflake-jdbc:3.14.5,net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4

import os
from pyspark.sql import SparkSession

# Set the PYSPARK_SUBMIT_ARGS environment variable
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages net.snowflake:snowflake-jdbc:3.14.5,net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4 pyspark-shell'

# Initialize Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("SnowflakeTest") \
    .getOrCreate()

#print(spark.sparkContext.getConf().getAll()) #for troubleshooting driver issue

# Define Snowflake connection options
sfURL = os.getenv('SF_URL') 
sfAccount = os.getenv('SF_ACCOUNT')
sfUser = os.getenv('SF_USER')
sfPassword = os.getenv('SF_PASSWORD')

options = {
    "sfURL": sfURL,
    "sfAccount": sfAccount,
    "sfUser": sfUser,
    "sfPassword": sfPassword,
    "sfDatabase": "RAW",
    "sfSchema": "SUPERSTORE",
    "sfWarehouse": "WH_SUPERSTORE"
}

# Specify the Snowflake source
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

# Load data from Snowflake
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**options) \
    .option("query", "SELECT * FROM returned_orders") \
    .load()

# Show the DataFrame
df.show()

+-----------+--------------+
|IS_RETURNED|      ORDER_ID|
+-----------+--------------+
|       true|CA-2016-100762|
|       true|CA-2016-100762|
|       true|CA-2016-100762|
|       true|CA-2016-100762|
|       true|CA-2016-100867|
|       true|CA-2016-102652|
|       true|CA-2016-102652|
|       true|CA-2016-102652|
|       true|CA-2016-102652|
|       true|CA-2016-103373|
|       true|CA-2016-103744|
|       true|CA-2016-103744|
|       true|CA-2016-103940|
|       true|CA-2016-103940|
|       true|CA-2016-103940|
|       true|CA-2016-103940|
|       true|CA-2016-104829|
|       true|CA-2016-105270|
|       true|CA-2016-105270|
|       true|CA-2016-108609|
+-----------+--------------+
only showing top 20 rows



In [13]:
# Add the 'return_date' column with the current date
df_with_date = df.withColumn("return_date", F.current_date())

# Show the updated DataFrame
df_with_date.show()

# Specify the target Snowflake table for writing the data
target_table = "returned_orders_with_date"

# Write the updated DataFrame back to Snowflake. Mode can be "overwrite" or "append"
df_with_date.write \
    .format(SNOWFLAKE_SOURCE_NAME) \
    .options(**options) \
    .option("dbtable", target_table) \
    .mode("overwrite")\
    .save()

+-----------+--------------+-----------+
|IS_RETURNED|      ORDER_ID|return_date|
+-----------+--------------+-----------+
|       true|CA-2016-100762| 2024-02-09|
|       true|CA-2016-100762| 2024-02-09|
|       true|CA-2016-100762| 2024-02-09|
|       true|CA-2016-100762| 2024-02-09|
|       true|CA-2016-100867| 2024-02-09|
|       true|CA-2016-102652| 2024-02-09|
|       true|CA-2016-102652| 2024-02-09|
|       true|CA-2016-102652| 2024-02-09|
|       true|CA-2016-102652| 2024-02-09|
|       true|CA-2016-103373| 2024-02-09|
|       true|CA-2016-103744| 2024-02-09|
|       true|CA-2016-103744| 2024-02-09|
|       true|CA-2016-103940| 2024-02-09|
|       true|CA-2016-103940| 2024-02-09|
|       true|CA-2016-103940| 2024-02-09|
|       true|CA-2016-103940| 2024-02-09|
|       true|CA-2016-104829| 2024-02-09|
|       true|CA-2016-105270| 2024-02-09|
|       true|CA-2016-105270| 2024-02-09|
|       true|CA-2016-108609| 2024-02-09|
+-----------+--------------+-----------+
only showing top

In [4]:
# ** Q-8c) How can read data from Parquet file?**

# Path to the Parquet file
parquet_file_path = './data/supestore_orders.parquet'

# Read the Parquet file
df = spark.read.parquet(parquet_file_path)

# Show the DataFrame
df.show()

+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Customer Name|    Segment|      Country|           City|         State|Postal Code| Region|     Product ID|       Category|Sub-Category|        Product Name|             Sales|Quantity|Discount|             Profit|
+------+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+--------------------+------------------+--------+--------+-------------------+
|     1|CA-2018-152156|2018-11-08|2018-11-07|  Second Class|   CG-12520|       Claire Gute|   Consumer|United States|      H