In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
              
ticketSchema=StructType([StructField("Summons_Number",StringType(), True),
StructField("Plate_ID",StringType(), True),
StructField("Registration_State", StringType(), True),
StructField("Plate_Type", StringType(), True),
StructField("Issue_Date", StringType(), True),
StructField("Violation_Code", StringType(), True),
StructField("Vehicle_Body_Type", StringType(), True),
StructField("Vehicle_Make", StringType(), True),
StructField("Issuing_Agency", StringType(), True),
StructField("Street_Code1", StringType(), True),
StructField("Street_Code2", StringType(), True),
StructField("Street_Code3", StringType(), True),
StructField("Vehicle_Expiration_Date", StringType(), True),
StructField("Violation_Location", StringType(), True),
StructField("Violation_Precinct", StringType(), True),
StructField("Issuer_Precinct", StringType(), True),
StructField("Issuer_Code", StringType(), True),
StructField("Issuer_Command", StringType(), True),
StructField("Issuer_Squad", StringType(), True),
StructField("Violation_Time", StringType(), True),
StructField("Time_First_Observed", StringType(), True),
StructField("Violation_County", StringType(), True),
StructField("Violation_In_Front_Opposite", StringType(), True),
StructField("House_Number", StringType(), True),
StructField("Street_Name", StringType(), True),
StructField("Intersecting_Street", StringType(), True),
StructField("Date_First_Observed", StringType(), True),
StructField("Law_Section", StringType(), True),
StructField("Sub_Division", StringType(), True),
StructField("Violation_Legal_Code", StringType(), True),
StructField("Days_Parking_In_Effect", StringType(), True),
StructField("From_Hours_In_Effect", StringType(), True),
StructField("To_Hours_In_Effect", StringType(), True),
StructField("Vehicle_Color", StringType(), True),
StructField("Unregistered_Vehicle?", StringType(), True),
StructField("Vehicle_Year", StringType(), True),
StructField("Meter_Number", StringType(), True),
StructField("Feet_From_Curb", StringType(), True),
StructField("Violation_Post_Code", StringType(), True),
StructField("Violation_Description", StringType(), True),
StructField("No_Standing_or_Stopping_Violation", StringType(), True),
StructField("Hydrant_Violation", StringType(), True),
StructField("Double_Parking_Violation", StringType(), True),
StructField("Latitude", StringType(), True),
StructField("Longitude", StringType(), True),
StructField("Community_Board", StringType(), True),
StructField("Community_Council", StringType(), True),
StructField("Census_Tract", StringType(), True),
StructField("BIN", StringType(), True),
StructField("BBL", StringType(), True),
StructField("NTA", StringType(), True)
])

import re
import usaddress
from pyspark.sql import functions as F

directions={"se":"Southeast","ne":"Northeast","sw":"Southwest","nw":"Northwest","e":"East","w":"West","n":"North","s":"South",
           "southeast":"Southeast","northeast":"Northeast","southwest":"Southwest","northwest":"Northwest","east":"East","west":"West","north":"North","south":"South"}

streetTypes={"st":"Street","ave":"Avenue","blvd":"Boulevard","cv":"Cove","ct":"Court","cr":"Circle","ln":"Lane","dr":"Drive",
             "street":"Street","avenue":"Avenue","boulevard":"Boulevard","cove":"Cove","court":"Court","circle":"Circle","lane":"Lane","drive":"Drive"}

@F.udf(returnType=BooleanType())
def has_no_special_chars(value):
  if(re.search("\W+",str(value))):
    return False
  else:
    return True
@F.udf(returnType=BooleanType())
def has_special_chars(value):
  if(re.search("\W+",str(value))):
    return True
  else:
    return False
  
@F.udf(returnType=BooleanType())
def is_valid_date(value):
  if(re.match("^\d\d/\d\d/\d\d\d\d$",str(value))):
    return True
  else:
    return False
