In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
## Setup
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

### Import all necessary packages to work with Spark

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os, IPython
import pyspark
from pyspark.sql import *
import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType, DoubleType
import numpy as np
import pandas as pd
from itertools import chain
from pyspark import __version__ as current_pyspark_version




























In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
## Configure the Spark Session
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

spark = (
    SparkSession.builder.appName('epc_uprn_check')
    .config("spark.executor.memory", "1500m")
    .config("spark.executor.cores", 2)
    .config("spark.dynamicAllocation.enabled", 'true')
    .config('spark.dynamicAllocation.maxExecutors', 4)
    .config('spark.shuffle.service.enabled','true')
    .config('spark.ui.showConsoleProgress', 'false')
    .enableHiveSupport()
    .getOrCreate()
)

import pandas as pd
pd.set_option("display.html.table_schema", True)

In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
## Load the Data
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

# Set the database to use by default
spark.sql("USE energy_performance_certificate")

# To find what tables are in the database
spark.sql("SHOW TABLES").show(truncate=False)


# Reading EPC Data from a Hive table using PySpark
df_std = spark.read.table("all_domestic_certificates_202112_georef_std")

# Reading in the UPRN data to add geographies
uprn_df = spark.sql("SELECT * FROM national_statistics_uprn_lookup.nsul_jan_2022_gb_std")


In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
## Join UPRN to EPC dataframe
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

#Join two spark dataframes with a left join, based on a given column
epc_withgeog = (
    df_std.join(
        uprn_df,
        on=df_std.ons_uprn == uprn_df.uprn,
        how="left")
    .drop("uprn", "gridgb1e", "gridgb1n", "pcds", "oa11cd", "cty21cd", "ced17cd", "wd19cd", 
          "hlth19cd", "ctry11cd", "rgn11cd", "pcon11cd", "eer11cd", "ttwa15cd", 
          "itl21cd", "npark16cd", "lsoa11cd", "wz11cd", "ccg17cd", "bua11cd", 
          "buasd11cd", "ruc11ind", "oac11ind", "lep17cd1", "lep17cd2", "pfa15cd", 
          "imd19ind", "guid", "lmk_key", "address1", "address2", "address3", 
          "postcode", "building_reference_number","potential_energy_efficiency", 
          "local_authority", "constituency", "county", "lodgement_date", "transaction_type", 
          "environment_impact_current", "environment_impact_potential", 
          "energy_consumption_current", "energy_consumption_potential", 
          "co2_emissions_current", "co2_emiss_curr_per_floor_area", "co2_emissions_potential", 
          "lighting_cost_current", "lighting_cost_potential", "heating_cost_current", 
          "heating_cost_potential", "hot_water_cost_current", "hot_water_cost_potential", 
          "total_floor_area", "energy_tariff", "mains_gas_flag", "floor_level", 
          "flat_top_storey", "flat_storey_count", "main_heating_controls", 
          "multi_glaze_proportion", "glazed_type", "glazed_area", "extension_count", 
          "number_habitable_rooms", "number_heated_rooms", "low_energy_lighting", 
          "number_open_fireplaces", "hotwater_description", "hot_water_energy_eff", 
          "hot_water_env_eff", "floor_description", "floor_energy_eff", "floor_env_eff", 
          "windows_description", "windows_energy_eff", "windows_env_eff", "walls_env_eff", 
          "secondheat_description", "sheating_energy_eff", "sheating_env_eff", "roof_env_eff", 
          "mainheat_description", "mainheat_energy_eff", "mainheat_env_eff", 
          "mainheatcont_description", "mainheatc_energy_eff", "mainheatc_env_eff", 
          "lighting_description", "lighting_energy_eff", "lighting_env_eff", "main_fuel", 
          "wind_turbine_count", "heat_loss_corridoor", "unheated_corridor_length", "floor_height", 
          "photo_supply", "solar_water_heating_flag", "mechanical_ventilation", "address", 
          "local_authority_label", "constituency_label", "posttown", "construction_age_band", 
          "lodgement_datetime", "tenure", "fixed_lighting_outlets_count", 
          "low_energy_fixed_light_count", "uprn", "uprn_source", "guid", "ons_uprn_match", 
          "ons_apiversion", "ons_confidencescore","ons_dataversion", "ons_inputaddress", 
          "ons_matchedformattedaddress", "ons_underlyingscore")
)


In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
# Export dataframe to a Hive table
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

#pyspark/hive/delete_table
# Delete a HIVE table
spark.sql('DROP TABLE IF EXISTS housing_epc.epc_working01')


# Write out to a Hive table using saveAsTable
table_name = f"housing_epc.epc_working01"
epc_withgeog.write.saveAsTable(table_name, format="parquet")

# Reading Data from a Hive table using PySpark
epc_df = spark.sql("SELECT * FROM housing_epc.epc_joined")

