In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Step 3 Data Preparation').getOrCreate()

In [2]:
df = spark.read.csv('Department 11 Field Interviews.csv', header=True, inferSchema=True)

<b>3.1</b><br/>
The columns have to be renamed

In [3]:
from functools import reduce

In [4]:
old_col = df.schema.names #original column headings
new_col = [i for i in df.head(1)[0]] #new column headings
df = reduce(lambda df, idx: df.withColumnRenamed(old_col[idx], new_col[idx]), range(len(old_col)), df) #replace with lambda function

Now the first row is the same as the header, and can be dropped using 'where' clause

In [5]:
df = df.where("SEQ_NUM != 'SEQ_NUM'")

Now we can drop unneccesary fields

In [6]:
df = df[['SEX', 'PRIORS', 'DESCRIPTION', 'COMPLEXION', 'FIOFS_TYPE', 'TERRORISM', 'SEARCH', 'BASIS', 'STOP_REASONS', 'FIOFS_REASONS',
         'OUTCOME', 'VEH_OCCUPANT', 'ETHNICITY', 'AGE_AT_FIO_CORRECTED']]

In [7]:
df.show(5)

+----+------+-----------+----------+----------+---------+------+--------------------+---------------+--------------------+-------+------------+---------+--------------------+
| SEX|PRIORS|DESCRIPTION|COMPLEXION|FIOFS_TYPE|TERRORISM|SEARCH|               BASIS|   STOP_REASONS|       FIOFS_REASONS|OUTCOME|VEH_OCCUPANT|ETHNICITY|AGE_AT_FIO_CORRECTED|
+----+------+-----------+----------+----------+---------+------+--------------------+---------------+--------------------+-------+------------+---------+--------------------+
|MALE|   YES|   B(Black)|       Med|      IOFS|       NO|    VP|      CONSENT SEARCH|  INVESTIGATIVE|DRUGS, INVESTIGATION|    SFO|      DRIVER|     null|                  59|
|MALE|    NO|   W(White)|       Med|        IO|       NO|  null|REASONABLE SUSPICION|     RADIO CALL|   DRUGS, POSSESSION|      F|        null|     null|                  26|
|MALE|   YES|   B(Black)|      Dark|       IOF|       NO|  null|                null|OTHER (SPECIFY)| INVESTIGATE, PERSON|   

3.1 is repeated for the second dataset.

In [8]:
df2 = spark.read.csv('Department 24 Vehicle Stops.csv', header=True, inferSchema=True)

In [9]:
old_col = df2.schema.names #original column headings
new_col = [i for i in df2.head(1)[0]] #new column headings
df2 = reduce(lambda df2, idx: df2.withColumnRenamed(old_col[idx], new_col[idx]), range(len(old_col)), df2) #replace with lambda function

In [10]:
df2 = df2[df2['YEAR OF STOP'] != 'YEAR OF STOP']

In [11]:
df2 = df2[['RACE OF DRIVER', 'GENDER OF DRIVER', 'DRIVER FRISKED?', 'VEHICLE SEARCHED?', 'CITATION ISSUED?', 'AGE OF DRIVER', 
           'REASON FOR STOP']]

In [12]:
df2.show(5)

+--------------+----------------+---------------+-----------------+----------------+-------------+---------------+
|RACE OF DRIVER|GENDER OF DRIVER|DRIVER FRISKED?|VEHICLE SEARCHED?|CITATION ISSUED?|AGE OF DRIVER|REASON FOR STOP|
+--------------+----------------+---------------+-----------------+----------------+-------------+---------------+
|         White|          Female|             No|               No|              No|         null|        No Data|
|         White|            Male|             No|               No|              No|         null|        No Data|
|         White|            Male|             No|               No|              No|         null|        No Data|
|         White|            Male|             No|               No|              No|         null|        No Data|
|         Asian|          Female|            Yes|               No|              No|         null|        No Data|
+--------------+----------------+---------------+-----------------+-------------

<b>3.2</b><br/>
First we replace all codes for null values with NoneType

In [13]:
from pyspark.sql.functions import when, count

