# Data Exploration (Part III - Feature Building)
## Intro
Okay, I'm starting to get the hang of Spark. I've totally abused the Spark SQL capabilities so far, and it's been extremely user friendly doing all this in Jupyter. Honestly, it's not that large of a data set so I haven't had too much complexity navigating the fields so far. There are not many features to build, but going through the date and time section, I could think of a few there that we should pull out for deeper analysis.

Let's set up our environment and load the data as we had it last time again:

In [1]:
# Use findspark package to connect Jupyter to Spark shell
import findspark
findspark.init('/usr/lib/spark')

# Load SparkSession object
import pyspark
from pyspark.sql import SparkSession

# Load other libraries
from datetime import datetime
from pyspark.sql.functions import col, udf, count, isnan, lit, sum, coalesce, concat, to_date, to_timestamp, when, date_format, unix_timestamp
from pyspark.sql.types import DateType
from functools import reduce
import pandas as pd

# Initiate SparkSession as "spark"
spark = SparkSession\
    .builder\
    .getOrCreate()

# Load sql_magic and connect to Spark
%load_ext sql_magic
%config SQL.conn_name = 'spark'

In [2]:
%%time
# Read NYPD Complaint Data
df = spark.read.csv(
    "s3n://2017edmfasatb/nypd_complaints/data/NYPD_Complaint_Data_Historic.csv", 
    header = True, 
    inferSchema = True
)

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 52.3 s


In [3]:
oldColumns = df.schema.names
newColumns = [
    'COMPLAINT_NUMBER',
    'COMPLAINT_START_DATE',
    'COMPLAINT_START_TIME',
    'COMPLAINT_END_DATE',
    'COMPLAINT_END_TIME',
    'REPORTED_DATE',
    'OFFENSE_ID',
    'OFFENSE_DESCRIPTION',
    'OFFENSE_INTERNAL_CODE',
    'OFFENSE_INTERNAL_DESCRIPTION',
    'OFFENSE_RESULT',
    'OFFENSE_LEVEL',
    'JURISDICTION',
    'BOROUGH',
    'PRECINCT',
    'SPECIFIC_LOCATION',
    'PREMISE_DESCRIPTION',
    'PARK_NAME',
    'HOUSING_NAME',
    'X_COORD_NYC',
    'Y_COORD_NYC',
    'LAT',
    'LON',
    'LAT_LON'
]

df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df)
df.printSchema()

root
 |-- COMPLAINT_NUMBER: integer (nullable = true)
 |-- COMPLAINT_START_DATE: string (nullable = true)
 |-- COMPLAINT_START_TIME: string (nullable = true)
 |-- COMPLAINT_END_DATE: string (nullable = true)
 |-- COMPLAINT_END_TIME: string (nullable = true)
 |-- REPORTED_DATE: string (nullable = true)
 |-- OFFENSE_ID: integer (nullable = true)
 |-- OFFENSE_DESCRIPTION: string (nullable = true)
 |-- OFFENSE_INTERNAL_CODE: integer (nullable = true)
 |-- OFFENSE_INTERNAL_DESCRIPTION: string (nullable = true)
 |-- OFFENSE_RESULT: string (nullable = true)
 |-- OFFENSE_LEVEL: string (nullable = true)
 |-- JURISDICTION: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- PRECINCT: integer (nullable = true)
 |-- SPECIFIC_LOCATION: string (nullable = true)
 |-- PREMISE_DESCRIPTION: string (nullable = true)
 |-- PARK_NAME: string (nullable = true)
 |-- HOUSING_NAME: string (nullable = true)
 |-- X_COORD_NYC: integer (nullable = true)
 |-- Y_COORD_NYC: integer (nullable = true)
 

In [4]:
# Drop rows with any NA values in the specified columns
df_na_drop = df.na.drop(subset=[
    'COMPLAINT_START_DATE',
    'COMPLAINT_START_TIME',
    'OFFENSE_DESCRIPTION',
    'OFFENSE_RESULT',
    'BOROUGH',
    'PRECINCT',
    'LAT',
    'LON'
])

