In [2]:
# Loading crime csv to RDD
# Replace with your directory
crime_csv = sc.textFile('file:/Users/zhuorulin/Documents/DataScience/datasets/NYPD_Complaint_Data_Historic.csv',use_unicode=False)

In [21]:
# Convert csv to DataFrame
from csv import reader # Warning: csv.reader does not support unicode decode
from pyspark.sql import SQLContext
from collections import defaultdict
import datetime
import re

In [4]:
# Use csv.reader to read raw binary
lines_rdd = crime_csv.mapPartitions(reader)\
.map(lambda line: [x.decode('utf-8') for x in line] )# Decode with utf-8 codec
# store columns values. Also a search table for searching column name using index
schemas = lines_rdd.take(1)[0]
# filter out first row
lines = lines_rdd.filter(lambda x: x!=schemas)

In [5]:
# This dict makes it easier to search for column index using column name
colname2idx = defaultdict()
for idx,colname in enumerate(schemas):
    colname2idx[colname] = idx
# Example
print(colname2idx['CMPLNT_TO_DT'])

3


# Global Null Checking method

In [6]:
def checkNull(string):
    #Assume Unicode String
    # Step 1: Check for length 0 i.e '' field
    if len(string)==0:
        return 'NULL'
    # Step 2: Check for 'nan'
    elif string=='nan':
        return 'NULL'
    else:
        return 'VALID'

In [6]:
# Example: Check row
NULL_TABLE = lines.map(lambda line:[checkNull(x) for x in line])
print(NULL_TABLE.take(1))

[['VALID', 'VALID', 'VALID', 'NULL', 'NULL', 'VALID', 'VALID', 'VALID', 'VALID', 'VALID', 'VALID', 'VALID', 'VALID', 'VALID', 'VALID', 'VALID', 'VALID', 'NULL', 'NULL', 'VALID', 'VALID', 'VALID', 'VALID', 'VALID']]


In [18]:
lines.map(lambda x:x[colname2idx['KY_CD']]).take(1)

[u'113']

# Date Format Checking
Format Reference: http://strftime.org/

In [15]:
testdate = '10/09/1992'
testdate = datetime.datetime.strptime(testdate,date_format)
testtime = '14:24:32'

1992

In [16]:
date_format = '%m/%d/%Y'
time_format = '%H:%M:%S'
# If date is in wrong format it would not be sucessfully converted to datetime object.
def checkDate(line,date_format):
    try:
        date = datetime.datetime.strptime(line,date_format)
        if (date.year <=2015)&(date.year>=2005):
            return 'VALID'
        else:
            return 'INVALID'
    except:
        return 'INVALID'
def checkTime(line,time_format):
    try:
        datetime.datetime.strptime(line,time_format)
        return 'VALID'
    except:
        return 'INVALID'
########################################
# Example
CMPLNT_FR_DT = lines.map(lambda x: x[colname2idx['CMPLNT_FR_DT']])
CMPLNT_FR_DT = CMPLNT_FR_DT.map(lambda x: checkDate(x,date_format))

In [17]:
# Testing Check Date Method
CMPLNT_FR_DT.countByValue()

defaultdict(int,
            {'INVALID/FORMAT': 655, 'INVALID/Year': 18782, 'VALID': 5081794})

# Regular Expression Checker
Regular Expression Reference: https://docs.python.org/2/library/re.html

In [7]:
# Example Checking KY_CD 3-digit code
regex_3_digits = '^\d{3}$'

In [8]:
#Require re package
def checkRegex(line,regex):
    # Input
    # line: a string for check
    # regex: regular expression pattern
    match = re.match(regex,line)
    if match:
        return 'VALID'
    else:
        return 'INVALID'

In [12]:
# Show that it actually work
print checkRegex('1234',regex_3_digits)
print checkRegex('123',regex_3_digits)

INVALID
VALID


In [13]:
#Example : Checking KY_CD VALADITY
KY_CD = lines.map(lambda x:x[colname2idx['KY_CD']]).map(lambda x: checkRegex(x,regex_3_digits))
KY_CD.countByValue()

defaultdict(int, {'VALID': 5101231})

# Columns Cleaning Logic

In [99]:
print schemas

[u'CMPLNT_NUM', u'CMPLNT_FR_DT', u'CMPLNT_FR_TM', u'CMPLNT_TO_DT', u'CMPLNT_TO_TM', u'RPT_DT', u'KY_CD', u'OFNS_DESC', u'PD_CD', u'PD_DESC', u'CRM_ATPT_CPTD_CD', u'LAW_CAT_CD', u'JURIS_DESC', u'BORO_NM', u'ADDR_PCT_CD', u'LOC_OF_OCCUR_DESC', u'PREM_TYP_DESC', u'PARKS_NM', u'HADEVELOPT', u'X_COORD_CD', u'Y_COORD_CD', u'Latitude', u'Longitude', u'Lat_Lon']


