In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SQL_options').getOrCreate()
spark

In [5]:
path = 'Datasets/'
crime = spark.read.csv(path +'rec-crime-pfa.csv',inferSchema=True,header=True)
crime.show(5,False)

+----------------+-----------------+----------+-------------------------------------------------+-------------------------------------+
|12 months ending|PFA              |Region    |Offence                                          |Rolling year total number of offences|
+----------------+-----------------+----------+-------------------------------------------------+-------------------------------------+
|31/03/2003      |Avon and Somerset|South West|All other theft offences                         |25959                                |
|31/03/2003      |Avon and Somerset|South West|Bicycle theft                                    |3090                                 |
|31/03/2003      |Avon and Somerset|South West|Criminal damage and arson                        |26202                                |
|31/03/2003      |Avon and Somerset|South West|Death or serious injury caused by illegal driving|2                                    |
|31/03/2003      |Avon and Somerset|South West|D

In [3]:
crime.printSchema()

root
 |-- 12 months ending: string (nullable = true)
 |-- PFA: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Offence: string (nullable = true)
 |-- Rolling year total number of offences: integer (nullable = true)



In [7]:
df = crime.withColumnRenamed('Rolling year total number of offences','Offence_Count')
df.limit(5).toPandas()

Unnamed: 0,12 months ending,PFA,Region,Offence,Offence_Count
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202
3,31/03/2003,Avon and Somerset,South West,Death or serious injury caused by illegal driving,2
4,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561


In [8]:
df.createOrReplaceTempView('TempView')

In [11]:
sql_results = spark.sql('select * from TempView where Offence_Count>1000')
sql_results.limit(5).toPandas()

Unnamed: 0,12 months ending,PFA,Region,Offence,Offence_Count
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202
3,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561
4,31/03/2003,Avon and Somerset,South West,Drug offences,2308


In [13]:
spark.sql('select Region, sum(Offence_Count) from TempView group by Region').toPandas()

Unnamed: 0,Region,sum(Offence_Count)
0,Fraud: CIFAS,7678981
1,North West,30235732
2,British Transport Police,3029117
3,Wales,11137260
4,London,42691902
5,South East,30911995
6,Fraud: Action Fraud,5921984
7,Fraud: UK Finance,2925861
8,South West,17985880
9,East,19890612


## SQL TRANSFORMER

In [14]:
from pyspark.ml.feature import SQLTransformer

sqlTrans = SQLTransformer(statement = 'select PFA, Region, Offence from __THIS__')

In [15]:
sqlTrans.transform(df).show(5)

+-----------------+----------+--------------------+
|              PFA|    Region|             Offence|
+-----------------+----------+--------------------+
|Avon and Somerset|South West|All other theft o...|
|Avon and Somerset|South West|       Bicycle theft|
|Avon and Somerset|South West|Criminal damage a...|
|Avon and Somerset|South West|Death or serious ...|
|Avon and Somerset|South West|   Domestic burglary|
+-----------------+----------+--------------------+
only showing top 5 rows



In [25]:
sqlTrans2 = SQLTransformer(statement = 'select Offence, SUM(Offence_Count) as Total from __THIS__ group by Offence')

In [28]:
sqlTrans2.transform(df).show(5,truncate=False)

+------------------------+--------+
|Offence                 |Total   |
+------------------------+--------+
|Public order offences   |10925676|
|Bicycle theft           |5297006 |
|Residential burglary    |1671469 |
|Violence without injury |16590158|
|All other theft offences|30979393|
+------------------------+--------+
only showing top 5 rows



In [29]:
from pyspark.sql.functions import expr

In [34]:
df.withColumn("percent", expr('round((Offence_Count/244720928)*100,2)')).show(truncate=False)

+----------------+-----------------+----------+-------------------------------------------------+-------------+-------+
|12 months ending|PFA              |Region    |Offence                                          |Offence_Count|percent|
+----------------+-----------------+----------+-------------------------------------------------+-------------+-------+
|31/03/2003      |Avon and Somerset|South West|All other theft offences                         |25959        |0.01   |
|31/03/2003      |Avon and Somerset|South West|Bicycle theft                                    |3090         |0.0    |
|31/03/2003      |Avon and Somerset|South West|Criminal damage and arson                        |26202        |0.01   |
|31/03/2003      |Avon and Somerset|South West|Death or serious injury caused by illegal driving|2            |0.0    |
|31/03/2003      |Avon and Somerset|South West|Domestic burglary                                |14561        |0.01   |
|31/03/2003      |Avon and Somerset|Sout

In [37]:
df.select("*", expr('round((Offence_Count/244720928)*100,2)')).toPandas()

Unnamed: 0,12 months ending,PFA,Region,Offence,Offence_Count,"round(((CAST(Offence_Count AS DOUBLE) / CAST(244720928 AS DOUBLE)) * CAST(100 AS DOUBLE)), 2)"
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959,0.01
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090,0.00
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202,0.01
3,31/03/2003,Avon and Somerset,South West,Death or serious injury caused by illegal driving,2,0.00
4,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561,0.01
...,...,...,...,...,...,...
46464,31/12/2018,Wiltshire,South West,Stalking and harassment,2380,0.00
46465,31/12/2018,Wiltshire,South West,Theft from the person,347,0.00
46466,31/12/2018,Wiltshire,South West,Vehicle offences,2895,0.00
46467,31/12/2018,Wiltshire,South West,Violence with injury,5701,0.00


In [39]:
df.selectExpr("*", ('round((Offence_Count/244720928)*100,2) as Percent')).toPandas()

Unnamed: 0,12 months ending,PFA,Region,Offence,Offence_Count,Percent
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959,0.01
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090,0.00
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202,0.01
3,31/03/2003,Avon and Somerset,South West,Death or serious injury caused by illegal driving,2,0.00
4,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561,0.01
...,...,...,...,...,...,...
46464,31/12/2018,Wiltshire,South West,Stalking and harassment,2380,0.00
46465,31/12/2018,Wiltshire,South West,Theft from the person,347,0.00
46466,31/12/2018,Wiltshire,South West,Vehicle offences,2895,0.00
46467,31/12/2018,Wiltshire,South West,Violence with injury,5701,0.00


In [41]:
df.selectExpr("*", ('round((Offence_Count/244720928)*100,2) as Percent')).filter("Region = 'South West' ").toPandas()

Unnamed: 0,12 months ending,PFA,Region,Offence,Offence_Count,Percent
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959,0.01
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090,0.00
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202,0.01
3,31/03/2003,Avon and Somerset,South West,Death or serious injury caused by illegal driving,2,0.00
4,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561,0.01
...,...,...,...,...,...,...
5265,31/12/2018,Wiltshire,South West,Stalking and harassment,2380,0.00
5266,31/12/2018,Wiltshire,South West,Theft from the person,347,0.00
5267,31/12/2018,Wiltshire,South West,Vehicle offences,2895,0.00
5268,31/12/2018,Wiltshire,South West,Violence with injury,5701,0.00
