In [1]:
import csv
import findspark
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import datediff, to_date, when, lit, col
import re

In [2]:
# Initialise the PySpark context
findspark.init()
sc = pyspark.SparkContext(appName="PlanningApp")
sqlContext = SQLContext(sc)

In [3]:
# Load dataset
df = sqlContext.read.json('planning-applications-weekly-list.json')

Define a helper function for writing to CSV file (escaping strings where necessary):

In [4]:
def write_csv(file_name, rows, column_names, row_format_fun):
    with open(file_name, 'wb') as f:
        csv_writer = csv.writer(f, quoting=csv.QUOTE_ALL)
        csv_writer.writerow(column_names)
        for row in rows:
            csv_writer.writerow(row_format_fun(row))    

1) Discover the schema of the input dataset and output it to a file.

In [5]:
schema = df.schema

write_csv('schema.csv',
          schema,
          ('name', 'dataType', 'nullable'),
          lambda r: (r.name, r.dataType, r.nullable))

2) What is the total number of planning application records in the dataset?

In [6]:
n_records = str(df.count())
print("Total number of planning application records: {0}".format(n_records))

Total number of planning application records: 17805


3) Identify the set of case officers (CASEOFFICER field) and output a unique list of these to a file:

In [7]:
column = 'CASEOFFICER'
case_officers = (df.select(column).
                 distinct().
                 sort(column).
                 rdd.map(lambda r: r[0]).collect())

write_csv('case_officers.csv',
          case_officers,
          (column,),
          lambda r: (r,))

4) Who are the top N agents (AGENT field) submitting the most number of applications?

In [8]:
# Define helper function for replacing empty strings with Null entries.
# We do this because it doesn't appear appropriate to include 'missing data' entries for this query
def replace(column, value):
    return when(column != value, column).otherwise(lit(None))

In [9]:
n_top_agents = 10
column = 'AGENT'
agent_application_counts = (df.select(column).
                            withColumn(column, replace(col(column), '')).
                            groupBy(column).
                            agg({column: 'count'}).
                            orderBy('count(' + column + ')', ascending=False).
                            limit(n_top_agents).
                            rdd.map(lambda r: (r[0], r[1])).
                            collect())

write_csv('top_agents_by_application_counts.csv',
          agent_application_counts,
          ('AGENT','APPLICATION COUNT'),
          lambda r: r)

5) Count the occurrence of each word within the case text (CASETEXT field) across all planning application records. Output each word and the corresponding count to a file:

In [10]:
regex_nonalphabetic = re.compile(r"[^a-z]")
stopword_filter = lambda s: len(s) > 3

counts = (df.select('CASETEXT').
          rdd.map(lambda row: row[0]).
          # Obtain collection of lower-case tokens by splitting at whitespace          
          flatMap(lambda s: s.lower().split()).
          # Segmentation heuristic which permits punctuated word boundaries without any whitespace
          # (w1,w2 w1/w2 w1&w2 etc.)
          # Also ensures that non-alphabetic words are filtered out.
          flatMap(lambda s: regex_nonalphabetic.split(s)).
          # Basic stopword filtering heuristic
          filter(stopword_filter).countByValue())

counts = sorted(sorted(counts.items()),
                key=lambda item: item[1],
                reverse=True)

write_csv('word_counts.csv',
          counts,
          ('WORD', 'COUNT'),
          lambda r: r)

6) Measure the average public consultation duration in days (i.e. the difference between PUBLICCONSULTATIONENDDATE and PUBLICCONSULTATIONSTARTDATE fields):

In [11]:
mean_diff = (df.withColumn("DIFF", datediff(
    to_date('PUBLICCONSULTATIONSTARTDATE', 'dd/MM/YYYY'),
    to_date('PUBLICCONSULTATIONENDDATE', 'dd/MM/YYYY'))).
             selectExpr('MEAN(DIFF)').
             rdd.map(lambda r: r[0]).
             collect()[0])

print("Average consultation duration in days: {:.1f}".format(mean_diff))

Average consultation duration in days: 7.1


In [12]:
# Shut down the PySpark context
sc.stop()