@F.udf(returnType=StringType())
def parse(houseNumber,streetName):
  addressStr=""
  if(houseNumber):
    addressStr=houseNumber+" "+streetName
  else:
    addressStr=streetName
  address=usaddress.parse(addressStr)
  addressStr=""
  for key in address:
    print(key)
    if(key[1]=="StreetNamePreDirectional" or key[1]=="StreetNamePostDirectional" ):
      addressStr+=" "+directions.get(key[0].lower(),key[0].lower())
    elif(key[1]=="StreetName"):
      try:
        strName=int(key[0])
        digit=strName%10
        if(digit in [0,4,5,6,7,8,9]):
          addressStr+=" "+key[0]+"th"
        elif(digit==1):
          addressStr+=" "+key[0]+"st"
        elif(digit==2):
          addressStr+=" "+key[0]+"nd"
        elif(digit==3):
          addressStr+=" "+key[0]+"rd"
      except:
        addressStr+=" "+key[0].lower()
    elif(key[1]=="StreetNamePostType"):
      addressStr+=" "+streetTypes.get(key[0].lower(),key[0].lower())
    else:
      addressStr+=" "+key[0].lower()
  return addressStr 



In [2]:
df=spark.read.csv("s3a://<key>:<secret_key>@nycpkviolations/Parking_Violations_Issued_-_Fiscal_Year_2015.csv",header=True,schema=ticketSchema)

In [3]:
for key in df.columns:
  print("Not Null count for "+key+" "+str(df.dropna(how="any",subset=key).count()))

from the above counts, it is pretty clear that we can drop the columns with 0 counts from the final table.
Also sparks schema inference was not working properly, so I have defined all columns as string. I will convert them to the right type using dataframe options.

Going, from the first look of the data, we could probably have the following dimensions for this data. The fact for this data would be the actual summons or the ticket itself. Hence, the primary key for thid dataset is the "Summons Number" . The logical dimensions for such data sets would

1. The vehicle for which a ticket was provided
2. The violation code and the corresponding violation details.
3. The issuer of the ticket.
4. The location of the violation or the meter number.

We will do a quick analysis of these different dimension options and see if these can actually be stored as references.

Lets look at the vehicle dimension. The Tag or the Plate ID of the vehical would logically be the key for this particular dimension.

In [6]:
display(df.where(has_special_chars(df["Plate_ID"])))