In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
## Recodes for variables
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

epc_df = epc_df.withColumn(
    "epc_property_group", f.when((epc_df.property_type == "House" )
                              |(epc_df.property_type == "Park house" )
                              |(epc_df.property_type == "Bungalow" ), "House")
                         .when((epc_df.property_type == "Flat" )
                              |(epc_df.property_type == "Maisonette" ), "Flat")
                        .otherwise ("voa_x")
  )




epc_df = epc_df.withColumn(
    "epc_built_fm_grp", f.when((epc_df.built_form == "Semi-Detached" ), "semi-detached")
                         .when((epc_df.built_form == "Detached" ), "detached")
                         .when((epc_df.built_form == "End-Terrace" )
                              |(epc_df.built_form == "Mid-Terrace" )
                              |(epc_df.built_form == "Enclosed End-Terrace" )
                              |(epc_df.built_form == "Enclosed Mid-Terrace" ), "terrace")
                         .when((epc_df.built_form == "Not Recorded" ), "not recorded")
                        .otherwise ("voa_x")
  )




from pyspark.sql.functions import when
epc_df = epc_df.withColumn("epc_property_age", when((epc_df.construction_age_band <= "1899")|
                                               (epc_df.construction_age_band == "England and Wales: before 1900")
                                               , 'pre-1900')
                                               .when((epc_df.construction_age_band == "1900")|
                                               (epc_df.construction_age_band == "1902")|
                                               (epc_df.construction_age_band == "1910")|
                                               (epc_df.construction_age_band == "1920")|
                                               (epc_df.construction_age_band == "1929")|
                                               (epc_df.construction_age_band == "England and Wales: 1900-1929")
                                               , '1900-1929')
                                               .when((epc_df.construction_age_band == "1930")|
                                               (epc_df.construction_age_band == "1935")|
                                               (epc_df.construction_age_band == "1940")|
                                               (epc_df.construction_age_band == "1950")|
                                               (epc_df.construction_age_band == "1960")|
                                               (epc_df.construction_age_band == "1965")|
                                               (epc_df.construction_age_band == "1969")|
                                               (epc_df.construction_age_band == "1970")|
                                               (epc_df.construction_age_band == "1980")|
                                               (epc_df.construction_age_band == "England and Wales: 1930-1949")|
                                               (epc_df.construction_age_band == "England and Wales: 1950-1966")|
                                               (epc_df.construction_age_band == "England and Wales: 1967-1975")|
                                               (epc_df.construction_age_band == "England and Wales: 1976-1982")
                                               , '1930-1982')
                                               .when((epc_df.construction_age_band == "1983")|
                                               (epc_df.construction_age_band == "1985")|
                                               (epc_df.construction_age_band == "1988")|
                                               (epc_df.construction_age_band == "1992")|
                                               (epc_df.construction_age_band == "1995")|
                                               (epc_df.construction_age_band == "2000")|
                                               (epc_df.construction_age_band == "2002")|
                                               (epc_df.construction_age_band == "2004")|
                                               (epc_df.construction_age_band == "2005")|
                                               (epc_df.construction_age_band == "2006")|
                                               (epc_df.construction_age_band == "2007")|
                                               (epc_df.construction_age_band == "2008")|
                                               (epc_df.construction_age_band == "2009")|
                                               (epc_df.construction_age_band == "2010")|
                                               (epc_df.construction_age_band == "2011")|
                                               (epc_df.construction_age_band == "England and Wales: 1983-1990")|
                                               (epc_df.construction_age_band == "England and Wales: 1991-1995")|
                                               (epc_df.construction_age_band == "England and Wales: 1996-2002")|
                                               (epc_df.construction_age_band == "England and Wales: 2003-2006")|
                                               (epc_df.construction_age_band == "England and Wales: 2007 onwards")|
                                               (epc_df.construction_age_band == "England and Wales: 2007-2011")
                                               , '1983-2011')
                                               .when((epc_df.construction_age_band == "England and Wales: 2012 onwards")|
                                               (epc_df.construction_age_band == "2012")|
                                               (epc_df.construction_age_band == "2013")|
                                               (epc_df.construction_age_band == "2014")|
                                               (epc_df.construction_age_band == "2015")|
                                               (epc_df.construction_age_band == "2016")|
                                               (epc_df.construction_age_band == "2017")|
                                               (epc_df.construction_age_band == "2018")|
                                               (epc_df.construction_age_band == "2019")|
                                               (epc_df.construction_age_band == "2020")|
                                               (epc_df.construction_age_band == "2021")|
                                               (epc_df.construction_age_band == "2022")
                                               , '2012_onwards')

                                               )



In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
# Export dataframe to a Hive table
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

#pyspark/hive/delete_table
# Delete a HIVE table
spark.sql('DROP TABLE IF EXISTS housing_epc.epc_working02')



