<a href="https://colab.research.google.com/github/samarthk/Learning_pyspark/blob/master/Spark_101.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Java, Spark, and Findspark
This installs Apache Spark 2.3.1, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://archive.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark


--2020-11-06 22:32:51--  https://archive.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 230778742 (220M) [application/x-gzip]
Saving to: ‘spark-2.4.1-bin-hadoop2.7.tgz’


2020-11-06 22:33:04 (17.3 MB/s) - ‘spark-2.4.1-bin-hadoop2.7.tgz’ saved [230778742/230778742]



## Set Environment Variables
Set the locations where Spark and Java are installed.

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"

## Start a SparkSession
This will start a local Spark session.

In [3]:
# Run a local spark session to test our installation:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setMaster("local[*]").setAppName('pyspark')
sc = SparkContext(conf=conf)
sqc = SQLContext(sc)

## Use Spark!
That's all there is to it - you're ready to use Spark!

In [4]:
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



# Programming Ex


Download File

In [None]:
!wget https://data.sfgov.org/api/views/wr8u-xric/rows.csv

In [None]:
!ls -ltr /content/sample_data/

!unzip /content/sample_data/Downloads.zip

#!head -5000 /content/drive/My\ Drive/Data_Files/Fire_Incidents_FULL.csv > Fire_Incidents.csv


total 56276
-rwxr-xr-x 1 root root      930 Jan  1  2000 README.md
-rwxr-xr-x 1 root root     1697 Jan  1  2000 anscombe.json
-rw-r--r-- 1 root root  1706430 Jun 26 16:26 california_housing_train.csv
-rw-r--r-- 1 root root   301141 Jun 26 16:26 california_housing_test.csv
-rw-r--r-- 1 root root 36523880 Jun 26 16:26 mnist_train_small.csv
-rw-r--r-- 1 root root 18289443 Jun 26 16:26 mnist_test.csv
-rw-r--r-- 1 root root   788006 Jul  6 18:22 Downloads.zip
Archive:  /content/sample_data/Downloads.zip
  inflating: Fire_Department_Calls_for_Service.csv  
replace Fire_Incidents.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: Fire_Incidents.csv      


In [None]:
from pyspark.sql.types import *

## San FranciscoooSS Fire Incidents (OpenData Meetup)

In [None]:
fireServiceCallsDF = spark.read.csv('/content/sample_data/Fire_Department_Calls_for_Service.csv', header=True, inferSchema=True)
fireServiceCallsDF.printSchema()

