## Datasets Pipeline v0.9
This code is designed to load, aggregate, map, combine and process equivalent datasets from across previous censuses (those presently available via the Nomis API). This includes:

1. Downloading datasets from Nomis (and concatenating these from multiple requests where necessary).
2. Aggregating data from Output Areas to higher geograpies, if required.
3. Normalising these datasets by mapping variables to common categories (eg. 10 year age bands).
4. Creating derived "proxy" datasets for indicators like population density (= population / area).
5. Combining datasets from across multiple censuses, and calculating change-over-time, % breakdowns and area rankings.
6. Generating individual JSON files for each area with 

The output is flat data files suitable for a variety of data visualisation and robo-journalism applications, with a particular focus on showing change-over-time between censuses, and ranking different geographic areas based on a variety of indicators.

### Caveats
Note that the data generated by this pipeline shouldn't be used in production until data accuracy and comparability issues have been overcome. Some important caveats of the present (v2.2) code include:

* Values for median age not accurate. They are just approximations based on 10 year age bands.
* Not all of the variables here are truly comparable across censuses (eg. general health questions changed from 2001 to 2011).

### Feature backlog

* Add years to geography codes in filenames, to account for changes.
* Add automatic aggregation for regions, e&w?
* Add dummy geographies representing 10% quantiles (deciles).
* Add JSON formats for quantiles + aggregates.
* Come up with a function for accurately calculating median age from single-year age data.
* Re-factor code to deal with changing geographies that can't be best-fit aggregated (eg. Output Areas, where some are merged, split, or otherwise modified). The aim would be to show change over time for the codes that remain the same, and to show only the most recent census data for the ones are new (and probably to ignore the ones that are obsolete).

In [1]:
# IMPORTS
import math
import pandas as pd
import numpy as np
import requests
import os
import simplejson as json
from io import StringIO
import pyarrow as pa # Install pyarrow with PIP not Conda

## Running this code
**Notes on config:** The idea is to have a pipeline that can load multiple datasets from multiple APIs, and at multiple geographic levels. The present config structure allows for multiple datasets, but only from a single API (Nomis). Also, at the moment some of the geographic lookups (for names, areas in hectares and parent geographies) are hard-coded in CSV files.

In [11]:
# CONFIG FOR LOADING AND PROCESSING DATA
SETTINGS = {
    'geocodes': {
        'source': {
            '2001': {
                'geography': 'lad', # Output Areas (code needs to match lookup code - see LOOKUPS config)
                'code': 'TYPE464', # 2001 Nomis type code
                'year': '2015', # Reference year for geography (needs to match lookup year - see LOOKUPS config)
                'count': 348 # Number of geographies of this type (required for paginating API requests to Nomis)
            },
            '2011': {
                'geography': 'lad',
                'code': 'TYPE464',
                'year': '2015',
                'count': 348
            }
        },
        'output': {
            'geography': 'lad', # Districts (code needs to match lookup, generated above)
            'year': '2021' # This is the year associated with the geographic boundaries, not the data
        }
    },
    'cols': ['geocode','year','code','value'],
    'apiurl': 'http://www.nomisweb.co.uk/api/v01/dataset/{dataset}.data.csv?date=latest&geography={geocode}&{cell}={cells}&measures=20100&select={selection}&uid={key}&recordlimit={limit}&recordoffset={offset}{query_misc}',
    'apikey': '0x3cfb19ead752b37bb90da0eb3a0fe78baa9fa055',
    'filepath': '{path}/{code}_{geography}_{suffix}.parquet',
    'jsonpath': '{path}/{code}{suffix}.json',
    'flatpath': '{path}/{geography}_{filename}.csv',
    'parent': 'lookup/{child}{cyear}_{parent}{pyear}.csv', # Child > parent lookup for data aggregations
    'lookup': 'lookup/code_lookup.csv' # Lookup file for names, parents, areas, bounds
}

