In [31]:
import pandas as pd
import numpy as np
import traceback
import time

from datetime import datetime

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col, udf, array, count
from pyspark.sql.functions import broadcast,coalesce, lit
from itertools import chain
from pyspark.sql.functions import col, lit, when, isnull


OBJECTIVES:

In this challenge, we would like to gather statistics on the number of parking violations (tickets) per street
segment in NYC over the past 5 years. In particular, for each street segment in NYC, we would like to have the
following:
1. The total number of parking violations for each year from 2015 to 2019.
2. The rate that the total number of violations change over the years using Ordinary Least Squares.

The street address is provided through the House Number; Street Name; and Violation County field.
For the parking violations data set, the Issue Date field should be used to determine which year a violationbelongs to.

In [2]:
sc = SparkContext()
spark = SparkSession(sc)

In [3]:
streets = "nyc_cscl.csv"
violations = "nyc_parking_violation/*.csv"

# streets = "hdfs:///tmp/bdm/nyc_cscl.csv"
# violations = "hdfs:///tmp/bdm/nyc_parking_violations/"

In [49]:
def to_upper(string):
    if string is None:
        return None
    return string.strip().upper()

def get_county_code(county):
    if county is not None:
        # Boro codes: 1 = MN, 2 = BX, 3 = BK, 4 = QN, 5 = SI
        if county.startswith("M") or county.startswith("N"):
            return 1
        if county in ['BRONX', 'BX', 'PBX']:
            return 2
        if county in ['BK', 'K', 'KING', 'KINGS']:
            return 3
        if county.startswith('Q'):
            return 4
        if county == 'R' or county == 'ST':
            return 5
    return -1

def get_year(string): 
    data_val = datetime.strptime(string.strip(), '%m/%d/%Y')    
    return data_val.year

def get_street_number(street_val):
    if street_val is None:
        return 0
    if type(street_val) is int:
        return street_val
    elems = street_val.split("-")
    new_val = "".join(elems)
    if new_val.isdigit():
        return int(new_val)
    else:
        return 0

def as_digit(val):
    if val:
        return int(val)
    return val

In [50]:
def get_violations_df(violations_file, spark):
    get_county_code_udf = udf(get_county_code)
    get_street_number_udf = udf(get_street_number)
    get_year_udf = udf(get_year)
    to_upper_udf = udf(to_upper)
    
    violations_df = spark.read.csv(violations_file, header=True, inferSchema=True)

    violations_df = violations_df.select("Violation County", "House Number", "Street Name", "Issue Date")

    violations_df = violations_df.filter((violations_df['Violation County'].isNotNull()) 
                                         & (violations_df['House Number'].isNotNull()) 
                                         & (violations_df['Street Name'].isNotNull()) 
                                         & (violations_df['Issue Date'].isNotNull())
                                        )

    violations_df = violations_df.withColumn('Violation County', get_county_code_udf(violations_df['Violation County']))
    violations_df = violations_df.withColumn('House Number', get_street_number_udf(violations_df['House Number']))
    violations_df = violations_df.withColumn('Street Name', to_upper_udf(violations_df['Street Name']))
    violations_df = violations_df.withColumn('Issue Date', get_year_udf(violations_df['Issue Date']))

    violations_df = violations_df.withColumnRenamed("Violation County","COUNTY")
    violations_df = violations_df.withColumnRenamed("House Number","HOUSENUM")
    violations_df = violations_df.withColumnRenamed("Street Name","STREETNAME")
    violations_df = violations_df.withColumnRenamed("Issue Date","YEAR")

    violations_df = violations_df.where(violations_df.YEAR.isin(list(range(2015,2020))))
    violations_df = violations_df.repartition(5,'COUNTY')
    violations_df = violations_df.alias('v')
    return violations_df

