## Requirements
1. Apache Spark binary (https://spark.apache.org/)
2. For Windows: winutils (https://medium.com/@dvainrub/how-to-install-apache-spark-2-x-in-your-pc-e2047246ffc3)
3. Setting ```JAVA_HOME```, ```SPARK_HOME```, and ```HADOOP_HOME```
4. Python 3.x (from Anaconda distribution)
5. ```findspark``` https://pypi.org/project/findspark/
6. Jupyter Notebook (available from Anaconda installation)

### References
https://spark.apache.org/docs/2.3.3/sql-programming-guide.html

## Spark Initialization

In [1]:
# Import findspark to read SPARK_HOME and HADOOP_HOME
import findspark
findspark.init()

In [2]:
# Import required library
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [3]:
# Print Spark object ID
print(spark)

<pyspark.sql.session.SparkSession object at 0x000002A654FBEAC8>


## Loading Data using Spark

In [4]:
df = spark.read.csv("F:/big-data-master/notebook/london_crime_by_lsoa.csv", header=True, inferSchema=True)

In [5]:
df.show()

+---------+--------------------+--------------------+--------------------+-----+----+-----+
|lsoa_code|             borough|      major_category|      minor_category|value|year|month|
+---------+--------------------+--------------------+--------------------+-----+----+-----+
|E01001116|             Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646|           Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|             Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774|           Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|          Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
|E01001320|              Ealing|  Theft and Handling|         Other Theft|    0|2012|    5|
|E01001342|              Ealing|Violence Against ...|    Offensive Weapon|    0|2010|    7|
|E01002633|            Hounslow|             Robbery|   Personal Property|    0|

In [6]:
df.count()

13490604

In [9]:
df2 = spark.read.csv("F:/dataset/london/london_crime_by_lsoa.csv", header=True, inferSchema=True)

In [10]:
df2.show()

+---------+--------------------+--------------------+--------------------+-----+----+-----+
|lsoa_code|             borough|      major_category|      minor_category|value|year|month|
+---------+--------------------+--------------------+--------------------+-----+----+-----+
|E01001116|             Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646|           Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|             Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774|           Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|          Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
|E01001320|              Ealing|  Theft and Handling|         Other Theft|    0|2012|    5|
|E01001342|              Ealing|Violence Against ...|    Offensive Weapon|    0|2010|    7|
|E01002633|            Hounslow|             Robbery|   Personal Property|    0|

In [11]:
df2.schema

StructType(List(StructField(lsoa_code,StringType,true),StructField(borough,StringType,true),StructField(major_category,StringType,true),StructField(minor_category,StringType,true),StructField(value,IntegerType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true)))

In [12]:

df3 = spark.read.csv("F:/dataset/london/london_crime_by_lsoa.csv", header=True, inferSchema=True)

In [13]:
df3.count()

13490604

In [14]:
df3.show()

+---------+--------------------+--------------------+--------------------+-----+----+-----+
|lsoa_code|             borough|      major_category|      minor_category|value|year|month|
+---------+--------------------+--------------------+--------------------+-----+----+-----+
|E01001116|             Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646|           Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|             Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774|           Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|          Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
|E01001320|              Ealing|  Theft and Handling|         Other Theft|    0|2012|    5|
|E01001342|              Ealing|Violence Against ...|    Offensive Weapon|    0|2010|    7|
|E01002633|            Hounslow|             Robbery|   Personal Property|    0|

In [15]:
df3.schema

StructType(List(StructField(lsoa_code,StringType,true),StructField(borough,StringType,true),StructField(major_category,StringType,true),StructField(minor_category,StringType,true),StructField(value,IntegerType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true)))

In [17]:
# Register the DataFrame as a SQL temporary view
df3.createOrReplaceTempView("londoncrimes")

In [18]:
result = spark.sql("SELECT DISTINCT major_category FROM londoncrimes")

In [19]:
result.show()

+--------------------+
|      major_category|
+--------------------+
|               Drugs|
|             Robbery|
|  Theft and Handling|
|    Fraud or Forgery|
|Violence Against ...|
|            Burglary|
|Other Notifiable ...|
|     Sexual Offences|
|     Criminal Damage|
+--------------------+



## Data Mining Process

In [25]:
# Data lokasi terjadi nya kejahatan pencurian pada tahun 2016
query1 = spark.sql("SELECT DISTINCT borough from londoncrimes where major_category='Robbery'and year=2016 ")

In [26]:
query1.show()

+--------------------+
|             borough|
+--------------------+
|             Croydon|
|          Wandsworth|
|              Bexley|
|             Lambeth|
|Barking and Dagenham|
|              Camden|
|           Greenwich|
|              Newham|
|       Tower Hamlets|
|            Hounslow|
|              Barnet|
|              Harrow|
|Kensington and Ch...|
|           Islington|
|               Brent|
|            Haringey|
|             Bromley|
|              Merton|
|         Westminster|
|             Hackney|
+--------------------+
only showing top 20 rows



In [45]:
# jenis kejahatan yang terjadi selama bulan puasa pada tahun 2016
# bulan puasa pada tahun 2016 jatuh pada bulan ke 6

query2 = spark.sql("SELECT DISTINCT major_category, minor_category from londoncrimes where year=2016 and month=6")

In [46]:
query2.show()

+--------------------+--------------------+
|      major_category|      minor_category|
+--------------------+--------------------+
|     Sexual Offences|        Other Sexual|
|               Drugs|    Drug Trafficking|
|     Criminal Damage|Criminal Damage T...|
|  Theft and Handling|         Other Theft|
|            Burglary|Burglary in Other...|
|               Drugs| Possession Of Drugs|
|  Theft and Handling|  Other Theft Person|
|    Fraud or Forgery|  Counted per Victim|
|    Fraud or Forgery|Other Fraud & For...|
|Violence Against ...|          Harassment|
|Violence Against ...|      Other violence|
|Violence Against ...|        Wounding/GBH|
|Violence Against ...|    Offensive Weapon|
|Violence Against ...|              Murder|
|Violence Against ...|      Common Assault|
|  Theft and Handling|Theft From Motor ...|
|  Theft and Handling|Handling Stolen G...|
|               Drugs|         Other Drugs|
|     Sexual Offences|                Rape|
|Other Notifiable ...|      Goin

In [75]:
# Jumlah kejahatan terkait obat-obatan yang terjadi selama tahun 2016

query3 = spark.sql("SELECT count(major_category) as `number of crime`, year from londoncrimes where major_category='Drugs' and year=2016 GROUP BY year")

In [76]:
query3.show()

+---------------+----+
|number of crime|year|
+---------------+----+
|         131052|2016|
+---------------+----+



In [61]:
# Save the results to CSV --> partitioned CSV
query3.write \
  .option("header", "true") \
  .csv("file:///F:/Wahyu Ivan Satyagraha/BigData/apache spark query/query3.csv")

In [62]:
# Convert to Pandas
import pandas as pd
queryPandas = query3.toPandas()

In [65]:
# Save to single CSV
queryPandas.to_csv("F:/Wahyu Ivan Satyagraha/BigData/apache spark query/query3pandas.csv", index=False)