Goal is to test out the best ways to manage loading data into a small database (using SQLite3) and handling duplicate records.

In [1]:
import sqlite3
import pandas as pd

In [2]:
db = sqlite3.connect('test.db')

In [3]:
# Create a cursor object
cur = db.cursor()

# 1. Use of Primary Keys

The intention of this section is to do some basic experimenting with primary keys and demonstrate their use before implementing any changes in my DB.

When a primary key isn't provided, it automatically adds the next sequential value. 

In [5]:
# Create a simple table

cur.execute("CREATE TABLE airports (airportid INTEGER PRIMARY KEY, name text, city text, country text)")

# Create some sample data...
test_vals = [('LHR', 'London', 'UK'),('LGW', 'London', 'UK')]

# Load into database
for val in test_vals:
    cur.execute("INSERT INTO airports (name, city, country) VALUES "+ str(val))

In [None]:
cur.execute("SELECT * from airports")
output = cur.fetchall()
output

In [4]:
# Delete table to start again

cur.execute("DROP TABLE airports")

<sqlite3.Cursor at 0x11ac6c570>

## Primary Key Based On Two Columns

Suppose we want to create a primary key based on the unqiue values of two columns 

### Without primary keys

In [7]:
# Create a simple table

cur.execute("CREATE TABLE cars (make text, model text, inproduction boolean, year text)")

# Add in a few different values with duplicate make and model values

test_vals = [('Honda', 'Civic', True, '2022'),('Honda', 'CRX', False, '2022'),('Honda', 'Civic', True, '2021'),('Nissan', 'Altima', True, '2022')]

# Load into database
for val in test_vals:
    cur.execute("INSERT INTO cars (make, model, inproduction, year) VALUES "+ str(val))

In [14]:
cur.execute("SELECT * from cars")
output = cur.fetchall()
output

[('Honda', 'Civic', 1, '2022'),
 ('Honda', 'CRX', 0, '2022'),
 ('Honda', 'Civic', 1, '2021'),
 ('Nissan', 'Altima', 1, '2022'),
 ('Honda', 'CRX', 0, '2022')]

In [15]:
cur.execute("INSERT OR REPLACE INTO cars VALUES('Honda', 'Civic', 0,'2022');")

<sqlite3.Cursor at 0x11ac6c570>

In [16]:
cur.execute("SELECT * from cars")
output = cur.fetchall()
output

[('Honda', 'Civic', 1, '2022'),
 ('Honda', 'CRX', 0, '2022'),
 ('Honda', 'Civic', 1, '2021'),
 ('Nissan', 'Altima', 1, '2022'),
 ('Honda', 'CRX', 0, '2022'),
 ('Honda', 'Civic', 0, '2022')]

Without a primary key, the value is duplicated rather than updated

In [17]:
cur.execute("DROP TABLE cars")

<sqlite3.Cursor at 0x11ac6c570>

### With primary keys

In [18]:
# Create a simple table

cur.execute("CREATE TABLE cars (make text, model text, inproduction boolean, year text, PRIMARY KEY (make, model, year))")

# Add in a few different values with duplicate make and model values

test_vals = [('Honda', 'Civic', True, '2022'),('Honda', 'CRX', False, '2022'),('Honda', 'Civic', True, '2021'),('Nissan', 'Altima', True, '2022')]

# Load into database
for val in test_vals:
    cur.execute("INSERT INTO cars (make, model, inproduction, year) VALUES "+ str(val))

In [19]:
cur.execute("SELECT * from cars")
output = cur.fetchall()
output

[('Honda', 'Civic', 1, '2022'),
 ('Honda', 'CRX', 0, '2022'),
 ('Honda', 'Civic', 1, '2021'),
 ('Nissan', 'Altima', 1, '2022')]

In [20]:
cur.execute("INSERT OR REPLACE INTO cars VALUES('Honda', 'Civic', 0,'2022');")

