Using the Polars Library in python for fast data operations

Import the necessary modules

In [1]:
import polars as pl # fast, optimised, memory efficient parallel data frame library
import os # file wrangling
import requests
import zipfile
import get_ca_data as get_ca

In [2]:
def get_postcode_file(url = "https://www.arcgis.com/sharing/rest/content/items/3770c5e8b0c24f1dbe6d2fc6b46a0b18/data",
                      destination_directory = "data\\postcode_lookup"):
    """
    Delete any existing files in the folder
    Download and unzip a CSV lookup file for the UK
    Return the path of the downloaded file
    """
    
    files = [f for f in os.listdir(destination_directory) if os.path.isfile(os.path.join(destination_directory, f))]

    if files:
        for file in files:
            file_path = os.path.join(destination_directory, file)
            os.remove(file_path)
    else:
        print("No files found in the directory.")

    # Download the file
    response = requests.get(url)
    if response.status_code != 200:
            raise Exception(f'API call failed {response.status_code}')
    with open("postcode_lookup.zip", "wb") as f:
        f.write(response.content)

    # Unzip the file
    with zipfile.ZipFile("postcode_lookup.zip", "r") as zip_ref:
        zip_ref.extractall(destination_directory)

    postcodes_files_list =  [os.path.join(destination_directory, file) for file in os.listdir(destination_directory)]
    if len(postcodes_files_list) == 1:
        postcodes_path = postcodes_files_list[0]
    else:
         print("More than one file present")
    return postcodes_path



In [3]:
def get_ca_la_codes(ca_la_df):
    """
    Return a list of the LA codes which comprise each Combined Authority
    """
    ca_la_codes = (ca_la_df
                .select(pl.col('LADCD'))
                .to_series()
                .to_list()
                )

In [6]:
def get_postcode_df(postcode_file, ca_la_codes):
    """
    Read the postcode file and filter the df 
    to return only those postcodes within Combined authorities
    """

    old = ['pcds', 'lsoa21cd', 'msoa21cd', 'ladcd', 'ladnm']
    new = ['pcds', 'lsoacd', 'msoacd', 'ladcd', 'ladnm']
    rename_dict = dict(zip(old, new))

    postcodes_q = (
        pl.scan_csv(postcode_file)
        .select(pl.col(old))
        .filter(pl.col('ladcd').is_in(ca_la_codes))
        .rename(rename_dict)
        )
    return postcodes_q.collect()


In [5]:
ca_la_df = get_ca.get_ca_la_df(year = 2023)
ca_la_codes = get_ca_la_codes(ca_la_df)
postcode_file = get_postcode_file(url = "https://www.arcgis.com/sharing/rest/content/items/3770c5e8b0c24f1dbe6d2fc6b46a0b18/data",
                      destination_directory = "data\\postcode_lookup")
postcodes_df = get_postcode_df(postcode_file, ca_la_codes)

# postcodes_path

In [7]:

postcodes_df.columns

['pcds', 'lsoacd', 'msoacd', 'ladcd', 'ladnm']

In [None]:
url = "https://services1.arcgis.com/ESMARspQHYMw9BZ9/arcgis/rest/services/LLSOA_Dec_2021_PWC_for_England_and_Wales_2022/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson"

In [4]:
url = "https://services1.arcgis.com/ESMARspQHYMw9BZ9/arcgis/rest/services/LLSOA_Dec_2021_PWC_for_England_and_Wales_2022/FeatureServer/0/query"
destination_directory = "data/geojson"

params = {
         'outFields': '*',
         'where': '1=1',
         'f': 'geojson'

    }
response = requests.get(url, params)

In [5]:
response.content

b'{"type":"FeatureCollection","properties":{"exceededTransferLimit":true},"features":[{"type":"Feature","id":1,"geometry":{"type":"Point","coordinates":[-0.0962402479184813,51.5195933020763]},"properties":{"FID":1,"LSOA21CD":"E01000001","GlobalID":"38ad8fe8-682a-4533-b51a-16c9ca366294"}},{"type":"Feature","id":2,"geometry":{"type":"Point","coordinates":[-4.7810063747152,50.4575699557455]},"properties":{"FID":2,"LSOA21CD":"E01018945","GlobalID":"0681c038-f7dd-4a2e-83c1-c8d3b32443f6"}},{"type":"Feature","id":3,"geometry":{"type":"Point","coordinates":[-0.140053563093234,51.6322147689281]},"properties":{"FID":3,"LSOA21CD":"E01000120","GlobalID":"e5ffc70b-0e24-40a9-afe8-13da1ec4032e"}},{"type":"Feature","id":4,"geometry":{"type":"Point","coordinates":[-0.775535025392899,50.8558263836113]},"properties":{"FID":4,"LSOA21CD":"E01031487","GlobalID":"17c7f13f-1a6a-45c5-a5cc-2c50e725c799"}},{"type":"Feature","id":5,"geometry":{"type":"Point","coordinates":[0.603292410965527,50.8731580871539]},"pr

