This stage merges the raw longlist with the candidate data from the reference sources to create the new list.

It round-trips the data from the existing landscape list to ensure that any manual overrides are included.

In [1]:
import petl as etl
from pipeline_utils.reference.geo import la_code_lookup
from pipeline_utils.reference.onspd import normalise_postcode, postcode_lookup
from pipeline_utils.filesystem.paths import RAW_DATA, DATA
import pipeline_utils.db as database
from config import WORKING

Load the spelling corrections we have inferred from the matching stage.

In [2]:
corrections = etl.fromcsv(WORKING / '2-company-corrections.csv').lookupone('organisation', 'match')

Load the untagged longlist from the raw directory and perform the following operations:

1. Convert numeric data to numbers
2. Correct the spellings of the organisational data
3. Augment with local authority data

In [3]:
raw = etl.fromcsv(
    WORKING / 'funded-organisations.csv'
).convertnumbers(
).convert(
    'organisation', lambda x: corrections.get(x, x)
).convert(
    'Local authority', la_code_lookup
).unpackdict(
    'Local authority'
).rename({
    'LAD24CD': 'funding_geo_code',
    'LAD24NM': 'funding_geo_name',
}).cache()

raw

organisation,Source,Number,funding_geo_code,funding_geo_name
509ArtsLtd,Project Grant,4,E08000032,Bradford
Alan Clay,Project Grant,1,E08000032,Bradford
Anthony Middleton,Project Grant,1,E08000032,Bradford
As Sirens Fall,Project Grant,1,E08000032,Bradford
Ashley Manning,Project Grant,1,E08000032,Bradford


Get the list of sources in the longlist. We'll use this to update the values in the columns later on.

In [4]:
sources = tuple(raw.cut('Source').distinct().values('Source'))
sources

('NPO', 'Project Grant')

Recast the longlist to convert the Source column into a column per entry, and convert any non-None values into True

In [5]:
wide_table = raw.recast(
    variablefield="Source",
    valuefield="Number"
).convert(
    sources,
    lambda x: True if x is not None else None
)
wide_table

organisation,funding_geo_code,funding_geo_name,NPO,Project Grant
509ArtsLtd,E08000032,Bradford,,True
Alan Clay,E08000032,Bradford,,True
Anthony Middleton,E08000032,Bradford,,True
As Sirens Fall,E08000032,Bradford,,True
Ashley Manning,E08000032,Bradford,,True


At this point we will also add in new data from the result of stage 2.
1. location Manually set locations
2. companies Company data from Companies house(direct and fuzzy matched)

In [6]:
location = etl.fromcsv(RAW_DATA / 'landscape-locations.csv').lookupone('organisation', ['latitude', 'longitude'])

FileNotFoundError: [Errno 2] No such file or directory: '/Users/lukestrange/Code/bradford-2025/raw/landscape-locations.csv'

Direct and fuzzy data loaded from database

In [None]:
db = database.connect(read_only=True)
db.query(f'''
         CREATE OR REPLACE TEMP TABLE tCompanies AS
              SELECT match as organisation, CompanyNumber as company_number, type, score
                     FROM read_csv('{ WORKING / '2-company-match-fuzzy.csv' }')
              UNION ALL
              SELECT organisation, charity_company_registration_number AS company_number, 'charity' AS type, 100 AS score
                     FROM read_csv('{ WORKING / '2-charity-match-direct.csv' }')
              UNION ALL
              SELECT *, 'direct' AS type, 100 as score
                     FROM read_csv('{ WORKING / '2-company-match-direct.csv' }');
         CREATE OR REPLACE TEMP TABLE tCharities AS
              SELECT *
                     FROM read_csv('{ WORKING / '2-charity-match-direct.csv' }');
         CREATE OR REPLACE TEMP TABLE tSicCodes AS
              SELECT *
              FROM read_csv('{ WORKING / '2-sic-codes.csv' }');
         CREATE OR REPLACE TEMP TABLE tPostcodes AS
              SELECT pcds AS postcode, lat, long
              FROM read_csv('{ DATA / 'reference/onspd_extract.csv' }')
              WHERE oslaua == 'E08000021';
         CREATE OR REPLACE TEMP TABLE tCompanyExtract AS
              SELECT
                    CompanyName as registered_name,
                    CompanyNumber as company_number,
                    "URI" as uri,
                    "RegAddress.PostTown" as post_town,
                    "RegAddress.PostCode" as postcode,
                    CompanyCategory as company_category,
                    CompanyStatus as company_status,
                    [x for x in [
                            "SICCode.SicText_1",
                            "SICCode.SicText_2",
                            "SICCode.SicText_3",
                            "SICCode.SicText_4"
                    ] if x is not NULL] as sic_code,
                    IncorporationDate as incorporation_date,
                    DissolutionDate as dissolution_date,
                    "Accounts.AccountCategory" as accounts_category,
                    lat AS latitude, long as longitude,
              FROM CompanyData c
              JOIN tPostcodes p
              ON c."RegAddress.PostCode" == p.postcode;
              ;
         ''')