In [5]:
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_DATE', coalesce(df_na_drop['COMPLAINT_END_DATE'], df_na_drop['COMPLAINT_START_DATE']))
df_na_drop = df_na_drop.withColumn('COMPLAINT_END_TIME', coalesce(df_na_drop['COMPLAINT_END_TIME'], df_na_drop['COMPLAINT_START_TIME'])) 

In [6]:
# Combine date and time fields and create new timestamp field for COMPLAINT fields
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_START_TIMESTAMP', 
    to_timestamp(
        concat(df_na_drop['COMPLAINT_START_DATE'], lit(' '), df_na_drop['COMPLAINT_START_TIME']),
        'MM/dd/yyyy HH:mm:ss'
    )
)

df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_END_TIMESTAMP', 
    to_timestamp(
        concat(df_na_drop['COMPLAINT_END_DATE'], lit(' '), df_na_drop['COMPLAINT_END_TIME']),
        'MM/dd/yyyy HH:mm:ss'
    )
)

# Convert REPORTED_DATE
df_na_drop = df_na_drop.withColumn(
    'REPORTED_DATE_TIMESTAMP', 
    to_timestamp(
        df_na_drop['REPORTED_DATE'],
        'MM/dd/yyyy'
    )
)

In [7]:
# List of crimes to keep
crimes_to_keep = [
    'PETIT LARCENY',
    'HARRASSMENT 2',
    'ASSAULT 3 & RELATED OFFENSES',
    'CRIMINAL MISCHIEF & RELATED OF',
    'GRAND LARCENY',
    'OFF. AGNST PUB ORD SENSBLTY &',
    'DANGEROUS DRUGS',
    'ROBBERY',
    'BURGLUARY',
    'FELONY ASSAULT'
]

# Anything not in the list becomes 'OTHER'
df_na_drop = df_na_drop.withColumn(
    'OFFENSE_DESCRIPTION', 
    when(df_na_drop['OFFENSE_DESCRIPTION'].isin(crimes_to_keep), df_na_drop['OFFENSE_DESCRIPTION']).otherwise('OTHER')
)

In [8]:
# List of premises to keep
premises_to_keep = [
    'STREET',
    'RESIDENCE - APT. HOUSE',
    'RESIDENCE-HOUSE',
    'RESIDENCE - PUBLIC HOUSING',
    'COMMERCIAL BUILDING',
    'DEPARTMENT STORE',
    'TRANSIT - NYC SUBWAY',
    'CHAIN STORE',
    'PUBLIC SCHOOL',
    'GROCERY/BODEGA',
    'RESTAURANT/DINER',
    'BAR/NIGHT CLUB',
    'PARK/PLAYGROUND'
]

# Anything not in the list becomes 'OTHER'
df_na_drop = df_na_drop.withColumn(
    'PREMISE_DESCRIPTION', 
    when(df_na_drop['PREMISE_DESCRIPTION'].isin(premises_to_keep), df_na_drop['PREMISE_DESCRIPTION']).otherwise('OTHER')
)

In [9]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")

In [10]:
%%read_sql
SELECT * FROM df_na_drop LIMIT 10;

Query started at 06:52:51 AM UTC; Query executed in 0.02 m

