In [None]:
%run ../notebook_preamble.ipy

In [None]:
import pyproj
import geopandas as gpd
import shapely
import os
from datetime import datetime
from zipfile import ZipFile
import logging

shapely.speedups.enable()

In [None]:
from beis_indicators.utils.nuts_utils import nuts_earliest

In [None]:
current_year = datetime.utcnow().year - 1
years = range(2007, current_year)

## DEFRA Data Collection

Modelled data for air pollution across the UK is compiled by DEFRA. The values are obtained by using the data from monitoring stations and using atmospheric modelling to interpolate the data to a 1km by 1km grid across the whole country.

## PM10

### Collection

In [None]:
def get_pollution_data(year, pollution_type):
    '''get_pollution_data
    Downloads and stores pollution data from https://uk-air.defra.gov.uk/data/pcm-data
    
    Args:
        year (int): Year to collect data from. Check website for coverage.
        pollution_type (str): Name of pollutant. Choices are currently pm10 
            and pm25, no2 or nox.
    '''
    
    base_url = 'https://uk-air.defra.gov.uk/datastore/pcm/map{}.csv'
        
    raw_data_dir = f'{project_dir}/data/raw/defra/'
    if not os.path.isdir(raw_data_dir):
        os.mkdir(raw_data_dir)
    
    if pollution_type in ['pm10', 'pm25']:
        fname = f'{pollution_type}{year}g'
    else:
        fname = f'{pollution_type}{year}'

    df = pd.read_csv(base_url.format(fname), header=5, na_values='MISSING')
    df.rename(columns={fname: pollution_type}, inplace=True)

    df.to_csv(f'{raw_data_dir}map{pollution_type}{year}.csv', index=False)

In [None]:
def load_pollution_data(year, pollution_type='pm10'):
    '''load_pollution_data
    Loads pollution data. If data is not present, downloads from 
    https://uk-air.defra.gov.uk/data/pcm-data
    
    Args:
        year (int): Year of data to load. Check website for coverage.
        pollution_type (str): Name of pollutant. Choices are currently pm10 
            and pm25, no2 or nox.

    Returns:
        (pandas.DataFrame): Modelled pointwise pollution data.
    '''
    fin = f'{project_dir}/data/raw/defra/map{pollution_type}{year}.csv'
    
    if not os.path.isfile(fin):
        get_pollution_data(year, pollution_type)
        
    return pd.read_csv(fin)

In [None]:
get_pollution_data(2007)

In [None]:
def translate_coordinates(x, y, pin, pout):
    '''translate_coordinates
    Translates vectors of spatial coordinates from one projection to another.
    
    Args:
        x (array-like): Vector of horizontal spatial coordinates.
        y (array-like): Vector of vertical spatial coortinates.
        pin (str): Projection of input vectors.
        pout (str): Output projection.
    
    Returns:
        (tuple of array-like): Translated coordinate vectors.
    '''
    proj_in = pyproj.Proj(pin)
    proj_out = pyproj.Proj(pout)
    return pyproj.transform(proj_in, proj_out, x, y)

In [None]:
def coordinates_to_points(df, x_coord_name, y_coord_name):
    '''coordinates_to_points
    Take a DataFrame with coordinate columns and returns a GeoDataFrame with 
    a single Point geometry column.
    
    Args:
        df (pandas.DataFrame): A DataFrame with spatial coordinate data.
        x_coord_name (str): Name of the horizontal coordinate column.
        y_coord_name (str): Name of the vertical coordinate column.
        
    Returns:
        (geopandas.GeoDataFrame): GeoDataFrame with Point objects in `geometry` column.
    '''
    return gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df[x_coord_name], df[y_coord_name]))

In [None]:
def load_nuts_regions(year, level=2, projection=4326, resolution=1, countries=['UK']):
    '''load_nuts_regions
    Loads Eurostat NUTS shapefiles.
    
    Args:
        year (int): NUTS version year.
        projection (int): Coordinate projection of shapefile. 
            Choice of EPSG 3035, 3857 or 4326. Default is 4326
        resolution (int): Shapefile resolution in metres.
        countries (list): List of 2 letter country codes to filter by. If None, 
            all regions will be returned. Default is `["UK"]`.
    '''
    
    resolution = str(resolution).zfill(2)

    nuts_dir = (f'{data_path}/raw/shapefiles/'
                f'ref-nuts-{year}-{resolution}m.shp/'
                f'NUTS_RG_{resolution}M_{year}_{projection}_LEVL_{level}.shp')
    
    if not os.path.isdir(nuts_dir):
        with ZipFile(f'{nuts_dir}.zip','r') as archive:
            archive.extractall(nuts_dir)
        
    nuts_fin = (f'{nuts_dir}/'
                f'NUTS_RG_{resolution}M_{year}_{projection}_LEVL_{level}.shp')
    nuts_gdf = gpd.read_file(nuts_fin)
    
    if countries is not None:
        nuts_gdf = nuts_gdf.set_index('CNTR_CODE').loc[countries].reset_index()
        
    return nuts_gdf

