# Transform

Testing ground for service call data transformation

In [38]:
from pathlib import Path
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

import os
from dotenv import load_dotenv

import folium
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql import Window
from pyspark.sql import types

In [2]:
spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/19 10:10:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
pq_path = Path("../tests/resources/SR2020.parquet")
df = spark.read.parquet(str(pq_path))
df.printSchema()

                                                                                

root
 |-- Status: string (nullable = true)
 |-- First 3 Chars of Postal Code: string (nullable = true)
 |-- Intersection Street 1: string (nullable = true)
 |-- Intersection Street 2: string (nullable = true)
 |-- Service Request Type: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- Section: string (nullable = true)
 |-- ward_name: string (nullable = true)
 |-- ward_id: byte (nullable = true)
 |-- creation_datetime: timestamp (nullable = true)



In [None]:
# stop current SparkContext before creating a new one
spark.stop()

Read from cloud storage using `gcs-connector-hadoop3` jar

Before starting a new context, I had to restart the kernel for spark to recognize the gcs-connector jar

In [31]:
# !mkdir ../data/lib
# !gsutil cp gs://hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar  \
#     ../data/lib/gcs-connector-hadoop3-latest.jar  
# !curl -O https://github.com/GoogleCloudDataproc/hadoop-connectors/releases/download/v2.2.11/gcs-connector-hadoop3-2.2.11-shaded.jar
    

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0


In [2]:
conf = (
    SparkConf()
    .setMaster("local[*]")
    .setAppName("test_cloud")
    .set("spark.jars", "../data/lib/gcs-connector-hadoop3-latest.jar")
)
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set(
    "fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
)
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark = SparkSession.builder.config(conf=sc.getConf()).getOrCreate()

23/03/22 14:29:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
load_dotenv()
DATA_LAKE = os.getenv("DATA_LAKE")
gcs_path = f"gs://{DATA_LAKE}/raw/pq/sr2023.parquet"
print(f"gcs_path: {gcs_path}")
df_gcs = spark.read.parquet(gcs_path)
df_gcs.printSchema()

gcs_path: gs://service-calls-data-lake/raw/pq/sr2023.parquet


                                                                                

root
 |-- Status: string (nullable = true)
 |-- First 3 Chars of Postal Code: string (nullable = true)
 |-- Intersection Street 1: string (nullable = true)
 |-- Intersection Street 2: string (nullable = true)
 |-- Service Request Type: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- Section: string (nullable = true)
 |-- ward_name: string (nullable = true)
 |-- ward_id: byte (nullable = true)
 |-- creation_datetime: timestamp (nullable = true)



## Transformations

How should we model our service call data?

Most important fields:

- Service Request Type
- First 3 Chars of Postal Code
- ward_name
- datetime

Ideas:

- most/least common types per month?
- most/least common types by ward/FSA?
- highest frequency ward/FSA by service types, i.e. which ward has the most/least amount of noise complaints?
- combine with population per FSA and find requests per capita?
- distributions of the all/top 80% of request types
    - filter by ward/FSA/season?

Division and section could be queried against to show which municipal entity receive the most requests, although that would highly correlate with the type of requests

In [3]:
# for testing
load_dotenv()
DATA_LAKE = os.getenv("DATA_LAKE")
gcs_path = f"gs://{DATA_LAKE}/raw/pq/SR2021.parquet"
df = spark.read.parquet(gcs_path)
print(f"row count: {df.count()}")

[Stage 1:>                                                          (0 + 1) / 1]

row count: 323880


                                                                                

In [16]:
try:
    df.corr("Service Request Type", "Division")
except Exception as e:
    print(e)

requirement failed: Currently correlation calculation for columns with dataType string not supported.


### Most common request type by ward

In [5]:
# request type count by ward
df_ward_type = (
    df.filter(df.ward_name.isNotNull())
    .select("creation_datetime", "ward_name", "Service Request Type")
    .withColumnRenamed("Service Request Type", "service_request_type")
    .groupBy(["ward_name", "service_request_type"])
    .count()
)
# most common request type's count by ward
df_ward_max = (
    df_ward_type \
    .groupBy("ward_name")
    .max("count")
    .withColumnRenamed("max(count)", "max_count")
)