Unnamed: 0,COMPLAINT_NUMBER,COMPLAINT_START_DATE,COMPLAINT_START_TIME,COMPLAINT_END_DATE,COMPLAINT_END_TIME,REPORTED_DATE,OFFENSE_ID,OFFENSE_DESCRIPTION,OFFENSE_INTERNAL_CODE,OFFENSE_INTERNAL_DESCRIPTION,...,PARK_NAME,HOUSING_NAME,X_COORD_NYC,Y_COORD_NYC,LAT,LON,LAT_LON,COMPLAINT_START_TIMESTAMP,COMPLAINT_END_TIMESTAMP,REPORTED_DATE_TIMESTAMP
0,101109527,12/31/2015,23:45:00,12/31/2015,23:45:00,12/31/2015,113,OTHER,729.0,"FORGERY,ETC.,UNCLASSIFIED-FELO",...,,,1007314,241257,40.828848,-73.916661,"(40.828848333, -73.916661142)",2015-12-31 23:45:00,2015-12-31 23:45:00,2015-12-31
1,153401121,12/31/2015,23:36:00,12/31/2015,23:36:00,12/31/2015,101,OTHER,,,...,,,1043991,193406,40.697338,-73.784557,"(40.697338138, -73.784556739)",2015-12-31 23:36:00,2015-12-31 23:36:00,2015-12-31
2,569369778,12/31/2015,23:30:00,12/31/2015,23:30:00,12/31/2015,117,DANGEROUS DRUGS,503.0,"CONTROLLED SUBSTANCE,INTENT TO",...,,,999463,231690,40.802607,-73.945052,"(40.802606608, -73.945051911)",2015-12-31 23:30:00,2015-12-31 23:30:00,2015-12-31
3,968417082,12/31/2015,23:30:00,12/31/2015,23:30:00,12/31/2015,344,ASSAULT 3 & RELATED OFFENSES,101.0,ASSAULT 3,...,,,1060183,177862,40.654549,-73.726339,"(40.654549444, -73.726338791)",2015-12-31 23:30:00,2015-12-31 23:30:00,2015-12-31
4,641637920,12/31/2015,23:25:00,12/31/2015,23:30:00,12/31/2015,344,ASSAULT 3 & RELATED OFFENSES,101.0,ASSAULT 3,...,,,987606,208148,40.738002,-73.987891,"(40.7380024, -73.98789129)",2015-12-31 23:25:00,2015-12-31 23:30:00,2015-12-31
5,365661343,12/31/2015,23:18:00,12/31/2015,23:25:00,12/31/2015,106,FELONY ASSAULT,109.0,"ASSAULT 2,1,UNCLASSIFIED",...,,,996149,181562,40.665023,-73.957111,"(40.665022689, -73.957110763)",2015-12-31 23:18:00,2015-12-31 23:25:00,2015-12-31
6,608231454,12/31/2015,23:15:00,12/31/2015,23:15:00,12/31/2015,235,DANGEROUS DRUGS,511.0,"CONTROLLED SUBSTANCE, POSSESSI",...,,,987373,201662,40.7202,-73.988735,"(40.720199996, -73.988735082)",2015-12-31 23:15:00,2015-12-31 23:15:00,2015-12-31
7,265023856,12/31/2015,23:15:00,12/31/2015,23:15:00,12/31/2015,118,OTHER,792.0,WEAPONS POSSESSION 1 & 2,...,,,1009041,247401,40.845707,-73.910398,"(40.845707148, -73.910398033)",2015-12-31 23:15:00,2015-12-31 23:15:00,2015-12-31
8,989238731,12/31/2015,23:15:00,12/31/2015,23:30:00,12/31/2015,344,ASSAULT 3 & RELATED OFFENSES,101.0,ASSAULT 3,...,,,1014154,251416,40.856711,-73.8919,"(40.856711291, -73.891899956)",2015-12-31 23:15:00,2015-12-31 23:30:00,2015-12-31
9,415095955,12/31/2015,23:10:00,12/31/2015,23:10:00,12/31/2015,341,PETIT LARCENY,338.0,"LARCENY,PETIT FROM BUILDING,UN",...,,,994327,218211,40.765618,-73.963623,"(40.765617688, -73.96362342)",2015-12-31 23:10:00,2015-12-31 23:10:00,2015-12-31


## Timestamp Columns
Remember, in the last post, we cleaned up the **COMPLAINT\_START**, **COMPLAINT\_END**, and **REPORTED\_DATE** columns. Through these 3 fields, alone, I can probably think of 5 or 6 features off the top of my head that we should build:
- Year, Month, Day, Day Of Week, and Hour of each of these dates (no hour for REPORTED\_DATE)
- Flag of whether incident spanned any amount of time (vs a one-time incident where COMPLAINT\_START = COMPLAINT\_END)
- Length of the incident, if not one-time
- How far after the incident started and ended was the event reported