In [51]:
def get_streets_df(streets_file, spark):
    get_street_number_udf = udf(get_street_number)
    to_upper_udf = udf(to_upper)
    as_digit_udf = udf(as_digit)
    
    streets_df = spark.read.csv(streets_file, header=True, inferSchema=True)

    streets_df = streets_df.select("PHYSICALID","BOROCODE", "FULL_STREE", "ST_LABEL","L_LOW_HN", "L_HIGH_HN", 
                                   "R_LOW_HN", "R_HIGH_HN")

    streets_df = streets_df.withColumn('BOROCODE', as_digit_udf(streets_df['BOROCODE']))
    streets_df = streets_df.withColumn('FULL_STREE', to_upper_udf(streets_df['FULL_STREE']))
    streets_df = streets_df.withColumn('ST_LABEL',   to_upper_udf(streets_df['ST_LABEL']))
    streets_df = streets_df.withColumn('L_LOW_HN',  get_street_number_udf(streets_df['L_LOW_HN']))
    streets_df = streets_df.withColumn('L_HIGH_HN', get_street_number_udf(streets_df['L_HIGH_HN']))
    streets_df = streets_df.withColumn('R_LOW_HN',  get_street_number_udf(streets_df['R_LOW_HN']))
    streets_df = streets_df.withColumn('R_HIGH_HN', get_street_number_udf(streets_df['R_HIGH_HN']))

    streets_df = streets_df.withColumnRenamed("L_LOW_HN","OddLo")
    streets_df = streets_df.withColumnRenamed("L_HIGH_HN","OddHi")
    streets_df = streets_df.withColumnRenamed("R_LOW_HN","EvenLo")
    streets_df = streets_df.withColumnRenamed("R_HIGH_HN","EvenHi")
    
    streets_df = streets_df.repartition(5, 'BOROCODE')
    streets_df = streets_df.alias('s')
    return streets_df

In [52]:
violations_df = get_violations_df(violations, spark)
streets_df = get_streets_df(streets, spark)

In [53]:
# violations_simp = pd.DataFrame(violations_df.head(5), columns=violations_df.columns)
# violations_simp

In [54]:
# streets_simp = pd.DataFrame(streets_df.head(5), columns=streets_df.columns)
# streets_simp

In [69]:
# streets_df = streets_df.alias('s')
# violations_df = violations_df.alias('v')

def mapper(row):
    if row['FULL_STREE'] == row['ST_LABEL']:
        yield ( 
                (row['BOROCODE'], row["FULL_STREE"] ), 
                [( row['EvenLo'],row['EvenHi'],row['OddLo'],row['OddHi'], row['PHYSICALID'] )] 
              ) 
    else:
        yield ( 
                (row['BOROCODE'], row["FULL_STREE"]), 
                [( row['EvenLo'],row['EvenHi'],row['OddLo'],row['OddHi'] ,row['PHYSICALID'] )] 
              ) 
        yield ( 
                (row['BOROCODE'], row["ST_LABEL"]), 
                [( row['EvenLo'],row['EvenHi'],row['OddLo'],row['OddHi'], row['PHYSICALID'] ) ]
              ) 
        

streets_dict = streets_df.rdd.flatMap(mapper).reduceByKey(lambda x,y: x+y).collectAsMap()

In [73]:
def get_val(borocode, street, housenum):
    val = streets_dict.get( (borocode, street) )
    if val:
        for item in val:
            if int(housenum) % 2 == 0:
                if int(item[0]) >= int(housenum )and int(housenum) <= int(item[1]):
                    return item[4]
            else:
                if int(item[2]) >= int(housenum) and int(housenum) <= int(item[3]):
                    return item[4]      
    return None

get_val_udf = udf(get_val)

violations_2 = violations_df.withColumn('PHYSICALID', get_val_udf(violations_df['v.County'], 
                                                          violations_df['v.STREETNAME'], violations_df['v.HOUSENUM']
                                                          ))

violations_2 = violations_2.filter( violations_2['PHYSICALID'].isNotNull() )
violations_2.show()

