In [1]:
import findspark
findspark.init()

In [2]:
#importing the libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
#Initializing the SPARK SESSION
spark=SparkSession.builder.appName('SF_FireCalls').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
%ls -ltr "/home/karthik/sf-fire-calls.csv"

-rw-r--r-- 1 karthik karthik 44530123 Jul 14 19:07 /home/karthik/sf-fire-calls.csv


In [5]:
filePath='/home/karthik/sf-fire-calls.csv'

In [6]:
#Creating Schema
fire_schema = StructType( [ StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True) ] )

In [7]:
#Creating a DataFrame
data=spark.read.format('csv').option('header','true').load(filePath,schema=fire_schema)

In [8]:
data.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [9]:
data.count()

                                                                                

175296

In [10]:
data.show(5,False)

22/08/12 16:42:33 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+------+--------------+----------------+----------+----------+--------------------+----------------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+---------------------+-------------------------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|CallType        |CallDate  |WatchDate |CallFinalDisposition|AvailableDtTm         |Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood         |Location                             |RowID        |Delay    |
+----------+------+--------------+----------------+----------+----------+--------------------+----------------------+---------------------------+----+-------+--

================================================================================

#### Q-1) Filter out "Medical Incident" call types and store the remaining data in a new dataframe

In [11]:
new_fire_data=data.select('IncidentNumber','AvailableDtTm','CallType').where(col('CallType')!='Medical Incident')