That will actually give us 18 new features haha... actually, I'm not going to do the datetime fields breakdown of COMPLAINT\_END or REPORTED\_DATE, these don't really matter as much as when the incident actually started. This leaves us with 9 new features - I'll take it.

### Date / Time Fields For COMPLAINT\_START

In [11]:
# Set UDFs to extract specific parts of date and time
extract_year =  udf(lambda x: x.year)
extract_month =  udf(lambda x: x.month)
extract_day =  udf(lambda x: x.day)
extract_hour =  udf(lambda x: x.hour)

# Perform transformation
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_YEAR', extract_year(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_MONTH', extract_month(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_DAY', extract_day(col('COMPLAINT_START_TIMESTAMP')))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_WEEKDAY', date_format(col('COMPLAINT_START_TIMESTAMP'), 'E'))
df_na_drop = df_na_drop.withColumn('COMPLAINT_START_TIMESTAMP_HOUR', extract_hour(col('COMPLAINT_START_TIMESTAMP')))

In [12]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")

In [13]:
%%read_sql
SELECT 
    COMPLAINT_START_TIMESTAMP,
    COMPLAINT_START_TIMESTAMP_YEAR,
    COMPLAINT_START_TIMESTAMP_MONTH,
    COMPLAINT_START_TIMESTAMP_DAY,
    COMPLAINT_START_TIMESTAMP_WEEKDAY,
    COMPLAINT_START_TIMESTAMP_HOUR
FROM df_na_drop 
LIMIT 10;

Query started at 06:52:52 AM UTC; Query executed in 0.04 m

Unnamed: 0,COMPLAINT_START_TIMESTAMP,COMPLAINT_START_TIMESTAMP_YEAR,COMPLAINT_START_TIMESTAMP_MONTH,COMPLAINT_START_TIMESTAMP_DAY,COMPLAINT_START_TIMESTAMP_WEEKDAY,COMPLAINT_START_TIMESTAMP_HOUR
0,2015-12-31 23:45:00,2015,12,31,Thu,23
1,2015-12-31 23:36:00,2015,12,31,Thu,23
2,2015-12-31 23:30:00,2015,12,31,Thu,23
3,2015-12-31 23:30:00,2015,12,31,Thu,23
4,2015-12-31 23:25:00,2015,12,31,Thu,23
5,2015-12-31 23:18:00,2015,12,31,Thu,23
6,2015-12-31 23:15:00,2015,12,31,Thu,23
7,2015-12-31 23:15:00,2015,12,31,Thu,23
8,2015-12-31 23:15:00,2015,12,31,Thu,23
9,2015-12-31 23:10:00,2015,12,31,Thu,23


Perfect. Let's move onto the next task.

### Incident Length Flag & Incident Length
Very simple here. I want to
1. I want to calculate how long the incident lasted
2. If the incident was a single instance in time, I want to flag it in some way in another column

In [14]:
# Take the difference between start and end, expressed in minutes
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_LENGTH', 
    (unix_timestamp(df_na_drop['COMPLAINT_END_TIMESTAMP']) - unix_timestamp(df_na_drop['COMPLAINT_START_TIMESTAMP']))/60
)

In [15]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")

In [16]:
%%read_sql
SELECT 
    COMPLAINT_START_TIMESTAMP,
    COMPLAINT_END_TIMESTAMP,
    COMPLAINT_LENGTH
FROM df_na_drop 
LIMIT 10;

Query started at 06:52:55 AM UTC; Query executed in 0.01 m

Unnamed: 0,COMPLAINT_START_TIMESTAMP,COMPLAINT_END_TIMESTAMP,COMPLAINT_LENGTH
0,2015-12-31 23:45:00,2015-12-31 23:45:00,0.0
1,2015-12-31 23:36:00,2015-12-31 23:36:00,0.0
2,2015-12-31 23:30:00,2015-12-31 23:30:00,0.0
3,2015-12-31 23:30:00,2015-12-31 23:30:00,0.0
4,2015-12-31 23:25:00,2015-12-31 23:30:00,5.0
5,2015-12-31 23:18:00,2015-12-31 23:25:00,7.0
6,2015-12-31 23:15:00,2015-12-31 23:15:00,0.0
7,2015-12-31 23:15:00,2015-12-31 23:15:00,0.0
8,2015-12-31 23:15:00,2015-12-31 23:30:00,15.0
9,2015-12-31 23:10:00,2015-12-31 23:10:00,0.0