<sqlite3.Cursor at 0x11ac6c570>

In [21]:
cur.execute("SELECT * from cars")
output = cur.fetchall()
output

[('Honda', 'CRX', 0, '2022'),
 ('Honda', 'Civic', 1, '2021'),
 ('Nissan', 'Altima', 1, '2022'),
 ('Honda', 'Civic', 0, '2022')]

With the primary key, we're able to simply update the records

# EPC Data Analysis

Knowing the importance of primary keys, I will now examine how data is stored in the Chester Data Analytics SQLite3 database I have created. This database is based off of API requests for EPC data in the Chester area and I'm looking to optimise the ingestion of this data.

In [None]:
# 
cda = sqlite3.connect('./Chester-Data-Analytics/cda.db')

cda_cur = cda.cursor()

In [None]:
# How many records are in the EPC table?

cda_cur.execute("select count(uprn) from epc")

cda_cur.fetchall()

In [None]:
# How many records have a uprn?

cda_cur.execute("select count(uprn) as uprn_count from epc where length(uprn)>1")

cda_cur.fetchall()

In [None]:
10453/11522

Around 91% of properties with an EPC certificate include a UPRN. In an ideal world, I'd like to use the UPRN wherever possible; However, with almost 10% of the data lacking one, it makes sense to identify other unqiue values to use together to make a primary key. But which ones?

In [None]:
# How many records are there with distinct a postcode and address combination?

cda_cur.execute("select count(postcode) from (select distinct postcode,address from epc)")

In [None]:
cda_cur.fetchall()

In [None]:
# What about the address1 field? Are there duplicate records with an address1 and postcode combination?

cda_cur.execute("select count(*) as appearances, address1, postcode from epc group by address1, postcode order by appearances desc limit 10")

In [None]:
cda_cur.fetchall()

In [None]:
cda_cur.execute("select address from epc where address1 = 'Apartment 1' and postcode = 'CH1 1RD'")

In [None]:
cda_cur.fetchall()

These are all unique properties based on the second line of the address field, so the address1 field should not be used in the primary key. Otherwise, all the EPC records appear to have no duplicate values based on an address/postcode field combination which makes these the best fields for a primary key.

In [None]:
# Import time module for further testing
import time
import json
import pandas
import base64
import requests
import datetime

In [None]:
path = './Chester-Data-Analytics/API Key.json'

url = 'https://epc.opendatacommunities.org/api/v1/domestic/search'  # URL endpoint for the EPC API


def get_key(path):
    """
    This is to retrieve a stored key and username combination to access the EPC API.
    These values are stored in a JSON file in a local directory and hidden by the .gitignore to avoid
    making these public. The key is then encoded as per the EPC documentation.
    :param path: Relative location of JSON file with epc username and key
    :return: key (Encoded)
    """
    with open(path) as f:
        api_key = json.loads(f.read())
    key = api_key['epc_key']
    username = api_key['epc_username']
    # Need to encode the username and key then strip off the extra bits
    encoded_api_key = str(base64.b64encode(bytes(username + ':' + key, 'utf-8')))[1:].replace('\'', "")
    return encoded_api_key

key = get_key(path)

In [None]:
postcode = 'CH2'
results = energy_bands = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
postcode_area = postcode.split(" ")[0]
results = []
for band in energy_bands:
    results_not_reached = True
    total_count = 0
    while results_not_reached:
        response = requests.get(url, params={'postcode': postcode, 'size': 5000, 'from': total_count,
                                                 'energy-band': band},
                                    headers={'Authorization': 'Basic %s' % key, "Accept": 'application/json'})
        if response.status_code == 200:
            results += response.json()['rows']
            count = len(response.json()['rows'])
        else:
            print(response.status_code) 
        if count < 5000:  # The idea here is that if the response gets to the last section, we end the loop
            results_not_reached = False
        total_count += count
results = pd.DataFrame(data=results, columns=response.json()['column-names'])