DATASETS = [
    {
        'code': 'population',
        'name': 'Population, male/female',
        'sources': [
            {
                'year': '2001',
                'dataset': 'NM_1634_1', # This is the table code for the Nomis API
                'table': 'KS001', # This is the official ONS table code (not actually used in this app)
                'geocode': SETTINGS['geocodes']['source']['2001']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2001']['count'],
                'cell': 'cell', # This is the cell that contains the observation code on Nomis
                'cells': [1,2], # These are the selected observation codes
                'cellnames': ['male','female'], # These are human readable names for the above observation codes
                'selection': ['geography_code','date_name','cell','obs_value'] # This is the selection of columns to load from Nomis
            },
            {
                'year': '2011',
                'dataset': 'NM_144_1',
                'table': 'KS101EW',
                'geocode': SETTINGS['geocodes']['source']['2011']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2011']['count'],
                'cell': 'cell',
                'cells': [1,2],
                'cellnames': ['male','female'],
                'selection': ['geography_code','date_name','cell','obs_value'],
                'query_misc': '&rural_urban=0'
            }
        ]
    },
    {
        'code': 'age',
        'name': 'Age, 1 year',
        'exclude': True, # Exclude this dataset from ALL_DATASETS
        'sources': [
            {
                'year': '2001',
                'dataset': 'NM_1637_1',
                'table': 'UV004',
                'geocode': SETTINGS['geocodes']['source']['2001']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2001']['count'],
                'cell': 'c_age',
                'cells': list(range(1,82)),
                'cellnames': [
                    '0','1','2','3','4','5','6','7','8','9',
                    '10','11','12','13','14','15','16','17','18','19',
                    '20','21','22','23','24','25','26','27','28','29',
                    '30','31','32','33','34','35','36','37','38','39',
                    '40','41','42','43','44','45','46','47','48','49',
                    '50','51','52','53','54','55','56','57','58','59',
                    '60','61','62','63','64','65','66','67','68','69',
                    '70','71','72','73','74',
                    '75-79','80-84','85-89','90-94','95-99','100plus'
                ],
                'selection': ['geography_code','date_name','c_age','obs_value']
            },
            {
                'year': '2011',
                'dataset': 'NM_503_1',
                'table': 'QS103EW',
                'geocode': SETTINGS['geocodes']['source']['2011']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2011']['count'],
                'cell': 'c_age',
                'cells': list(range(1,102)),
                'cellnames': [
                    '0','1','2','3','4','5','6','7','8','9',
                    '10','11','12','13','14','15','16','17','18','19',
                    '20','21','22','23','24','25','26','27','28','29',
                    '30','31','32','33','34','35','36','37','38','39',
                    '40','41','42','43','44','45','46','47','48','49',
                    '50','51','52','53','54','55','56','57','58','59',
                    '60','61','62','63','64','65','66','67','68','69',
                    '70','71','72','73','74','75','76','77','78','79',
                    '80','81','82','83','84','85','86','87','88','89',
                    '90','91','92','93','94','95','96','97','98','99',
                    '100plus'
                ],
                'cellmaps': {
                    '75-79':['75','76','77','78','79'],
                    '80-84':['80','81','82','83','84'],
                    '85-89':['85','86','87','88','89'],
                    '90-94':['90','91','92','93','94'],
                    '95-99':['95','96','97','98','99']
                },
                'selection': ['geography_code','date_name','c_age','obs_value'],
                'query_misc': '&rural_urban=0'
            }
        ]
    },
    {
        'code': 'ethnicity',
        'name': 'Ethnicity, 5 categories',
        'sources': [
            {
                'year': '2001',
                'dataset': 'NM_1606_1',
                'table': 'KS006',
                'geocode': SETTINGS['geocodes']['source']['2001']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2001']['count'],
                'cell': 'c_ethpuk11',
                'cells': [100,200,8,9,10,11,400,15,16],
                'cellnames': ['white','mixed','indian','pakistani','bangladeshi','asian_other','black','chinese','other'],
                'cellmaps': {
                    'asian': ['indian','pakistani','bangladeshi','asian_other','chinese']
                },
                'selection': ['geography_code','date_name','c_ethpuk11','obs_value']
            },
            {
                'year': '2011',
                'dataset': 'NM_608_1',
                'table': 'KS201EW',
                'geocode': SETTINGS['geocodes']['source']['2011']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2011']['count'],
                'cell': 'cell',
                'cells': [100,200,300,400,500],
                'cellnames': ['white','mixed','asian','black','other'],
                'selection': ['geography_code','date_name','cell','obs_value'],
                'query_misc': '&rural_urban=0'
            }
        ]
    },
    {
        'code': 'health',
        'name': 'Health',
        'sources': [
            {
                'year': '2001',
                'dataset': 'NM_1645_1',
                'table': 'UV020',
                'geocode': SETTINGS['geocodes']['source']['2001']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2001']['count'],
                'cell': 'c_health',
                'cells': [1,2,3],
                'cellnames': ['good','fair','bad'],
                'selection': ['geography_code','date_name','c_health','obs_value']
            },
            {
                'year': '2011',
                'dataset': 'NM_531_1',
                'table': 'QS302EW',
                'geocode': SETTINGS['geocodes']['source']['2011']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2011']['count'],
                'cell': 'c_health',
                'cells': [1,2,3,4,5],
                'cellnames': ['very_good','good','fair','bad','very_bad'],
                'cellmaps': {
                    'good':['very_good','good'],
                    'bad':['bad','very_bad']
                },
                'selection': ['geography_code','date_name','c_health','obs_value'],
                'query_misc': '&rural_urban=0'
            }
        ]
    },
    {
        'code': 'economic',
        'name': 'Economic Activity',
        'sources': [
            {
                'year': '2001',
                'dataset': 'NM_1651_1',
                'table': 'UV028',
                'geocode': SETTINGS['geocodes']['source']['2001']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2001']['count'],
                'cell': 'c_ecopuk11',
                'cells': [2,5,8,11,12,13],
                'cellnames': ['employee','self-employed_employees','self-employed','unemployed','student','inactive'],
                'cellmaps': {
                    'self-employed':['self-employed_employees','self-employed']
                },
                'selection': ['geography_code','date_name','c_ecopuk11','obs_value']
            },
            {
                'year': '2011',
                'dataset': 'NM_556_1',
                'table': 'QS601EW',
                'geocode': SETTINGS['geocodes']['source']['2011']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2011']['count'],
                'cell': 'cell',
                'cells': [2,3,4,5,6,7,8,9,10],
                'cellnames': ['employee_pt','employee_ft','self-employed_employees_pt','self-employed_employees_ft','self-employed_pt','self-employed_ft','unemployed','student','inactive'],
                'cellmaps': {
                    'employee':['employee_pt','employee_ft'],
                    'self-employed':['self-employed_employees_pt','self-employed_employees_ft','self-employed_pt','self-employed_ft']
                },
                'selection': ['geography_code','date_name','cell','obs_value'],
                'query_misc': '&rural_urban=0'
            }
        ]
    },
    {
        'code': 'travel',
        'name': 'Travel to Work',
        'sources': [
            {
                'year': '2001',
                'dataset': 'NM_1659_1',
                'table': 'UV037',
                'geocode': SETTINGS['geocodes']['source']['2001']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2001']['count'],
                'cell': 'transport_powpew11',
                'cells': [1,2,3,4,5,6,7,8,9,10,11],
                'cellnames': ['home','metro','train','bus','taxi','car_van','car_van_passenger','moto','bicycle','foot','other'],
                'cellmaps': {
                    'train_metro':['metro','train'],
                    'car_van':['car_van','car_van_passenger']
                },
                'selection': ['geography_code','date_name','transport_powpew11','obs_value']
            },
            {
                'year': '2011',
                'dataset': 'NM_568_1',
                'table': 'QS703EW',
                'geocode': SETTINGS['geocodes']['source']['2011']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2011']['count'],
                'cell': 'cell',
                'cells': [1,2,3,4,5,6,7,8,9,10,11],
                'cellnames': ['home','metro','train','bus','taxi','moto','car_van','car_van_passenger','bicycle','foot','other'],
                'cellmaps': {
                    'train_metro':['metro','train'],
                    'car_van':['car_van','car_van_passenger']
                },
                'selection': ['geography_code','date_name','cell','obs_value'],
                'query_misc': '&rural_urban=0'
            },
        ]
    },
    {
        'code': 'tenure',
        'name': 'Housing Tenure',
        'sources': [
            {
                'year': '2001',
                'dataset': 'NM_1680_1',
                'table': 'UV063',
                'geocode': SETTINGS['geocodes']['source']['2001']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2001']['count'],
                'cell': 'c_tenhuk11',
                'cells': [1,4,5,8,13],
                'cellnames': ['owned','shared_ownership','rented_social','rented_private','rent_free'],
                'selection': ['geography_code','date_name','c_tenhuk11','obs_value']
            },
            {
                'year': '2011',
                'dataset': 'NM_537_1',
                'table': 'QS405EW',
                'geocode': SETTINGS['geocodes']['source']['2011']['code'],
                'rowcount': SETTINGS['geocodes']['source']['2011']['count'],
                'cell': 'c_tenhuk11',
                'cells': [1,4,5,8,13],
                'cellnames': ['owned','shared_ownership','rented_social','rented_private','rent_free'],
                'selection': ['geography_code','date_name','c_tenhuk11','obs_value'],
                'query_misc': '&rural_urban=0'
            },
        ]
    }
]