In [17]:
# If COMPLAINT_LENGTH = 0, we flag with a new boolean column COMPLAINT_LENGTH_ZERO_TIME
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_LENGTH_ZERO_TIME', when(df_na_drop['COMPLAINT_LENGTH'] == 0, True).otherwise(False)
)

In [18]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")

In [19]:
%%read_sql
SELECT 
    COMPLAINT_LENGTH,
    COMPLAINT_LENGTH_ZERO_TIME
FROM df_na_drop 
LIMIT 10;

Query started at 06:52:56 AM UTC; Query executed in 0.01 m

Unnamed: 0,COMPLAINT_LENGTH,COMPLAINT_LENGTH_ZERO_TIME
0,0.0,True
1,0.0,True
2,0.0,True
3,0.0,True
4,5.0,False
5,7.0,False
6,0.0,True
7,0.0,True
8,15.0,False
9,0.0,True


### Lag Time Between Incident And Report Date
This one is kinda weird because report date doesn't have a time component in this dataset, only a date. We'll take the difference between both the COMPLAINT\_START and COMPLAINT\_END dates with the REPORTED\_DATE. My assumption here is that the incident always happens first and then it is reported when it is completely finished. This may be a wrong assumption, but I'll make this assumption for the sake of the subtraction and we will allow negative values if the complaint can be reported before the incident actually finished.

In [20]:
# Take the difference between start and reported, expressed in days
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_START_REPORTED_LAG', 
    (unix_timestamp(df_na_drop['REPORTED_DATE_TIMESTAMP']) - unix_timestamp(to_date(df_na_drop['COMPLAINT_START_TIMESTAMP'])))/60/60/24
)

# Take the difference between end and reported, expressed in days
df_na_drop = df_na_drop.withColumn(
    'COMPLAINT_END_REPORTED_LAG', 
    (unix_timestamp(df_na_drop['REPORTED_DATE_TIMESTAMP']) - unix_timestamp(to_date(df_na_drop['COMPLAINT_END_TIMESTAMP'])))/60/60/24
)

In [21]:
# Add table to SQL Context
df_na_drop.createOrReplaceTempView("df_na_drop")

In [22]:
%%read_sql
SELECT 
    COMPLAINT_START_TIMESTAMP,
    COMPLAINT_END_TIMESTAMP,
    REPORTED_DATE_TIMESTAMP,
    COMPLAINT_START_REPORTED_LAG,
    COMPLAINT_END_REPORTED_LAG
FROM df_na_drop 
LIMIT 10;

Query started at 06:52:57 AM UTC; Query executed in 0.01 m

Unnamed: 0,COMPLAINT_START_TIMESTAMP,COMPLAINT_END_TIMESTAMP,REPORTED_DATE_TIMESTAMP,COMPLAINT_START_REPORTED_LAG,COMPLAINT_END_REPORTED_LAG
0,2015-12-31 23:45:00,2015-12-31 23:45:00,2015-12-31,0.0,0.0
1,2015-12-31 23:36:00,2015-12-31 23:36:00,2015-12-31,0.0,0.0
2,2015-12-31 23:30:00,2015-12-31 23:30:00,2015-12-31,0.0,0.0
3,2015-12-31 23:30:00,2015-12-31 23:30:00,2015-12-31,0.0,0.0
4,2015-12-31 23:25:00,2015-12-31 23:30:00,2015-12-31,0.0,0.0
5,2015-12-31 23:18:00,2015-12-31 23:25:00,2015-12-31,0.0,0.0
6,2015-12-31 23:15:00,2015-12-31 23:15:00,2015-12-31,0.0,0.0
7,2015-12-31 23:15:00,2015-12-31 23:15:00,2015-12-31,0.0,0.0
8,2015-12-31 23:15:00,2015-12-31 23:30:00,2015-12-31,0.0,0.0
9,2015-12-31 23:10:00,2015-12-31 23:10:00,2015-12-31,0.0,0.0