# Write out to a Hive table using saveAsTable
table_name = f"housing_epc.epc_working02"
epc_df.write.saveAsTable(table_name, format="parquet")

# Reading Data from a Hive table using PySpark
epc_df = spark.sql("SELECT * FROM housing_epc.epc_working02")

In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
# Remove records that have implausible values (that would affect overall EPC score)
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

epc_df = epc_df.filter((epc_df.current_energy_efficiency > 0)
                        |(epc_df.potential_energy_efficiency > 0) 
                        & (epc_df.lighting_cost_current > 0) & (epc_df.lighting_cost_potential > 0) &
                        (epc_df.heating_cost_current > 0) & (epc_df.heating_cost_potential > 0) &
                        (epc_df.hot_water_cost_current > 0) & (epc_df.building_reference_number > 0) & 
                        (epc_df.hot_water_cost_potential > 0)
                        |(epc_df.current_energy_efficiency < 125))



In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
# Newest EPC by inspection date
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

#Sort a spark dataframe by the values in a column in ascending order
epc_df = epc_df.orderBy(["ons_uprn", "inspection_date"], ascending=False)

epc_df = epc_df.drop_duplicates(['ons_uprn'])


In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
# Wall/Roof insulation crosstabs
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

#Filter IS IN List values
li=["Very Good","Average","Good","Poor","Very Poor"]
epc_2 = epc_df.filter(epc_df.walls_energy_eff.isin(li))
epc_3 = epc_df.filter(epc_df.roof_energy_eff.isin(li))
epc_4 = epc_df.filter(epc_df.walls_env_eff.isin(li))
epc_5 = epc_df.filter(epc_df.roof_env_eff.isin(li))


#Groups by wall energy efficiency by LA
la_wall_sdf = (epc_2.groupBy("lad21cd", "walls_energy_eff")
                    .agg(f.count("walls_energy_eff"))
                    .sort("lad21cd", ascending=True))

#Groups by roof energy efficiency by LA to a csv file
la_roof_sdf = (epc_3.groupBy("lad21cd", "roof_energy_eff")
                    .agg(f.count("roof_energy_eff"))
                    .sort("lad21cd", ascending=True))

#Groups by wall environment efficiency by LA
la_wall_env_sdf = (epc_.groupBy("lad21cd", "walls_env_eff")
                    .agg(f.count("walls_env_eff"))
                    .sort("lad21cd", ascending=True))

#Groups by roof environment efficiency by LA to a csv file
la_roof_env_sdf = (epc_.groupBy("lad21cd", "roof_env_eff")
                    .agg(f.count("roof_env_eff"))
                    .sort("lad21cd", ascending=True))


In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
# Export wall energy efficiency by LA to a csv file
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

# Convert the df you are exporting to Pandas
la_wall_sdf.toPandas()

# Constuct the file path for the CSV file
import os
username = os.getenv('HADOOP_USER_NAME') 

csv_file_path = f'/user/{username}/la_wall_ener_eff.csv'
print(csv_file_path)

# Coalesce all the data to one partition then write out, overwriting the previous files
la_wall_sdf.coalesce(1).write.mode("overwrite").csv(csv_file_path, header=True)


In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
# Export roof environmental efficiency by LA to a csv file
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

# Convert the df you are exporting to Pandas
la_roof_env_sdf.toPandas()

# Constuct the file path for the CSV file
import os
username = os.getenv('HADOOP_USER_NAME') 

csv_file_path = f'/user/{username}/la_roof_env_eff.csv'
print(csv_file_path)

# Coalesce all the data to one partition then write out, overwriting the previous files
la_roof_env_sdf.coalesce(1).write.mode("overwrite").csv(csv_file_path, header=True)

In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
# Export roof energy efficiency by LA to a csv file
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

# Convert the df you are exporting to Pandas
la_roof_sdf.toPandas()

# Constuct the file path for the CSV file
import os
username = os.getenv('HADOOP_USER_NAME') 

csv_file_path = f'/user/{username}/la_roof_ener_eff.csv'
print(csv_file_path)

# Coalesce all the data to one partition then write out, overwriting the previous files
la_roof_sdf.coalesce(1).write.mode("overwrite").csv(csv_file_path, header=True)

In [None]:
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------
# Export wall environmental efficiency by LA to a csv file
# ------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------

# Convert the df you are exporting to Pandas
la_wall_env_sdf.toPandas()

# Constuct the file path for the CSV file
import os
username = os.getenv('HADOOP_USER_NAME') 

csv_file_path = f'/user/{username}/la_wall_env_eff.csv'
print(csv_file_path)

# Coalesce all the data to one partition then write out, overwriting the previous files
la_wall_env_sdf.coalesce(1).write.mode("overwrite").csv(csv_file_path, header=True)

