In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext("local", "Basic extra")
spark = (SparkSession
         .builder
         .appName('Basic extra')
         .getOrCreate())

### Use DataFrame is more code-readable
Create a dataset and compute average age for each name value

In [2]:
# use RDD of tuples: (name, age)
dataRDD = sc.parallelize(
    [("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)]
)
agesRDD = (dataRDD
           .map(lambda x: (x[0], (x[1], 1)))
           .reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1]))
           .map(lambda x: (x[0], x[1][0] / x[1][1]))
           )
agesRDD.collect()


[('Brooke', 22.5), ('Denny', 31.0), ('Jules', 30.0), ('TD', 35.0)]

Now, perform the same functionality with DataFrame

In [3]:
from pyspark.sql.functions import avg
dataRDD = spark.createDataFrame(
    [("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)],
    ("name", "age")
)
dataRDD.groupby("Name").agg(avg(dataRDD.age)).show()


+------+--------+
|  Name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Jules|    30.0|
|    TD|    35.0|
| Denny|    31.0|
+------+--------+



### Data types

In [4]:
from pyspark.sql.types import *

Define the schema programmatically

In [5]:
schema = StructType([ # a table structure
    StructField("author", StringType(), False),
    StructField("title", StringType(), False),
    StructField("pages", StringType(), False)
])
data = [
    ("Peter", "Mr", "twitter.com"),
    ("Mary", "Ms", "facebook.com"),
    ("John", "Mr", "twitter.com")
]

dataDF = spark.createDataFrame(
    data,
    schema
)
dataDF.show()

+------+-----+------------+
|author|title|       pages|
+------+-----+------------+
| Peter|   Mr| twitter.com|
|  Mary|   Ms|facebook.com|
|  John|   Mr| twitter.com|
+------+-----+------------+



Define the schema by DDL

In [6]:
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING,\
      `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"
data = [
    [1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
    [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
    [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web", "twitter", "FB", "LinkedIn"]],
    [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
    [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
    [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
]
authorDF = spark.createDataFrame(
    data,
    schema
)
authorDF.show()
print(authorDF.printSchema())

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (

### Columns and Expressions

In [7]:
from pyspark.sql.functions import expr, concat
authorDF.select('Id', 'Hits', expr('Hits * 2')).show()
authorDF.select('Id', 'Hits', authorDF.Hits * 2).show()
authorDF.withColumn('Big Hitters', expr('Hits > 10000')).show()
authorDF.withColumn('AuthorsId', concat(expr('First'), expr('Last'), expr('Id'))).show()
# ==
# authorDF.withColumn('AuthorsId', concat(authorDF.First, authorDF.Last, authorDF.Id)).show()
authorDF.sort(authorDF.Hits.desc()).show(2)

+---+-----+----------+
| Id| Hits|(Hits * 2)|
+---+-----+----------+
|  1| 4535|      9070|
|  2| 8908|     17816|
|  3| 7659|     15318|
|  4|10568|     21136|
|  5|40578|     81156|
|  6|25568|     51136|
+---+-----+----------+

+---+-----+----------+
| Id| Hits|(Hits * 2)|
+---+-----+----------+
|  1| 4535|      9070|
|  2| 8908|     17816|
|  3| 7659|     15318|
|  4|10568|     21136|
|  5|40578|     81156|
|  6|25568|     51136|
+---+-----+----------+

+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|Big Hitters|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|      false|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|      false|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|     

### Rows


In [8]:
from pyspark.sql import Row

rows = [
    Row('Naruto', 'Leaf village'),
    Row('Bee', 'Cloud village')
]
ninjaDF = spark.createDataFrame(rows, ('Name', 'Village'))
ninjaDF.show()
rows = ninjaDF.collect()
row_0 = rows[0]
print(row_0.Name, '/', row_0.Village)

+------+-------------+
|  Name|      Village|
+------+-------------+
|Naruto| Leaf village|
|   Bee|Cloud village|
+------+-------------+

Naruto / Leaf village


### DataFrame API

In [9]:
fire_path = '../datasets/sf-fire-calls.csv'
fireDF = (spark
          .read
          .option("samplingRatio", 0.01)
          .option("header", True)
          .csv(fire_path))
fireDF.show(5)
print(fireDF.printSchema())

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

Projections and filters

In [10]:
from pyspark.sql.functions import col
few_fireDF = (fireDF
              .select("IncidentNumber", "AvailableDtTm", "CallType")
              .filter(col('CallType') != 'Medical Incident')
              )
few_fireDF.show(3)

(fireDF
 .select('CallType')
 .where(col('CallType').isNotNull())
 .distinct()
 .show(10, False)
)

+--------------+--------------------+--------------+
|IncidentNumber|       AvailableDtTm|      CallType|
+--------------+--------------------+--------------+
|       2003235|01/11/2002 01:51:...|Structure Fire|
|       2003250|01/11/2002 04:16:...|  Vehicle Fire|
|       2003259|01/11/2002 06:01:...|        Alarms|
+--------------+--------------------+--------------+
only showing top 3 rows

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows



### Add, Rename, Drop columns


In [11]:
new_fireDF = fireDF.withColumnRenamed('Delay', 'ResponseDelayedinMins')
(new_fireDF
 .select('ResponseDelayedinMins')
 .where(col('ResponseDelayedinMins') > 5)
 .show(5, False)
 )

from pyspark.sql.functions import to_timestamp, year, weekofyear
new_fireDF = (new_fireDF
              .withColumn('IncidentDate', to_timestamp(col('CallDate'), 'MM/dd/yyyy'))
              .drop('CallDate')
              .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
              .drop('WatchDate')
              .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
              .drop('AvailableDtTm')
              )

(new_fireDF
 .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
 .show(5, False))

(new_fireDF
 .select(year(col('IncidentDate')).alias('yearIncidentDate'))
 .distinct()
 .sort(col('yearIncidentDate'))
 .show())

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|6.25                 |
|7.25                 |
|11.916667            |
|8.633333             |
|95.28333             |
+---------------------+
only showing top 5 rows

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows

+----------------+
|yearIncidentDate|
+----------------+
|            2000|
|            2001|
|            2002|
|            2003|
|            2004|
|            2005|
|    

### Aggregations
The most common types of fire calls

In [12]:
from pyspark.sql.functions import count
(new_fireDF
 .select('CallType')
 .where(col('CallType').isNotNull())
 .groupBy('CallType')
 .agg(count('CallType').alias('CallTypeCount'))
 .orderBy(col('CallTypeCount').desc())
 .show(5, False)
 )

+-----------------------------+-------------+
|CallType                     |CallTypeCount|
+-----------------------------+-------------+
|Medical Incident             |113794       |
|Structure Fire               |23319        |
|Alarms                       |19406        |
|Traffic Collision            |7013         |
|Citizen Assist / Service Call|2524         |
+-----------------------------+-------------+
only showing top 5 rows



Sum of alarms, the average response time, and the minimum and maximum response times

In [13]:
import pyspark.sql.functions as F

(new_fireDF
 .select(F.sum('NumAlarms'), F.avg('ResponseDelayedinMins'),
         F.min('ResponseDelayedinMins'), F.max('ResponseDelayedinMins'))
 .show()
)

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|      176170.0|        3.8923641541750413|               0.016666668|                      99.9|
+--------------+--------------------------+--------------------------+--------------------------+



All the different types of fire calls in 2018

In [14]:
(new_fireDF
 .where(year('IncidentDate')==2018)
 .select('CallType')
 .distinct()
 .show(10, False)
)

+-----------------------------+
|CallType                     |
+-----------------------------+
|Elevator / Escalator Rescue  |
|Alarms                       |
|Odor (Strange / Unknown)     |
|Citizen Assist / Service Call|
|HazMat                       |
|Explosion                    |
|Vehicle Fire                 |
|Suspicious Package           |
|Other                        |
|Outside Fire                 |
+-----------------------------+
only showing top 10 rows



What months within the year 2018 saw the highest number of fire calls?

In [15]:
(new_fireDF
 .select('IncidentDate')
 .where(year('IncidentDate')==2018)
 .groupBy(F.month('IncidentDate').alias('IncidentMonth'))
 .count()
 .orderBy(col('count').desc())
 .show(1)
)


+-------------+-----+
|IncidentMonth|count|
+-------------+-----+
|           10| 1068|
+-------------+-----+
only showing top 1 row



Neighborhood in San Francisco generated the most fire calls in 2018

In [16]:
(new_fireDF
 .select('Neighborhood', 'City', 'IncidentDate')
 .where((col('City')=='San Francisco') & (year('IncidentDate')==2018))
 .groupBy('Neighborhood')
 .count()
 .orderBy(col('count').desc())
 .show(5, truncate=False)
)


+------------------------------+-----+
|Neighborhood                  |count|
+------------------------------+-----+
|Tenderloin                    |1393 |
|South of Market               |1052 |
|Mission                       |911  |
|Financial District/South Beach|764  |
|Bayview Hunters Point         |513  |
+------------------------------+-----+
only showing top 5 rows



In [17]:
(new_fireDF
 .select('Neighborhood', 'City', 'IncidentDate')
 .where(F.expr("City = 'San Francisco' and year(IncidentDate) = 2018"))
 .groupBy('Neighborhood')
 .count()
 .orderBy(col('count').desc())
 .show(5, truncate=False)
)

+------------------------------+-----+
|Neighborhood                  |count|
+------------------------------+-----+
|Tenderloin                    |1393 |
|South of Market               |1052 |
|Mission                       |911  |
|Financial District/South Beach|764  |
|Bayview Hunters Point         |513  |
+------------------------------+-----+
only showing top 5 rows



Neighborhoods had the worst response times to fire calls in 2018

In [18]:
(new_fireDF
.select('Neighborhood', 'ResponseDelayedinMins', 'IncidentDate')
.where(year(F.col('IncidentDate'))==2018)
.groupBy('Neighborhood')
.agg(F.max(F.col('ResponseDelayedinMins')).alias('MaxResponseDelayedinMins'))
.orderBy('MaxResponseDelayedinMins', ascending=False)
.show(5, truncate=False)
)

+---------------------+------------------------+
|Neighborhood         |MaxResponseDelayedinMins|
+---------------------+------------------------+
|South of Market      |94.71667                |
|Bayview Hunters Point|92.816666               |
|Inner Richmond       |90.433334               |
|Russian Hill         |9.983334                |
|Mission              |9.95                    |
+---------------------+------------------------+
only showing top 5 rows



Week in the year in 2018 had the most fire calls

In [19]:
(new_fireDF
 .select(weekofyear(F.col('IncidentDate')).alias('week'))
 .groupBy('week')
 .count()
 .orderBy('count', ascending=False)
 .show(5, truncate=False))


+----+-----+
|week|count|
+----+-----+
|40  |3645 |
|35  |3549 |
|38  |3548 |
|44  |3546 |
|20  |3519 |
+----+-----+
only showing top 5 rows



Is there a correlation between neighborhood, zip code, and number of fire calls?

In [20]:
from pyspark.mllib.stat import Statistics
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.stat import ChiSquareTest

stat_DF = (new_fireDF
           .select('Neighborhood', 'Zipcode')
           .where(F.col('Neighborhood').isNotNull() & (F.col('Zipcode').isNotNull()))
           )

str_indexer = StringIndexer(inputCols=['Neighborhood', 'Zipcode'],
                            outputCols=['NeighborhoodIndex', 'ZipcodeIndex'])
vec_assembler = VectorAssembler(inputCols=['ZipcodeIndex'],
                                outputCol='ZipcodeVector')
transformed_DF = (vec_assembler.transform(str_indexer
                        .fit(stat_DF)
                        .transform(stat_DF))
).select(F.col('NeighborhoodIndex'), 'ZipcodeVector')

test_result = ChiSquareTest.test(transformed_DF,
                   featuresCol='ZipcodeVector',
                   labelCol='NeighborhoodIndex')
test_result.show()

+-------+----------------+--------------------+
|pValues|degreesOfFreedom|          statistics|
+-------+----------------+--------------------+
|  [0.0]|          [1066]|[2730627.6506358217]|
+-------+----------------+--------------------+



`pValues = 0.0` => significant.