**Data sources**

In [18]:
fire_file_path='/Users/simith/Downloads/Fire_Department_Calls_for_Service.csv'
law_enf_file_path='/Users/simith/Downloads/Law_Enforcement_Dispatched_Calls_for_Service__Closed.csv'

**Imports from pyspark**

In [19]:
from pyspark.sql.types import StructField,StructType,IntegerType,StringType,BooleanType
import pyspark.sql.functions as F

**Print the schema**

In [20]:

fire_calls_schema = StructType([StructField('call_number', IntegerType(), True),
                     StructField('unit_id', StringType(), True),
                     StructField('incident_number', IntegerType(), True),
                     StructField('call_type', StringType(), True),                  
                     StructField('call_date', StringType(), True),      
                     StructField('watch_date', StringType(), True),
                     StructField('received_dt_time', StringType(), True),
                     StructField('entry_dt_time', StringType(), True),
                     StructField('dispatch_dt_time', StringType(), True),       
                     StructField('response_dt_time', StringType(), True),       
                     StructField('on_scene_date_time', StringType(), True),
                     StructField('transport_dt_time', StringType(), True),   
                     StructField('hospital_dt_time', StringType(), True),   
                     StructField('available_dt_time', StringType(), True),                 
                     StructField('call_final_disposition', StringType(), True), 
                     StructField('address', StringType(), True),       
                     StructField('city', StringType(), True),       
                     StructField('zip_code', IntegerType(), True),       
                     StructField('battalion', StringType(), True),       
                     StructField('station_area', StringType(), True),       
                     StructField('box', StringType(), True),       
                     StructField('original_priority', StringType(), True),
                     StructField('priority', IntegerType(), True),
                     StructField('final_priority', IntegerType(), True),
                     StructField('als_unit', BooleanType(), True),
                     StructField('call_type_group', StringType(), True), 
                     StructField('no_of_alarms', IntegerType(), True),
                     StructField('unit_type', StringType(), True), 
                     StructField('unit_seq_in_call_dispatch', IntegerType(), True),
                     StructField('fire_prevention_district', StringType(), True),
                     StructField('supervisor_district', StringType(), True),
                     StructField('analysis_neighborhood', StringType(), True),
                     StructField('row_id', StringType(), True),
                     StructField('case_location', StringType(), True),
                     StructField('a_neighborhoods',StringType(),True)])

**Load the Fre calls csv file**

In [7]:
fire_df = spark.read.format("csv").option("header","true").load(fire_file_path,schema=fire_calls_schema)

In [12]:
fire_df = fire_df.withColumn("year",F.year(F.to_timestamp('received_dt_time', 'MM/dd/yyyy hh:mm:ss a')))

**Display the new Fire Service calls schema**

In [55]:
fire_df.printSchema()

root
 |-- call_number: integer (nullable = true)
 |-- unit_id: string (nullable = true)
 |-- incident_number: integer (nullable = true)
 |-- call_type: string (nullable = true)
 |-- call_date: string (nullable = true)
 |-- watch_date: string (nullable = true)
 |-- received_dt_time: string (nullable = true)
 |-- entry_dt_time: string (nullable = true)
 |-- dispatch_dt_time: string (nullable = true)
 |-- response_dt_time: string (nullable = true)
 |-- on_scene_date_time: string (nullable = true)
 |-- transport_dt_time: string (nullable = true)
 |-- hospital_dt_time: string (nullable = true)
 |-- available_dt_time: string (nullable = true)
 |-- call_final_disposition: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- battalion: string (nullable = true)
 |-- station_area: string (nullable = true)
 |-- box: string (nullable = true)
 |-- original_priority: string (nullable = true)
 |-- priority: i

**Create Bronze table in Delta lake**

In [14]:
fire_df\
    .write.format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema", "true")\
    .save("/tmp/data/1_0_assignment/tables/fire_calls")

23/07/10 14:38:56 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/07/10 14:38:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/07/10 14:38:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/07/10 14:38:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/07/10 14:38:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Call Number, Unit ID, Incident Number, Call Type, Call Date, Watch Date, Received DtTm, Entry DtTm, Dispatch DtTm, Response DtTm, On Scene DtTm, Transport DtTm, Hospital DtTm, Call Final Disposition, Available DtTm, Address, City, Zipcode of Incident, Battalion

In [56]:
df_fire_delta_table = (
    spark
    .read.format("delta")
    .load("/tmp/data/1_0_assignment/tables/fire_calls")
)

df_fire_delta_table.count()

6256076