+------+--------+------------------+----+----------+
|COUNTY|HOUSENUM|        STREETNAME|YEAR|PHYSICALID|
+------+--------+------------------+----+----------+
|     4|    8027|       JAMAICA AVE|2015|      6471|
|     4|    8246|          135TH ST|2015|     92936|
|     4|    8246|          135TH ST|2015|     92936|
|     4|   21447|       JAMAICA AVE|2015|      6471|
|     4|    2260|           26TH ST|2015|     97676|
|     4|    4018|           29TH ST|2015|     24685|
|     4|    8909|          162ND ST|2015|     25168|
|     4|    4202|COLLEGE POINT BLVD|2015|     29578|
|     4|   15608|    CROSS BAY BLVD|2015|     73349|
|     4|    4322|       QUEENS BLVD|2015|     12455|
|     4|    3529|     FARRINGTON ST|2015|     32032|
|     4|    4705|           45TH ST|2015|     83319|
|     4|       0|     NORTHERN BLVD|2015|      9677|
|     4|    9120|      ATLANTIC AVE|2015|      5467|
|     4|   13355|     ROOSEVELT AVE|2015|    168241|
|     4|   15922|          102ND ST|2015|     

In [77]:
violations_2 = violations_2.groupBy("PHYSICALID", "YEAR").agg(count("*").alias("YEAR_COUNT"))
violations_2.show(10)

+----------+----+----------+
|PHYSICALID|YEAR|YEAR_COUNT|
+----------+----+----------+
|     23724|2015|         1|
|     69833|2015|         1|
|     43038|2015|         1|
|      1900|2015|         1|
|     96699|2015|         1|
|     77522|2015|         1|
|     81602|2015|         1|
|    102112|2015|         1|
|     32232|2015|         1|
|     68066|2015|         1|
+----------+----+----------+
only showing top 10 rows



In [78]:
violations_2.createOrReplaceTempView("violations2_results")

In [80]:
summaries = spark.sql(
    "select PHYSICALID, " +
    "MAX(CASE WHEN (YEAR = 2015) THEN YEAR_COUNT ELSE 0 END) AS COUNT_2015, " +
    "MAX(CASE WHEN (YEAR = 2016) THEN YEAR_COUNT ELSE 0 END) AS COUNT_2016, " +
    "MAX(CASE WHEN (YEAR = 2017) THEN YEAR_COUNT ELSE 0 END) AS COUNT_2017, " +
    "MAX(CASE WHEN (YEAR = 2018) THEN YEAR_COUNT ELSE 0 END) AS COUNT_2018, " +
    "MAX(CASE WHEN (YEAR = 2019) THEN YEAR_COUNT ELSE 0 END) AS COUNT_2019  " +
    "from violations2_results " +
    "group by PHYSICALID " +
    "order by PHYSICALID "
)
summaries.show(10)

+----------+----------+----------+----------+----------+----------+
|PHYSICALID|COUNT_2015|COUNT_2016|COUNT_2017|COUNT_2018|COUNT_2019|
+----------+----------+----------+----------+----------+----------+
|    100016|         1|         0|         0|         0|         0|
|    100070|         1|         0|         0|         0|         0|
|    100106|         1|         0|         0|         0|         0|
|    100172|         1|         0|         0|         0|         0|
|    100181|         1|         0|         0|         0|         0|
|    100272|         1|         0|         0|         0|         0|
|    100322|         1|         0|         0|         0|         0|
|    100735|         1|         0|         0|         0|         0|
|    100921|         1|         0|         0|         0|         0|
|    100983|         1|         0|         0|         0|         0|
+----------+----------+----------+----------+----------+----------+
only showing top 10 rows



In [83]:
summaries = summaries.withColumn('OLS_COEF', 
                getOLS_udf(array('COUNT_2015', 'COUNT_2016', 'COUNT_2017', 'COUNT_2018', 'COUNT_2019')))