Summons_Number,Plate_ID,Registration_State,Plate_Type,Issue_Date,Violation_Code,Vehicle_Body_Type,Vehicle_Make,Issuing_Agency,Street_Code1,Street_Code2,Street_Code3,Vehicle_Expiration_Date,Violation_Location,Violation_Precinct,Issuer_Precinct,Issuer_Code,Issuer_Command,Issuer_Squad,Violation_Time,Time_First_Observed,Violation_County,Violation_In_Front_Opposite,House_Number,Street_Name,Intersecting_Street,Date_First_Observed,Law_Section,Sub_Division,Violation_Legal_Code,Days_Parking_In_Effect,From_Hours_In_Effect,To_Hours_In_Effect,Vehicle_Color,Unregistered_Vehicle?,Vehicle_Year,Meter_Number,Feet_From_Curb,Violation_Post_Code,Violation_Description,No_Standing_or_Stopping_Violation,Hydrant_Violation,Double_Parking_Violation,Latitude,Longitude,Community_Board,Community_Council,Census_Tract,BIN,BBL,NTA
7710757820,3.46E+87,MD,PAS,09/05/2014,14,DELV,HINO,T,34850,10410,10510,01/01/20140088 12:00:00 PM,18.0,18,18,349867,T503,A1,0617A,,NY,O,33,W 54th St,,01/05/0001 12:00:00 PM,408,c,,YYYYYYY,,,WHITE,,0,,0,17 7,14-No Standing,,,,,,,,,,,
8013466176,N/S,NY,PAS,01/17/2015,74,4DSD,ME/BE,T,60810,40404,40404,01/01/88888888 12:00:00 PM,105.0,105,105,355712,T402,J,0735A,,Q,F,216-02,Rockaway Blvd,,01/05/0001 12:00:00 PM,408,j2,,,,,TAN,,0,,0,15 4,74-Missing Display Plate,,,,,,,,,,,
8013466176,N/S,NY,PAS,01/17/2015,74,4DSD,ME/BE,T,60810,40404,40404,01/01/88888888 12:00:00 PM,105.0,105,105,355712,T402,J,0735A,,Q,F,216-02,Rockaway Blvd,,01/05/0001 12:00:00 PM,408,j2,,,,,TAN,,0,,0,15 4,74-Missing Display Plate,,,,,,,,,,,
7441832494,],NY,PAS,02/06/2015,14,,,T,68190,40404,40404,01/01/20150101 12:00:00 PM,103.0,103,103,340127,T402,L,0747A,,Q,O,,Woodhull Ave,,01/05/0001 12:00:00 PM,408,c,,Y,0700A,1000A,,,0,,0,B 42,14-No Standing,,,,,,,,,,,
7498841137,N/S,NY,MOT,10/30/2014,51,MCY,YAMAH,T,9890,9840,59990,01/01/88888888 12:00:00 PM,108.0,108,108,345395,T740,B,1011A,,Q,F,43-09,43rd St,,01/05/0001 12:00:00 PM,408,e3,,,,,BLACK,,0,,0,01 6,51-Sidewalk,,,,,,,,,,,
7639987477,N/S,NY,PAS,08/18/2014,51,SUBN,ME/BE,T,25220,0,0,01/01/88888888 12:00:00 PM,103.0,103,103,356353,T402,H,0935A,,Q,I,N,187th Pl,40ft W/of Jamaica Av,01/05/0001 12:00:00 PM,408,e3,,,,,SILVE,,0,,0,10 4,51-Sidewalk,,,,,,,,,,,
7061894969,N/S,NY,999,06/26/2015,14,SUBN,HONDA,T,34550,11720,97500,01/01/88888888 12:00:00 PM,14.0,14,14,346109,T106,A,0813A,,NY,F,528,W 39th St,,01/05/0001 12:00:00 PM,408,c,,YYYYYYY,0800A,1000P,BLUE,,0,,0,I 17,14-No Standing,,,,,,,,,,,
7095875560,XAVH72],NJ,PAS,09/12/2014,46,VAN,FORD,T,18850,10410,25390,01/01/88888888 12:00:00 PM,19.0,19,19,352950,T503,X1,0138P,,NY,F,24,E 94th St,,01/05/0001 12:00:00 PM,408,f1,,,,,WHITE,,0,,0,01 7,46A-Double Parking (Non-COM),,,,,,,,,,,
7082173106,"AEE,5519",NY,TRA,11/06/2014,21,LTRL,INTER,T,68930,48830,27530,01/01/88888888 12:00:00 PM,84.0,84,84,347663,T301,Y,0916A,,K,O,120,Pacific St,,01/05/0001 12:00:00 PM,408,d1,,Y,0900A,1030A,WHITE,,0,,0,01 3,21-No Parking (street clean),,,,,,,,,,,
7537475507,626)7JM,NY,COM,09/16/2014,38,DELV,INTER,T,63030,89730,46630,01/01/88888888 12:00:00 PM,84.0,84,84,361034,T301,P,0251P,,K,F,475,Myrtle Ave,,01/05/0001 12:00:00 PM,408,h1,,Y,0830A,0700P,BROWN,,1995,,0,06 3,38-Failure to Display Muni Rec,,,,,,,,,,,


seeing the output above, we can see that the Plate_ID field has special characters in them, which cannot be tag numbers. But, overall, there the count of data where the Plate_ID is bad is very low compared to where the data is good. We can still use this as one of our dimensions.

In [8]:
#remove data with invalid plateID
print(df.where(has_no_special_chars(df["Plate_ID"])).count())
print(df.count())

so roughly 5500 records have bad plate ID. For the purpose of this exercise, i decided to drop them out . This could be forwarded to the responsible teams to ensure they can correct how this information gets captured.

In [10]:
display(df.groupby("Plate_ID").count())

Plate_ID,count
62417JM,711
PGQ1276,14
2314816,3
CEG8498,8
GNG1357,4
T521509C,2
P43ECM,5
DWP1806,15
29889BB,3
FYW7780,4


From the above tables, we can see that there have vehicles who have been tagged multiple times within a give year. Obviously the 711 occurences looks a bit fishy as the data is for a year and there are only 365 days in a year.

lets check the violation dimensions

In [13]:
display(df.select("Violation_Code","Violation_Legal_Code","Law_Section","Violation_Post_Code","Violation_Description","No_Standing_or_Stopping_Violation","Hydrant_Violation","Double_Parking_Violation"))