In [None]:
with open("esri.geojson", "wb") as f:
    f.write(response.content)

In [6]:
def get_geojson(url = "https://services1.arcgis.com/ESMARspQHYMw9BZ9/arcgis/rest/services/LLSOA_Dec_2021_PWC_for_England_and_Wales_2022/FeatureServer/0/query",
                      destination_directory = "data\\geojson"):
    """
    Download geoJSON from ESRI ONS OG
    Return the path of the downloaded file
    """
    
    files = [f for f in os.listdir(destination_directory) if os.path.isfile(os.path.join(destination_directory, f))]

    if files:
        for file in files:
            file_path = os.path.join(destination_directory, file)
            os.remove(file_path)
    else:
        print("No files found in the directory.")

    params = {
         'outFields': '*',
         'where': '1=1',
         'f': 'geojson'

    }
    # Download the file
    response = requests.get(url, params)
    if response.status_code != 200:
            raise Exception(f'API call failed {response.status_code}')
    with open(os.path.join(destination_directory, "esri.geojson"), "wb") as f:
        f.write(response.content)

    files_list =  [os.path.join(destination_directory, file) for file in os.listdir(destination_directory)]
    if len(files_list) == 1:
        path = files_list[0]
    else:
         print("More than one file present")
    return path



In [7]:
get_geojson()

No files found in the directory.


'data/geojson\\esri.geojson'

In [8]:
root_dir = 'data/all-domestic-certificates'
# la_list = pl.read_csv('data/ca_la_tbl.csv')['LAD22CD'] # All the LA codes for LA's within Combined Authorities
la_list = get_ca.get_ca_la_df(2023)['LADCD']
f'There are {str(la_list.shape)[1:3]} Local Authorities in Combined Authorities'


'There are 53 Local Authorities in Combined Authorities'

Open the certificates csv in each folder where the la code is within a CA.

Use lazy evaluation to select columns,

change data types and group by UPRN, 

filter for the most recent certificate. 

Collect the data and append dataframe to the list.

Concatenate the list of dataframes

In [9]:
def ingest_certs(la_list, root_dir):
    all_dataframes = []
    for item in la_list:
        for folder_name in os.listdir(root_dir):
            # Check if the folder name matches an item in la_list
            if item in folder_name:
                file_path = os.path.join(root_dir, folder_name, "certificates.csv")
                # Check if certificates.csv actually exists inside the folder
                if os.path.exists(file_path):
                    # Optimised query which implements predicate pushdown for each file
                    # Polars optimises the query to make it fast and efficient
                    q = (
                    pl.scan_csv(file_path,
                    infer_schema_length=0) #all as strings
                        .select(pl.col(['LMK_KEY',
                        'POSTCODE',
                        'CURRENT_ENERGY_RATING',
                        'LOCAL_AUTHORITY',
                        'PROPERTY_TYPE',
                        'LODGEMENT_DATETIME',
                        'TRANSACTION_TYPE',
                        'ENVIRONMENT_IMPACT_CURRENT',
                        'CO2_EMISSIONS_CURRENT',
                        'TENURE',
                        'UPRN']))
                    .with_columns([pl.col('LODGEMENT_DATETIME').str.to_datetime(),
                    pl.col('ENVIRONMENT_IMPACT_CURRENT').cast(pl.Int64),
                    pl.col('CO2_EMISSIONS_CURRENT').cast(pl.Float32),
                    pl.col('UPRN').cast(pl.Int64)])
                    .sort(pl.col(['UPRN', 'LODGEMENT_DATETIME']))
                    .group_by('UPRN').last()
                    )
                    # The query is collected for each file
                    df = q.collect()
                    # the collected dataframe is appended to the list
                    all_dataframes.append(df)
    # Concatenate list of dataframes into one consolidated DF                
    cons_df = pl.concat(all_dataframes)                
    return cons_df

In [10]:
cons_df = ingest_certs(la_list, root_dir)

In [11]:
cons_df.columns

['UPRN',
 'LMK_KEY',
 'POSTCODE',
 'CURRENT_ENERGY_RATING',
 'LOCAL_AUTHORITY',
 'PROPERTY_TYPE',
 'LODGEMENT_DATETIME',
 'TRANSACTION_TYPE',
 'ENVIRONMENT_IMPACT_CURRENT',
 'CO2_EMISSIONS_CURRENT',
 'TENURE']

In [12]:
cons_df.glimpse()


