In [None]:
import os
exec(open(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py')).read())

#### Add the config file within the spark context

In [2]:
sc.addFile('/Users/sumangangopadhyay/complex-spark-transformations/config.py')

#### Import the relevant libraries

In [3]:
import config as cf
from pyspark.sql import functions as func

#### Get the relevant configurations in variables

In [4]:
data_path = cf.data_path()
primary_response_variables = cf.primary_response_variables().split(',')
secondary_response_variables = cf.secondary_response_variables().split(',')
primary_explanatory_variables = cf.primary_explanatory_variables().split(',')

#### Read the data

In [5]:
df = spark.read.csv(data_path, header=True)

#### Remove the spaces from the column names so that it's easier to use the columns later on

In [6]:
df_with_no_spaces_in_colm_names = df.select([func.col(col).alias(col.replace(' ', '_')) for col in df.columns])

#### Get the count of distinct values of the attributes which form the response variables ( In statistical terms, response variables are the variables on the y-axis, i.e. the variables whose variations are being observed)

In [7]:
unique_count_of_primary_response_variables = df_with_no_spaces_in_colm_names\
.select([func.countDistinct(col)\
         .alias('unique_'+ col) for col in primary_response_variables])

In [8]:
unique_count_of_primary_response_variables.show()

21/10/16 18:52:38 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------------------------+-----------------+---------------------+------------------+---------------------------+---------------------+
|unique_Registration_State|unique_Plate_Type|unique_Violation_Code|unique_Law_Section|unique_Violation_Legal_Code|unique_Issuing_Agency|
+-------------------------+-----------------+---------------------+------------------+---------------------------+---------------------+
|                       67|               86|                  100|                 8|                          4|                   17|
+-------------------------+-----------------+---------------------+------------------+---------------------------+---------------------+



                                                                                

In [9]:
unique_count_of_secondary_response_variables = df_with_no_spaces_in_colm_names\
.select([func.countDistinct(col)\
         .alias('unique_'+ col) for col in secondary_response_variables])

In [10]:
unique_count_of_secondary_response_variables.show()



+-----------------------+-------------------+-------------------+
|unique_Violation_County|unique_Issuer_Squad|unique_Vehicle_Year|
+-----------------------+-------------------+-------------------+
|                     12|                 49|                100|
+-----------------------+-------------------+-------------------+



                                                                                

#### Check the NaNs and Nulls in the explanatory variables. These variables typically go in the x-axis. Statistically, we are interested in the extent to which the variation in the response variables are associated with the variation in these variables

In [11]:
nan_null_count_in_primary_explanatory_variables = df_with_no_spaces_in_colm_names\
.select([func.count(func.when(func.isnan(col) | func.col(col).isNull(), col))\
         .alias('null_nan_count_'+ col) for col in primary_explanatory_variables])

In [12]:
nan_null_count_in_primary_explanatory_variables.show()

[Stage 7:>                                                        (0 + 16) / 16]

+-------------------------+-----------------------------+
|null_nan_count_Issue_Date|null_nan_count_Violation_Time|
+-------------------------+-----------------------------+
|                        0|                           63|
+-------------------------+-----------------------------+



                                                                                

#### Creating a new categorical explanatory variable (Categorical variables are factors with 2 or more levels, e.g. a rainbow is a factor with 7 levels)

In [13]:
df_with_no_spaces_in_colm_names = df_with_no_spaces_in_colm_names\
.withColumn('Violation_AM_or_PM', \
            func.when(func.isnan(df_with_no_spaces_in_colm_names.Violation_Time) \
                      | func.col('Violation_Time').isNull()\
                      , func.lit(None))\
            .otherwise(func.substring(df_with_no_spaces_in_colm_names.Violation_Time,5,1)))

#### Verify that the new column has been populated correctly

In [14]:
df_with_no_spaces_in_colm_names\
.select('Violation_Time','Violation_AM_or_PM')\
.filter(df_with_no_spaces_in_colm_names.Violation_Time.isNotNull())\
.show(10)

+--------------+------------------+
|Violation_Time|Violation_AM_or_PM|
+--------------+------------------+
|         0143A|                 A|
|         0400P|                 P|
|         0233P|                 P|
|         1120A|                 A|
|         0555P|                 P|
|         0852P|                 P|
|         0215A|                 A|
|         0758A|                 A|
|         1005A|                 A|
|         0845A|                 A|
+--------------+------------------+
only showing top 10 rows



In [15]:
df_with_no_spaces_in_colm_names\
.select('Violation_Time','Violation_AM_or_PM')\
.filter(df_with_no_spaces_in_colm_names.Violation_Time.isNull())\
.show(10)

[Stage 11:>                                                         (0 + 4) / 4]

+--------------+------------------+
|Violation_Time|Violation_AM_or_PM|
+--------------+------------------+
|          null|              null|
|          null|              null|
|          null|              null|
|          null|              null|
|          null|              null|
|          null|              null|
|          null|              null|
|          null|              null|
|          null|              null|
|          null|              null|
+--------------+------------------+
only showing top 10 rows



                                                                                

In [16]:
df_with_no_spaces_in_colm_names\
.select('Violation_Time','Violation_AM_or_PM')\
.filter(func.isnan(df_with_no_spaces_in_colm_names.Violation_Time))\
.show(10)

[Stage 14:>                                                       (0 + 11) / 11]

+--------------+------------------+
|Violation_Time|Violation_AM_or_PM|
+--------------+------------------+
+--------------+------------------+





#### Convert the dates to a standard YYYY-MM-DD format so that it becomes easy to extract the weeks, months and years from it

In [17]:
df_with_no_spaces_in_colm_names = df_with_no_spaces_in_colm_names\
.withColumn('Issue_Date_standardized', \
            func.date_format(func.unix_timestamp('Issue_Date','MM/dd/yyyy').cast('timestamp'), 'yyyy-MM-dd'))

In [18]:
df_with_no_spaces_in_colm_names.select('Issue_Date','Issue_Date_standardized').show(10)

+----------+-----------------------+
|Issue_Date|Issue_Date_standardized|
+----------+-----------------------+
|07/10/2016|             2016-07-10|
|07/08/2016|             2016-07-08|
|08/23/2016|             2016-08-23|
|06/14/2017|             2017-06-14|
|11/21/2016|             2016-11-21|
|06/13/2017|             2017-06-13|
|08/03/2016|             2016-08-03|
|12/21/2016|             2016-12-21|
|11/21/2016|             2016-11-21|
|10/05/2016|             2016-10-05|
+----------+-----------------------+
only showing top 10 rows



#### Now drop the original column and rename this new column as the original one

In [19]:
df_with_no_spaces_in_colm_names = df_with_no_spaces_in_colm_names\
.drop('Issue_Date')\
.withColumnRenamed('Issue_Date_standardized','Issue_Date')

In [20]:
df_with_no_spaces_in_colm_names.select('Issue_Date').show(10)

+----------+
|Issue_Date|
+----------+
|2016-07-10|
|2016-07-08|
|2016-08-23|
|2017-06-14|
|2016-11-21|
|2017-06-13|
|2016-08-03|
|2016-12-21|
|2016-11-21|
|2016-10-05|
+----------+
only showing top 10 rows



#### Now let's create some more categorical variables from the Issue Date column

In [21]:
df_with_no_spaces_in_colm_names = df_with_no_spaces_in_colm_names\
.withColumn('Issue_Year', func.year('Issue_Date'))

df_with_no_spaces_in_colm_names = df_with_no_spaces_in_colm_names\
.withColumn('Issue_Month', func.month('Issue_Date'))

df_with_no_spaces_in_colm_names = df_with_no_spaces_in_colm_names\
.withColumn('Issue_DayofWeek', func.dayofweek('Issue_Date'))

df_with_no_spaces_in_colm_names = df_with_no_spaces_in_colm_names\
.withColumn('Issue_DayofMonth', func.dayofmonth('Issue_Date'))

In [22]:
df_with_no_spaces_in_colm_names.select('Issue_Date','Issue_Year',\
                                       'Issue_Month','Issue_DayofWeek','Issue_DayofMonth').show(10)

+----------+----------+-----------+---------------+----------------+
|Issue_Date|Issue_Year|Issue_Month|Issue_DayofWeek|Issue_DayofMonth|
+----------+----------+-----------+---------------+----------------+
|2016-07-10|      2016|          7|              1|              10|
|2016-07-08|      2016|          7|              6|               8|
|2016-08-23|      2016|          8|              3|              23|
|2017-06-14|      2017|          6|              4|              14|
|2016-11-21|      2016|         11|              2|              21|
|2017-06-13|      2017|          6|              3|              13|
|2016-08-03|      2016|          8|              4|               3|
|2016-12-21|      2016|         12|              4|              21|
|2016-11-21|      2016|         11|              2|              21|
|2016-10-05|      2016|         10|              4|               5|
+----------+----------+-----------+---------------+----------------+
only showing top 10 rows



In [23]:
df_with_no_spaces_in_colm_names.select('Days_Parking_In_Effect____','From_Hours_In_Effect','To_Hours_In_Effect').show(10)

+--------------------------+--------------------+------------------+
|Days_Parking_In_Effect____|From_Hours_In_Effect|To_Hours_In_Effect|
+--------------------------+--------------------+------------------+
|                      null|                null|              null|
|                      null|                null|              null|
|                      null|                null|              null|
|                         Y|               0700A|             0700P|
|                         Y|               0700A|             0700P|
|                      null|                null|              null|
|                   BBBBBBB|                 ALL|               ALL|
|                      null|                null|              null|
|                      null|                null|              null|
|                      null|                null|              null|
+--------------------------+--------------------+------------------+
only showing top 10 rows



In [24]:
df_with_no_spaces_in_colm_names\
.select([func.count(func.when(func.isnan(col) | func.col(col).isNull(), col))\
         .alias('null_nan_count_'+ col) for col in \
        ['Days_Parking_In_Effect____','From_Hours_In_Effect','To_Hours_In_Effect']]).show()

[Stage 19:===>                                                    (1 + 15) / 16]

+-----------------------------------------+-----------------------------------+---------------------------------+
|null_nan_count_Days_Parking_In_Effect____|null_nan_count_From_Hours_In_Effect|null_nan_count_To_Hours_In_Effect|
+-----------------------------------------+-----------------------------------+---------------------------------+
|                                  2712416|                            5450946|                          5450943|
+-----------------------------------------+-----------------------------------+---------------------------------+





In [25]:
df_with_no_spaces_in_colm_names\
.select('Violation_In_Front_Of_Or_Opposite','Unregistered_Vehicle?','Feet_From_Curb').show(10)

+---------------------------------+---------------------+--------------+
|Violation_In_Front_Of_Or_Opposite|Unregistered_Vehicle?|Feet_From_Curb|
+---------------------------------+---------------------+--------------+
|                             null|                 null|             0|
|                             null|                 null|             0|
|                             null|                 null|             0|
|                                O|                 null|             0|
|                                F|                 null|             0|
|                             null|                 null|             0|
|                                F|                    0|             1|
|                             null|                 null|             0|
|                             null|                 null|             0|
|                             null|                 null|             0|
+---------------------------------+----------------

In [26]:
df_with_no_spaces_in_colm_names\
.select([func.count(func.when(func.isnan(col) | func.col(col).isNull(), col))\
         .alias('null_nan_count_'+ col) for col in \
        ['Violation_In_Front_Of_Or_Opposite','Unregistered_Vehicle?','Feet_From_Curb']]).show()

[Stage 22:===>                                                    (1 + 15) / 16]

+------------------------------------------------+------------------------------------+-----------------------------+
|null_nan_count_Violation_In_Front_Of_Or_Opposite|null_nan_count_Unregistered_Vehicle?|null_nan_count_Feet_From_Curb|
+------------------------------------------------+------------------------------------+-----------------------------+
|                                         2161235|                             9675432|                            0|
+------------------------------------------------+------------------------------------+-----------------------------+



                                                                                

In [27]:
df_with_no_spaces_in_colm_names\
.select('No_Standing_or_Stopping_Violation','Hydrant_Violation','Double_Parking_Violation').show(10)

+---------------------------------+-----------------+------------------------+
|No_Standing_or_Stopping_Violation|Hydrant_Violation|Double_Parking_Violation|
+---------------------------------+-----------------+------------------------+
|                             null|             null|                    null|
|                             null|             null|                    null|
|                             null|             null|                    null|
|                             null|             null|                    null|
|                             null|             null|                    null|
|                             null|             null|                    null|
|                             null|             null|                    null|
|                             null|             null|                    null|
|                             null|             null|                    null|
|                             null|             null

In [28]:
df_with_no_spaces_in_colm_names\
.select([func.count(func.when(func.isnan(col) | func.col(col).isNull(), col))\
         .alias('null_nan_count_'+ col) for col in \
        ['No_Standing_or_Stopping_Violation','Hydrant_Violation','Double_Parking_Violation']]).show()

[Stage 25:>                                                       (0 + 16) / 16]

+------------------------------------------------+--------------------------------+---------------------------------------+
|null_nan_count_No_Standing_or_Stopping_Violation|null_nan_count_Hydrant_Violation|null_nan_count_Double_Parking_Violation|
+------------------------------------------------+--------------------------------+---------------------------------------+
|                                        10803028|                        10803028|                               10803028|
+------------------------------------------------+--------------------------------+---------------------------------------+



[Stage 25:===>                                                    (1 + 15) / 16]                                                                                

#### Based on the information above, discarding the relevant columns from any transformations since they have significant number of Nulls/NaNs

In [29]:
df_with_no_spaces_in_colm_names = df_with_no_spaces_in_colm_names\
.drop('Days_Parking_In_Effect____'\
      ,'From_Hours_In_Effect'\
      ,'To_Hours_In_Effect'\
      ,'Unregistered_Vehicle?'\
      ,'No_Standing_or_Stopping_Violation'\
      ,'Hydrant_Violation'\
      ,'Double_Parking_Violation'
     )

In [30]:
df_with_no_spaces_in_colm_names.printSchema()

root
 |-- Summons_Number: string (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Plate_Type: string (nullable = true)
 |-- Violation_Code: string (nullable = true)
 |-- Vehicle_Body_Type: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Issuing_Agency: string (nullable = true)
 |-- Street_Code1: string (nullable = true)
 |-- Street_Code2: string (nullable = true)
 |-- Street_Code3: string (nullable = true)
 |-- Vehicle_Expiration_Date: string (nullable = true)
 |-- Violation_Location: string (nullable = true)
 |-- Violation_Precinct: string (nullable = true)
 |-- Issuer_Precinct: string (nullable = true)
 |-- Issuer_Code: string (nullable = true)
 |-- Issuer_Command: string (nullable = true)
 |-- Issuer_Squad: string (nullable = true)
 |-- Violation_Time: string (nullable = true)
 |-- Time_First_Observed: string (nullable = true)
 |-- Violation_County: string (nullable = true)
 |-- Violation_In_Fr