In [7]:
import pyspark
from pyspark.sql.functions import *

In [8]:
spark = pyspark.sql.SparkSession.builder.appName("Stars").getOrCreate()

In [9]:
df = spark.read.csv('crime.csv', header=True, inferSchema=True)

In [10]:
df.printSchema()

root
 |-- incident_id: long (nullable = true)
 |-- offense_id: long (nullable = true)
 |-- OFFENSE_CODE: integer (nullable = true)
 |-- OFFENSE_CODE_EXTENSION: integer (nullable = true)
 |-- OFFENSE_TYPE_ID: string (nullable = true)
 |-- OFFENSE_CATEGORY_ID: string (nullable = true)
 |-- FIRST_OCCURRENCE_DATE: string (nullable = true)
 |-- LAST_OCCURRENCE_DATE: string (nullable = true)
 |-- REPORTED_DATE: string (nullable = true)
 |-- INCIDENT_ADDRESS: string (nullable = true)
 |-- GEO_X: double (nullable = true)
 |-- GEO_Y: double (nullable = true)
 |-- GEO_LON: double (nullable = true)
 |-- GEO_LAT: double (nullable = true)
 |-- DISTRICT_ID: string (nullable = true)
 |-- PRECINCT_ID: integer (nullable = true)
 |-- NEIGHBORHOOD_ID: string (nullable = true)
 |-- IS_CRIME: integer (nullable = true)
 |-- IS_TRAFFIC: integer (nullable = true)
 |-- VICTIM_COUNT: integer (nullable = true)



In [11]:
# get the number of rows in the dataframe
df.count()

399572

In [12]:
# the percantege of missing values in each column
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()





+-----------+----------+------------+----------------------+---------------+-------------------+---------------------+--------------------+-------------+----------------+-----+-----+-------+-------+-----------+-----------+---------------+--------+----------+------------+
|incident_id|offense_id|OFFENSE_CODE|OFFENSE_CODE_EXTENSION|OFFENSE_TYPE_ID|OFFENSE_CATEGORY_ID|FIRST_OCCURRENCE_DATE|LAST_OCCURRENCE_DATE|REPORTED_DATE|INCIDENT_ADDRESS|GEO_X|GEO_Y|GEO_LON|GEO_LAT|DISTRICT_ID|PRECINCT_ID|NEIGHBORHOOD_ID|IS_CRIME|IS_TRAFFIC|VICTIM_COUNT|
+-----------+----------+------------+----------------------+---------------+-------------------+---------------------+--------------------+-------------+----------------+-----+-----+-------+-------+-----------+-----------+---------------+--------+----------+------------+
|          0|         0|           0|                     0|              0|                  0|                    0|              194340|            0|            5560| 5560| 5560|  

                                                                                

In [13]:
# drop a column
df = df.drop('LAST_OCCURRENCE_DATE')

In [14]:
# get all numeric type columns
numeric_columns = [t[0] for t in df.dtypes if t[1] in ['int', 'double','long','float','integer']]
numeric_columns

['OFFENSE_CODE',
 'OFFENSE_CODE_EXTENSION',
 'GEO_X',
 'GEO_Y',
 'GEO_LON',
 'GEO_LAT',
 'PRECINCT_ID',
 'IS_CRIME',
 'IS_TRAFFIC',
 'VICTIM_COUNT']

In [15]:
# fill the missing values with the mean of the column using imputer
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=numeric_columns, outputCols=numeric_columns,strategy='mean')
df = imputer.fit(df).transform(df)


# fill neighborhood_id with the most frequent value
from pyspark.sql.functions import col, when
df = df.withColumn('NEIGHBORHOOD_ID', when(col('NEIGHBORHOOD_ID').isNull(), 1).otherwise(col('NEIGHBORHOOD_ID')))


In [16]:
df = df.drop("INCIDENT_ADDRESS","DISTRICT_ID",
        "incident_id","offense_id","OFFENSE_CODE",
        "OFFENSE_CODE_EXTENSION","PRECINCT_ID","DISTRICT_ID")

In [17]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------------+-------------------+---------------------+-------------+-----+-----+-------+-------+---------------+--------+----------+------------+
|OFFENSE_TYPE_ID|OFFENSE_CATEGORY_ID|FIRST_OCCURRENCE_DATE|REPORTED_DATE|GEO_X|GEO_Y|GEO_LON|GEO_LAT|NEIGHBORHOOD_ID|IS_CRIME|IS_TRAFFIC|VICTIM_COUNT|
+---------------+-------------------+---------------------+-------------+-----+-----+-------+-------+---------------+--------+----------+------------+
|              0|                  0|                    0|            0|    0|    0|      0|      0|              0|       0|         0|           0|
+---------------+-------------------+---------------------+-------------+-----+-----+-------+-------+---------------+--------+----------+------------+



                                                                                

In [18]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
# convert FIRST_OCCURRENCE_DATE to a timestamp in format MM/dd/yyyy hh:mm:ss a
df = df.withColumn("FIRST_OCCURRENCE_DATE", to_timestamp("FIRST_OCCURRENCE_DATE", "MM/dd/yyyy hh:mm:ss a"))
df = df.withColumn("REPORTED_DATE", to_timestamp("REPORTED_DATE", "MM/dd/yyyy hh:mm:ss a"))

In [19]:
df.printSchema()

root
 |-- OFFENSE_TYPE_ID: string (nullable = true)
 |-- OFFENSE_CATEGORY_ID: string (nullable = true)
 |-- FIRST_OCCURRENCE_DATE: timestamp (nullable = true)
 |-- REPORTED_DATE: timestamp (nullable = true)
 |-- GEO_X: double (nullable = true)
 |-- GEO_Y: double (nullable = true)
 |-- GEO_LON: double (nullable = true)
 |-- GEO_LAT: double (nullable = true)
 |-- NEIGHBORHOOD_ID: string (nullable = true)
 |-- IS_CRIME: integer (nullable = true)
 |-- IS_TRAFFIC: integer (nullable = true)
 |-- VICTIM_COUNT: integer (nullable = true)



In [21]:
# save the dataframe on a single repartitioned file
df.repartition(1).write.csv('crime_clean.csv', header=True)

                                                                                