In [14]:
missing_values = ["UNKNOWN", "NO DATA ENTERED", -1, "No Data"]
for i in missing_values:
    for j in df.columns: #go through all columns to replace codes with null
        df = df.withColumn(j, when(df[j] == i, None).otherwise(df[j]))
    for j in df2.columns: #repeat for 2nd dataset
        df2 = df2.withColumn(j, when(df2[j] == i, None).otherwise(df2[j]))
df = df.withColumn("AGE_AT_FIO_CORRECTED", when(df["AGE_AT_FIO_CORRECTED"] >= 100, None).otherwise(df["AGE_AT_FIO_CORRECTED"]))
df = df.withColumn("AGE_AT_FIO_CORRECTED", when(df["AGE_AT_FIO_CORRECTED"] <= 10, None).otherwise(df["AGE_AT_FIO_CORRECTED"]))
#replace age values below 10 or over 100 with null

Now we will remove columns with 25% or more empty values

In [15]:
x = []
for c in df.columns:
    x.append((count(when(df[c].isNull(), c))*100.0/df.count()).alias(c))
col_emp = df.select(x).collect()
emp_col = [row.asDict() for row in col_emp] #store each column as key and its emptiness as value
emp_col = emp_col[0]
col_null_g_25p=[i for i in emp_col if emp_col[i] > 25]
df = df.drop(*col_null_g_25p)
df.show()

+----+------+-----------+----------+----------+---------+--------------------+-------+--------------------+
| SEX|PRIORS|DESCRIPTION|COMPLEXION|FIOFS_TYPE|TERRORISM|       FIOFS_REASONS|OUTCOME|AGE_AT_FIO_CORRECTED|
+----+------+-----------+----------+----------+---------+--------------------+-------+--------------------+
|MALE|   YES|   B(Black)|       Med|      IOFS|       NO|DRUGS, INVESTIGATION|    SFO|                  59|
|MALE|    NO|   W(White)|       Med|        IO|       NO|   DRUGS, POSSESSION|      F|                  26|
|MALE|   YES|   B(Black)|      Dark|       IOF|       NO| INVESTIGATE, PERSON|      F|                  18|
|MALE|   YES|   B(Black)|       Med|      IOFS|       NO|                ABDW|      S|                  24|
|MALE|   YES|   B(Black)|     Light|        IO|       NO|                 VAL|      F|                  70|
|MALE|   YES|       null|      Dark|        IO|       NO|ALCOHOL, PUBLIC D...|      F|                  51|
|MALE|   YES|   B(Black)|   

In [16]:
df = df.dropna(how='any')

Now we repeat 3.2 for the second dataset.

In [17]:
x = []
for c in df2.columns:
    x.append((count(when(df2[c].isNull(), c))*100.0/df2.count()).alias(c))
col_emp = df2.select(x).collect()
emp_col = [row.asDict() for row in col_emp] #store each column as key and its emptiness as value
emp_col = emp_col[0]
col_null_g_25p=[i for i in emp_col if emp_col[i] > 25]
df2 = df2.drop(*col_null_g_25p)
df2 = df2.dropna(how='any')

<b>3.3<b/>

In [18]:
df2_ex = spark.read.csv('Department 24 Vehicle Stops.csv', header=True, inferSchema=True)

In [18]:
from pyspark.sql.functions import concat, lit

In [24]:
df2_ex = df2_ex.select("*", concat(df2_ex["LOCATION_LATITUDE"],lit(" "),df2_ex["LOCATION_LONGITUDE"]).alias("COORDINATES"))

In [27]:
df2_ex.select('COORDINATES').collect()