In [12]:
new_fire_data.show(5,False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [13]:
new_fire_data.count()

61502

#### Q-2) How many distinct types of calls were made to the Fire Department?

#### To be sure, let's not count "null" strings in that column.

In [14]:
data.select('CallType').where(col('CallType').isNotNull()).distinct().show(10,False)

+-----------------------------+
|CallType                     |
+-----------------------------+
|Elevator / Escalator Rescue  |
|Marine Fire                  |
|Aircraft Emergency           |
|Administrative               |
|Alarms                       |
|Odor (Strange / Unknown)     |
|Citizen Assist / Service Call|
|HazMat                       |
|Watercraft in Distress       |
|Explosion                    |
+-----------------------------+
only showing top 10 rows



In [15]:
data.select('CallType').where(col('CallType').isNotNull()).distinct().count()

30

#### Q-3) Find out all response or delayed times greater than 5 mins?

#### Rename the column Delay - > ReponseDelayedinMins Returns a new DataFrame Find out all calls where the response time to the fire site was delayed for more than 5 mins

In [21]:
new_data=data.withColumnRenamed('Delay','ResponseDelayedinMins')

In [22]:
new_data.select('ResponseDelayedinMins').show(5)

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|                 2.95|
|                  4.7|
|            2.4333334|
|                  1.5|
|            3.4833333|
+---------------------+
only showing top 5 rows



In [23]:
new_data.select('IncidentNumber','AvailableDtTm','CallType','ResponseDelayedinMins').where(col('ResponseDelayedinMins') > 5).count()

27508

In [24]:
new_data.select('IncidentNumber','AvailableDtTm','CallType','ResponseDelayedinMins').where(col('ResponseDelayedinMins') > 5).show(10,False)

+--------------+----------------------+-----------------------------+---------------------+
|IncidentNumber|AvailableDtTm         |CallType                     |ResponseDelayedinMins|
+--------------+----------------------+-----------------------------+---------------------+
|2003409       |01/11/2002 04:34:23 PM|Medical Incident             |5.35                 |
|2003642       |01/12/2002 01:23:04 PM|Medical Incident             |6.25                 |
|2003818       |01/13/2002 01:51:15 AM|Medical Incident             |5.2                  |
|2004152       |01/14/2002 08:16:54 AM|Citizen Assist / Service Call|5.6                  |
|2004216       |01/14/2002 12:27:03 PM|Medical Incident             |7.25                 |
|2004408       |01/15/2002 06:53:38 AM|Medical Incident             |11.916667            |
|2004521       |01/15/2002 03:18:26 PM|Medical Incident             |5.116667             |
|2004528       |01/15/2002 03:55:41 PM|Medical Incident             |8.633333   

Let's do some ETL:

- Transform the string dates to Spark Timestamp data type so we can make some time-based queries later
- Returns a transformed query
- Cache the new DataFrame

In [25]:
data.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [27]:
fire_df_ts=(new_data.withColumn('IncidentDate',to_date(col('CallDate'),'MM/dd/yyyy')).drop('CallDate')
            .withColumn('onWatchDate',to_date(col('WatchDate'),'MM/dd/yyyy')).drop('WatchDate')
            .withColumn('AvailableDtTs',to_date(col('AvailableDtTm'),'MM/dd/yyyy hh:mm:ss a')).drop('AvailableDtTm') 
           )

In [28]:
fire_df_ts.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable =

In [30]:
fire_df_ts.select('IncidentNumber','IncidentDate','onWatchDate','AvailableDtTs').show(5)

+--------------+------------+-----------+-------------+
|IncidentNumber|IncidentDate|onWatchDate|AvailableDtTs|
+--------------+------------+-----------+-------------+
|       2003235|  2002-01-11| 2002-01-10|   2002-01-11|
|       2003241|  2002-01-11| 2002-01-10|   2002-01-11|
|       2003242|  2002-01-11| 2002-01-10|   2002-01-11|
|       2003250|  2002-01-11| 2002-01-10|   2002-01-11|
|       2003259|  2002-01-11| 2002-01-10|   2002-01-11|
+--------------+------------+-----------+-------------+
only showing top 5 rows



In [31]:
fire_df_ts.cache()

DataFrame[CallNumber: int, UnitID: string, IncidentNumber: int, CallType: string, CallFinalDisposition: string, Address: string, City: string, Zipcode: int, Battalion: string, StationArea: string, Box: string, OriginalPriority: string, Priority: string, FinalPriority: int, ALSUnit: boolean, CallTypeGroup: string, NumAlarms: int, UnitType: string, UnitSequenceInCallDispatch: int, FirePreventionDistrict: string, SupervisorDistrict: string, Neighborhood: string, Location: string, RowID: string, ResponseDelayedinMins: float, IncidentDate: date, onWatchDate: date, AvailableDtTs: date]

In [34]:
fire_df_ts.columns

['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallFinalDisposition',
 'Address',
 'City',
 'Zipcode',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumAlarms',
 'UnitType',
 'UnitSequenceInCallDispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhood',
 'Location',
 'RowID',
 'ResponseDelayedinMins',
 'IncidentDate',
 'onWatchDate',
 'AvailableDtTs']

#### Q-4) What were the most common call types?

In [48]:
fire_df_ts.select('CallType').groupBy('CallType').count().orderBy('count',ascending=False).show()

+--------------------+------+
|            CallType| count|
+--------------------+------+
|    Medical Incident|113794|
|      Structure Fire| 23319|
|              Alarms| 19406|
|   Traffic Collision|  7013|
|Citizen Assist / ...|  2524|
|               Other|  2166|
|        Outside Fire|  2094|
|        Vehicle Fire|   854|
|Gas Leak (Natural...|   764|
|        Water Rescue|   755|
|Odor (Strange / U...|   490|
|   Electrical Hazard|   482|
|Elevator / Escala...|   453|
|Smoke Investigati...|   391|
|          Fuel Spill|   193|
|              HazMat|   124|
|Industrial Accidents|    94|
|           Explosion|    89|
|Train / Rail Inci...|    57|
|  Aircraft Emergency|    36|
+--------------------+------+
only showing top 20 rows



#### Q-4a) What zip codes accounted for most common calls?

Let's investigate what zip codes in San Francisco accounted for most fire calls and what type where they.

- Filter out by CallType
- Group them by CallType and Zip code
- Count them and display them in descending order
- It seems like the most common calls were all related to Medical Incident, and the two zip codes are 94102 and 94103.

In [53]:
fire_df_ts.select('CallType','Zipcode').groupBy('CallType','Zipcode').count().orderBy('count',ascending=False).show()

+----------------+-------+-----+
|        CallType|Zipcode|count|
+----------------+-------+-----+
|Medical Incident|  94102|16130|
|Medical Incident|  94103|14775|
|Medical Incident|  94110| 9995|
|Medical Incident|  94109| 9479|
|Medical Incident|  94124| 5885|
|Medical Incident|  94112| 5630|
|Medical Incident|  94115| 4785|
|Medical Incident|  94122| 4323|
|Medical Incident|  94107| 4284|
|Medical Incident|  94133| 3977|
|Medical Incident|  94117| 3522|
|Medical Incident|  94134| 3437|
|Medical Incident|  94114| 3225|
|Medical Incident|  94118| 3104|
|Medical Incident|  94121| 2953|
|Medical Incident|  94116| 2738|
|Medical Incident|  94132| 2594|
|  Structure Fire|  94110| 2267|
|Medical Incident|  94105| 2258|
|  Structure Fire|  94102| 2229|
+----------------+-------+-----+
only showing top 20 rows



#### Q-4b) What San Francisco neighborhoods are in the zip codes 94102 and 94103

In [72]:
fire_df_ts.select('Zipcode','Neighborhood').where( (col('Zipcode')==94102) | (col('Zipcode')==94103)).distinct().orderBy('Zipcode').show(10,False)

+-------+------------------------------+
|Zipcode|Neighborhood                  |
+-------+------------------------------+
|94102  |Financial District/South Beach|
|94102  |Western Addition              |
|94102  |Hayes Valley                  |
|94102  |Tenderloin                    |
|94102  |South of Market               |
|94102  |Nob Hill                      |
|94103  |South of Market               |
|94103  |Castro/Upper Market           |
|94103  |Financial District/South Beach|
|94103  |Mission                       |
+-------+------------------------------+
only showing top 10 rows



#### Q-5) What was the sum of all calls, average, min and max of the response times for calls?

Let's use the built-in Spark SQL functions to compute the sum, avg, min, and max of few columns:

- Number of Total Alarms
- What were the min and max the delay in response time before the Fire Dept arrived at the scene of the call

In [73]:
fire_df_ts.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable =

In [87]:
fire_df_ts.select(sum('NumAlarms'),mean('ResponseDelayedinMins'),min('ResponseDelayedinMins'),max('ResponseDelayedinMins') ).show()

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



#### Q-6a) How many distinct years of data is in the CSV file?

We can use the year() SQL Spark function off the Timestamp column data type IncidentDate.

In all, we have fire calls from years 2000-2018

In [107]:
fire_df_ts.select(year('IncidentDate')).distinct().orderBy(year('IncidentDate')).show()

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



In [102]:
fire_df_ts.select(year('IncidentDate')).distinct().count()

19

#### Q-6b) What week of the year in 2018 had the most fire calls?

Note: Week 1 is the New Years' week and week 25 is the July 4 the week. Loads of fireworks, so it makes sense the higher number of calls.

In [127]:
fire_df_ts.filter(year('IncidentDate')==2018).groupBy(weekofyear('IncidentDate')).count().orderBy('count',ascending=False).show()


+------------------------+-----+
|weekofyear(IncidentDate)|count|
+------------------------+-----+
|                      22|  259|
|                      40|  255|
|                      43|  250|
|                      25|  249|
|                       1|  246|
|                      44|  244|
|                      13|  243|
|                      32|  243|
|                      11|  240|
|                       5|  236|
|                      18|  236|
|                      23|  235|
|                      31|  234|
|                       2|  234|
|                      42|  234|
|                      19|  233|
|                      34|  232|
|                       8|  232|
|                      10|  232|
|                      21|  231|
+------------------------+-----+
only showing top 20 rows



#### Q-7) What neighborhoods in San Francisco had the worst response time in 2018?

In [138]:
fire_df_ts.filter(year('IncidentDate')==2018).groupBy('Neighborhood').mean('ResponseDelayedinMins').orderBy(mean('ResponseDelayedinMins'),ascending=False).show(5,False)

+---------------------+--------------------------+
|Neighborhood         |avg(ResponseDelayedinMins)|
+---------------------+--------------------------+
|Chinatown            |6.190314101143033         |
|Presidio             |5.829227011272873         |
|Treasure Island      |5.453703684111436         |
|McLaren Park         |4.74404764175415          |
|Bayview Hunters Point|4.620561962212182         |
+---------------------+--------------------------+
only showing top 5 rows



#### Q-8a) How can we use Parquet files or SQL table to store data and read it back?

In [139]:
fire_df_ts.write.format('parquet').mode('overwrite').save('/home/karthik/Practice_Folder/SF_FireCall_parquet')

22/08/12 18:08:23 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/08/12 18:08:23 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
22/08/12 18:08:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

In [142]:
%ls -ltr '/home/karthik/Practice_Folder/SF_FireCall_parquet/' %

ls: cannot access '%': No such file or directory
/home/karthik/Practice_Folder/SF_FireCall_parquet/:
total 7636
-rw-r--r-- 1 karthik karthik  374745 Aug 12 18:08 part-00007-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1126117 Aug 12 18:08 part-00006-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1080804 Aug 12 18:08 part-00005-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1048302 Aug 12 18:08 part-00004-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1027870 Aug 12 18:08 part-00003-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1004522 Aug 12 18:08 part-00002-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1064782 Aug 12 18:08 part-00001-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1082808 Aug 12 18:08 pa

In [143]:
fire_df_ts.write.format('parquet').mode('overwrite').saveAsTable('fire_calls1')

22/08/12 19:10:42 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
22/08/12 19:10:42 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
22/08/12 19:10:43 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

In [146]:
spark.sql("""CACHE TABLE fire_calls1""")

                                                                                

DataFrame[]

In [147]:
spark.sql("""SELECT * FROM fire_calls1""").show()

+----------+------+--------------+-----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+--------------+---------------------+------------+-----------+-------------+
|CallNumber|UnitID|IncidentNumber|         CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|         RowID|ResponseDelayedinMins|IncidentDate|onWatchDate|AvailableDtTs|
+----------+------+--------------+-----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+------------

#### Q-8c) How to read data from Parquet file?

In [152]:
fire_parq_df=spark.read.format('parquet').load('/home/karthik/Practice_Folder/SF_FireCall_parquet/')

In [150]:
%ls -ltr '/home/karthik/Practice_Folder/SF_FireCall_parquet/' %

ls: cannot access '%': No such file or directory
/home/karthik/Practice_Folder/SF_FireCall_parquet/:
total 7636
-rw-r--r-- 1 karthik karthik  374745 Aug 12 18:08 part-00007-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1126117 Aug 12 18:08 part-00006-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1080804 Aug 12 18:08 part-00005-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1048302 Aug 12 18:08 part-00004-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1027870 Aug 12 18:08 part-00003-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1004522 Aug 12 18:08 part-00002-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1064782 Aug 12 18:08 part-00001-6e1e1155-0f9d-49ea-aa0b-9f2581b6a94e-c000.snappy.parquet
-rw-r--r-- 1 karthik karthik 1082808 Aug 12 18:08 pa

In [154]:
fire_parq_df.show(5)

+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+------------------+--------------------+-------------+---------------------+------------+-----------+-------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|      Neighborhood|            Location|        RowID|ResponseDelayedinMins|IncidentDate|onWatchDate|AvailableDtTs|
+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+-------

In [155]:
spark.stop()