summaries = summaries.show(5)

+----------+----------+----------+----------+----------+----------+--------+
|PHYSICALID|COUNT_2015|COUNT_2016|COUNT_2017|COUNT_2018|COUNT_2019|OLS_COEF|
+----------+----------+----------+----------+----------+----------+--------+
|    100016|         1|         0|         0|         0|         0|     0.6|
|    100070|         1|         0|         0|         0|         0|     0.6|
|    100106|         1|         0|         0|         0|         0|     0.6|
|    100172|         1|         0|         0|         0|         0|     0.6|
|    100181|         1|         0|         0|         0|         0|     0.6|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows



In [13]:
merged_df = (
    violations_df.join(
        broadcast(streets_df),
        ((col("s.BOROCODE") == col("v.COUNTY")) &
        (
            (col("s.FULL_STREE") == col("v.STREETNAME")) | 
            (col("s.ST_LABEL") == col("v.STREETNAME"))
        ) &
        (
            ((col("v.HOUSENUM") % 2 == 0)  & (col("v.HOUSENUM") >= col("s.EvenLo")) & (col("v.HOUSENUM") <= col("s.EvenHi"))) |  
            ((col("v.HOUSENUM") % 2 == 1)  & (col("v.HOUSENUM") >= col("s.OddLo"))  & (col("v.HOUSENUM") <= col("s.OddHi")))
        )
    ), how='inner')
).select(col("s.PHYSICALID"),col("v.YEAR"))

merged_df = merged_df.alias('m')
merged_df = merged_df.groupBy("m.PHYSICALID", "m.YEAR").agg(count("*").alias("YEAR_COUNT"))

In [14]:
start_time = time.time()
merged_df.show(10)
print("--- %s seconds ---" % (time.time() - start_time))
#  436.2948143482208 seconds 

+----------+----+----------+
|PHYSICALID|YEAR|YEAR_COUNT|
+----------+----+----------+
|     17275|2015|         1|
|     73374|2015|         6|
|     90616|2015|         1|
|    127841|2015|         1|
|    146935|2015|         2|
|      8574|2015|         1|
|     10729|2015|         2|
|     90192|2015|         1|
|     13798|2015|         8|
|     71027|2015|         1|
+----------+----+----------+
only showing top 10 rows

--- 234.21456098556519 seconds ---


In [66]:
merged_df.createOrReplaceTempView("merged_results")

In [68]:
summaries = spark.sql(
    "select m.PHYSICALID, " +
    "MAX(CASE WHEN (YEAR = 2015) THEN YEAR_COUNT ELSE 0 END) AS COUNT_2015, " +
    "MAX(CASE WHEN (YEAR = 2016) THEN YEAR_COUNT ELSE 0 END) AS COUNT_2016, " +
    "MAX(CASE WHEN (YEAR = 2017) THEN YEAR_COUNT ELSE 0 END) AS COUNT_2017, " +
    "MAX(CASE WHEN (YEAR = 2018) THEN YEAR_COUNT ELSE 0 END) AS COUNT_2018, " +
    "MAX(CASE WHEN (YEAR = 2019) THEN YEAR_COUNT ELSE 0 END) AS COUNT_2019  " +
    "from merged_results m  " +
    "group by PHYSICALID " +
    "order by PHYSICALID "
)
summaries.show(10)