In [None]:
def make_air_pollution_nuts(years, pollution_type='pm10', nuts_level=2, aggfunc=np.mean):
    '''make_air_pollution_nuts
    Creates air pollution indicator for NUTS regions over a range of years, using
    modelled point data from DEFRA at a 1km resolution.
    
    Output is a DataFrame of aggregated values at the level of NUTS regions.
    
    Args:
        years (iter of int): Collects pollution data and creates indicators over
            this range of years.
        pollution_type (str): Name of pollutant. Choices are currently pm10 
            and pm25, no2 or nox. Defaults to pm10.
        nuts_level (int): NUTS region level. Can be 1, 2 or 3.
        aggfunc (function): Function used to aggregate point data within a region, 
            for example finding the average, maximum or percentile value. Default 
            is np.mean.
            
    Returns:
        df (pandas.DataFrame): DataFrame of processed indicator.
    '''
    
    pin = 'epsg:27700'
    pout = 'epsg:4326'
    
    out_dir = f'{data_path}/processed/defra/'
    raw_data_dir = f'{project_dir}/data/raw/defra/'
    
    dfs = []
    for year in years:
        pollution = load_pollution_data(year, pollution_type)
        pollution['longitude'], pollution['latitude'] = translate_coordinates(
            pollution['x'].values, pollution['y'].values, pin, pout)
        pollution_gdf = coordinates_to_points(pollution, 'latitude', 'longitude')
        
        nuts_spec_year = nuts_earliest(year)
        nuts = load_nuts_regions(year=nuts_spec_year, level=nuts_level)
        nuts = nuts.to_crs(pout.upper())
        
        pollution_gdf = gpd.sjoin(pollution_gdf, nuts, op='within')
        
        aggregated = (pollution_gdf.groupby('NUTS_ID')[pollution_type]
                      .apply(aggfunc)
                      .reset_index())
        aggregated['nuts_year_spec'] = nuts_spec_year
        aggregated['year'] = year
        
        value_header = f'air_pollution_{aggfunc.__name__}_{pollution_type}'
        headers = {
            pollution_type: value_header,
            'NUTS_ID': 'nuts_id'
        }
        aggregated = aggregated.rename(columns=headers)
        dfs.append(aggregated)
    
    df = pd.concat(dfs)
    df = df[['year', 'nuts_id', 'nuts_year_spec', value_header]]
    df.to_csv(f'{out_dir}{value_header}.nuts{nuts_level}.csv', index=False)
    return df

In [None]:
df = make_air_pollution_nuts(years, pollution_type='pm10', nuts_level=3, aggfunc=np.mean)

## LEPs

In [None]:
def leps_year_spec(year):
    '''leps_year_spec
    Return earliest possible year for the LEP boundaries based
    on a given year.
    
    Args:
        year (int): Year of data.
        
    Returns:
        (int): LEP boundary year specification.
    '''
    if year < 2014:
        return -2014
    elif 2014 <= year < 2017:
        return 2014
    elif year >= 2017:
        return 2017

In [None]:
def make_air_pollution_leps(years, pollution_type='pm10', aggfunc=np.mean):
    '''make_air_pollution_leps
    Creates air pollution indicator for LEP regions over a range of years, using
    modelled point data from DEFRA at a 1km resolution.
    
    Output is a DataFrame of aggregated values at the level of LEP regions.
    
    Args:
        years (iter of int): Collects pollution data and creates indicators over
            this range of years.
        pollution_type (str): Name of pollutant. Choices are currently pm10 
            and pm25, no2 or nox. Defaults to pm10.
        aggfunc (function): Function used to aggregate point data within a region, 
            for example finding the average, maximum or percentile value. Default 
            is np.mean.
            
    Returns:
        df (pandas.DataFrame): DataFrame of processed indicator.
    '''
    
    proj = 'epsg:27700'
    
    out_dir = f'{data_path}/processed/defra/'
    raw_data_dir = f'{project_dir}/data/raw/defra/'
    
    dfs = []
    for year in years:
        pollution = load_pollution_data(year, pollution_type)
        pollution_gdf = coordinates_to_points(pollution, 'x', 'y')
        
        lep_year_spec = leps_year_spec(year)
        leps = load_leps_regions(year=lep_year_spec)
        leps = leps.to_crs(proj.upper())
        
        pollution_gdf = gpd.sjoin(pollution_gdf, leps, op='within')
        
        region_col = f'lep{str(lep_year_spec)[-2:]}cd'
        aggregated = (pollution_gdf.groupby(region_col)[pollution_type]
                      .apply(aggfunc)
                      .reset_index())
        aggregated['lep_year_spec'] = lep_year_spec
        aggregated['year'] = year
        
        value_header = f'air_pollution_{aggfunc.__name__}_{pollution_type}'
        headers = {
            pollution_type: value_header,
            region_col: 'lep_id'
        }
        aggregated = aggregated.rename(columns=headers)
        dfs.append(aggregated)
    
    df = pd.concat(dfs)
    df = df[['year', 'lep_id', 'lep_year_spec', value_header]]
    df.to_csv(f'{out_dir}{value_header}.lep.csv', index=False)
    return df

In [None]:
def load_leps_regions(year):
    '''load_leps_regions
    Loads LEP shapefiles.
    
    Args:
        year (int): LEP version year.
    '''
    year = abs(year)

    if year == 2014:
        fin = 'Local_Enterprise_Partnerships_December_2014_Full_Clipped_Boundaries_in_England'
    elif year == 2017:
        fin = 'Local_Enterprise_Partnerships_April_2017_EN_BFC_V3'
        
    leps_dir = f'{data_path}/raw/shapefiles/{fin}/{fin}.shp'
    
    leps_gdf = gpd.read_file(leps_dir)
    
    return leps_gdf

In [None]:
_ = make_air_pollution_leps(years)