In [None]:
def get_postcode_epc_data(key, postcode, results):
    """" Returns a pandas dataframe containing all results for the given postcode.
    This can be written to a CSV file for later processing.
    Maximum size of GET response is 5000, therefore, this function breaks the query into multiple GET requests for
    each energy band
    :param key: Encoded secret key for the OS API
    :param postcode: Any UK Postcode or the first part (district portion)
    :return: Pandas Dataframe of EPC results
    """
    # Can only paginate through the first 10,000 results of any query. Need to use energy bands (or other parameter)
    # to break up the query.
    # Need to write these results to the database
    curr_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    # Write to DB
    con = sqlite3.connect('./Chester-Data-Analytics/cda.db')
    cur = con.cursor()
    # Iterate through the results dataframe, adding each value
    for index, row in results.iterrows():
        cur.execute("INSERT INTO epc VALUES(?,?,?,?,?,?,?,?)", (row.address, row.address1, row.uprn,row['postcode'],
                                                                row['current-energy-rating'], row['total-floor-area'],
                                                                row['lodgement-datetime'], curr_time))
    # Is this redundant based on below code?
    cur.execute("""DELETE FROM epc where (address, lodgement_datetime) NOT IN (SELECT 
      epc.address, epc.lodgement_datetime
    FROM
      (SELECT
         address, MAX(lodgement_datetime) AS most_recent_epc
       FROM
         epc
       GROUP BY
         address) AS latest_record
    INNER JOIN
      epc
    ON
      epc.address = latest_record.address AND
      epc.lodgement_datetime = latest_record.most_recent_epc)""")
    # cur.execute("DROP TABLE epc")
    # Drop duplicate records based on the most recent query_date
    cur.execute("""DELETE FROM epc WHERE query_date < (SELECT max(query_date) FROM epc) AND address IN
                (SELECT address FROM epc GROUP BY address HAVING COUNT(*) >1)""")
#     # Log the data in the data_log table
    # Create the postcode district value
    postcode_district = postcode.split(" ")[0]
    cur.execute("INSERT INTO data_log VALUES(?,?,?)",(postcode_district, 'epc', curr_time))
    con.commit()
    con.close()
    return

In [None]:
# This is using an old method than imports all values, then deletes duplicates
# Quickly run the test for getting the EPC data from a given postcode then record the time

start = time.perf_counter()
get_postcode_epc_data(key, 'CH2', results)
end = time.perf_counter()

original_time = end-start


In [None]:
#cda_cur.execute("DROP TABLE epckeys")

In [None]:
# Create a second EPC table with a primary key

cda_cur.execute('''CREATE TABLE IF NOT EXISTS epckeys 
                    (address text, address1 text ,uprn text,postcode text, current_energy_rating text, 
                    total_floor_area real, lodgement_datetime text, query_date text, PRIMARY KEY (address, postcode))''')

In [None]:
# Re-write the function so that it 

def get_postcode_epc_data_improved(key, postcode, results):
    
    # Need to write these results to the database
    curr_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    # Write to DB
    con = sqlite3.connect('./Chester-Data-Analytics/cda.db')
    cur = con.cursor()
    # Iterate through the results dataframe, adding each value
    for index, row in results.iterrows():
        cur.execute("INSERT OR REPLACE INTO epckeys VALUES(?,?,?,?,?,?,?,?)", (row.address, row.address1, row.uprn,row['postcode'],
                                                                row['current-energy-rating'], row['total-floor-area'],
                                                                row['lodgement-datetime'], curr_time))
    # Delete duplicates -- Commented out for demonstration
