## Requirements
1. Apache Spark binary (https://spark.apache.org/)
2. For Windows: winutils (https://medium.com/@dvainrub/how-to-install-apache-spark-2-x-in-your-pc-e2047246ffc3)
3. Setting ```JAVA_HOME```, ```SPARK_HOME```, and ```HADOOP_HOME```
4. Python 3.x (from Anaconda distribution)
5. ```findspark``` https://pypi.org/project/findspark/
6. Jupyter Notebook (available from Anaconda installation)

### References
https://spark.apache.org/docs/2.3.3/sql-programming-guide.html

## Spark Initialization

In [2]:
# Import findspark to read SPARK_HOME and HADOOP_HOME
import findspark
findspark.init()

In [3]:
# Import required library
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [4]:
# Print Spark object ID
print(spark)

<pyspark.sql.session.SparkSession object at 0x0000018641DCB6A0>


## Loading Data using Spark

In [5]:
df = spark.read.json("D://spark-2.3.1-bin-hadoop2.7//examples//src//main//resources//people.json")

In [6]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [7]:
df.count()

3

In [11]:
df2 = spark.read.csv("D:/Documents/dataset/Uber-Jan-Feb-FOIL.csv", header=True, inferSchema=True)

In [12]:
df2.show()

+-----------------------+--------+---------------+-----+
|dispatching_base_number|    date|active_vehicles|trips|
+-----------------------+--------+---------------+-----+
|                 B02512|1/1/2015|            190| 1132|
|                 B02765|1/1/2015|            225| 1765|
|                 B02764|1/1/2015|           3427|29421|
|                 B02682|1/1/2015|            945| 7679|
|                 B02617|1/1/2015|           1228| 9537|
|                 B02598|1/1/2015|            870| 6903|
|                 B02598|1/2/2015|            785| 4768|
|                 B02617|1/2/2015|           1137| 7065|
|                 B02512|1/2/2015|            175|  875|
|                 B02682|1/2/2015|            890| 5506|
|                 B02765|1/2/2015|            196| 1001|
|                 B02764|1/2/2015|           3147|19974|
|                 B02765|1/3/2015|            201| 1526|
|                 B02617|1/3/2015|           1188|10664|
|                 B02598|1/3/20

In [13]:
df2.schema

StructType(List(StructField(dispatching_base_number,StringType,true),StructField(date,StringType,true),StructField(active_vehicles,IntegerType,true),StructField(trips,IntegerType,true)))

In [14]:
# Datasets can be downloaded from https://catalog.data.gov/dataset/crimes-2001-to-present-398a4

df3 = spark.read.csv("D:/Documents/dataset/Crimes_-_2001_to_present.csv", header=True, inferSchema=True)

In [15]:
df3.count()

6814395

In [16]:
df3.show()

+-------+-----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|     ID|Case Number|                Date|               Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+-------+-----------+--------------------+--------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|4080779|   HL424670|06/16/2005 02:30:...| 076XX S SANGAMON ST|1320|     CRIMINAL DAMAGE|          TO VEHICLE|              STR

In [15]:
df3.schema

StructType(List(StructField(ID,IntegerType,true),StructField(Case Number,StringType,true),StructField(Date,StringType,true),StructField(Block,StringType,true),StructField(IUCR,StringType,true),StructField(Primary Type,StringType,true),StructField(Description,StringType,true),StructField(Location Description,StringType,true),StructField(Arrest,BooleanType,true),StructField(Domestic,BooleanType,true),StructField(Beat,IntegerType,true),StructField(District,IntegerType,true),StructField(Ward,IntegerType,true),StructField(Community Area,IntegerType,true),StructField(FBI Code,StringType,true),StructField(X Coordinate,IntegerType,true),StructField(Y Coordinate,IntegerType,true),StructField(Year,IntegerType,true),StructField(Updated On,StringType,true),StructField(Latitude,DoubleType,true),StructField(Longitude,DoubleType,true),StructField(Location,StringType,true)))

In [17]:
# Register the DataFrame as a SQL temporary view
df3.createOrReplaceTempView("crimes")

In [19]:
result = spark.sql("SELECT DISTINCT Date FROM crimes")

In [22]:
result.show()

+--------------------+
|                Date|
+--------------------+
|05/13/2005 11:00:...|
|06/10/2005 11:30:...|
|06/18/2005 10:35:...|
|06/18/2005 10:45:...|
|06/17/2005 05:50:...|
|06/19/2005 01:00:...|
|05/19/2005 06:27:...|
|06/17/2005 06:15:...|
|06/19/2005 04:20:...|
|06/19/2005 07:10:...|
|06/05/2005 09:00:...|
|05/20/2005 07:00:...|
|06/21/2005 04:40:...|
|11/23/2004 08:31:...|
|06/22/2005 05:41:...|
|05/23/2005 12:30:...|
|06/17/2005 09:41:...|
|06/22/2005 10:45:...|
|06/23/2005 09:33:...|
|06/24/2005 11:38:...|
+--------------------+
only showing top 20 rows



## Data Mining Process

In [18]:
# Data kejahatan yang terjadi suatu daerah
# Gunakan backtick (`) untuk memanggil nama kolom dengan spasi

query1 = spark.sql("SELECT Block, `Primary Type`, Description, `Location Description`, Location, COUNT(`Primary Type`) \
                    FROM crimes \
                    GROUP BY Block, `Primary Type`, Description, `Location Description`, Location \
                    ORDER BY count('Primary Type') DESC")

In [54]:
query1.show()

+--------------------+------------------+--------------------+--------------------+--------------------+-------------------+
|               Block|      Primary Type|         Description|Location Description|            Location|count(Primary Type)|
+--------------------+------------------+--------------------+--------------------+--------------------+-------------------+
|    001XX N STATE ST|             THEFT|        RETAIL THEFT|    DEPARTMENT STORE|(41.883500187, -8...|               4736|
|  076XX S CICERO AVE|             THEFT|        RETAIL THEFT|    DEPARTMENT STORE|(41.754592961, -8...|               4036|
|008XX N MICHIGAN AVE|             THEFT|        RETAIL THEFT|    DEPARTMENT STORE|(41.897895128, -8...|               2253|
|    100XX W OHARE ST|     OTHER OFFENSE|OTHER WEAPONS VIO...|    AIRPORT/AIRCRAFT|(41.976290414, -8...|               2183|
|   046XX W NORTH AVE|             THEFT|        RETAIL THEFT|    DEPARTMENT STORE|(41.909664252, -8...|               1864|


In [19]:
# Instalasi PixieDust
# 1. Matplotlib
# 2. Bokeh
# 3. pixiedust

import pixiedust

Pixiedust database opened successfully


In [20]:
display(query1)

In [56]:
# Jumlah kriminalitas dalam tiap hari
# Konversi tanggal dari String to Date
# 05/13/2005 --> MM/DD/YYYY

query2 = spark.sql("SELECT TO_DATE(`Date`, 'MM/DD/YYYY') AS date, COUNT(`Date`) \
                    FROM crimes \
                    GROUP BY `Date` \
                    ORDER BY `Date` DESC")

In [57]:
query2.show()

+----------+-----------+
|      date|count(Date)|
+----------+-----------+
|2017-01-01|          1|
|2017-01-01|          2|
|2017-01-01|          1|
|2017-01-01|          1|
|2017-01-01|          1|
|2017-01-01|          2|
|2017-01-01|          1|
|2017-01-01|          1|
|2017-01-01|          1|
|2017-01-01|          2|
|2017-01-01|          1|
|2017-01-01|          1|
|2017-01-01|          1|
|2017-01-01|          1|
|2017-01-01|          3|
|2017-01-01|          1|
|2017-01-01|          1|
|2017-01-01|          1|
|2017-01-01|          1|
|2017-01-01|          8|
+----------+-----------+
only showing top 20 rows



In [58]:
# Daftar Kejahatan dan Sub Kategori yang paling banyak terjadi

query3 = spark.sql("SELECT `Primary Type`, Description, COUNT(Description) \
                    FROM crimes \
                    GROUP BY `Primary Type`, Description\
                    ORDER BY COUNT(Description) DESC")

In [60]:
query3.show()

+-------------------+--------------------+------------------+
|       Primary Type|         Description|count(Description)|
+-------------------+--------------------+------------------+
|              THEFT|      $500 AND UNDER|            552709|
|            BATTERY|DOMESTIC BATTERY ...|            523311|
|            BATTERY|              SIMPLE|            506627|
|    CRIMINAL DAMAGE|          TO VEHICLE|            365611|
|    CRIMINAL DAMAGE|         TO PROPERTY|            358101|
|              THEFT|           OVER $500|            352326|
|            ASSAULT|              SIMPLE|            293835|
|          NARCOTICS|POSS: CANNABIS 30...|            276990|
|           BURGLARY|      FORCIBLE ENTRY|            264670|
|MOTOR VEHICLE THEFT|          AUTOMOBILE|            248586|
|              THEFT|       FROM BUILDING|            229884|
|              THEFT|        RETAIL THEFT|            170081|
|      OTHER OFFENSE|    TELEPHONE THREAT|            135461|
|  CRIMI

In [62]:
# Save the results to CSV --> partitioned CSV
query3.write \
  .option("header", "true") \
  .csv("file:///D:/Documents/dataset/query3.csv")

In [63]:
# Convert to Pandas
import pandas as pd
queryPandas = query3.toPandas()

In [66]:
# Save to single CSV
queryPandas.to_csv("D:/Documents/dataset/queryPandas.csv", index=False)