AnalysisException: "cannot resolve '`PHYSICALID`' given input columns: [m.COUNTY, m.STREETNAME, m.YEAR, m.HOUSENUM, m.ID]; line 1 pos 408;\n'Sort ['PHYSICALID ASC NULLS FIRST], true\n+- 'Aggregate ['PHYSICALID], ['m.PHYSICALID, 'MAX(CASE WHEN (cast(YEAR#1274 as int) = 2015) THEN 'YEAR_COUNT ELSE 0 END) AS COUNT_2015#2063, 'MAX(CASE WHEN (cast(YEAR#1274 as int) = 2016) THEN 'YEAR_COUNT ELSE 0 END) AS COUNT_2016#2064, 'MAX(CASE WHEN (cast(YEAR#1274 as int) = 2017) THEN 'YEAR_COUNT ELSE 0 END) AS COUNT_2017#2065, 'MAX(CASE WHEN (cast(YEAR#1274 as int) = 2018) THEN 'YEAR_COUNT ELSE 0 END) AS COUNT_2018#2066, 'MAX(CASE WHEN (cast(YEAR#1274 as int) = 2019) THEN 'YEAR_COUNT ELSE 0 END) AS COUNT_2019#2067]\n   +- SubqueryAlias `m`\n      +- SubqueryAlias `merged_results`\n         +- Project [COUNTY#1259, HOUSENUM#1264, STREETNAME#1269, YEAR#1274, get_val(County#1259, STREETNAME#1269, HOUSENUM#1264) AS ID#1987]\n            +- SubqueryAlias `v`\n               +- RepartitionByExpression [COUNTY#1259], 5\n                  +- Filter cast(YEAR#1274 as string) IN (cast(2015 as string),cast(2016 as string),cast(2017 as string),cast(2018 as string),cast(2019 as string))\n                     +- Project [COUNTY#1259, HOUSENUM#1264, STREETNAME#1269, Issue Date#1254 AS YEAR#1274]\n                        +- Project [COUNTY#1259, HOUSENUM#1264, Street Name#1248 AS STREETNAME#1269, Issue Date#1254]\n                           +- Project [COUNTY#1259, House Number#1242 AS HOUSENUM#1264, Street Name#1248, Issue Date#1254]\n                              +- Project [Violation County#1236 AS COUNTY#1259, House Number#1242, Street Name#1248, Issue Date#1254]\n                                 +- Project [Violation County#1236, House Number#1242, Street Name#1248, get_year(Issue Date#1149) AS Issue Date#1254]\n                                    +- Project [Violation County#1236, House Number#1242, to_upper(Street Name#1169) AS Street Name#1248, Issue Date#1149]\n                                       +- Project [Violation County#1236, get_street_number(House Number#1168) AS House Number#1242, Street Name#1169, Issue Date#1149]\n                                          +- Project [get_county_code(Violation County#1166) AS Violation County#1236, House Number#1168, Street Name#1169, Issue Date#1149]\n                                             +- Filter (((isnotnull(Violation County#1166) && isnotnull(House Number#1168)) && isnotnull(Street Name#1169)) && isnotnull(Issue Date#1149))\n                                                +- Project [Violation County#1166, House Number#1168, Street Name#1169, Issue Date#1149]\n                                                   +- Relation[Summons Number#1145L,Plate ID#1146,Registration State#1147,Plate Type#1148,Issue Date#1149,Violation Code#1150,Vehicle Body Type#1151,Vehicle Make#1152,Issuing Agency#1153,Street Code1#1154,Street Code2#1155,Street Code3#1156,Vehicle Expiration Date#1157,Violation Location#1158,Violation Precinct#1159,Issuer Precinct#1160,Issuer Code#1161,Issuer Command#1162,Issuer Squad#1163,Violation Time#1164,Time First Observed#1165,Violation County#1166,Violation In Front Of Or Opposite#1167,House Number#1168,... 19 more fields] csv\n"

In [82]:
def getOLS(values):
    import statsmodels.api as sm
    X = sm.add_constant(np.arange(len(values)))
    fit = sm.OLS(values, X).fit()
    coef = fit.params[0]
    return float(coef)

getOLS_udf = udf(getOLS)

# summaries = summaries.withColumn('OLS_COEF', 
#                 getOLS_udf(array('COUNT_2015', 'COUNT_2016', 'COUNT_2017', 'COUNT_2018', 'COUNT_2019')))
# summaries = summaries.alias('f')