#     cur.execute("""DELETE FROM epckeys where (address, lodgement_datetime) NOT IN (SELECT 
#       epckeys.address, epckeys.lodgement_datetime
#     FROM
#       (SELECT
#          address, MAX(lodgement_datetime) AS most_recent_epc
#        FROM
#          epckeys
#        GROUP BY
#          address) AS latest_record
#     INNER JOIN
#       epckeys
#     ON
#       epckeys.address = latest_record.address AND
#       epckeys.lodgement_datetime = latest_record.most_recent_epc)""")
    con.commit()
    con.close()
    return

In [None]:
# Check performance of the improved method

start = time.perf_counter()
get_postcode_epc_data_improved(key, 'CH2', results)
end = time.perf_counter()

updated_time = end-start

print(f"Original time: {original_time} seconds")

print(f"Updated time: {updated_time} seconds")

print(f"The improved method is {round(((original_time-updated_time)/updated_time)*100,2)}% faster than the original method")


Using the primary key appears to make a very impressive reduction in time.

However, we still need to remove duplicate records and find all differences between the two methods.

In [None]:
# Check to verify that the same amount of values are in each table

cda_cur.execute("Select count(postcode) from epc")

print(cda_cur.fetchall())

cda_cur.execute("Select count(postcode) from epckeys")

print(cda_cur.fetchall())



In [None]:
cda_cur.execute("Select count(*) as num, address from epc group by address order by num desc limit 10")

print(cda_cur.fetchall())

In [None]:
cda_cur.execute("Select count(*) as num, address from epckeys group by address order by num desc, address desc limit 10")

print(cda_cur.fetchall())

In [None]:
cda_cur.execute("select * from epckeys where address = 'Chapel View, Church Road, Saughall'")

print(cda_cur.fetchall())

In [None]:
# Without the drop duplicates part of the improved function, there are two properties with duplicate EPC records

In [None]:
# This finds all instances where the lodgement date isn't the most recent. Can be deleted by replacing 'SELECT *' with DELETE.

cda_cur.execute("""SELECT * FROM epckeys where (address, lodgement_datetime) NOT IN (SELECT 
      epckeys.address, epckeys.lodgement_datetime
    FROM
      (SELECT
         address, MAX(lodgement_datetime) AS most_recent_epc
       FROM
         epckeys
       GROUP BY
         address) AS latest_record
    INNER JOIN
      epckeys
    ON
      epckeys.address = latest_record.address AND
      epckeys.lodgement_datetime = latest_record.most_recent_epc)""")

to_be_deleted = cda_cur.fetchall()

In [None]:
to_be_deleted

Now to identify all the differences between the query results.

In [None]:
cda_cur.execute("""SELECT * FROM epc LEFT JOIN epckeys on epckeys.address = epc.address WHERE epckeys.address IS NULL order by uprn desc""")

differences = cda_cur.fetchall()

In [None]:
differences

In [None]:
cda_cur.execute("SELECT * from epc where address = '4 Park Cottages, Kinseys Lane, Ince'")

cda_cur.fetchall()

In [None]:
results[results['address']=='4 Park Cottages, Kinseys Lane, Ince']

In [8]:
cur.execute("select * from cars")

<sqlite3.Cursor at 0x11ac6c570>

In [9]:
cur.fetchall()

[('Honda', 'Civic', 1, '2022'),
 ('Honda', 'CRX', 0, '2022'),
 ('Honda', 'Civic', 1, '2021'),
 ('Nissan', 'Altima', 1, '2022')]

In [22]:
# Test out best use of insert or update

cur.execute("""insert or replace into cars (make, model, inproduction, year) 
values ( 'Honda', 'CRX', coalesce((select inproduction from cars where make = 'Honda' and model = 'CRX' and year ='2022'),1), '2022'
)""")

<sqlite3.Cursor at 0x11ac6c570>

In [23]:
cur.execute("select * from cars")

cur.fetchall()

[('Honda', 'Civic', 1, '2021'),
 ('Nissan', 'Altima', 1, '2022'),
 ('Honda', 'Civic', 0, '2022'),
 ('Honda', 'CRX', 0, '2022')]

In [24]:
cur.close()