PROXY_DATASETS = [
    {
        'code': 'density',
        'name': 'Population Density',
        'source': 'population'
    },
    {
        'code': 'age10yr',
        'name': 'Age, 10 year',
        'source': 'age',
        'cellmaps': {
            '0-9':['0','1','2','3','4','5','6','7','8','9'],
            '10-19':['10','11','12','13','14','15','16','17','18','19'],
            '20-29':['20','21','22','23','24','25','26','27','28','29'],
            '30-39':['30','31','32','33','34','35','36','37','38','39'],
            '40-49':['40','41','42','43','44','45','46','47','48','49'],
            '50-59':['50','51','52','53','54','55','56','57','58','59'],
            '60-69':['60','61','62','63','64','65','66','67','68','69'],
            '70-79':['70','71','72','73','74',],
            '80plus':['75-79','80-84','85-89','90-94','95-99','100plus']
        }
    },
    {
        'code': 'agemed',
        'name': 'Median Age',
        'source': 'age'
    }
]

# Combined list of standard and proxy datasets (excluding age by single year)
ALL_DATASETS = [d for d in DATASETS if 'exclude' not in d] + PROXY_DATASETS

## Functions
All of the functions are defined in the cell below. Note that the functions are currently called sequentially in the later cells to build a single dataset, but could be wrapped in another function in order to load multiple datasets at once.

In [3]:
# SECTION B. FUNCTIONS

# Get all raw sources for a dataset
def get_sources(dataset, settings):
    code = dataset['code']
    cols = settings['cols']
    
    for source in dataset['sources']:
        get_raw(code, cols, source, settings)

# Load a data source from an API to a Parquet file
def get_raw(code, cols, source, settings, override=False):
    year = source['year']
    geography = settings['geocodes']['source'][source['year']]['geography']
    geography_yr = settings['geocodes']['source'][source['year']]['year']
    path = 'raw'
    
    # Set save path
    save_path = settings['filepath'].format(
        path=path,
        code=code,
        geography=geography + geography_yr,
        suffix=str(source['year'])
    )
    
    if not os.path.exists(save_path) or override == True:
        # Split API call into maximum 250,000 cell chunks
        rowcount = source['rowcount'] * len(source['cells'])
        maxrows = 250000
        offsets = []
        offset = 0
        while offset < rowcount:
            offsets.append(offset)
            offset += maxrows
    
        dfs = []
    
        for i, offset in enumerate(offsets, start=1):
            # Format API url
            if 'query_misc' in source:
                query_misc = source['query_misc']
            else:
                query_misc = ''
        
            url = settings['apiurl'].format(
                dataset=source['dataset'],
                geocode=source['geocode'],
                cell=source['cell'],
                cells=','.join(str(x) for x in source['cells']),
                selection=','.join(source['selection']),
                key=settings['apikey'],
                limit=maxrows,
                offset=offset,
                query_misc=query_misc
            )
            
            # Read CSV from API
            save_path_part = save_path + '.part' + str(i)
            df = None
            
            if not os.path.exists(save_path_part):
                print(url)
                response = requests.get(url)
                
                # Convert CSV string to DataFrame + save to Parquet
                df = pd.read_csv(StringIO(response.text))
                df.to_parquet(save_path_part, index=False)
            else:
                df = pd.read_parquet(save_path_part)
            
            print('Loaded ' + code + ' ' + year + ' segment ' + str(i) + '/' + str(len(offsets)))
        
            # Add dataframe to stack
            dfs.append(df)
    
        # Combine loaded dataframes in stack
        df = pd.concat(dfs, ignore_index=True)
        
        # Rename columns
        colmap = {}
        for i in range(len(cols)):
            colmap[source['selection'][i].upper()] = cols[i]
        df = df.rename(columns=colmap)
        
        # Rename cells
        for i in range(len(source['cells'])):
            df.loc[df['code'] == source['cells'][i], 'code'] = source['cellnames'][i]
            
        # Map cells (if map exists)
        if 'cellmaps' in source:
            df = map_raw(df, source['cellmaps'], cols)

        # Create output directory if needed
        if not os.path.exists(path):
            os.mkdir(path)
    
        # Save DataFrame as Parquet file
        print(df)
        df.to_parquet(save_path, index=False)
        print('Combined + mapped ' + save_path)
        
        # Remove Parquet temporary (part) files
        for i, offset in enumerate(offsets, start=1):
            save_path_part = save_path + '.part' + str(i)
            # os.remove(save_path_part)
        
    else:
        print('Already exists ' + save_path)

# Combine (sum) variables using mapping
def map_raw(df, cellmaps, cols):
    # Rename cells in code column using cell mapping
    for key in cellmaps:
        for code in cellmaps[key]:
            df.loc[df['code'] == code, 'code'] = key
    
    # Group by code column
    groups = cols[:len(cols) - 1]
    df = df.groupby(groups, as_index=False).sum()
    
    return df