BinderException: Binder Error: Referenced column "organisation" not found in FROM clause!
Candidate bindings: "column0"

LINE 6:               SELECT organisation, charity_company_registration_number AS compan...
                             ^

In [None]:
db.query('''
    SELECT * from tCompanies;
         ''')

In [None]:
companies = etl.fromdataframe(
    db.query('''
             SELECT m.organisation as organisation,
                c.CompanyName as registered_name,
                c.CompanyNumber as company_number,
                m.type,
                m.score,
                "URI" as uri,
                "RegAddress.PostTown" as post_town,
                "RegAddress.PostCode" as postcode,
                CompanyCategory as company_category,
                CompanyStatus as company_status,
                [x for x in [
                        "SICCode.SicText_1",
                        "SICCode.SicText_2",
                        "SICCode.SicText_3",
                        "SICCode.SicText_4"
                ] if x is not NULL] as sic_code,
                IncorporationDate as incorporation_date,
                DissolutionDate as dissolution_date,
                "Accounts.AccountCategory" as accounts_category
             FROM tCompanies m
             JOIN CompanyData c
             ON m.company_number == c.CompanyNumber;
             ''').df())

In [None]:
company_by_sic = etl.fromdataframe(
    db.query(f'''
             SELECT DISTINCT c.*
             FROM (
               SELECT e.*
               FROM tCompanyExtract e
               LEFT JOIN (SELECT company_number FROM tCompanies) r
               ON e.company_number == r.company_number
               WHERE r.company_number IS NULL
             ) c
             JOIN tSicCodes s
             ON list_contains(c.sic_code, s.sic_code)
             ORDER BY c.company_number;
             ''').df()
)

In [None]:
charities = etl.fromdataframe(
    db.query('''
             SELECT
             l.organisation,
             c.charity_name,
             c.registered_charity_number,
             c.charity_company_registration_number,
             charity_contact_postcode,
             charity_contact_web,
             latest_income AS charity_latest_income,
             latest_expenditure AS charity_latest_expenditure
             FROM Charities c
             JOIN tCharities l
             ON c.registered_charity_number == l.registered_charity_number
             ''').df()
)

In [None]:
db.close()

In [None]:
company_data = companies.dictlookupone('organisation')
charity_data = charities.dictlookupone('organisation')

Create new landscape table

In [None]:
landscape_matched = (
    wide_table
    .addfield('location', lambda r: location.get(r.organisation, ()))
    .unpack('location', newfields=['latitude', 'longitude'])
    .addfield('company_data', lambda r: company_data.get(r.organisation, {}))
    .unpackdict('company_data', keys=[
        'company_category',
        'accounts_category',
        'company_number',
        'company_status',
        'dissolution_date',
        'incorporation_date',
        'post_town',
        'postcode',
        'sic_code',
        'uri',
        'type', 'score'
    ])
    .convert('postcode', normalise_postcode)
    .addfield('charity_data', lambda r: charity_data.get(r.organisation, {}))
    .unpackdict('charity_data', keys=[
        'charity_name',
        'registered_charity_number',
        'charity_company_registration_number',
        'charity_contact_postcode',
        'charity_contact_web',
        'charity_latest_expenditure',
        'charity_latest_income',
    ])
)

landscape = (
    etl
    .cat(landscape_matched, company_by_sic)
    .convert('postcode', lambda x: postcode_lookup.get(x, { 'pcds': x }))
    .unpackdict('postcode', keys=['pcds', 'lat', 'long', 'oslaua'])
    .convert('latitude', lambda x, r: r['lat'], pass_row=True, where=lambda r: r['latitude'] == None and r['lat'] != None)
    .convert('longitude', lambda x, r: r['long'], pass_row=True, where=lambda r: r['longitude'] == None and r['long'] != None)
    .cutout('lat', 'long')
    .convert('organisation', lambda x, r: x or r.registered_name, pass_row=True)
    .cutout('registered_name')
    .convert('sic_code', list)
    .sort('organisation')
)

In [None]:
landscape.selectnotnone('company_number').duplicates('company_number')

In [None]:
landscape.selectnotnone('registered_charity_number').select(lambda r: r.company_number != r.charity_company_registration_number)

In [None]:
landscape.tocsv(DATA / 'culture_landscape.csv')

In [None]:
landscape.cut('sic_code').selectnotnone('sic_code')