In [None]:
IncidentSchema = StructType(
					[
					StructField('IncidentNumber',  IntegerType(), True),
StructField('ExposureNumber',  IntegerType(), True),
StructField('ID',  IntegerType(), True),
StructField('Address',  StringType(), True),
StructField('IncidentDate',  StringType(), True),
StructField('CallNumber',  IntegerType(), True),
StructField('AlarmDtTm',  StringType(), True),
StructField('ArrivalDtTm',  StringType(), True),
StructField('CloseDtTm',  StringType(), True),
StructField('City',  StringType(), True),
StructField('ZIPCode',  IntegerType(), True),
StructField('Battalion',  StringType(), True),
StructField('StationArea',  StringType(), True),
StructField('Box',  IntegerType(), True),
StructField('SuppressionUnits',  IntegerType(), True),
StructField('SuppressionPersonnel',  IntegerType(), True),
StructField('EMSUnits',  IntegerType(), True),
StructField('EMSPersonnel',  IntegerType(), True),
StructField('OtherUnits',  IntegerType(), True),
StructField('OtherPersonnel',  IntegerType(), True),
StructField('FirstUnitOnScene',  StringType(), True),
StructField('EstimatedPropertyLoss',  IntegerType(), True),
StructField('EstimatedContentsLoss',  IntegerType(), True),
StructField('FireFatalities',  IntegerType(), True),
StructField('FireInjuries',  IntegerType(), True),
StructField('CivilianFatalities',  IntegerType(), True),
StructField('CivilianInjuries',  IntegerType(), True),
StructField('NumberofAlarms',  IntegerType(), True),
StructField('PrimarySituation',  StringType(), True),
StructField('MutualAid',  StringType(), True),
StructField('ActionTakenPrimary',  StringType(), True),
StructField('ActionTakenSecondary',  StringType(), True),
StructField('ActionTakenOther',  StringType(), True),
StructField('DetectorAlertedOccupants',  StringType(), True),
StructField('PropertyUse',  StringType(), True),
StructField('AreaofFireOrigin',  StringType(), True),
StructField('IgnitionCause',  StringType(), True),
StructField('IgnitionFactorPrimary',  StringType(), True),
StructField('IgnitionFactorSecondary',  StringType(), True),
StructField('HeatSource',  StringType(), True),
StructField('ItemFirstIgnited',  StringType(), True),
StructField('HumanFactorsAssociatedwithIgnition',  StringType(), True),
StructField('StructureType',  StringType(), True),
StructField('StructureStatus',  StringType(), True),
StructField('FloorofFireOrigin',  IntegerType(), True),
StructField('FireSpread',  StringType(), True),
StructField('NoFlameSpead',  StringType(), True),
StructField('Numberoffloorswithminimumdamage',  IntegerType(), True),
StructField('Numberoffloorswithsignificantdamage',  IntegerType(), True),
StructField('Numberoffloorswithheavydamage',  IntegerType(), True),
StructField('Numberoffloorswithextremedamage',  IntegerType(), True),
StructField('DetectorsPresent',  StringType(), True),
StructField('DetectorType',  StringType(), True),
StructField('DetectorOperation',  StringType(), True),
StructField('DetectorEffectiveness',  StringType(), True),
StructField('DetectorFailureReason',  StringType(), True),
StructField('AutomaticExtinguishingSystemPresent',  StringType(), True),
StructField('AutomaticExtinguishingSytemType',  StringType(), True),
StructField('AutomaticExtinguishingSytemPerfomance',  StringType(), True),
StructField('AutomaticExtinguishingSytemFailureReason',  StringType(), True),
StructField('NumberofSprinklerHeadsOperating',  IntegerType(), True),
StructField('SupervisorDistrict',  IntegerType(), True),
StructField('AnalysisNeighborhood',  StringType(), True),
StructField('point',  StringType(), True),
StructField('Neighborhoods(old)',  IntegerType(), True),
StructField('ZipCodes',  IntegerType(), True),
StructField('FirePreventionDistricts',  IntegerType(), True),
StructField('PoliceDistricts',  IntegerType(), True),
StructField('SupervisorDistricts',  IntegerType(), True),
StructField('CivicCenterHarmReductionProjectBoundary',  IntegerType(), True),
StructField('2017FixItZones',  IntegerType(), True),
StructField('HSOCZones',  IntegerType(), True),
StructField('CentralMarket/TenderloinBoundary',  IntegerType(), True),
StructField('CentralMarket/TenderloinBoundaryPolygon-Updated',  IntegerType(), True),
StructField('HSOCZonesasof2018-06-05',  IntegerType(), True),
StructField('Neighborhoods',  IntegerType(), True),
StructField('SFFindNeighborhoods',  IntegerType(), True),
StructField('CurrentPoliceDistricts',  IntegerType(), True),
StructField('CurrentSupervisorDistricts',  IntegerType(), True),
StructField('AnalysisNeighborhoods',  IntegerType(), True)
					 ]
					 )


In [None]:
fireIncidentsDF = spark.read.csv('/content/sample_data/Fire_Incidents.csv', header=True, schema=IncidentSchema)
fireIncidentsDF.printSchema()