# Group (aggregate) data using lookups
def group_data(dataset, settings):
    code = dataset['code']
    cols = settings['cols']
    path = 'raw'
    geography = settings['geocodes']['output']['geography']
    geography_yr = settings['geocodes']['output']['year']
    
    for source in dataset['sources']:
        source_geo = settings['geocodes']['source'][source['year']]['geography']
        source_geo_yr = settings['geocodes']['source'][source['year']]['year']
        
        if source_geo + source_geo_yr != geography + geography_yr:
            load_path = settings['filepath'].format(
                path=path,
                code=code,
                geography=source_geo + source_geo_yr,
                suffix=str(source['year'])
            )
            lookup_path = settings['parent'].format(
                child=source_geo,
                cyear=source_geo_yr,
                parent=geography,
                pyear=geography_yr
            )
            if not os.path.exists(lookup_path):
                lookup_path = settings['parent'].format(
                    child=source_geo,
                    cyear=geography_yr,
                    parent=geography,
                    pyear=geography_yr
                )
        
            # Load OA data and higher geography lookup for grouping
            df = pd.read_parquet(load_path)
            df.set_index('geocode', inplace=True)
            lookup = pd.read_csv(lookup_path)
            lookup.set_index('code', inplace=True)
        
            # Merge the datasets on the OA code, then drop the OA column
            df = df.merge(lookup, left_index=True, right_index=True)
            df = df.reset_index(drop=True)
        
            # Clean up the columns
            df = df.rename(columns={'parent': 'geocode'})
            df = df[cols]
        
            # Group (aggregate) the data up to the higher geography
            df = df.groupby(cols[:-1], as_index=False).sum()
        
            # Set the filepath and save the aggregated dataset
            save_path = settings['filepath'].format(
                path=path,
                code=code,
                geography=geography + geography_yr,
                suffix=str(source['year'])
            )
            df.to_parquet(save_path, index=False)
            print('Aggregated ' + save_path)

# Combine raw Parquet data source files into a single Tidy Data file
def combine_data(dataset, settings):
    in_path = 'raw'
    out_path = 'processed'
    suffix = 'tidy'
    geography = settings['geocodes']['output']['geography']
    geography_yr = settings['geocodes']['output']['year']
    code = dataset['code']
    cols = settings['cols']
    
    # Load raw datasets into an array
    datasets = []
    for source in dataset['sources']:
        load_path = settings['filepath'].format(
            path=in_path,
            code=code,
            geography=geography + geography_yr,
            suffix=str(source['year'])
        )
        df = pd.read_parquet(load_path)
        datasets.append(df)
    
    # Combine datasets from array
    combined = pd.concat(datasets)
    
    # Sort rows
    combined = combined.sort_values(cols[:-1], ascending=True)
    
    # Create output directory if needed
    if not os.path.exists(out_path):
        os.mkdir(out_path)
    
    # Save combined tidydata file
    save_path = settings['filepath'].format(
        path=out_path,
        code=code,
        geography=geography + geography_yr,
        suffix=suffix
    )
    combined.to_parquet(save_path, index=False)
    print('Combined ' + save_path)

# Pivot data
def pivot_data(dataset, settings):
    cols = settings['cols']
    code = dataset['code']
    path = 'processed'
    geography = settings['geocodes']['output']['geography']
    geography_yr = settings['geocodes']['output']['year']
    
    # Load tidy data Parquet file as dataframe
    load_path = settings['filepath'].format(
        path=path,
        code=code,
        geography=geography + geography_yr,
        suffix='tidy'
    )
    df = pd.read_parquet(load_path)
    
    # Pivot the data
    pivot = df.pivot_table(index=cols[0], columns=cols[1:len(cols) - 1], values='value', fill_value=0)
    
    # Add additional 'value' level to index
    pivot.columns = pd.MultiIndex.from_product([['value']] + pivot.columns.levels)
    
    # Replace empty cells with 0 values (deals with new areas that didn't have data for earlier censuses)
    pivot = pivot.fillna(np.nan, downcast='infer')
    
    # Create output directory if needed
    if not os.path.exists(path):
        os.mkdir(path)
    
    # Save pivoted file
    save_path = settings['filepath'].format(
        path=path,
        code=code,
        geography=geography + geography_yr,
        suffix='pivot'
    )
    
    # Use pyarrow to write multiindex parquet file
    pivot = pa.Table.from_pandas(pivot)
    pa.parquet.write_table(pivot, save_path)
    
    # pivot.to_parquet(save_path)
    print('Data pivoted ' + save_path)

# Add proxy datasets
def add_proxy_data(proxy_datasets, settings):
    path = 'processed'
    geography = settings['geocodes']['output']['geography']
    geography_yr = settings['geocodes']['output']['year']
    suffix = 'pivot'
    
    for dataset in proxy_datasets:
        code = dataset['code']
        cols = settings['cols']
        source = dataset['source']
        
        df = None
        
        # Build density dataset
        if code == 'density':
            print('Calculating density')
            
            # Set data load path
            load_path = settings['filepath'].format(
                path=path,
                code=source,
                geography=geography + geography_yr,
                suffix='pivot'
            )
            # Check if data exists in directory
            if os.path.exists(load_path):
                df = pd.read_parquet(load_path)
            
                # Build dataset
                df = add_proxy_density(df, 'density', dataset, settings)
            
        # Build median age dataset
        elif code == 'agemed':
            print('Calculating median age')
            
            # Set data load path
            load_path = settings['filepath'].format(
                path=path,
                code=source,
                geography=geography + geography_yr,
                suffix='pivot'
            )
            # Check if data exists in directory
            if os.path.exists(load_path):
                df = pd.read_parquet(load_path)
            
                # Build datasets
                df = add_proxy_agemed(df, 'agemed', dataset, settings)
                
        # Build any mapped dataset
        elif dataset['cellmaps']:
            print('Mapping ' + code)
            
            # Load tidy data file
            load_path = settings['filepath'].format(
                path=path,
                code=source,
                geography=geography + geography_yr,
                suffix='tidy'
            )
            # Check if data exists in directory
            if os.path.exists(load_path):
                df = pd.read_parquet(load_path)
                
                # Map data
                df = map_raw(df, dataset['cellmaps'], cols)
                
                # Pivot the data
                df = df.pivot_table(index=cols[0], columns=cols[1:len(cols) - 1], values='value', fill_value=0)
    
                # Add additional 'value' level to index
                df.columns = pd.MultiIndex.from_product([['value']] + df.columns.levels)
    
                # Replace empty cells with 0 values (deals with new areas that didn't have data for earlier censuses)
                df = df.fillna(np.nan, downcast='infer')
                
        
        if isinstance(df, pd.DataFrame):
            # Save pivoted file
            save_path = settings['filepath'].format(
                path=path,
                code=code,
                geography=geography + geography_yr,
                suffix=suffix
            )
            
            # Use pyarrow to write multiindex parquet file
            df = pa.Table.from_pandas(df)
            pa.parquet.write_table(df, save_path)
            # df.to_parquet(save_path)
            print('Proxy data ' + save_path)

