# **Setup**

*Upload the given csv file here:*

In [0]:
from google.colab import files
files.upload()

Saving pubs_in_england.csv to pubs_in_england.csv


*Package Installations:*

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

*Environment Setup:*

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
spark

## **Import the csv file contents**

In [0]:
import pyspark.sql.functions as sf
from pyspark.sql.types import *

# Defining the schema based on given description
schema = StructType([
    StructField("fsa_id", IntegerType()),
    StructField("name", StringType()),
    StructField("address", StringType()),
    StructField("postcode", StringType()),
    StructField("easting", IntegerType()),
    StructField("northing", IntegerType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("local_authority", StringType())    
    ])

dataset = spark.read.csv("pubs_in_england.csv", header = True, schema=schema, \
                         mode = 'DROPMALFORMED')


*Reading data and checking number of records:*

In [83]:
dataset.show(5)
dataset.count()

+------+---------------+--------------------+--------+-------+--------+---------+---------+---------------+
|fsa_id|           name|             address|postcode|easting|northing| latitude|longitude|local_authority|
+------+---------------+--------------------+--------+-------+--------+---------+---------+---------------+
|    24|     Anchor Inn|Upper Street, Str...| CO7 6LW| 604748|  234405| 51.97039| 0.979328|        Babergh|
|    30|      Angel Inn|Egremont Street, ...|CO10 7SA| 582888|  247368|52.094427| 0.668408|        Babergh|
|    63|Black Boy Hotel|7 Market Hill, SU...|CO10 2EA| 587356|  241327|52.038683| 0.730226|        Babergh|
|    64|    Black Horse|Lower Street, Str...| CO7 6JS| 604270|  233920|51.966211| 0.972091|        Babergh|
|    65|     Black Lion|Lion Road, Glemsf...|CO10 7RF| 582750|  248298|52.102815| 0.666893|        Babergh|
+------+---------------+--------------------+--------+-------+--------+---------+---------+---------------+
only showing top 5 rows



51566

**Checking the schema and data statistics**

In [84]:
dataset.printSchema()

root
 |-- fsa_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- easting: integer (nullable = true)
 |-- northing: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- local_authority: string (nullable = true)



In [85]:
dataset.describe().show()

+-------+------------------+----------------+--------------------+--------+------------------+------------------+------------------+-------------------+---------------+
|summary|            fsa_id|            name|             address|postcode|           easting|          northing|          latitude|          longitude|local_authority|
+-------+------------------+----------------+--------------------+--------+------------------+------------------+------------------+-------------------+---------------+
|  count|             51494|           51494|               51494|   51494|             51494|             51494|             51494|              51494|          51494|
|   mean|257687.77865382374|          518.75|                null|    null|425726.03792674874| 315053.4275061172|52.723645153513274|-1.6370478585854669|           null|
| stddev|146752.26052595043|936.482203069907|                null|    null| 97740.61607377358|166810.32438424518|1.5020463529539323|  1.436843719115415|   

We can see that the count here is **51494** which is lesser than the total number of rows in the given CSV file, which means there were **72** rows not conforming to the schema, so we need to clean it:

In [52]:
print("Initial count:", dataset.count())
dataset = dataset.dropna()
print("Cleaned count:", dataset.count())

Initial count: 51566
Cleaned count: 51494


**Checking data for any missing / duplicate entries**

In [46]:
dataset.filter(dataset.fsa_id == '').show()
dataset.filter(dataset.name == '').show()
dataset.filter(dataset.address == '').show()
dataset.filter(dataset.postcode == '').show()
dataset.filter(dataset.easting == '').show()
dataset.filter(dataset.northing == '').show()
dataset.filter(dataset.latitude == '').show()
dataset.filter(dataset.longitude == '').show()
dataset.filter(dataset.local_authority == '').show()

+------+----+-------+--------+-------+--------+--------+---------+---------------+
|fsa_id|name|address|postcode|easting|northing|latitude|longitude|local_authority|
+------+----+-------+--------+-------+--------+--------+---------+---------------+
+------+----+-------+--------+-------+--------+--------+---------+---------------+

+------+----+-------+--------+-------+--------+--------+---------+---------------+
|fsa_id|name|address|postcode|easting|northing|latitude|longitude|local_authority|
+------+----+-------+--------+-------+--------+--------+---------+---------------+
+------+----+-------+--------+-------+--------+--------+---------+---------------+

+------+----+-------+--------+-------+--------+--------+---------+---------------+
|fsa_id|name|address|postcode|easting|northing|latitude|longitude|local_authority|
+------+----+-------+--------+-------+--------+--------+---------+---------------+
+------+----+-------+--------+-------+--------+--------+---------+---------------+

+

In [56]:
dataset = dataset.dropDuplicates()
print("Count after removing duplicates:", dataset.count())

Count after removing duplicates: 51494


As we can see the dataset no longer has any malformed, duplicate or missing data. We can now proceed to perform analysis on the same:

# **2) Which local_authority has the least number of pubs?**

In [73]:
# Grouping by local_authority and counting number of pubs in each group to then order by the least number of pubs
q2 = dataset \
    .groupBy('local_authority') \
    .count() \
    .select(
        'local_authority', \
        sf.col("count").alias("Number of Pubs")
        ).orderBy('Number of Pubs') 

q2.show(1) 

#Alternatively we can use q2.collect()[0] for extracting values.

+---------------+--------------+
|local_authority|Number of Pubs|
+---------------+--------------+
|  Tower Hamlets|             1|
+---------------+--------------+
only showing top 1 row



Row(local_authority='Tower Hamlets', Number of Pubs=1)

# **4) Which Street in England has the highest number of pubs?**

In [81]:
# Splitting the address on ',' and grouping by the street and count. Followed by sorting in descending order to get highest number of pubs for a street.
q4 = dataset.select(
        sf.split("address", ", ")[0].alias("Street in England")
    ).groupBy('Street in England').count().select(
        'Street in England', \
        sf.col("count").alias("Number of Pubs")
        ).orderBy('Number of Pubs', ascending = False) 

q4.show(1)

+-----------------+--------------+
|Street in England|Number of Pubs|
+-----------------+--------------+
|      High Street|           473|
+-----------------+--------------+
only showing top 1 row