Rows: 4660486
Columns: 11
$ UPRN                                <i64> None, 4210003725, 10001239702, 10001239704, 10001239709, 10001239711, 10001239712, 10001239714, 10001239715, 10001239716
$ LMK_KEY                             <str> '632d66abdfbd4f709d71851927a2d79fce2fd90e4d67fe999bc22b5a9970bcc2', '8126d78434eb72180e023c7d522d811b8a11a9d6f206aa19a867f06df5d4599c', '69223609222016052017263804808356', '1099459139642014100812510823040588', '876abfcefe535b31e3e3e46264de08d97acec07eb72415f2fa967061afc6315d', 'e9c4c64a24ad61bbf2c4de0d3bd97725b8dde5d9b21a12e2cd0888e34ac6af15', '1019963559062013100615514564448457', '1485207013512016100421133096069648', '1389662563632016072714480291278400', 'aade8e478efe85ef7a8b090f1a4eedd4941217246ccb9019e35ef1d944af8505'
$ POSTCODE                            <str> 'BL5 3WB', 'BL3 1PS', 'BL1 5WA', 'BL3 5NP', 'BL3 5NP', 'BL3 5NP', 'BL3 5NP', 'BL3 5NP', 'BL3 5NP', 'BL3 5NP'
$ CURRENT_ENERGY_RATING               <str> 'B', 'C', 'C', 'D', 'C', 'C', 'C', 'D', '

In [13]:
cons_df.estimated_size() / 1000000000

0.829428976

In [14]:
cons_df.shape

(4660486, 11)

In [32]:
cons_df.write_csv('data/epc_subset_polars_last.csv')

In [7]:
postcodes_ca = ((cons_df
                .select(pl.col('POSTCODE'))
                .unique()
                )
                .to_series()
                .to_list()
                )

In [26]:
q_pc_lsoa = (pl.scan_csv('data/PCD_OA_LSOA_MSOA_LAD_FEB19_UK_LU.csv', infer_schema_length=0)
            .select([pl.col(['lsoa11cd', 'ladcd']),
                     pl.col('pcds').alias('POSTCODE')])
            .filter(pl.col('POSTCODE').is_in(postcodes_ca))         
                     )

In [27]:
pc_lsoa_df = q_pc_lsoa.collect()

In [28]:
pc_lsoa_df.shape

(327000, 3)

In [30]:
ca_lsoa = (
    pc_lsoa_df
    .select('lsoa11cd')
    .to_series()
    .to_list()
)

In [51]:
q_imd_lsoa = (pl.scan_csv('data/Index_of_Multiple_Deprivation_(Dec_2019)_Lookup_in_England.csv', infer_schema_length=0)
              .select([pl.col(['LSOA11CD', 'LAD19CD']),
                       pl.col('IMD19')
                        .cast(pl.Int64)
              ])
              .filter(pl.col('LSOA11CD').is_in(ca_lsoa)))

In [35]:
imd_lsoa_df = q_imd_lsoa.collect()

In [52]:
ca_pc_lsoa_imd_df = (
    cons_df.join(pc_lsoa_df, on = 'POSTCODE', how = 'inner')
    .join(imd_lsoa_df, left_on = 'lsoa11cd', right_on = 'LSOA11CD', how = 'inner')
)

In [49]:

ca_pc_lsoa_imd_df.columns = [item.lower() for item in ca_pc_lsoa_imd_df.columns]

In [50]:
ca_pc_lsoa_imd_df.glimpse()

Rows: 4537641
Columns: 14
$ uprn                                <i64> None, 4210003725, 10001239702, 10001239704, 10001239709, 10001239711, 10001239712, 10001239714, 10001239715, 10001239716
$ lmk_key                             <str> 632d66abdfbd4f709d71851927a2d79fce2fd90e4d67fe999bc22b5a9970bcc2, 8126d78434eb72180e023c7d522d811b8a11a9d6f206aa19a867f06df5d4599c, 69223609222016052017263804808356, 1099459139642014100812510823040588, 876abfcefe535b31e3e3e46264de08d97acec07eb72415f2fa967061afc6315d, e9c4c64a24ad61bbf2c4de0d3bd97725b8dde5d9b21a12e2cd0888e34ac6af15, 1019963559062013100615514564448457, 1485207013512016100421133096069648, 1389662563632016072714480291278400, aade8e478efe85ef7a8b090f1a4eedd4941217246ccb9019e35ef1d944af8505
$ postcode                            <str> BL5 3WB, BL3 1PS, BL1 5WA, BL3 5NP, BL3 5NP, BL3 5NP, BL3 5NP, BL3 5NP, BL3 5NP, BL3 5NP
$ current_energy_rating               <str> B, C, C, D, C, C, C, D, E, D
$ local_authority                     <str> E0800000