# Build density dataset
def add_proxy_density(df, code, source, settings):
    # Get existing columns
    cols = df.columns
    
    # Get years
    years = cols.unique(level='year').values.tolist()
    
    # Add columns for totals
    df = add_totals(df, cols)
    
    # Load lookup for areas & add multi-index to allow clean merge with pivot table
    lookup = pd.read_csv(settings['lookup'])
    lookup.set_index('code', inplace=True)
    areas = lookup[['area']]
    
    areas.columns = pd.MultiIndex.from_product([['value'], ['all'], areas.columns])
    
    # Add column for density
    df = df.merge(areas, left_index=True, right_index=True)
    df.index.name = 'geocode' # The name of the index column gets lost with the merge
    
    # Get new columns (and create empty list to remap names of final output columns)
    cols = df.columns
    
    for year in years:
        # Add population density column (for each year)
        df['value', year, 'density'] = round(df['value', year, 'all'] / df['value','all','area'], 2)
            
    # Remove redundant columns & rename remaining
    df = df.drop(columns=cols)
    df = df.rename(columns={'density': 'all'}, level=2)
    
    return df

# Build median age dataset
def add_proxy_agemed(df, code, source, settings):
    # Get new columns
    cols = df.columns
    
    # Get census years
    years = cols.unique(level='year').values.tolist()
    
    # Ages as strings
    ages = [str(x) for x in list(range(0, 75))]
    
    # Add columns for totals
    df = add_totals(df, cols)
    
    # Add columns for median age
    for year in years:
        df['value', year, 'median'] = None
        df['value', year, 'min'] = 0
        df['value', year, 'max'] = 0
        df['value', year, 'half'] = df['value', year, 'all'] / 2
        
        for age in ages:
            df['value', year, 'min'] = df['value', year, 'max']
            df['value', year, 'max'] +=  df['value', year, age]
            
            df.loc[(df['value', year, 'half'] > df['value', year, 'min']) & (df['value', year, 'half'] <= df['value', year, 'max']), ('value', year, 'median')] = int(age)
    
    # Remove redundant columns & rename remaining
    df = df[[('value', 2001, 'median'), ('value', 2011, 'median')]]
    df = df.rename(columns={'median': 'all'}, level=2)
    
    return df

# Add calculated columns to tidy data dataframe
def add_calcs(dataset, settings):
    code = dataset['code']
    path = 'processed'
    geography = settings['geocodes']['output']['geography']
    geography_yr = settings['geocodes']['output']['year']
    
    # Load pivoted data as dataframe
    load_path = settings['filepath'].format(
        path=path,
        code=code,
        geography=geography + geography_yr,
        suffix='pivot'
    )
    df = pd.read_parquet(load_path)
    
    # Get columns
    cols = df.columns
    
    # Get codes
    codes = cols.unique(level='code').values.tolist()
    
    # Add columns for totals
    if len(codes) > 1:
        df = add_totals(df, cols)
    
    # Add % change to totals since previous census
    df = add_change(df, cols)
    
    # Add columns for % breakdown (incl. change since prev. census)
    if len(codes) > 1:
        df = add_percent(df, cols)
    
    # Add columns for absolute and % ranks
    if len(df.index) > 1:
        df = add_ranks(df)
    
    # Create output directory if needed
    if not os.path.exists(path):
        os.mkdir(path)
    
    # Save combined tidydata file
    save_path = settings['filepath'].format(
        path=path,
        code=code,
        geography=geography + geography_yr,
        suffix='calcs'
    )
    df = pa.Table.from_pandas(df)
    pa.parquet.write_table(df, save_path)
    # df.to_parquet(save_path)
    print('Calculated fields added ' + save_path)

# Generate an indexed "JSON" file with quantiles for each indicator
def make_quantiles(df, quan='deciles'):
    
    # Get column headers
    cols = df.columns
    
    # Dictionary to hold data
    data = {}
    
    for col in cols:
        if 'rank' not in col[1]:
            # Get values in column (excluding NaN values)
            vals = df[col].tolist()
            vals = [d for d in vals if not (math.isinf(d) or math.isnan(d))]
            vals.sort()
            count = len(vals)
            
            quantiles = []
            
            if quan == 'deciles':
                quantiles = [
                    vals[0],
                    vals[math.floor(count * 0.1)],
                    vals[math.floor(count * 0.2)],
                    vals[math.floor(count * 0.3)],
                    vals[math.floor(count * 0.4)],
                    vals[math.floor(count * 0.5)],
                    vals[math.floor(count * 0.6)],
                    vals[math.floor(count * 0.7)],
                    vals[math.floor(count * 0.8)],
                    vals[math.floor(count * 0.9)],
                    vals[-1]
                ]
            elif quan == 'quintiles':
                quantiles = [
                    vals[0],
                    vals[math.floor(count * 0.2)],
                    vals[math.floor(count * 0.4)],
                    vals[math.floor(count * 0.6)],
                    vals[math.floor(count * 0.8)],
                    vals[-1]
                ]
            elif quan == 'quartiles':
                quantiles = [
                    vals[0],
                    vals[math.floor(count * 0.25)],
                    vals[math.floor(count * 0.5)],
                    vals[math.floor(count * 0.75)],
                    vals[-1]
                ]
                
        
            if col[0] not in data:
                data[col[0]] = {}
            
            if col[1] not in data[col[0]]:
                data[col[0]][col[1]] = {}
            
            if col[2] not in data[col[0]][col[1]]:
                data[col[0]][col[1]][col[2]] = {}
            
            if col[3] not in data[col[0]][col[1]][col[2]]:
                data[col[0]][col[1]][col[2]][col[3]] = quantiles
    
    return data

