In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

In [2]:
my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.11:2.2.0' ) \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/local.nik") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/local.analyses") \
    .getOrCreate()
my_spark2 = SQLContext(my_spark)

In [3]:
# read from default source, as defined above under 'input.uri'
data = my_spark2.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [4]:
# look at top 5 rows to make sure data is read correctly
data.head(5)

[Row(_id=Row(oid='5c92c2c73ff4e811a09a0a22'), borough='Southwark', major_category='Burglary', minor_category='Burglary in a Dwelling', month='6', value='2', year='2013'),
 Row(_id=Row(oid='5c92c2c73ff4e811a09a0a23'), borough='Southwark', major_category='Burglary', minor_category='Burglary in a Dwelling', month='3', value='0', year='2014'),
 Row(_id=Row(oid='5c92c2c73ff4e811a09a0a24'), borough='Southwark', major_category='Burglary', minor_category='Burglary in a Dwelling', month='6', value='2', year='2015'),
 Row(_id=Row(oid='5c92c2c73ff4e811a09a0a25'), borough='Southwark', major_category='Burglary', minor_category='Burglary in a Dwelling', month='1', value='1', year='2014'),
 Row(_id=Row(oid='5c92c2c73ff4e811a09a0a26'), borough='Southwark', major_category='Burglary', minor_category='Burglary in a Dwelling', month='3', value='1', year='2016')]

In [5]:
# get summary of all columns
data.describe().show()

+-------+--------------------+--------------+-------------------+-----------------+------------------+------------------+
|summary|             borough|major_category|     minor_category|            month|             value|              year|
+-------+--------------------+--------------+-------------------+-----------------+------------------+------------------+
|  count|             3286436|       3286436|            3286436|          3286436|           3286436|           3286436|
|   mean|                null|          null|               null| 6.49677229716592|0.4774909756684608|2011.9994735869996|
| stddev|                null|          null|               null|3.452671524829749| 1.765015745215443|2.5824079990390745|
|    min|Barking and Dagenham|      Burglary|Assault with Injury|                1|                 0|              2008|
|    max|                null|          null|               null|             null|              null|              null|
+-------+---------------

In [6]:
# Group the data by 'borough' and 'year', get the total count of crime 'value'.
# Store the returned dataframe in variable
crimeBoroughYear = data.groupBy([ "borough", "year"]).agg({'value': 'count'})

In [7]:
crimeBoroughYear.head(5)

[Row(borough='Brent', year='2010', count(value)=16296),
 Row(borough='Islington', year='2015', count(value)=9214),
 Row(borough='Lambeth', year='2016', count(value)=11488),
 Row(borough='Kensington and Chelsea', year='2016', count(value)=9018),
 Row(borough='Kingston upon Thames', year='2009', count(value)=7890)]

In [8]:
# write the analysis result to local csv file. It creates a folder of the name
# 'analyses.csv' which contains the resulting csv file
crimeBoroughYear.repartition(1).write \
  .option("header", "true") \
  .csv("analyses.csv")

In [9]:
# Group the data by 'year', 'month' and crime 'category', and get the total
# count of crime 'value'.
crimeYearMonthCat = data.groupBy([ "year", "month", "major_category"]).agg({'value': 'count'})

In [10]:
# write the analysis result to local csv file. It creates a folder of the name
# 'analyses2.csv' which contains the resulting csv file
crimeYearMonthCat.write \
  .option("header", "true") \
  .csv("analyses2.csv")

In [11]:
# write the analysis result to mongodb, as defined under 'output.uri'
crimeBoroughYear.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

In [12]:
# stop the spark connection
my_spark.stop()