In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col
import numpy as np

In [0]:
spark_session = (SparkSession.builder
                             .master("local[*]")
                             .appName("EnglandCouncilsJob")
                             .getOrCreate())

input_directory = "/FileStore/tables/data"

In [0]:
district_councilsdf = spark_session.read.option('header','true')\
                                   .option('inferSchema','true').option('sep',',')\
                                   .csv(f'{input_directory}/england_councils/district_councils.csv')\
                                   .withColumn('council_type', lit('District Council'))

london_boroughsdf = spark_session.read.option('header','true')\
                                   .option('inferSchema','true').option('sep',',')\
                                   .csv(f'{input_directory}/england_councils/london_boroughs.csv')\
                                   .withColumn('council_type', lit('London Boroughs'))

metropolitan_districtsdf = spark_session.read.option('header','true')\
                                   .option('inferSchema','true').option('sep',',')\
                                   .csv(f'{input_directory}/england_councils/metropolitan_districts.csv')\
                                   .withColumn('council_type', lit('Metropolitan Districts'))

unitary_authoritiesdf = spark_session.read.option('header','true')\
                                   .option('inferSchema','true').option('sep',',')\
                                   .csv(f'{input_directory}/england_councils/unitary_authorities.csv')\
                                   .withColumn('council_type', lit('Unitary Authorities'))

In [0]:
councils_df = district_councilsdf.union(london_boroughsdf)\
                                .union(metropolitan_districtsdf)\
                                .union(unitary_authoritiesdf)

In [0]:
property_avg_pricedf = spark_session.read.option('header','true')\
                                    .option('inferSchema','true').option('sep',',')\
                                    .csv(f'{input_directory}/property_avg_price.csv')\
                                    .select(col('local_authority').alias('council'), col('avg_price_nov_2019'))

In [0]:
property_sales_volumedf = spark_session.read.option('header','true')\
                                    .option('inferSchema','true').option('sep',',')\
                                    .csv(f'{input_directory}/property_sales_volume.csv')\
                                    .select(col('local_authority').alias('council'), col('sales_volume_sep_2019'))

In [0]:
councils_df = councils_df.join(property_avg_pricedf, on='council', how='left')\
                         .join(property_sales_volumedf, on='council', how='left')\
                         .replace({np.nan: None})

In [None]:
councils_df.show()

In [0]:
print(councils_df.count(), len(councils_df.columns))

316 5


In [0]:
councils_df.write.mode('overwrite').parquet(f'{input_directory}/councils.parquet')

In [0]:
dbutils.fs.ls('dbfs:/FileStore/tables/data/')

Out[11]: [FileInfo(path='dbfs:/FileStore/tables/data/councils.parquet/', name='councils.parquet/', size=0, modificationTime=1681574160000),
 FileInfo(path='dbfs:/FileStore/tables/data/england_councils/', name='england_councils/', size=0, modificationTime=1681573989000),
 FileInfo(path='dbfs:/FileStore/tables/data/property_avg_price.csv', name='property_avg_price.csv', size=13483, modificationTime=1681573971000),
 FileInfo(path='dbfs:/FileStore/tables/data/property_sales_volume.csv', name='property_sales_volume.csv', size=6964, modificationTime=1681573971000)]

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS demo;
USE demo;

In [0]:
councils_df.write.format('parquet').saveAsTable('demo.councils')

In [0]:
%sql
SELECT * 
FROM demo.councils
WHERE council_type = 'Metropolitan Districts'
    AND sales_volume_sep_2019 > 100
ORDER BY avg_price_nov_2019 DESC;

council,county,council_type,avg_price_nov_2019,sales_volume_sep_2019
Trafford,Greater Manchester,Metropolitan Districts,296547.0,265
Solihull,West Midlands,Metropolitan Districts,288068.0,237
Stockport,Greater Manchester,Metropolitan Districts,238043.0,358
Birmingham,West Midlands,Metropolitan Districts,190901.0,844
Coventry,West Midlands,Metropolitan Districts,190220.0,329
Leeds,West Yorkshire,Metropolitan Districts,189738.0,854
Manchester,Greater Manchester,Metropolitan Districts,185333.0,406
Dudley,West Midlands,Metropolitan Districts,184010.0,351
Bury,Greater Manchester,Metropolitan Districts,181541.0,179
Sefton,Merseyside,Metropolitan Districts,173595.0,314


In [0]:
%sql
WITH cte_county AS (
  SELECT county,
    council_type,
    SUM(sales_volume_sep_2019) sales_volume,
    DENSE_RANK() OVER(PARTITION BY council_type ORDER BY SUM(sales_volume_sep_2019) DESC) AS rank
  FROM demo.councils
  GROUP BY county, council_type
)
SELECT *
FROM cte_county
WHERE rank <= 3;

county,council_type,sales_volume,rank
Essex,District Council,1839,1
Hampshire,District Council,1658,2
Kent,District Council,1654,3
Greater London,London Boroughs,6155,1
City of London,London Boroughs,10,2
Greater Manchester,Metropolitan Districts,2879,1
West Yorkshire,Metropolitan Districts,2456,2
West Midlands,Metropolitan Districts,2454,3
Cheshire,Unitary Authorities,1344,1
County Durham,Unitary Authorities,1051,2