root
 |-- IncidentNumber: integer (nullable = true)
 |-- ExposureNumber: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- IncidentDate: string (nullable = true)
 |-- CallNumber: integer (nullable = true)
 |-- AlarmDtTm: string (nullable = true)
 |-- ArrivalDtTm: string (nullable = true)
 |-- CloseDtTm: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZIPCode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: integer (nullable = true)
 |-- SuppressionUnits: integer (nullable = true)
 |-- SuppressionPersonnel: integer (nullable = true)
 |-- EMSUnits: integer (nullable = true)
 |-- EMSPersonnel: integer (nullable = true)
 |-- OtherUnits: integer (nullable = true)
 |-- OtherPersonnel: integer (nullable = true)
 |-- FirstUnitOnScene: string (nullable = true)
 |-- EstimatedPropertyLoss: integer (nullable = true)
 |-- EstimatedContentsLoss: integer (nu

In [None]:
# Note that we are removing all space characters from the col names to prevent errors when writing to Parquet later

fireSchema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),       
                     StructField('WatchDate', StringType(), True),       
                     StructField('ReceivedDtTm', StringType(), True),       
                     StructField('EntryDtTm', StringType(), True),       
                     StructField('DispatchDtTm', StringType(), True),       
                     StructField('ResponseDtTm', StringType(), True),       
                     StructField('OnSceneDtTm', StringType(), True),       
                     StructField('TransportDtTm', StringType(), True),                  
                     StructField('HospitalDtTm', StringType(), True),       
                     StructField('CallFinalDisposition', StringType(), True),       
                     StructField('AvailableDtTm', StringType(), True),       
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('ZipcodeofIncident', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumberofAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('Unitsequenceincalldispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('NeighborhoodDistrict', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True)])

In [None]:
print(type(fireIncidentsDF.printSchema()))

for i in fireIncidentsDF):
  print(i)


SyntaxError: ignored

In [None]:
fireServiceCallsDF_sch = spark.read.csv('/content/sample_data/Fire_Department_Calls_for_Service.csv', header=True, schema=fireSchema)
#fireServiceCallsDF_sch.printSchema()
fireServiceCallsDF_sch.show(5)

+----------+------+--------------+-----------------+----------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+-----------------+---------+-----------+----+----------------+--------+-------------+-------+--------------------+--------------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+--------------+
|CallNumber|UnitID|IncidentNumber|         CallType|  CallDate| WatchDate|        ReceivedDtTm|           EntryDtTm|        DispatchDtTm|        ResponseDtTm|         OnSceneDtTm|       TransportDtTm|        HospitalDtTm|CallFinalDisposition|       AvailableDtTm|             Address|         City|ZipcodeofIncident|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|       CallTypeGroup|NumberofAlarms|UnitType|U

In [None]:
fireServiceCallsDF.count()

4999

In [None]:
fireServiceCallsDF_sch.select('CallType').distinct().show(1000,False)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Explosion                                   |
|Vehicle Fire                                |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Gas Leak (Natural and LP Gases)             |
|Water Rescue                                |
|Electrical Hazard                           |
|High Angle Rescue                           |
|Structure Fire                              |
|Industrial Accidents                        |
|Medical Incident                            |
|Fuel Spill  

In [None]:
fireIncidentsDF.createOrReplaceTempView("fireIncidents")
spark.sql("SELECT COUNT(*) FROM fireIncidents").show()

+--------+
|count(1)|
+--------+
|    4999|
+--------+



In [5]:
fireServiceCallsDF_sch.createOrReplaceTempView("firecalls")
spark.sql("SELECT COUNT(*) FROM firecalls").show()

NameError: ignored

In [None]:
spark.sql("SELECT * FROM firecalls where \
CallDate = '04/19/2020'and IncidentNumber >20046358 ").show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+-----------------+---------+-----------+----+----------------+--------+-------------+-------+--------------------+--------------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|        ReceivedDtTm|           EntryDtTm|        DispatchDtTm|        ResponseDtTm|         OnSceneDtTm|       TransportDtTm|        HospitalDtTm|CallFinalDisposition|       AvailableDtTm|             Address|         City|ZipcodeofIncident|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|       CallTypeGroup|NumberofAlarms|UnitType|Unit

In [None]:
spark.sql("SELECT * FROM fireIncidents fi where \
 to_date(fi.IncidentDate, 'MM/dd/yyyy')> '06/05/2018' ").show(5)

+--------------+--------------+---------+--------------------+------------+----------+--------------------+--------------------+--------------------+-------------+-------+---------+-----------+----+----------------+--------------------+--------+------------+----------+--------------+----------------+---------------------+---------------------+--------------+------------+------------------+----------------+--------------+--------------------+---------+--------------------+--------------------+----------------+------------------------+--------------------+----------------+-------------+---------------------+-----------------------+----------+----------------+----------------------------------+-------------+---------------+-----------------+----------+------------+-------------------------------+-----------------------------------+-----------------------------+-------------------------------+----------------+------------+-----------------+---------------------+---------------------+------

In [None]:
spark.sql("SELECT distinct upper(CallType) FROM firecalls fc where \
CallDate = '04/19/2020'and IncidentNumber >20046358 ").show(1000,False)

+-------------------------------+
|upper(CallType)                |
+-------------------------------+
|ALARMS                         |
|OUTSIDE FIRE                   |
|TRAFFIC COLLISION              |
|GAS LEAK (NATURAL AND LP GASES)|
|MEDICAL INCIDENT               |
|OTHER                          |
|STRUCTURE FIRE                 |
|CITIZEN ASSIST / SERVICE CALL  |
|SMOKE INVESTIGATION (OUTSIDE)  |
+-------------------------------+



In [None]:
spark.sql("SELECT NeighborhoodDistrict, count(NeighborhoodDistrict) AS Neighborhood_Count \
FROM firecalls \
WHERE year(to_date(CallDate, 'MM/dd/yyyy')) > 2019 \
GROUP BY NeighborhoodDistrict \
ORDER BY Neighborhood_Count DESC ").show(5, False)

+------------------------------+------------------+
|NeighborhoodDistrict          |Neighborhood_Count|
+------------------------------+------------------+
|Tenderloin                    |93                |
|South of Market               |69                |
|Mission                       |63                |
|Outer Richmond                |49                |
|Financial District/South Beach|33                |
+------------------------------+------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import *

# Note that PySpark uses the Java Simple Date Format patterns

from_pattern1 = 'MM/dd/yyyy'
to_pattern1 = 'yyyy-MM-dd'

from_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'
to_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'


fireServiceCallsTsDF = fireServiceCallsDF \
  .withColumn('CallDateTS', unix_timestamp(fireServiceCallsDF['Call Date'], from_pattern1).cast("timestamp")) \
  .drop('CallDate') \
  .withColumn('WatchDateTS', unix_timestamp(fireServiceCallsDF['Watch Date'], from_pattern1).cast("timestamp")) \
  .drop('WatchDate') \
  .withColumn('ReceivedDtTmTS', unix_timestamp(fireServiceCallsDF['Received DtTm'], from_pattern2).cast("timestamp")) \
  .drop('ReceivedDtTm')


fireServiceCallsTsDF.printSchema()
fireServiceCallsTsDF.show(5)

In [None]:
spark.sql("SELECT * FROM firecalls fc \
left join fireIncidents fi on fc.IncidentNumber =fi.IncidentNumber ").show(10)

+----------+------+--------------+-----------------+----------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+-----------------+---------+-----------+----+----------------+--------+-------------+-------+--------------------+--------------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+--------------+--------------+--------------+----+-------+------------+----------+---------+-----------+---------+----+-------+---------+-----------+----+----------------+--------------------+--------+------------+----------+--------------+----------------+---------------------+---------------------+--------------+------------+------------------+----------------+--------------+----------------+---------+------------------+--------------------+------