# trying to join them get get most common type by ward
df_ward = (
    df_ward_max.join(df_ward_type, col("max_count") == col("count") & col(, "inner")
    .select("df_ward_max.ward_name", "Service Request Type", "count")
    .orderBy("ward_name")
)
df_ward.show(n=30, truncate=40)

                                                                                

+------------------------+---------------------------------------+-----+
|               ward_name|                   Service Request Type|count|
+------------------------+---------------------------------------+-----+
|       Beaches-East York|                       CADAVER WILDLIFE|  788|
|               Davenport|Residential: Bin: Repair or Replace Lid| 1141|
|         Don Valley East|                     Property Standards|  349|
|        Don Valley North|                                 Zoning|  413|
|        Don Valley North|Residential: Bin: Repair or Replace Lid|  413|
|         Don Valley West|                        General Pruning|  575|
|       Eglinton-Lawrence|Residential: Bin: Repair or Replace Lid|  984|
|        Etobicoke Centre|                        General Pruning|  845|
|         Etobicoke North|                     Property Standards|  517|
|     Etobicoke-Lakeshore|                        General Pruning|  913|
|Humber River-Black Creek|                     Prop

Seeing some repeats of ward name due to counts of different request types being equal; is that a coincidence or is that really true?

In [8]:
df_ward_max.orderBy('ward_name').count()

25

In [9]:
df_ward.count()

                                                                                

29

Using a `Window` instead, as explained in [this SOF post](https://stackoverflow.com/questions/48829993/groupby-column-and-filter-rows-with-maximum-value-in-pyspark)

- find max service type by ward
- assign that value as a new column, according to ward
- filter for rows where count == max_count
- that row is the max_count service type

In [15]:
w = Window.partitionBy('ward_name')
df_ward_type.withColumn('max_count', F.max('count').over(w)) \
    .where(F.col('count') == F.col('max_count')) \
    .drop('max_count') \
    .show(n=25, truncate=40)

                                                                                

+------------------------+---------------------------------------+-----+
|               ward_name|                   Service Request Type|count|
+------------------------+---------------------------------------+-----+
|       Beaches-East York|                       CADAVER WILDLIFE|  788|
|               Davenport|Residential: Bin: Repair or Replace Lid| 1141|
|         Don Valley East|                     Property Standards|  349|
|        Don Valley North|Residential: Bin: Repair or Replace Lid|  413|
|         Don Valley West|                        General Pruning|  575|
|       Eglinton-Lawrence|Residential: Bin: Repair or Replace Lid|  984|
|        Etobicoke Centre|                        General Pruning|  845|
|         Etobicoke North|                     Property Standards|  517|
|     Etobicoke-Lakeshore|                        General Pruning|  913|
|Humber River-Black Creek|                     Property Standards|  499|
|      Parkdale-High Park|Residential: Bin: Repair 

In [25]:
df_ward_type \
    .filter(df_ward_type.ward_name == 'Willowdale') \
    .orderBy('count', ascending=False) \
    .show(n=5)

+----------+--------------------+-----+
| ward_name|Service Request Type|count|
+----------+--------------------+-----+
|Willowdale|Long Grass and Weeds|  410|
|Willowdale|Residential: Bin:...|  386|
|Willowdale|     General Pruning|  375|
|Willowdale|    CADAVER WILDLIFE|  313|
|Willowdale| INJUR/DIST WILDLIFE|  285|
+----------+--------------------+-----+
only showing top 5 rows



Seems my first approach picked up some erroneous rows. Simply matching the max_count and hoping they were unique was perhaps not the best way to go about it.

## Window function

Characterized by `OVER (PARTITION BY ...)`, there are 3 types:

1. Aggregate
1. Ranking
1. Value

### Aggregate

Aggregates refer to:

- AVG
- MAX
- MIN
- SUM
- COUNT

This can be a replacement for `GROUP BY`, if we specify that group in the `OVER` clause. But unlike `GROUPBY.COUNT` which collapses the table into those groups, window assigns that value to each row of that dataset.

In SQL for above query:

In [4]:
df.createOrReplaceTempView('df_sql_view')

Part one - creating table with count of each type for each ward

In [61]:
query = """
    SELECT
        ward_name ward,
        `Service Request Type` type,
        COUNT(1) count
    FROM
        df_sql_view
    GROUP BY ward_name, `Service Request Type`
    HAVING ward_name IS NOT NULL
    ORDER BY count DESC
"""
df_type_count = spark.sql(query)
df_type_count.createOrReplaceTempView('df_type_count')
df_type_count.show()

+--------------------+--------------------+-----+
|                ward|                type|count|
+--------------------+--------------------+-----+
|           Davenport|Residential: Bin:...| 1141|
|   Eglinton-Lawrence|Residential: Bin:...|  984|
|  Parkdale-High Park|Residential: Bin:...|  971|
| Etobicoke-Lakeshore|     General Pruning|  913|
|  Toronto-St. Paul's|Residential: Bin:...|  895|
|    Etobicoke Centre|     General Pruning|  845|
|    Toronto-Danforth|     General Pruning|  818|
| University-Rosedale|Residential: Bin:...|  810|
| Etobicoke-Lakeshore| INJUR/DIST WILDLIFE|  799|
|   Beaches-East York|    CADAVER WILDLIFE|  788|
|   Beaches-East York| INJUR/DIST WILDLIFE|  784|
|   Beaches-East York|Residential: Bin:...|  774|
|   Eglinton-Lawrence|     General Pruning|  751|
|   Beaches-East York|     General Pruning|  737|
|    Toronto-Danforth|    CADAVER WILDLIFE|  728|
|   York South-Weston|  Property Standards|  716|
|Scarborough South...|  Property Standards|  701|


                                                                                

Part two - using common table expression (CTE) to add a column with max(count) of each ward, and query against it to find the max count service type

In [67]:
query = """
WITH type_max AS (
    SELECT 
        ward,
        type,
        count,
        MAX(count) OVER (PARTITION BY ward) maxcount
    FROM df_type_count
)
SELECT
    ward, type, count
FROM
    type_max
WHERE count == maxcount
"""
df_type_max = spark.sql(query)
df_type_max.show()

+--------------------+--------------------+-----+
|                ward|                type|count|
+--------------------+--------------------+-----+
|   Beaches-East York|    CADAVER WILDLIFE|  788|
|           Davenport|Residential: Bin:...| 1141|
|     Don Valley East|  Property Standards|  349|
|    Don Valley North|Residential: Bin:...|  413|
|     Don Valley West|     General Pruning|  575|
|   Eglinton-Lawrence|Residential: Bin:...|  984|
|    Etobicoke Centre|     General Pruning|  845|
|     Etobicoke North|  Property Standards|  517|
| Etobicoke-Lakeshore|     General Pruning|  913|
|Humber River-Blac...|  Property Standards|  499|
|  Parkdale-High Park|Residential: Bin:...|  971|
|  Scarborough Centre|Residential: Bin:...|  668|
|   Scarborough North|Residential: Bin:...|  531|
|Scarborough South...|  Property Standards|  701|
|Scarborough-Aginc...|     General Pruning|  428|
|Scarborough-Guild...| INJUR/DIST WILDLIFE|  484|
|Scarborough-Rouge...|     General Pruning|  687|


Putting it all together

In [8]:
window_query = """
with type_count as (
    SELECT
        ward_name ward,
        `Service Request Type` type,
        COUNT(1) count
    FROM
        df_sql_view
    GROUP BY ward_name, `Service Request Type`
    HAVING ward_name IS NOT NULL
),
type_max as (
    SELECT 
        ward,
        type,
        count,
        MAX(count) OVER (PARTITION BY ward) maxcount
    FROM type_count
)
SELECT
    ward, type, count, row_number() over(ORDER BY type) row
FROM
    type_max
WHERE count == maxcount
ORDER BY type
"""
df_sql = spark.sql(window_query).show(n=25, truncate=40)

23/03/22 14:47:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/22 14:47:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/22 14:47:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 10:>                                                         (0 + 1) / 1]

23/03/22 14:47:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/22 14:47:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/22 14:47:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/22 14:47:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/22 14:47:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/22 14:47:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---------

                                                                                

### Most common ward by request type

Inverse of the previous question - which neighborhood makes the most calls for each request type?

In [12]:
ward_query = """
with type_count as (
    SELECT
        ward_name ward,
        `Service Request Type` type,
        COUNT(1) count
    FROM
        df_sql_view
    GROUP BY ward_name, `Service Request Type`
    HAVING ward_name IS NOT NULL
),
ward_max as (
    SELECT 
        ward,
        type,
        count,
        MAX(count) OVER (PARTITION BY type) maxcount
    FROM type_count
)
SELECT
    type, ward, count
FROM ward_max
WHERE maxcount = count and count > 100
"""
df_ward_by_type = spark.sql(ward_query).show()
# df_ward_by_type


+--------------------+--------------------+-----+
|                type|                ward|count|
+--------------------+--------------------+-----+
|All / Hazardous W...|  Parkdale-High Park|  656|
|Bin Investigation...|           Davenport|  511|
|Boulevard - Ploug...|Scarborough-Guild...|  102|
|Boulevards - Dama...|    Etobicoke Centre|  194|
|By-Law Contravent...| Etobicoke-Lakeshore|  126|
|    CADAVER WILDLIFE|   Beaches-East York|  788|
|Catch Basin - Blo...|   Eglinton-Lawrence|  134|
|Comment / Suggestion|   Spadina-Fort York|  118|
|Complaint/Investi...|           Davenport|  179|
|Dispute SR Status...|   Eglinton-Lawrence|  211|
|      Dogs off Leash|   Spadina-Fort York|  131|
|Expressway requir...|   Spadina-Fort York|  122|
|     General Pruning| Etobicoke-Lakeshore|  913|
|General Tree Main...| Etobicoke-Lakeshore|  115|
|            Graffiti| University-Rosedale|  290|
|Gypsy Moth Contro...|    Don Valley North|  168|
| INJUR/DIST WILDLIFE| Etobicoke-Lakeshore|  799|


### Top 5 requests by season

Let's define seasons by datetime:

- spring: 3/21-6/21
- summer: 6/21-9/21
- fall: 9/21 - 11/30 (winter is long, okay)
- winter: 12/1 - 3/21

Execution:

- add season column via UDF
- for each season, count rows by service type
- since we want the top $n$ service type by season, we can't just use `max`
    - for each season, sort the count
    - `row_number() OVER(PARTITION BY season)`
    - get row numbers 1-5 for each season


In [34]:
from datetime import datetime

In [26]:
# dt_samp, _ = df.randomSplit([0.1,0.9])
dt_samp = df.sample(0.005).select('creation_datetime')
dt_samp.show()

+-------------------+
|  creation_datetime|
+-------------------+
|2021-01-01 20:37:54|
|2021-01-02 15:34:08|
|2021-01-02 15:42:29|
|2021-01-03 09:25:41|
|2021-01-03 14:00:47|
|2021-01-03 14:52:53|
|2021-01-03 16:42:20|
|2021-01-03 16:59:58|
|2021-01-04 10:08:03|
|2021-01-04 11:08:34|
|2021-01-04 11:29:09|
|2021-01-04 13:47:57|
|2021-01-04 18:38:14|
|2021-01-04 18:49:55|
|2021-01-05 09:35:50|
|2021-01-05 10:15:37|
|2021-01-05 10:17:07|
|2021-01-05 12:47:21|
|2021-01-05 14:24:13|
|2021-01-05 16:50:05|
+-------------------+
only showing top 20 rows



In [27]:
dt_samp.count()

1699

In [36]:
dt1 = dt_samp.head(1)
type(dt1[0].creation_datetime)
dtval = dt1[0].creation_datetime
dtval.year
spring = datetime(dtval.year, month=3, day=21)
dtval < spring

                                                                                

True

In [37]:

def extract_season(creation_time: datetime) -> str:
    """Extracts season from the datetime field"""
    m = creation_time.month
    d = creation_time.day
    if m < 3 or (m == 3 and d < 21) or m > 11:
        return 'winter'
    elif m < 6 or (m == 6 and d < 21):
        return 'spring'
    elif m < 9 or (m == 9 and d < 21):
        return 'summer'
    else:
        return 'fall'

In [39]:
extract_season_udf = F.udf(extract_season, returnType=types.StringType())

In [44]:
dt_samp.withColumn('season', extract_season_udf(dt_samp.creation_datetime)) \
    .sample(0.02).show(n=40)

+-------------------+------+
|  creation_datetime|season|
+-------------------+------+
|2021-02-01 14:33:56|winter|
|2021-02-25 18:34:44|winter|
|2021-03-01 15:46:41|winter|
|2021-03-11 09:23:17|winter|
|2021-04-12 12:58:47|spring|
|2021-04-21 12:33:16|spring|
|2021-04-28 16:21:34|spring|
|2021-05-06 13:34:14|spring|
|2021-05-11 09:03:35|spring|
|2021-05-13 15:09:33|spring|
|2021-05-31 09:32:44|spring|
|2021-06-03 09:24:27|spring|
|2021-07-09 10:38:02|summer|
|2021-07-15 18:13:51|summer|
|2021-08-06 17:47:59|summer|
|2021-08-09 12:12:22|summer|
|2021-08-11 18:22:40|summer|
|2021-08-16 11:00:34|summer|
|2021-08-16 13:15:40|summer|
|2021-08-27 12:03:40|summer|
|2021-09-08 14:12:10|summer|
|2021-09-13 11:03:23|summer|
|2021-09-16 15:05:24|summer|
|2021-09-20 08:09:53|summer|
|2021-10-11 22:39:56|  fall|
|2021-10-12 08:26:15|  fall|
+-------------------+------+