## Law Enforcement dataset

In [47]:
law_enf_df = spark.read.format("csv").option("header","true").load(law_enf_file_path)

**Create Bronze table**

**Create Silver table here, ingest into data warehouse for querying from applications**

**Ingest gold table into Data warehouse**

In [48]:
law_enf_df=law_enf_df.na.fill(value='UNKNOWN',subset=["police_district"])
law_enf_df_001 = law_enf_df.select('analysis_neighborhood').distinct()


In [49]:
law_enf_df.printSchema()

root
 |-- cad_number: string (nullable = true)
 |-- dup_cad_number: string (nullable = true)
 |-- pd_incident_report: string (nullable = true)
 |-- received_datetime: string (nullable = true)
 |-- entry_datetime: string (nullable = true)
 |-- dispatch_datetime: string (nullable = true)
 |-- enroute_datetime: string (nullable = true)
 |-- onscene_datetime: string (nullable = true)
 |-- close_datetime: string (nullable = true)
 |-- call_type_original: string (nullable = true)
 |-- call_type_original_desc: string (nullable = true)
 |-- call_type_original_notes: string (nullable = true)
 |-- call_type_final: string (nullable = true)
 |-- call_type_final_desc: string (nullable = true)
 |-- call_type_final_notes: string (nullable = true)
 |-- priority_original: string (nullable = true)
 |-- priority_final: string (nullable = true)
 |-- agency: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- onview_flag: string (nullable = true)
 |-- sensitive_call: string (nullable =

In [50]:
law_enf_df = law_enf_df.withColumnRenamed('SF Find Neighborhoods','sf_neighbourhoods').\
            withColumnRenamed('Current Police Districts','curr_police_district').\
            withColumnRenamed('Current Supervisor Districts','curr_supervisor_district').\
            withColumnRenamed('Analysis Neighborhoods','analysis_neighborhood2').\
            withColumnRenamed('Neighborhoods','neighborhood')

law_enf_df = law_enf_df.withColumn("year",F.year(F.to_timestamp('received_datetime', 'MM/dd/yyyy hh:mm:ss a')))
            

In [51]:
law_enf_df.printSchema()

root
 |-- cad_number: string (nullable = true)
 |-- dup_cad_number: string (nullable = true)
 |-- pd_incident_report: string (nullable = true)
 |-- received_datetime: string (nullable = true)
 |-- entry_datetime: string (nullable = true)
 |-- dispatch_datetime: string (nullable = true)
 |-- enroute_datetime: string (nullable = true)
 |-- onscene_datetime: string (nullable = true)
 |-- close_datetime: string (nullable = true)
 |-- call_type_original: string (nullable = true)
 |-- call_type_original_desc: string (nullable = true)
 |-- call_type_original_notes: string (nullable = true)
 |-- call_type_final: string (nullable = true)
 |-- call_type_final_desc: string (nullable = true)
 |-- call_type_final_notes: string (nullable = true)
 |-- priority_original: string (nullable = true)
 |-- priority_final: string (nullable = true)
 |-- agency: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- onview_flag: string (nullable = true)
 |-- sensitive_call: string (nullable =

**Save Law Enforcement data to Bronze table (law_enf_calls)**

In [53]:
law_enf_df\
    .write.format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema", "true")\
    .save("/tmp/data/1_0_assignment/tables/law_enf_calls")

23/07/10 16:06:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/07/10 16:06:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/07/10 16:06:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/07/10 16:06:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/07/10 16:06:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/07/10 16:06:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                     

In [8]:
df_fire_delta_table = (
    spark
    .read.format("delta")
    .load("/tmp/data/1_0_assignment/tables/fire_calls")
)

df_fire_delta_table.count()

6256076

In [54]:
df_law_enf_delta_table = (
    spark
    .read.format("delta")
    .load("/tmp/data/1_0_assignment/tables/law_enf_calls")
)

df_law_enf_delta_table.count()

5202343

### Create data set for querying from applications for Fire and Law Enforcement

**Prepare the fire call records as per the App requirements and ingest into warehouse**

In [11]:
fire_select_list=[F.col('call_number').alias('caller_phone_number'),F.col('call_type').alias('incident_type'),\
                  'zip_code',F.col('analysis_neighborhood').alias('neighbourhood'),\
                  F.col('received_dt_time').alias('received_dt_time'),\
                  F.year(F.to_timestamp('received_dt_time', 'MM/dd/yyyy hh:mm:ss a')).alias('year')]
fire_apps_df = df_fire_delta_table.select(fire_select_list)
fire_apps_df = fire_apps_df.withColumn('call_received_date',\
                                       F.to_date(F.col('received_dt_time'), 'MM/dd/yyyy hh:mm:ss a'))\
                                       .drop(F.col('received_dt_time'))

spark = SparkSession \
    .builder \
    .appName("JDBC Writer App") \
    .config("spark.jars", "/Users/simith/Downloads/postgresql-42.6.0.jar") \
    .getOrCreate()
connection_str="jdbc:postgresql://localhost:5432/test"
table="fire_calls"
properties = {"user":"postgres", "password":"postgres", "driver": "org.postgresql.Driver"}
fire_apps_df.write.jdbc(url=connection_str, table=table, properties=properties)

                                                                                

**Prepare the Law enforcement call records as per the App requirements and ingest into warehouse**

In [12]:
law_enf_select_list=[F.col('cad_number').alias('incident_number'),F.col('call_type_final_desc').alias('incident_type'),\
             'received_datetime','close_datetime',\
                     F.year(F.to_timestamp('received_datetime', 'MM/dd/yyyy hh:mm:ss a')).alias('year'),\
                     F.col('police_district').alias('district'),\
                     F.col('analysis_neighborhood').alias('neighbourhood')]
law_enf_apps_df = df_law_enf_delta_table.select(law_enf_select_list)
law_enf_apps_df = law_enf_apps_df.withColumn('call_received_date',\
                                       F.to_date(F.col('received_datetime'), 'MM/dd/yyyy hh:mm:ss a'))
                                     

spark = SparkSession \
    .builder \
    .appName("JDBC Writer App") \
    .config("spark.jars", "/Users/simith/Downloads/postgresql-42.6.0.jar") \
    .getOrCreate()
connection_str="jdbc:postgresql://localhost:5432/test"
table="law_enforcement_calls"
properties = {"user":"postgres", "password":"postgres", "driver": "org.postgresql.Driver"}
law_enf_apps_df.write.jdbc(url=connection_str, table=table, properties=properties)

                                                                                

In [76]:
fire_apps_df.withColumn("n_date",F.to_date(F.col('call_received_date_time'))).limit(5).show(truncate=False)

+-------------------+-------------+--------+-------------+-----------------------+----+------+
|caller_phone_number|incident_type|zip_code|neighbourhood|call_received_date_time|year|n_date|
+-------------------+-------------+--------+-------------+-----------------------+----+------+
|221210313          |Outside Fire |94102   |Hayes Valley |05/01/2022 02:58:25 AM |2022|null  |
|220190150          |Alarms       |94107   |Potrero Hill |01/19/2022 01:42:12 AM |2022|null  |
|211233271          |Alarms       |94110   |Mission      |05/03/2021 09:28:12 PM |2021|null  |
|212933533          |Alarms       |94102   |Tenderloin   |10/20/2021 10:08:47 PM |2021|null  |
|221202543          |Alarms       |94109   |Russian Hill |04/30/2022 06:35:58 PM |2022|null  |
+-------------------+-------------+--------+-------------+-----------------------+----+------+



23/07/08 11:03:48 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Call Number, Call Type, Received DtTm, Zipcode of Incident, Neighborhooods - Analysis Boundaries
 Schema: call_number, call_type, received_dt_time, zip_code, neighbourhoods
Expected: call_number but found: Call Number
CSV file: file:///Users/simith/Downloads/Fire_Department_Calls_for_Service.csv


In [77]:
fire_apps_df.select(F.date_format(F.to_date(fire_apps_df.call_received_date_time, 'MM/dd/yyyy hh:mm:ss a'), "MM/dd/yyyy").alias('date')).show()

+----------+
|      date|
+----------+
|05/01/2022|
|01/19/2022|
|05/03/2021|
|10/20/2021|
|04/30/2022|
|05/03/2021|
|07/13/2021|
|10/20/2021|
|04/30/2022|
|07/13/2021|
|01/18/2022|
|01/11/2022|
|01/11/2022|
|01/11/2022|
|01/18/2022|
|04/30/2022|
|07/13/2021|
|01/18/2022|
|07/28/2022|
|07/28/2022|
+----------+
only showing top 20 rows



23/07/08 11:05:09 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Received DtTm
 Schema: received_dt_time
Expected: received_dt_time but found: Received DtTm
CSV file: file:///Users/simith/Downloads/Fire_Department_Calls_for_Service.csv


**Find the average response time for both law enforcement and fire departments**

In [None]:
**Find the average response time for both law enforcement and fire departments**