# Converting Chicago crime data file from csv to parquet
Data source: https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2/data

The reason we are conveting this csv data to parquet is because parquet is much better format when it comes to both speed and space in hadoop world. Also if we are short on in-memory capacity and we can't cache too much data in that case as well reading and writing parquet file from disk is much efficient than csv both in terms of speed and cost.

Import SparkSession and various functions(we will use to_timestamp to convert string to timestamp).

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

Create SparkSession. Here I am using spark in standalone mode.

In [2]:
spark = SparkSession.builder.master("spark://localhost:7077").appName("Chicago crime data analysis").getOrCreate()

Read data from hdfs using spark data source api. We are going to infer schema from file as well (This will take time as spark will read data. Better approach is to provide schema externally).

In [3]:
crimeData = spark.read.csv("hdfs://localhost:9000/public/data/crime/crime.csv", header=True, inferSchema = True)

Print schema of the data

In [4]:
crimeData.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: integer (nullable = true)
 |-- Y Coordinate: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



Show data

In [5]:
crimeData.show(10,False)

+--------+-----------+----------------------+----------------------+----+-------------+----------------------------+-----------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|ID      |Case Number|Date                  |Block                 |IUCR|Primary Type |Description                 |Location Description   |Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On            |Latitude    |Longitude    |Location                     |
+--------+-----------+----------------------+----------------------+----+-------------+----------------------------+-----------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|10000092|HY189866   |03/18/2015 07:44:00 PM|047XX W OHIO ST       

Change columns Date and Updated On from string to timestamp and drop duplicate columns like Year, Location etc.

In [6]:
crimeData = crimeData.withColumn('ReportedTime',F.to_timestamp('Date','MM/dd/yyyy hh:mm:ss aaa')).withColumn('UpdatedTime',F.to_timestamp('Updated On','MM/dd/yyyy hh:mm:ss aaa')).drop('Date', 'Updated On', 'Year', 'Location')

In [7]:
crimeData.show(10, False)

+--------+-----------+----------------------+----+-------------+----------------------------+-----------------------+------+--------+----+--------+----+--------------+--------+------------+------------+------------+-------------+-------------------+-------------------+
|ID      |Case Number|Block                 |IUCR|Primary Type |Description                 |Location Description   |Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Latitude    |Longitude    |ReportedTime       |UpdatedTime        |
+--------+-----------+----------------------+----+-------------+----------------------------+-----------------------+------+--------+----+--------+----+--------------+--------+------------+------------+------------+-------------+-------------------+-------------------+
|10000092|HY189866   |047XX W OHIO ST       |041A|BATTERY      |AGGRAVATED: HANDGUN         |STREET                 |false |false   |1111|11      |28  |25            |04B     |1144606     |1

In [8]:
crimeData.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: integer (nullable = true)
 |-- Y Coordinate: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- ReportedTime: timestamp (nullable = true)
 |-- UpdatedTime: timestamp (nullable = true)



Rename columns to remove spaces

In [9]:
crimeData = crimeData.withColumnRenamed('Case Number', 'CaseNumber').withColumnRenamed('Primary Type', 'PrimaryType').withColumnRenamed('Location Description','LocationDescription').withColumnRenamed('Community Area','CommunityArea').withColumnRenamed('FBI Code','FBICode').withColumnRenamed('X Coordinate','XCoordinate').withColumnRenamed('Y Coordinate','YCoordinate')

In [10]:
crimeData.show(10,False)

+--------+----------+----------------------+----+-------------+----------------------------+-----------------------+------+--------+----+--------+----+-------------+-------+-----------+-----------+------------+-------------+-------------------+-------------------+
|ID      |CaseNumber|Block                 |IUCR|PrimaryType  |Description                 |LocationDescription    |Arrest|Domestic|Beat|District|Ward|CommunityArea|FBICode|XCoordinate|YCoordinate|Latitude    |Longitude    |ReportedTime       |UpdatedTime        |
+--------+----------+----------------------+----+-------------+----------------------------+-----------------------+------+--------+----+--------+----+-------------+-------+-----------+-----------+------------+-------------+-------------------+-------------------+
|10000092|HY189866  |047XX W OHIO ST       |041A|BATTERY      |AGGRAVATED: HANDGUN         |STREET                 |false |false   |1111|11      |28  |25           |04B    |1144606    |1903566    |41.89139

Write data as parquet file to hdfs using only one partition (We don't really need to write as only one partition but I wanted to use coalesce)

In [15]:
crimeData.coalesce(1).write.parquet("hdfs://localhost:9000/public/data/crime/parquet/")