# Export data to indexed JSON format with parent geographies
def export_data(datasets, settings):
    in_path = 'processed'
    out_path = 'final'
    geography = settings['geocodes']['output']['geography']
    geography_yr = settings['geocodes']['output']['year']
    
    # Load geography metadata lookup table
    lookup = pd.read_csv(settings['lookup'])
    lookup.set_index('code', inplace=True)
    index = lookup.to_dict(orient='index')
    
    # Generate child lookup
    child_lookup = {}
    
    for key in index:
        if key not in child_lookup:
            child_lookup[key] = []

        parent = index[key]['parent']
        if parent not in child_lookup:
            child_lookup[parent] = []
        
        child_lookup[parent].append({
            'code': key,
            'name': index[key]['name'],
            'type': index[key]['type']
        })
    print('Lookups generated')
    
    
    # Create dict to contain dataframes
    dfs = {}
    
    # Load and merge datasets
    for dataset in datasets:
        code = dataset['code']
        
        # Load Parquet with calculated fields as dataframe
        load_path = settings['filepath'].format(
            path=in_path,
            code=code,
            geography=geography + geography_yr,
            suffix='calcs'
        )
        new_df = pd.read_parquet(load_path)
        
        dfs[code] = new_df
    print('Datasets loaded')
    
    # Merge datasets into a single dataframe
    df = pd.concat(dfs.values(), axis=1, keys=dfs.keys())
    
    # Save sump of merged datasets to Parquet file
    save_path = settings['filepath'].format(
        path=in_path,
        code='all',
        geography=geography + geography_yr,
        suffix='merged'
    )
    dump = pa.Table.from_pandas(df)
    pa.parquet.write_table(dump, save_path)
    # df.to_parquet(save_path)
    print('Datasets merged ' + save_path)
    
    # Create 'final' output directories if needed
    paths = [out_path, out_path + '/json', out_path + '/json/quantiles', out_path + '/json/place']
    for path in paths:
        if not os.path.exists(path):
            os.mkdir(path)
    
    # Generate deciles, quintiles and quartiles
    quans = ['deciles', 'quintiles', 'quartiles']
    quantiles = {
        'deciles': None,
        'quintiles': None,
        'quartiles': None
    }
    
    if len(df.index) > 20:
        quantiles[quans[0]] = make_quantiles(df, quans[0])
    if len(df.index) > 10:
        quantiles[quans[1]] = make_quantiles(df, quans[1])
    if len(df.index) > 8:
        quantiles[quans[2]] = make_quantiles(df, quans[2])
    
    # Save 'quantiles' datasets
    for quan in quans:
        if not quantiles[quan] == None:
            save_path = settings['jsonpath'].format(
                path=paths[2],
                code=quan + '_' + geography + geography_yr,
                suffix=''
            )
    
            with open(save_path, "w") as write_file:
                json.dump(quantiles[quan], write_file, indent=2, ignore_nan=True)
            print(quan.capitalize() + ' data saved to JSON file ' + save_path)
    
    # Generate individual JSON datasets for each geography
    
    # Get column headers
    cols = df.columns
    
    # Get number of rows
    count = len(df.index)
    progress = 0
    
    # Generate JSON data files from dataframe, row by row
    for i, row in df.iterrows():
        progress += 1
        if progress % 1000 == 0:
            print('Generated JSON files for ' + str(progress) + ' of ' + str(count) + ' areas')
        
        entry = {
            'code': i,
            'name': index[i]['name'],
            'type': settings['geocodes']['output']['geography'],
            'area': index[i]['area'],
            'count': int(count),
            'parents': [],
            'children': child_lookup[i],
            'bounds': [
                [round(index[i]['minx'], 5), round(index[i]['miny'], 5)],
                [round(index[i]['maxx'], 5), round(index[i]['maxy'], 5)]
            ],
            'data': {}
        }
        
        # Generate parent codes
        parent_code = index[i]['parent']
        
        # Get parents, grandparents etc
        while isinstance(parent_code, str):
            parent = {
                'code': parent_code,
                'name': index[parent_code]['name'],
                'type': index[parent_code]['type']
            }
            entry['parents'].append(parent)
            
            parent_code = index[parent_code]['parent']
        
        # Generate children
        # children = lookup.loc[lookup['parent'] == i][['name', 'type']].reset_index(level=0)
        # entry['children'] = children.to_dict(orient='records')
        
        # Generate data in indexed/nested format
        data = {}
        
        for col in cols:
            if col[0] not in data:
                data[col[0]] = {}
            
            if col[1] not in data[col[0]]:
                data[col[0]][col[1]] = {}
            
            if col[2] not in data[col[0]][col[1]]:
                data[col[0]][col[1]][col[2]] = {}
            
            if col[3] not in data[col[0]][col[1]][col[2]]:
                data[col[0]][col[1]][col[2]][col[3]] = row[col]
        
        entry['data'] = data
        
        # Save geography to JSON file
        save_path = settings['jsonpath'].format(
            path=paths[3],
            code=entry['code'],
            suffix=''
        )
        
        with open(save_path, "w") as write_file:
            json.dump(entry, write_file, indent=2)

    print('Geographies saved to individual JSON files ' + out_path + '/{geocode}.json')    

# Add column for totals (incl. change since prev. census)
def add_totals(df, cols):
    years = cols.unique(level='year').values.tolist()
    codes = cols.unique(level='code').values.tolist()
    
    # Generate list of columns to sum
    for year in years:
        sum_cols = [];
        for code in codes:
            sum_cols.append(('value', year, code))
        
        # Sum list to new column (for each year)
        df['value', year, 'all'] = df[sum_cols].sum(axis=1)
        
    # Sort columns
    df = df.sort_index(axis=1)
                
    print('Totals added')
    return df