In [20]:
start_time = time.time()
summaries.show()
print("--- %s seconds ---" % (time.time() - start_time))


+----------+----------+----------+----------+----------+----------+------------------+
|PHYSICALID|COUNT_2015|COUNT_2016|COUNT_2017|COUNT_2018|COUNT_2019|          OLS_COEF|
+----------+----------+----------+----------+----------+----------+------------------+
|        29|         5|         0|         0|         0|         0|               3.0|
|        30|         5|         0|         0|         0|         0|               3.0|
|        50|         8|         0|         0|         0|         0|               4.8|
|        58|         5|         0|         0|         0|         0|               3.0|
|        62|         6|         0|         0|         0|         0|3.5999999999999996|
|        66|         6|         0|         0|         0|         0|3.5999999999999996|
|        67|        26|         0|         0|         0|         0|              15.6|
|       116|         8|         0|         0|         0|         0|               4.8|
|       120|         4|         0|         

In [21]:
streets_df = streets_df.select(col("s.PHYSICALID")) \
                    .join(summaries, "PHYSICALID", how='left') \
                    .distinct() \
                    .orderBy("PHYSICALID") \

streets_df = streets_df.withColumn("COUNT_2015",coalesce("COUNT_2015", lit(0))) 
streets_df = streets_df.withColumn("COUNT_2016",coalesce("COUNT_2016", lit(0))) 
streets_df = streets_df.withColumn("COUNT_2017",coalesce("COUNT_2017", lit(0))) 
streets_df = streets_df.withColumn("COUNT_2018",coalesce("COUNT_2018", lit(0))) 
streets_df = streets_df.withColumn("COUNT_2019",coalesce("COUNT_2019", lit(0))) 
streets_df = streets_df.withColumn("OLS_COEF",  coalesce("OLS_COEF", lit(0.0))) 


In [22]:
streets_df.show()

+----------+----------+----------+----------+----------+----------+--------+
|PHYSICALID|COUNT_2015|COUNT_2016|COUNT_2017|COUNT_2018|COUNT_2019|OLS_COEF|
+----------+----------+----------+----------+----------+----------+--------+
|         3|         0|         0|         0|         0|         0|     0.0|
|         5|         0|         0|         0|         0|         0|     0.0|
|         6|         0|         0|         0|         0|         0|     0.0|
|         8|         0|         0|         0|         0|         0|     0.0|
|        14|         0|         0|         0|         0|         0|     0.0|
|        23|         0|         0|         0|         0|         0|     0.0|
|        24|         0|         0|         0|         0|         0|     0.0|
|        25|         0|         0|         0|         0|         0|     0.0|
|        29|         5|         0|         0|         0|         0|     3.0|
|        30|         5|         0|         0|         0|         0|     3.0|

In [1]:
# streets_df.write.csv('TODO', header=False)

In [2]:
import BDM_Final
start_time = time.time()
%run -i BDM_Final.py output
print("--- %s seconds ---" % (time.time() - start_time))

Output Path:  output
+----------+----------+----------+----------+----------+----------+--------+
|PHYSICALID|COUNT_2015|COUNT_2016|COUNT_2017|COUNT_2018|COUNT_2019|OLS_COEF|
+----------+----------+----------+----------+----------+----------+--------+
|         3|         0|         0|         0|         0|         0|     0.0|
|         5|         0|         0|         0|         0|         0|     0.0|
|         6|         0|         0|         0|         0|         0|     0.0|
|         8|         0|         0|         0|         0|         0|     0.0|
|        14|         0|         0|         0|         0|         0|     0.0|
|        23|         0|         0|         0|         0|         0|     0.0|
|        24|         0|         0|         0|         0|         0|     0.0|
|        25|         0|         0|         0|         0|         0|     0.0|
|        29|         5|         0|         0|         0|         0|     3.0|
|        30|         5|         0|         0|         0