[Row(COORDINATES='LOCATION OF STOP LAT LOCATION OF STOP LONG'),
 Row(COORDINATES='44.97391713 -93.06089457'),
 Row(COORDINATES='44.95211807 -93.15173337'),
 Row(COORDINATES='44.9738881 -93.03039075'),
 Row(COORDINATES='44.97358832 -93.17215188'),
 Row(COORDINATES='44.95987773 -93.06094546'),
 Row(COORDINATES='44.9536711 -93.10112732'),
 Row(COORDINATES='44.95934727 -93.12122586'),
 Row(COORDINATES='44.93079234 -93.08574357'),
 Row(COORDINATES='44.98110238 -93.02019274'),
 Row(COORDINATES='44.95936066 -93.14154747'),
 Row(COORDINATES='44.95916259 -93.07181545'),
 Row(COORDINATES='44.96015434 -93.09197852'),
 Row(COORDINATES='44.95929424 -93.16188682'),
 Row(COORDINATES='44.95211807 -93.15173337'),
 Row(COORDINATES='44.98475702 -93.2027105'),
 Row(COORDINATES='44.96671118 -93.10117378'),
 Row(COORDINATES='44.95212653 -93.13138291'),
 Row(COORDINATES='44.94342881 -93.10668882'),
 Row(COORDINATES='44.91613019 -93.13952579'),
 Row(COORDINATES='44.94864046 -93.1021296'),
 Row(COORDINATES='44

<b>3.4<b/>

In [19]:
col1 = list(df.columns)
col2 = list(df2.columns)

We will add the columns of each dataframe to the other dataframe.

In [21]:
df_new = df
for i in col2:
    df_new = df_new.withColumn(i, lit(None))

In [23]:
df_new2 = df2
for i in col1:
    df_new2 = df_new2.withColumn(i, lit(None))

Now we can merge the dataframes.

In [25]:
merged = df_new.union(df_new2)

In [26]:
merged.show() #in first 20 rows, all fields added from the 2nd dataset have null values

+------+------+----------+----------+---------+--------------------+-------+-----------+--------------------+--------------+----------------+---------------+-----------------+----------------+
|   SEX|PRIORS|COMPLEXION|FIOFS_TYPE|TERRORISM|       FIOFS_REASONS|OUTCOME|  RACE_DESC|AGE_AT_FIO_CORRECTED|RACE OF DRIVER|GENDER OF DRIVER|DRIVER FRISKED?|VEHICLE SEARCHED?|CITATION ISSUED?|
+------+------+----------+----------+---------+--------------------+-------+-----------+--------------------+--------------+----------------+---------------+-----------------+----------------+
|  MALE|   YES|       Med|      IOFS|       NO|DRUGS, INVESTIGATION|    SFO|   B(Black)|                  59|          null|            null|           null|             null|            null|
|  MALE|    NO|       Med|        IO|       NO|   DRUGS, POSSESSION|      F|   W(White)|                  26|          null|            null|           null|             null|            null|
|  MALE|   YES|      Dark|       IO

<b>3.5<b/>

In [20]:
races = [i["DESCRIPTION"] for i in df.select("DESCRIPTION").distinct().collect()]

In [21]:
from pyspark.sql.functions import *
for i in races:
    df = df.withColumn("DESCRIPTION", when(df["DESCRIPTION"] == i, i[2:len(i)-1]).otherwise(df["DESCRIPTION"]))

In [22]:
df.show()

+------+------+-----------+----------+----------+---------+--------------------+-------+--------------------+
|   SEX|PRIORS|DESCRIPTION|COMPLEXION|FIOFS_TYPE|TERRORISM|       FIOFS_REASONS|OUTCOME|AGE_AT_FIO_CORRECTED|
+------+------+-----------+----------+----------+---------+--------------------+-------+--------------------+
|  MALE|   YES|      Black|       Med|      IOFS|       NO|DRUGS, INVESTIGATION|    SFO|                  59|
|  MALE|    NO|      White|       Med|        IO|       NO|   DRUGS, POSSESSION|      F|                  26|
|  MALE|   YES|      Black|      Dark|       IOF|       NO| INVESTIGATE, PERSON|      F|                  18|
|  MALE|   YES|      Black|       Med|      IOFS|       NO|                ABDW|      S|                  24|
|  MALE|   YES|      Black|     Light|        IO|       NO|                 VAL|      F|                  70|
|  MALE|   YES|      Black|       Med|       IOF|       NO| INVESTIGATE, PERSON|      F|                  30|
|  MALE|  

In [24]:
df.write.csv("Police Interviews(S3).csv")

In [25]:
df2.write.csv("Vehicle Stops(S3).csv")