# Generate cols for change since prev. census
def add_change(df, cols):
    years = cols.unique(level='year').values.tolist()
    codes = cols.unique(level='code').values.tolist()
    
    # Generate cols
    if 'all' not in codes:
        codes.append('all')
        
    
    for code in codes:
        year = years[-1]
        year_prev = years[-2]
        
        df['value', 'change', code] = 'null'
        
        val = df['value', year, code]
        val_prev = df['value', year_prev, code]
        
        df['value', 'change', code] = round((100 * (val / val_prev)) - 100, 2)
        df['value', 'change', code] =  df['value', 'change', code].replace(np.nan, 0)
    
    # Sort columns
    df = df.sort_index(axis=1)
                
    print('% change added')
    return df

# Add columns for % breakdown (incl. change since prev. census)
def add_percent(df, cols):
    years = cols.unique(level='year').values.tolist()
    codes = cols.unique(level='code').values.tolist()
    
    # Add columns for totals
    for year in years:
        for code in codes:
            
            # Sum list to new column (for each year)
            df['perc', year, code] = round(100 * (df['value', year, code] / df['value', year, 'all']), 2)
    
    # Generate cols for change since prev. census
    for code in codes:
        year = years[len(years) - 1]
        prev_year = years[len(years) - 2]
        df['perc', 'change', code] = round(df['perc', year, code] - df['perc', prev_year, code], 2)
        
    # Replace empty cells with 0.0 values (treats zero out of zero as 0.0%)
    df = df.fillna(np.nan, downcast='infer')
    
    # Sort columns
    df = df.sort_index(axis=1)
                
    print('Percentage breakdown added')
    return df

# Add columns for absolute and % ranks
def add_ranks(df):
    
    # Iterate through columns
    for column in df:

        # Generate index for rank columns
        new_col = list(column)
        new_col[0] += '_rank'
        
        # Add rank column for each data column
        df[tuple(new_col)] = df[column].rank(method='min', ascending=False, na_option='keep')
        
    # Sort columns
    df = df.sort_index(axis=1)
    
    print('Rankings added')
    return df

# Output observations for all areas (can include a maximum of one wildcard '*')
def export_csv(dataset, obs, settings):
    in_path = 'processed'
    out_path = 'final/csv'
    filename = ''
    geography = settings['geocodes']['output']['geography']
    geography_yr = settings['geocodes']['output']['year']
    
    if obs.count('*') > 1:
        print('Too many wildcards. The maximum is 1.')
        return None
    
    if obs[0] == '*':
        print('Cant have a wildcard in first position.')
        return None
        
    # Load dataset
    load_path = settings['filepath'].format(
        path=in_path,
        code=dataset,
        geography=geography + geography_yr,
        suffix='calcs'
    )
    df = pd.read_parquet(load_path)
    
    # Get columns
    cols = df.columns
    years = cols.unique(level='year').values.tolist()
    codes = cols.unique(level='code').values.tolist()
    
    # Create output directory if needed
    if not os.path.exists(out_path):
        os.mkdir(out_path)
    
    # Output a timeseries
    if obs[1] == '*':
        # Remove 'change' column from 'years'
        years.remove('change')
        
        # Filter the CSV
        df = df.loc[:, df.columns.get_level_values(0).isin([obs[0]])]
        df = df.loc[:, df.columns.get_level_values('year').isin(years)]
        df = df.loc[:, df.columns.get_level_values('code').isin([obs[2]])]
        
        # Overwrite the column headers
        df.columns = years
        
        # Set filename & out_path
        filename = dataset + '-' + obs[0] + '-' + obs[2]
        out_path += '/series'
    
    # Output a breakdown for one variable
    elif obs[2] == '*':
        # Remove 'all' column from 'codes' for multi-value variable
        if len(codes) > 1:
            codes.remove('all')
        
        # Filter the dataframe
        df = df.loc[:, df.columns.get_level_values(0).isin([obs[0]])]
        df = df.loc[:, df.columns.get_level_values('year').isin([obs[1]])]
        df = df.loc[:, df.columns.get_level_values('code').isin(codes)]
        
        # Overwrite the column headers
        df.columns = codes
        
        # Set filename & out_path
        filename = dataset + '-' + obs[0] + '-' + obs[1]
        out_path += '/variable'
    
    # Output a single observation
    else:
        # Filter the dataframe
        df = df.loc[:, df.columns.get_level_values(0).isin([obs[0]])]
        df = df.loc[:, df.columns.get_level_values('year').isin([obs[1]])]
        df = df.loc[:, df.columns.get_level_values('code').isin([obs[2]])]
        
        # Overwrite the column headers
        df.columns = ['value']
        
        # Set filename & out_path
        filename = dataset + '-' + obs[0] + '-' + obs[1] + '-' + obs[2]
        out_path += '/single'
            
    # Create output directory if needed
    if not os.path.exists(out_path):
        os.mkdir(out_path)
    
    save_path = settings['flatpath'].format(
        path=out_path,
        geography=geography + geography_yr,
        filename=filename
    )
    df.to_csv(save_path)
    print('CSV data exported ' + save_path)

## Build a dataset
The cells below should be called sequentially to build the sample dataset. Note that data from each interim step is saved to the local directory. This is for error-checking, and also to avoid having to reload data from the API in order to re-run the subsequent steps. 
1. To download and consistently map the dimensions of the source data from each census.
2. To combine the data together into a single Tidy Data Parquet file.
3. To pivot the data into a Pandas MultiIndex DataFrame (which can be processed like a multi-dimensional dataset).
4. To add calculated fields for percentage breakdowns and rankings.
5. To add parent geography codes and names, and generate JSON data files for each geography.

In [4]:
# Get all sources for all datasets (creates one Parquet per dataset & per census year)
# Only run this cell ONE TIME for each source geography
# Output Area-level data takes a LONG TIME to load, so don't waste your time doing it more than once
for dataset in DATASETS:
    get_sources(dataset, SETTINGS)

