In [68]:
import pyspark as ps
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType, DateType
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time

In [2]:
spark = (ps.sql.SparkSession
         .builder
         .master('local[4]')
         .appName('lecture')
         .getOrCreate()
        )
sc = spark.sparkContext

In [3]:
park = spark.read.csv('../data/Parking_Violations_Issued_FY_2019.csv',
                         header=True,
                         inferSchema=True)
park.limit(5).toPandas()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,1105232165,GLS6001,NY,PAS,07/03/2018,14,SDN,HONDA,X,47130,...,BLUE,0,2006,-,0,,,,,
1,1121274900,HXM7361,NY,PAS,06/28/2018,46,SDN,NISSA,X,28990,...,GRY,0,2017,-,0,,,,,
2,1130964875,GTR7949,NY,PAS,06/08/2018,24,SUBN,JEEP,X,64,...,GREEN,0,0,-,0,,,,,
3,1130964887,HH1842,NC,PAS,06/07/2018,24,P-U,FORD,X,11310,...,WHITE,0,0,-,0,,,,,
4,1131599342,HDG7076,NY,PAS,06/29/2018,17,SUBN,HYUND,X,47130,...,GREEN,0,2007,-,0,,,,,


In [5]:
park.count()

11467506

In [4]:
park.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Coun

In [9]:
# from pyspark.sql.functions import col,sum
missing = park.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in park.columns)).toPandas()

In [15]:
missing_t = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in missing.items()]).toDF()

missing_t.orderBy('_2').toPandas()

Unnamed: 0,_1,_2
0,Summons Number,0
1,Plate ID,0
2,Registration State,0
3,Plate Type,0
4,Issue Date,0
5,Violation Code,0
6,Street Code1,0
7,Street Code2,0
8,Street Code3,0
9,Vehicle Expiration Date,0


In [87]:
# Drop rows where house number and intersecting street are both null

park2 = park.filter(~((park['House Number'].isNull()) & (park['Intersecting Street'].isNull())))
park2.count()

11353777

In [88]:
park2.select('Feet From Curb').groupBy('Feet From Curb').count().orderBy('Feet From Curb').show()

+--------------+--------+
|Feet From Curb|   count|
+--------------+--------+
|             0|11076405|
|             1|   22657|
|             2|   30105|
|             3|   34716|
|             4|   33057|
|             5|   59367|
|             6|   35274|
|             7|   24984|
|             8|   21032|
|             9|    9842|
|            10|    5526|
|            11|     472|
|            12|     255|
|            13|      47|
|            14|      16|
|            15|      20|
|            16|       2|
+--------------+--------+



In [93]:
def clean_time(line):
#     if line[-1] == "A":
#         txt_time = int(line[:-1])
#     elif line[-1] == "P":
#         txt_time = int(line[:-1]) + 1200
    try:
        txt_time = line + "M"
        new_time = time.strptime(txt_time, '%I%M%p')
        return new_time
    except:
        pass

# def clean_date(line):
clean_time_udf = F.udf(clean_time, TimestampType())

In [70]:
new_time = park2['Violation Time'].map(lambda x: clean_time(x))

TypeError: 'Column' object is not callable

In [94]:
new_time = park2.select('Violation Time').rdd.map(lambda x: clean_time(x))

In [80]:
new_time = park2.withColumn('new_time', clean_time_udf(park2['Violation Time'])).drop('Violation Time')

In [97]:
new_time.filter(lambda x: x != None).take(10)

[]

In [67]:
clean_time('0830P')

AttributeError: 'PipelinedRDD' object has no attribute 'strptime'