Okay. I think we have our final dataframe! I'm not really sure what other features I can engineer at this point without getting deeper into actual analysis, so let's save this dataframe back out to S3 as a temporary store.

In [24]:
df_na_drop.printSchema()

root
 |-- COMPLAINT_NUMBER: integer (nullable = true)
 |-- COMPLAINT_START_DATE: string (nullable = true)
 |-- COMPLAINT_START_TIME: string (nullable = true)
 |-- COMPLAINT_END_DATE: string (nullable = true)
 |-- COMPLAINT_END_TIME: string (nullable = true)
 |-- REPORTED_DATE: string (nullable = true)
 |-- OFFENSE_ID: integer (nullable = true)
 |-- OFFENSE_DESCRIPTION: string (nullable = true)
 |-- OFFENSE_INTERNAL_CODE: integer (nullable = true)
 |-- OFFENSE_INTERNAL_DESCRIPTION: string (nullable = true)
 |-- OFFENSE_RESULT: string (nullable = true)
 |-- OFFENSE_LEVEL: string (nullable = true)
 |-- JURISDICTION: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- PRECINCT: integer (nullable = true)
 |-- SPECIFIC_LOCATION: string (nullable = true)
 |-- PREMISE_DESCRIPTION: string (nullable = true)
 |-- PARK_NAME: string (nullable = true)
 |-- HOUSING_NAME: string (nullable = true)
 |-- X_COORD_NYC: integer (nullable = true)
 |-- Y_COORD_NYC: integer (nullable = true)
 

In [25]:
df_clean = df_na_drop[[
    'COMPLAINT_NUMBER', 
    'COMPLAINT_START_TIMESTAMP',
    'COMPLAINT_END_TIMESTAMP',
    'REPORTED_DATE_TIMESTAMP',
    'COMPLAINT_START_TIMESTAMP_YEAR',
    'COMPLAINT_START_TIMESTAMP_MONTH',
    'COMPLAINT_START_TIMESTAMP_DAY',
    'COMPLAINT_START_TIMESTAMP_WEEKDAY',
    'COMPLAINT_START_TIMESTAMP_HOUR',
    'COMPLAINT_LENGTH',
    'COMPLAINT_LENGTH_ZERO_TIME',
    'COMPLAINT_START_REPORTED_LAG',
    'COMPLAINT_END_REPORTED_LAG',
    'OFFENSE_DESCRIPTION',
    'OFFENSE_RESULT',
    'OFFENSE_LEVEL',
    'JURISDICTION',
    'BOROUGH',
    'PRECINCT',
    'SPECIFIC_LOCATION',
    'PREMISE_DESCRIPTION',
    'LAT',
    'LON'
]]

In [26]:
# Save CSV back to S3
df_clean.write.parquet('s3n://2017edmfasatb/nypd_complaints/data/df_clean.parquet')

Py4JJavaError: An error occurred while calling o266.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:509)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 8.0 failed 4 times, most recent failure: Lost task 8.3 in stage 8.0 (TID 33, ip-10-0-0-154.ec2.internal, executor 2): org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
    for obj in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 209, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-11-ca9fdfbab274>", line 2, in <lambda>
AttributeError: 'NoneType' object has no attribute 'year'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
	... 8 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1569)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1557)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1556)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1556)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:815)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:815)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:815)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1784)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1739)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1728)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:631)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:188)
	... 45 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
    for obj in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/serializers.py", line 209, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/mnt/yarn/usercache/hadoop/appcache/application_1504379514127_0006/container_1504379514127_0006_01_000005/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-11-ca9fdfbab274>", line 2, in <lambda>
AttributeError: 'NoneType' object has no attribute 'year'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
	... 8 more