## CMPLNT_TO_DT and CMPLNT_TO_TM
- Check Null for both filed
- Combine Date and Time
- Check date format using checkDate() in utilities.py
- Check whether it is later than CMPLNT_FR_DT+CMPLNT_FR_TM

In [85]:
%run ./crime-data-process/code/utilities.py

In [59]:
datetime_format = date_format+' '+time_format
datetime_format

'%m/%d/%Y %H:%M:%S'

In [92]:
# Implementation
# check_CMPLNT_TO check both CMPLNT_TO_DT and CMPLNT_TO_TM at the same time
# It return the validity of both field
def check_FR_TO(from_date,from_time,to_date,to_time,date_format,time_format):
    ###########
    #Null check
    ###########
    from_nullity = [checkNull(from_date),checkNull(from_time)]
    to_nullity = [checkNull(to_date),checkNull(to_time)]
    # If from datetime is NULL, return all NULL
    if 'NULL' in from_nullity:
        return ['NULL','NULL','NULL','NULL']
    elif 'NULL' in to_nullity:
        if 'INVALID' in [checkDate(from_date),checkTime(from_time)]:
            return ['INVALID','INVALID','NULL','NULL']
        else:
            return ['VALID','VALID','NULL','NULL']
    #Check for invalidity
    #If any field format is invalid return invalid for both
    elif ('INVALID' in [checkDate(x) for x in [from_date,to_date]])|('INVALID' in [checkTime(x) for x in [from_time,to_time]]):
        return ['INVALID','INVALID','INVALID','INVALID']    
    else:
        ###Start combining
        #Define datetime format
        datetime_format = date_format+' '+time_format
        from_datetime = datetime.datetime.strptime(from_date+' '+from_time,datetime_format)
        to_datetime = datetime.datetime.strptime(from_date+' '+from_time,datetime_format)
        if from_datetime <= to_datetime:
            return ['VALID','VALID','VALID','VALID']
        else:
            # Or ['INVALID','INVALID','INVALID','INVALID']? Open for discussion
            
            return ['VALID','VALID','INVALID','INVALID']        

In [93]:
# Testing
print check_FR_TO('10/09/1992','12:00:00','10/09/1991','12:00:00',date_format,time_format)
print check_FR_TO('','','10/09/1991','12:00:00',date_format,time_format)
print check_FR_TO('10/09/1991','12:00:00','','',date_format,time_format)

['INVALID', 'INVALID', 'INVALID', 'INVALID']
['NULL', 'NULL', 'NULL', 'NULL']
['INVALID', 'INVALID', 'NULL', 'NULL']


In [94]:
from_to_tuple = lines.map(lambda x:x[1:5])
checkResults_from_to = from_to_tuple.map(lambda x: check_FR_TO(x[0],x[1],x[2],x[3],date_format,time_format))

## OFNS_DESC 

In [103]:
OFNS_DESC = lines.map(lambda x: x[colname2idx['OFNS_DESC']])
OFNS_DESC.countByValue()

defaultdict(int,
            {u'': 18840,
             u'ABORTION': 4,
             u'ADMINISTRATIVE CODE': 11383,
             u'ADMINISTRATIVE CODES': 18,
             u'AGRICULTURE & MRKTS LAW-UNCLASSIFIED': 83,
             u'ALCOHOLIC BEVERAGE CONTROL LAW': 750,
             u'ANTICIPATORY OFFENSES': 95,
             u'ARSON': 13984,
             u'ASSAULT 3 & RELATED OFFENSES': 521538,
             u"BURGLAR'S TOOLS": 2477,
             u'BURGLARY': 191406,
             u'CHILD ABANDONMENT/NON SUPPORT': 367,
             u'CRIMINAL MISCHIEF & RELATED OF': 505774,
             u'CRIMINAL TRESPASS': 66544,
             u'DANGEROUS DRUGS': 348469,
             u'DANGEROUS WEAPONS': 124235,
             u'DISORDERLY CONDUCT': 829,
             u'DISRUPTION OF A RELIGIOUS SERV': 47,
             u'ENDAN WELFARE INCOMP': 144,
             u'ESCAPE 3': 167,
             u'FELONY ASSAULT': 184069,
             u'FORGERY': 49303,
             u'FORTUNE TELLING': 11,
             u'FRAUDS'