http://www.nomisweb.co.uk/api/v01/dataset/NM_1634_1.data.csv?date=latest&geography=TYPE464&cell=1,2&measures=20100&select=geography_code,date_name,cell,obs_value&uid=0x3cfb19ead752b37bb90da0eb3a0fe78baa9fa055&recordlimit=250000&recordoffset=0
Loaded population 2001 segment 1/1
Combined + mapped raw/population_lad2015_2001.parquet
http://www.nomisweb.co.uk/api/v01/dataset/NM_144_1.data.csv?date=latest&geography=TYPE464&cell=1,2&measures=20100&select=geography_code,date_name,cell,obs_value&uid=0x3cfb19ead752b37bb90da0eb3a0fe78baa9fa055&recordlimit=250000&recordoffset=0&rural_urban=0
Loaded population 2011 segment 1/1
Combined + mapped raw/population_lad2015_2011.parquet
http://www.nomisweb.co.uk/api/v01/dataset/NM_1637_1.data.csv?date=latest&geography=TYPE464&c_age=1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,7

In [5]:
# Group (aggregate) data to output geography using lookups
# If your source and output geography codes are the same this function will automatically skip over
for dataset in DATASETS:
    group_data(dataset, SETTINGS)

Aggregated raw/population_lad2021_2001.parquet
Aggregated raw/population_lad2021_2011.parquet
Aggregated raw/age_lad2021_2001.parquet
Aggregated raw/age_lad2021_2011.parquet
Aggregated raw/ethnicity_lad2021_2001.parquet
Aggregated raw/ethnicity_lad2021_2011.parquet
Aggregated raw/health_lad2021_2001.parquet
Aggregated raw/health_lad2021_2011.parquet
Aggregated raw/economic_lad2021_2001.parquet
Aggregated raw/economic_lad2021_2011.parquet
Aggregated raw/travel_lad2021_2001.parquet
Aggregated raw/travel_lad2021_2011.parquet
Aggregated raw/tenure_lad2021_2001.parquet
Aggregated raw/tenure_lad2021_2011.parquet


In [6]:
# Combine years for all datasets (creates one Tidy Data CSV per dataset)
for dataset in DATASETS:
    combine_data(dataset, SETTINGS)

Combined processed/population_lad2021_tidy.parquet
Combined processed/age_lad2021_tidy.parquet
Combined processed/ethnicity_lad2021_tidy.parquet
Combined processed/health_lad2021_tidy.parquet
Combined processed/economic_lad2021_tidy.parquet
Combined processed/travel_lad2021_tidy.parquet
Combined processed/tenure_lad2021_tidy.parquet


In [7]:
# Pivot all datasets (pivots data into multi-index columns)
for dataset in DATASETS:
    pivot_data(dataset, SETTINGS)

Data pivoted processed/population_lad2021_pivot.parquet
Data pivoted processed/age_lad2021_pivot.parquet
Data pivoted processed/ethnicity_lad2021_pivot.parquet
Data pivoted processed/health_lad2021_pivot.parquet
Data pivoted processed/economic_lad2021_pivot.parquet
Data pivoted processed/travel_lad2021_pivot.parquet
Data pivoted processed/tenure_lad2021_pivot.parquet


In [8]:
# Add proxy (derived) datasets (in pivoted format, to match above)
add_proxy_data(PROXY_DATASETS, SETTINGS)

Calculating density
Totals added


  exec(code_obj, self.user_global_ns, self.user_ns)


Proxy data processed/density_lad2021_pivot.parquet
Mapping age10yr
Proxy data processed/age10yr_lad2021_pivot.parquet
Calculating median age
Totals added
Proxy data processed/agemed_lad2021_pivot.parquet


In [9]:
# Perform calculations on all datasets (incl. proxy datasets) for %change, %breakdown and rankings
for dataset in ALL_DATASETS:
    add_calcs(dataset, SETTINGS)

Totals added
% change added
Percentage breakdown added
Rankings added
Calculated fields added processed/population_lad2021_calcs.parquet
Totals added
% change added
Percentage breakdown added




Rankings added
Calculated fields added processed/ethnicity_lad2021_calcs.parquet
Totals added
% change added
Percentage breakdown added
Rankings added
Calculated fields added processed/health_lad2021_calcs.parquet
Totals added
% change added
Percentage breakdown added
Rankings added
Calculated fields added processed/economic_lad2021_calcs.parquet
Totals added
% change added
Percentage breakdown added
Rankings added
Calculated fields added processed/travel_lad2021_calcs.parquet
Totals added
% change added
Percentage breakdown added
Rankings added
Calculated fields added processed/tenure_lad2021_calcs.parquet
% change added
Rankings added
Calculated fields added processed/density_lad2021_calcs.parquet
Totals added
% change added
Percentage breakdown added
Rankings added
Calculated fields added processed/age10yr_lad2021_calcs.parquet
% change added
Rankings added
Calculated fields added processed/agemed_lad2021_calcs.parquet


In [10]:
export_data(ALL_DATASETS, SETTINGS)

  if (await self.run_code(code, result,  async_=asy)):


Lookups generated
Datasets loaded
Datasets merged processed/all_lad2021_merged.parquet
Deciles data saved to JSON file final/json/quantiles/deciles_lad2021.json
Quintiles data saved to JSON file final/json/quantiles/quintiles_lad2021.json
Quartiles data saved to JSON file final/json/quantiles/quartiles_lad2021.json
Geographies saved to individual JSON files final/{geocode}.json


## Export CSV Data
This section of the code allows data to be exported in CSV files covering all geographies. Edit the parameters for the  "export_csv" function in the cell below, as follows:
1. Select the dataset code (eg. 'population', 'age10yr', 'medage', 'density')
2. Select the filter pattern ['val', 'year', 'variable']

Options for each parameter:
* 'val' can be 'value', 'perc', 'value_rank' or 'perc_rank'
* 'year' can be '2001', '2011' or 'change' for % change from 2001 to 2011
* 'variable' options depend on the dataset. Select 'all' to return the sum total
* one of 'year' or 'variable' can be a wildcard '*' to create a timeseries or a breakdown

In [66]:
# Export a filtered CSV file covering all geographies
export_csv('travel', ['value', '2011', '*'], SETTINGS)

CSV data exported final/csv/variable/oa2011_travel-value-2011.csv