Violation_Code,Violation_Legal_Code,Law_Section,Violation_Post_Code,Violation_Description,No_Standing_or_Stopping_Violation,Hydrant_Violation,Double_Parking_Violation
21,,408,A 77,21-No Parking (street clean),,,
14,,408,CC3,14-No Standing,,,
46,,408,J 32,46A-Double Parking (Non-COM),,,
19,,408,01 4,19-No Stand (bus stop),,,
19,,408,19 7,19-No Stand (bus stop),,,
21,,408,C 32,21-No Parking (street clean),,,
21,,408,10 3,21-No Parking (street clean),,,
71,,408,A 42,71A-Insp Sticker Expired (NYS),,,
69,,408,093,69-Failure to Disp Muni Recpt,,,
71,,408,N 42,71A-Insp Sticker Expired (NYS),,,


we already know from our counts earlier that the data for No_Standing_or_Stopping , Hydrant and Double_Parking violation are not available. We could possibly parse the description of the violation to extract  them.

In [15]:
display(df.groupby("Violation_Code","Law_Section","Violation_Description","Violation_Post_Code").count())

Violation_Code,Law_Section,Violation_Description,Violation_Post_Code,count
21,408,21-No Parking (street clean),66,3631
40,408,40-Fire Hydrant,C 77,11243
21,408,21-No Parking (street clean),E 42,5047
20,408,20A-No Parking (Non-COM),20-A,91
16,408,16-No Std (Com Veh) Com Plate,14 7,615
71,408,71A-Insp Sticker Expired (NYS),I 42,4447
20,408,20-No Parking (Com Plate),95 1,2031
20,408,20-No Parking (Com Plate),092,135
38,408,38-Failure to Display Muni Rec,02P,1429
70,408,70A-Reg. Sticker Expired (NYS),02 3,845


from the data above we can see that there is a clear 1-1 relationship between violation code,violation post code and violation description. I am not clear as to what the violation post code means. But, i can notice that it could be different for the same violation code.

In [17]:
display(df.select("Violation_Code","Issue_Date","Issuer_Code","Plate_ID","Violation_Legal_Code","Law_Section","Violation_Precinct","Violation_Post_Code","Violation_Description"))

Violation_Code,Issue_Date,Issuer_Code,Plate_ID,Violation_Legal_Code,Law_Section,Violation_Precinct,Violation_Post_Code,Violation_Description
21,10/01/2014,345454,EPC5238,,408,7,A 77,21-No Parking (street clean)
14,03/06/2015,333386,5298MD,,408,25,CC3,14-No Standing
46,07/28/2014,331845,FYW2775,,408,72,J 32,46A-Double Parking (Non-COM)
19,04/13/2015,355669,GWE1987,,408,102,01 4,19-No Stand (bus stop)
19,05/19/2015,341248,T671196C,,408,28,19 7,19-No Stand (bus stop)
21,11/20/2014,357104,JJF6834,,408,67,C 32,21-No Parking (street clean)
21,08/01/2014,355251,FYP8263,,408,79,10 3,21-No Parking (street clean)
71,07/19/2014,353083,FZL7450,,408,103,A 42,71A-Insp Sticker Expired (NYS)
69,09/23/2014,346381,86318MA,,408,5,093,69-Failure to Disp Muni Recpt
71,01/20/2015,361082,GKX8095,,408,113,N 42,71A-Insp Sticker Expired (NYS)


by looking at the above data, i come to a conclusion that though violation_code and violation_post_code seem to having a 1-1 with the description and form a natural key, i don't think that is the case here, it seems to me to be more factual than a dimension. for now i will keep it out of the dimension table, just to sure.

Now lets take a look at the Issuer of the ticket data.

In [20]:
display(df.groupby("Issuer_Code","Issuer_Precinct","Issuer_Command","Issuer_Squad","Issuing_Agency").count())

Issuer_Code,Issuer_Precinct,Issuer_Command,Issuer_Squad,Issuing_Agency,count
345221,19,T103,B,T,6171
350285,18,T106,B,T,3559
340060,88,T730,C,T,23
355209,20,T103,A,T,4000
356550,10,T800,B2,T,451
359323,60,T302,K,T,63
358658,17,T106,J,T,914
361783,114,T401,P,T,1360
361084,94,T301,S,T,546
357737,19,T103,O,T,4897


from the groupby above, i could clearly reveal a relation between issuer code and the issuer_squad and his agency. But ,we can also , that for a few records the data has not been captured accurately. For the purpose of this exercise, i will be again dropping all records that have issuer_code 0. I can forward these records also to the ingest team so they can correct the data acquisition.

In [22]:
display(df.where(df["Issuer_Code"]=="330581").select("Issuer_Code","Violation_Precinct","Issue_Date","Issuer_Precinct","Issuer_Command","Issuer_Squad","Issuing_Agency"))

Issuer_Code,Violation_Precinct,Issue_Date,Issuer_Precinct,Issuer_Command,Issuer_Squad,Issuing_Agency
330581,14,06/10/2015,14,T501,B,T
330581,14,01/31/2015,14,T501,B,T
330581,14,06/04/2015,14,T501,B,T
330581,14,03/21/2015,14,T501,B,T
330581,18,10/05/2014,18,T501,B,T
330581,17,05/03/2015,17,T501,B,T
330581,17,05/03/2015,17,T501,B,T
330581,10,11/16/2014,10,T501,B,T
330581,18,01/24/2015,18,T501,B,T
330581,18,01/24/2015,18,T501,B,T


In [23]:
display(df.where(df["Issuer_Code"]=="347447").select("Issuer_Code","Violation_Precinct","Issue_Date","Issuer_Precinct","Issuer_Command","Issuer_Squad","Issuing_Agency"))

Issuer_Code,Violation_Precinct,Issue_Date,Issuer_Precinct,Issuer_Command,Issuer_Squad,Issuing_Agency
347447,94,09/30/2014,94,T802,B,T
347447,88,11/05/2014,88,T802,B,T
347447,110,08/06/2014,110,T803,B,T
347447,108,08/14/2014,108,T803,B,T
347447,76,10/28/2014,76,T802,B,T
347447,76,10/28/2014,76,T802,B,T
347447,61,09/26/2014,61,T802,B,T
347447,83,09/30/2014,83,T802,B,T
347447,84,11/04/2014,84,T802,B,T
347447,103,08/27/2014,103,T803,B,T


from the above filtered data for one of the issuer_code, it seems that issuer code is tightly aligned with the command, squad and agency. The precint seems to be more of a fact like data of where the violation occured. So issuer_code, issuer_command, issuer_squad and issuing_agency would be the columns for our dimension.Looks like there is a 1-1 relation between issuuer code , issuer command, issuer squad and issuer agency. But, the issuer precinct is changing

Lets look at the location fields. From the earlier count we could see that there was not latitude/longitude data available so I dropped it from my analysis for this exercise. We could may be use the address and query a service like google maps or a GIS system for the latitude and longitude, and enrich the data as it comes in.

In [26]:
display(df.select("Street_Code1","Street_Code2","Street_Code3","Violation_County","Street_Name","House_Number","Intersecting_Street"))

Street_Code1,Street_Code2,Street_Code3,Violation_County,Street_Name,House_Number,Intersecting_Street
20390,29890,31490,NY,Essex St,133,
27790,19550,19570,NY,Park Ave,1916,
8130,5430,5580,K,31st St,184,
59990,16540,16790,Q,Queens Blvd,120-20,
36090,10410,24690,NY,W 116th St,66,
74230,37980,38030,K,Rutland Rd,1013,
77530,63030,93230,K,Skillman St,155,
49210,0,0,Q,Henderson Ave,S,30ft W/of 189th St
21690,26990,26790,NY,Grand St,199,
35490,35780,22670,Q,Bedell St,137-22,


from the above data we can see that, Street_Code1 is related to the actual Street_Name, the Street_Code2 and Street_Code2 are derived from the house number. They are both set to 0 , when there is not house number or a intersecting street value is set. This ideally could be a dimensional data, were based on the address i could populate the street codes and the latitude/longitude for some analytics. I will not use it for my exercise, but I will add these fields to a seperate table, so we can start building on that set. I can include the precinct and meter_number info in this set, so may be we can show the details of a precin