# Packages

In [1]:
# Data and Maths
import pandas as pd
import geopandas as gpd
import numpy as np
import math
from dataclasses import dataclass

# Plotting
import matplotlib.pyplot as plt
import seaborn as sns

# Statistics
import scipy.stats as stats
from scipy.stats import binom
RANDOM_SEED = 42

import warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Streamlined Workflow

In [None]:
import HistoricalDataPreProcessing
processed_data = HistoricalDataPreProcessing.main()

In [None]:
processed_data.keys()

In [None]:
from RelativeDifferenceTools import RelativeDifferenceCalculator, RelativeDifferenceSampleGenerator, VehicleEstimator

relative_difference_calculator = RelativeDifferenceCalculator(
    processed_data['car_van_2011'],
    processed_data['car_van_2021'],
    processed_data['vehicle_registrations']
)
relative_difference_data = relative_difference_calculator.calculate()
relative_difference_data.head()

In [None]:
relative_difference_sample_generator = RelativeDifferenceSampleGenerator(
    relative_difference_data=relative_difference_data,
    n_samples=1000
)
relative_difference_samples = relative_difference_sample_generator.generate_samples()
relative_difference_samples.head()

In [None]:
vehicle_estimator = VehicleEstimator(relative_difference_samples)

vehicle_samples = vehicle_estimator.estimate(processed_data['vehicle_registrations_i'], '2023 Q1')
bev_samples = vehicle_estimator.estimate(processed_data['bev_registrations_i'], '2023 Q1')
phev_samples = vehicle_estimator.estimate(processed_data['phev_registrations_i'], '2023 Q1')

In [None]:
vehicle_samples

# Pre-processing

In [None]:
@dataclass
class DataContainer:
    file_path: str
    raw_data: pd.DataFrame = None
    data: pd.DataFrame = None

class BasePreprocessor():
    def __init__(self, data_container: DataContainer) -> None:
        self.data_container = data_container
    
    def preprocess(self, keep_raw: bool, dtype=None, na_values=None) -> None:
        self._load_csv_to_df(self.data_container, keep_raw, dtype, na_values)

    def _load_csv_to_df(self, container: DataContainer, keep_raw: bool, dtype=None, na_values=None) -> None:
        try:
            container.data = pd.read_csv(container.file_path, dtype=dtype, na_values=na_values)
            if keep_raw:
                container.raw_data = container.data.copy()
        except FileNotFoundError as e:
            raise FileNotFoundError(f"File not found: {container.file_path}") from e
    
    def _rename_df_columns(self, column_mapper: dict) -> None:
        self.data_container.data = self.data_container.data.rename(columns=column_mapper)

    def _select_df_columns(self, columns: list) -> None:
        self.data_container.data = self.data_container.data[columns]

    def _drop_duplicate_rows(self, subset) -> None:
        self.data_container.data.drop_duplicates(subset=subset, keep='first', inplace=True)
    
    def _drop_duplicate_rows_by_index(self) -> None:
        self.data_container.data = self.data_container.data[~self.data_container.data.index.duplicated(keep='first')]

    def _set_df_index(self, index_name: str, drop: bool) -> None:
        self.data_container.data = self.data_container.data.set_index(index_name, drop=drop)
    
    def _apply_dtypes(self, start_col: int, end_col: int) -> dict:
        dtypes = {i: str for i in range(start_col)} # first 'first' columns as strings
        dtypes.update({i: float for i in range(start_col, end_col)}) # 'last' columns is currently hard coded. Float is needed for NaNs
        return dtypes

class CarVan2011DataPreprocessor(BasePreprocessor):
    def preprocess(self, keep_raw: bool) -> pd.DataFrame:
        super().preprocess(keep_raw)
        self._rename_df_columns({
            'GEO_CODE': 'LSOA11CD',
            'GEO_LABEL': 'LSOA11NM',
            'Car or van availability : Sum of all cars or vans - Unit : Cars or vans': 'cars',
            'Car or van availability : No cars or vans in household - Unit : Households': 'households_without_cars',
            'Car or van availability : Total\ Car or van availability - Unit : Households': 'households'
            }
        )
        self._drop_duplicate_rows('LSOA11CD')
        self._set_df_index('LSOA11CD', drop=True)
        self._select_df_columns(['LSOA11NM', 'cars', 'households', 'households_without_cars'])
        self._drop_duplicate_rows_by_index()
        print('Pre-processing complete')
        return self.data_container.data

class CarVan2021DataPreprocessor(BasePreprocessor):  
    def __init__(self, data_container: DataContainer, lsoa_lookup_file_name: str) -> None:
        super().__init__(data_container)
        self.lsoa_lookup_file_name = lsoa_lookup_file_name 

    def preprocess(self, keep_raw: bool) -> pd.DataFrame:
        super().preprocess(keep_raw)
        self._count_number_of_cars()
        self._condense_data()
        self._reindex_data()
        self._drop_duplicate_rows_by_index()
        print('Pre-processing complete')
        return self.data_container.data

    def _count_number_of_cars(self) -> None:
        self.data_container.raw_data['cars'] = self.data_container.raw_data['Observation'] * self.data_container.raw_data['Car or van availability (5 categories) Code']

    def _condense_data(self) -> None:
        self.data_container.data = (
            pd.DataFrame(index=self.data_container.raw_data['Lower Layer Super Output Areas Code'].unique(), columns=['LSOA21CD', 'LSOA21NM', 'cars', 'houses_without_cars'])
            .assign(
                LSOA21CD=lambda df: df.index, 
                LSOA21NM=self.data_container.raw_data['Lower Layer Super Output Areas'].unique(),
                cars=self.data_container.raw_data.groupby('Lower Layer Super Output Areas Code')['cars'].sum(),
                houses_without_cars=self.data_container.raw_data.loc[self.data_container.raw_data['Car or van availability (5 categories) Code'] == 0, ['Lower Layer Super Output Areas Code', 'Observation']].set_index('Lower Layer Super Output Areas Code')
            )
        )

    def _reindex_data(self) -> None:
        lsoa_lookup = pd.read_csv(self.lsoa_lookup_file_name)
        data = self.data_container.data
        self.data_container.data = (
            data
            .merge(lsoa_lookup[['LSOA11CD', 'LSOA21CD']], on='LSOA21CD', how='right')
            .assign(cars=lambda df: df.groupby('LSOA11CD')['cars'].transform('sum'))
            .drop(columns=['LSOA21CD'])
        )
        self._set_df_index('LSOA11CD', drop=True)

class VehicleRegistrationsDataPreprocessor(BasePreprocessor):
    def __init__(self, data_container: DataContainer):
        super().__init__(data_container)
        self.data_container = data_container
    
    def preprocess(self, keep_raw: bool) -> pd.DataFrame:
        super().preprocess(keep_raw, dtype=self._apply_dtypes(5, 57), na_values=['[c]', '[x]'])
        self.data_container.data = self._preprocess_by_bodytype('Cars') + self._preprocess_by_bodytype('Other body types')
        self._drop_duplicate_rows_by_index()
        self.data_container.data = self.data_container.data.drop('Miscellaneous')
        self.data_container.data = self.data_container.data.dropna(how='all')
        print('Pre-processing complete')
        return self.data_container.data
    
    def _preprocess_by_bodytype(self, bodytype: str) -> pd.DataFrame:
        bodytype_data = self.data_container.data.query("BodyType == '" + bodytype + "' & Keepership == 'Private' & LicenceStatus == 'Licensed'")
        bodytype_data = bodytype_data.drop(columns = ['BodyType', 'Keepership', 'LicenceStatus', 'LSOA11NM'])
        bodytype_data = bodytype_data.set_index('LSOA11CD', drop=True)
        return bodytype_data
    
class EVRegistrationsDataPreprocessor(BasePreprocessor):
    def __init__(self, data_container: DataContainer, fuel_type: str):
        super().__init__(data_container)
        self.data_container = data_container
        self.fuel_type = fuel_type
    
    def preprocess(self, keep_raw: bool) -> pd.DataFrame:
        super().preprocess(keep_raw, dtype=self._apply_dtypes(4, 56), na_values=['[c]', '[x]'])
        self._filter_data()
        self._set_df_index('LSOA11CD', drop=True)
        self._split_by_fuel_type()
        self._drop_duplicate_rows_by_index()
        print('Pre-processing complete')
        return self.data_container.data
    
    def _filter_data(self):
        self.data_container.data = (
            self.data_container.data
            .query("Keepership == 'Private'")
            .drop(columns=['Keepership', 'LSOA11NM'])
        )
    
    def _split_by_fuel_type(self):
        self.data_container.data = (
            self.data_container.data
            .query("Fuel == '" + self.fuel_type + "' or Fuel.isnull()")
            .drop(columns=["Fuel"])
        )

class HouseDataPreprocessor(BasePreprocessor):
    def __init__(self, data_container: DataContainer, lsoa_lookup_file_name: str) -> None:
        super().__init__(data_container)
        self.lsoa_lookup_file_name = lsoa_lookup_file_name 
    
    def preprocess(self, keep_raw: bool, dtype=None, na_values=None) -> pd.DataFrame:
        super().preprocess(keep_raw, dtype, na_values)
        lsoa_lookup = pd.read_csv(self.lsoa_lookup_file_name)
        self.data_container.data = (
            self.data_container.data
            .rename(columns={'Lower layer Super Output Areas Code':'LSOA21CD', 'Lower layer Super Output Areas':'LSOA21NM', 'Observation':'households'})
            .merge(lsoa_lookup.loc[:, ['LSOA11CD', 'LSOA21CD']], on = 'LSOA21CD', how='outer')
            .drop(columns=['LSOA21NM'])
            .set_index('LSOA11CD')
        )
        self._drop_duplicate_rows_by_index()
        print('Pre-processing complete')
        return self.data_container.data

class AccommodationTypeDataPreprocessor(BasePreprocessor):
    def __init__(self, data_container: DataContainer, lsoa_lookup_file_name: str) -> None:
        super().__init__(data_container)
        self.lsoa_lookup_file_name = lsoa_lookup_file_name 
    
    def preprocess(self, keep_raw: bool, dtype=None, na_values=None) -> pd.DataFrame:
        super().preprocess(keep_raw, dtype, na_values)
        lsoa_lookup = pd.read_csv(self.lsoa_lookup_file_name)
        self.data_container.data = (
            self.data_container.data
            .rename(columns={'Lower layer Super Output Areas Code':'LSOA21CD', 'Lower layer Super Output Areas':'LSOA21NM', 'Accommodation type (8 categories)':'accommodation_type'})
            .merge(lsoa_lookup.loc[:, ['LSOA11CD', 'LSOA21CD']], on = 'LSOA21CD', how='outer')
            .drop(columns=['LSOA21NM', 'Accommodation type (8 categories) Code'])
            .set_index('LSOA11CD')
        )
        print('Pre-processing complete')
        return self.data_container.data

In [None]:
# Create Data Containers
car_van_2011_container = DataContainer(file_path='../Data/Vehicle/CarsAndVans2011.csv')
car_van_2021_container = DataContainer(file_path='../Data/Vehicle/CarsAndVans2021.csv')
vehicle_registrations_container = DataContainer(file_path='../Data/Vehicle/df_VEH0125.csv')
bev_registrations_container = DataContainer(file_path='../Data/Vehicle/df_VEH0145.csv')
phev_registrations_container = DataContainer(file_path='../Data/Vehicle/df_VEH0145.csv')
house_2021_container = DataContainer(file_path='../Data/Demographic/LSOA_households.csv')
accomodation_type_2021_container = DataContainer(file_path='../Data/Demographic/LSOA_accommodation_type.csv')

# Load and preprocess data
lsoa_lookup_file = '../Data/Spatial/LSOA/LSOA_(2011)_to_LSOA_(2021)_to_Local_Authority_District_(2022)_Lookup_for_England_and_Wales_(Version_2).csv'
car_van_2011_data_preprocessor = CarVan2011DataPreprocessor(car_van_2011_container)
car_van_2021_data_preprocessor = CarVan2021DataPreprocessor(car_van_2021_container, lsoa_lookup_file)
vehicle_registrations_data_preprocessor = VehicleRegistrationsDataPreprocessor(vehicle_registrations_container)
bev_registrations_data_preprocessor = EVRegistrationsDataPreprocessor(bev_registrations_container, fuel_type='Battery electric')
phev_registrations_data_preprocessor = EVRegistrationsDataPreprocessor(phev_registrations_container, fuel_type='Plug-in hybrid electric (petrol)')
house_2021_data_preprocessor = HouseDataPreprocessor(house_2021_container, lsoa_lookup_file)
accomodation_type_2021_data_preprocessor = AccommodationTypeDataPreprocessor(accomodation_type_2021_container, lsoa_lookup_file)

car_van_2011_data = car_van_2011_data_preprocessor.preprocess(keep_raw=False)
car_van_2021_data = car_van_2021_data_preprocessor.preprocess(keep_raw=True)
vehicle_registrations_data = vehicle_registrations_data_preprocessor.preprocess(keep_raw=False)
bev_registrations_data = bev_registrations_data_preprocessor.preprocess(keep_raw=True)
phev_registrations_data = phev_registrations_data_preprocessor.preprocess(keep_raw=True)
house_2021_data = house_2021_data_preprocessor.preprocess(keep_raw=False)
accomodation_type_2021_data = accomodation_type_2021_data_preprocessor.preprocess(keep_raw=False)

In [None]:
# Define file paths
file_paths = {
    'car_van_2011': '../Data/Vehicle/CarsAndVans2011.csv',
    'car_van_2021': '../Data/Vehicle/CarsAndVans2021.csv',
    'vehicle_registrations': '../Data/Vehicle/df_VEH0125.csv',
    'bev_registrations': '../Data/Vehicle/df_VEH0145.csv',
    'phev_registrations': '../Data/Vehicle/df_VEH0145.csv',
    'house_2021': '../Data/Demographic/LSOA_households.csv',
    'accomodation_type_2021': '../Data/Demographic/LSOA_accommodation_type.csv'
}

# Create Data Containers
data_containers = {key: DataContainer(file_path=value) for key, value in file_paths.items()}

# Load and preprocess data
lsoa_lookup_file = '../Data/Spatial/LSOA/LSOA_(2011)_to_LSOA_(2021)_to_Local_Authority_District_(2022)_Lookup_for_England_and_Wales_(Version_2).csv'

data_preprocessors = {
    'car_van_2011': CarVan2011DataPreprocessor(data_containers['car_van_2011']),
    'car_van_2021': CarVan2021DataPreprocessor(data_containers['car_van_2021'], lsoa_lookup_file),
    'vehicle_registrations': VehicleRegistrationsDataPreprocessor(data_containers['vehicle_registrations']),
    'bev_registrations': EVRegistrationsDataPreprocessor(data_containers['bev_registrations'], fuel_type='Battery electric'),
    'phev_registrations': EVRegistrationsDataPreprocessor(data_containers['phev_registrations'], fuel_type='Plug-in hybrid electric (petrol)'),
    'house_2021': HouseDataPreprocessor(data_containers['house_2021'], lsoa_lookup_file),
    'accomodation_type_2021': AccommodationTypeDataPreprocessor(data_containers['accomodation_type_2021'], lsoa_lookup_file)
}

# Get processed data
keep_raw_values = {
    'car_van_2011': False,
    'car_van_2021': True,
    'vehicle_registrations': False,
    'bev_registrations': True,
    'phev_registrations': True,
    'house_2021': False,
    'accomodation_type_2021': False
}

processed_data = {key: processor.preprocess(keep_raw=keep_raw_values[key]) for key, processor in data_preprocessors.items()}

In [None]:
processed_data['car_van_2011']

In [None]:
car_van_2021_data.head()

In [None]:
vehicle_registrations_data.head()

# Find Common LSOA Codes and Filter Data

In [None]:
class ListUtilities:
    """
    A utility class for list operations.
    """

    @staticmethod
    def intersection_of_lists(*args):
        """
        Returns the intersection of all provided lists.

        Args:
        *args: Variable length argument of lists.

        Returns:
        List containing the intersection of all provided lists.
        """
        
        # If no lists are provided, return an empty list
        if not args:
            return []

        # Start with the set of the first list
        result_set = set(args[0])

        # Iterate over the remaining lists, updating the result_set
        for lst in args[1:]:
            result_set &= set(lst)

        return list(result_set)

In [None]:
common_lsoas = ListUtilities.intersection_of_lists(
    car_van_2011_data.index,
    car_van_2021_data.index,
    vehicle_registrations_data.index,
    bev_registrations_data.index,
    phev_registrations_data.index,
    house_2021_data.index,
    accomodation_type_2021_data.index
)

car_van_2011_data = car_van_2011_data.loc[common_lsoas].sort_index()
car_van_2021_data = car_van_2021_data.loc[common_lsoas].sort_index()
vehicle_registrations_data = vehicle_registrations_data.loc[common_lsoas].sort_index()
bev_registrations_data = bev_registrations_data.loc[common_lsoas].sort_index()
phev_registrations_data = phev_registrations_data.loc[common_lsoas].sort_index()
house_2021_data = house_2021_data.loc[common_lsoas].sort_index()
accomodation_type_2021_data = accomodation_type_2021_data.loc[common_lsoas].sort_index()

# Estimating EV Adoption in LSOAs

In [None]:
class RegistrationInterpolator:
    def __init__(self, sample_rate=4) -> None:
        self.registration_data = None
        self.sample_rate = sample_rate

    # Interpolates missing registration data
    def interpolate(self, registration_data: pd.DataFrame) -> pd.DataFrame:
        self.registration_data = registration_data.T.iloc[::-1]
        interpolated_df = self.registration_data.apply(self._interpolate_column, axis=0)
        interpolated_df = interpolated_df.fillna(0)
        interpolated_df = interpolated_df.astype('Int64')
        return interpolated_df.iloc[::-1].T
    
    def _interpolate_column(self, col) -> pd.Series:
        dates = self._calculate_date_range(self._calculate_t0(col), self._calculate_t_present(col))
        mask = ~col.isna().values
        if mask.any():
            xp = dates[mask]
            fp = col[mask]
            x = dates
            interpolated_array = np.round(np.interp(x, xp, fp))
            interpolated_series = pd.Series(data=interpolated_array, index=col.index)
        else:
            interpolated_series = pd.Series(data=np.nan, index=col.index)
        return interpolated_series

    # apply_dtypes converts select columns from str to float values
    def _apply_dtypes(first_col: int, last_col: int) -> dict:
        dtypes = {i: str for i in range(first_col)}  # first 'first' columns as strings
        dtypes.update({i: float for i in range(first_col, last_col)}) # 'last' columns is currently hard coded. Float is needed for NaNs
        return dtypes
    
    def _quarter_to_decimal(self, year: int, quarter: str) -> float:
        """
        Converts a year and quarter to a decimal year representation.
        """
        quarters = {'Q1': 0, 'Q2': 0.25, 'Q3': 0.5, 'Q4': 0.75}
        return year + quarters.get(quarter, 0)

    def _calculate_t0(self, col) -> float:
        year = int(col.head(1).index[0][:4])
        quarter = col.head(1).index[0][-2:]
        return self._quarter_to_decimal(year, quarter)

    def _calculate_t_present(self, col) -> float:
        year = int(col.tail(1).index[0][:4])
        quarter = col.tail(1).index[0][-2:]
        return self._quarter_to_decimal(year, quarter)

    def _convert_dates_to_numeric(self) -> list:
        dates = []
        for date in self.registration_data.index:
            year = int(date[:4])
            quarter = date[-2:]
            dates.append(self._quarter_to_decimal(year, quarter))
        return dates

    # Returns an array of numeric dates between t0 and t1 at a specified sample rate
    def _calculate_date_range(self, t0: float, t1: float) -> list:
        return np.linspace(t0, t1, int((t1-t0)*self.sample_rate) + 1)

In [None]:
registration_interpolator = RegistrationInterpolator()
vehicle_registrations_data_i = registration_interpolator.interpolate(vehicle_registrations_data)
bev_registrations_data_i = registration_interpolator.interpolate(bev_registrations_data)
phev_registrations_data_i = registration_interpolator.interpolate(phev_registrations_data)

# Calculating Error and Uncertainty in Vehicle Registration Data (The quick way)

In [None]:
class VehicleRegistrationRelativeDifferenceCalculator:
    def __init__(
            self, 
            car_van_2011_data: pd.DataFrame, 
            car_van_2021_data: pd.DataFrame, 
            vehicle_registrations_data: pd.DataFrame, 
        ):
        self.car_van_2011_data = car_van_2011_data
        self.car_van_2021_data = car_van_2021_data
        self.vehicle_registrations_data = vehicle_registrations_data.astype(float)
        self.relative_difference_data = None

    def calculate(self) -> pd.DataFrame:
        self._calculate_relative_differences(2011)
        self._calculate_relative_differences(2021)
        self._merge_relative_difference_data()
        return self.relative_difference_data 

    def _calculate_mean_registered_vehicles_for_year(self, year: int) -> None:
        columns = [f"{year} {quarter}" for quarter in ['Q1', 'Q2', 'Q3', 'Q4']]
        df = getattr(self, f'car_van_{year}_data')
        df['registered_vehicles'] = self.vehicle_registrations_data.loc[:, columns].mean(axis=1).round()
    
    def _calculate_absolute_differences(self, year: int) -> None:
        df = getattr(self, f'car_van_{year}_data')
        df['abs_difference'] = df['cars'] - df['registered_vehicles']
    
    def _calculate_relative_differences(self, year: int) -> None:
        self._calculate_mean_registered_vehicles_for_year(year)
        self._calculate_absolute_differences(year)
        df = getattr(self, f'car_van_{year}_data')
        df['relative_difference'] = df['abs_difference'] / df['registered_vehicles']
    
    def _merge_relative_difference_data(self) -> None:
        relative_difference_2011_df = pd.DataFrame({'LSOA11CD': self.car_van_2011_data['relative_difference'].index, 'relative_difference_2011': self.car_van_2011_data['relative_difference'].values})
        relative_difference_2021_df = pd.DataFrame({'LSOA11CD': self.car_van_2021_data['relative_difference'].index, 'relative_difference_2021': self.car_van_2021_data['relative_difference'].values})
        self.relative_difference_data = pd.merge(relative_difference_2011_df, relative_difference_2021_df, how='outer', on='LSOA11CD').set_index('LSOA11CD')
    
class RelativeDifferenceSampleGenerator:
    def __init__(self, relative_difference_data: pd.DataFrame, n_samples: int):
        self.relative_difference_data = relative_difference_data
        self.relative_difference_samples = None
        self.n_samples = n_samples
    
    def generate_samples(self) -> pd.DataFrame:
        self._calculate_relative_difference_mu_and_sigma()
        self._calculate_relative_difference_samples()
        return self.relative_difference_samples

    def _calculate_relative_difference_mu_and_sigma(self) -> None:
        # Priors
        mu_prior = pd.concat([self.relative_difference_data['relative_difference_2011'], self.relative_difference_data['relative_difference_2021']]).mean()
        sigma_prior = pd.concat([self.relative_difference_data['relative_difference_2011'], self.relative_difference_data['relative_difference_2021']]).std()
        var_prior = sigma_prior**2
        precision_prior = 1/var_prior

        # Data
        mu_lsoa = self.relative_difference_data[['relative_difference_2011', 'relative_difference_2021']].mean(axis=1)
        sigma_lsoa = self.relative_difference_data[['relative_difference_2011', 'relative_difference_2021']].std(axis=1)
        var_lsoa = sigma_lsoa**2
        precision_lsoa = 1/var_lsoa

        # Posterior
        mu_post = (2*mu_lsoa*precision_lsoa + mu_prior*precision_prior)/(2*precision_lsoa + precision_prior)
        sigma_post = np.sqrt(1/(2*precision_lsoa + precision_prior))

        self.relative_difference_data['mu_post'] = mu_post
        self.relative_difference_data['sigma_post'] = sigma_post

        # Fill in missing data with mu_prior and sigma_prior (the mean mu and sigma for all LSOAs) if mu_post or sigma_post is NaN
        # This is a result from missing either Census and Registration data in both 2011 and 2021
        self.relative_difference_data.loc[self.relative_difference_data['mu_post'].isna(), 'mu_post'] = mu_lsoa[self.relative_difference_data['mu_post'].isna()]
        self.relative_difference_data.loc[self.relative_difference_data['mu_post'].isna(), 'mu_post'] = mu_prior
        self.relative_difference_data.loc[self.relative_difference_data['sigma_post'].isna(), 'sigma_post'] = sigma_prior

    def _calculate_relative_difference_samples(self) -> None:
        # Initialize an empty DataFrame for samples

        lsoa_list = self.relative_difference_data.index.values
        self.relative_difference_samples = pd.DataFrame(index=np.arange(0, self.n_samples), columns=lsoa_list)

        # Loop over each LSOA to generate samples
        for lsoa in lsoa_list:
            sigma = self.relative_difference_data.sigma_post.loc[lsoa]  # Assuming sigma_post is indexed by LSOA
            mu = self.relative_difference_data.mu_post.loc[lsoa]
            sample_array = np.random.normal(mu, sigma, size=self.n_samples)
            self.relative_difference_samples[lsoa] = sample_array

class VehicleRegistrationEstimator:
    def __init__(self, relative_difference_samples) -> None:
        self.relative_difference_samples = relative_difference_samples
    #     self.registration_data = registration_data
    #     self.adjusted_registration_data = None
    
    def estimate(self, registration_data: pd.DataFrame, quarter: str) -> pd.DataFrame:
        adjusted_registration_data = (1 + self.relative_difference_samples).mul(registration_data[quarter]).round(0).astype('Int64')
        return adjusted_registration_data

In [None]:
vehicle_registration_relative_difference_calculator = VehicleRegistrationRelativeDifferenceCalculator(car_van_2011_data,
                                                                                                      car_van_2021_data,
                                                                                                      vehicle_registrations_data) # Actively choosing not to use interpolated data here

relative_difference_data = vehicle_registration_relative_difference_calculator.calculate()

relative_difference_data.head()

In [None]:
relative_difference_sample_generator = RelativeDifferenceSampleGenerator(relative_difference_data=relative_difference_data,
                                                                         n_samples=1000)

relative_difference_samples = relative_difference_sample_generator.generate_samples()

relative_difference_samples.head()

In [None]:
vehicle_registration_estimator = VehicleRegistrationEstimator(relative_difference_samples)

vehicle_registrations_data_samples = vehicle_registration_estimator.estimate(vehicle_registrations_data_i, '2023 Q1')
bev_registrations_data_samples = vehicle_registration_estimator.estimate(bev_registrations_data_i, '2023 Q1')
phev_registrations_data_samples = vehicle_registration_estimator.estimate(phev_registrations_data_i, '2023 Q1')

In [None]:
vehicle_registrations_data_samples

In [None]:
bev_registrations_data_samples.iloc[:, 1].hist(bins=100)

In [None]:
pd.read_csv('../Data/PreProcessed/adjustment_factors.csv', index_col=0)

In [2]:
import json
test = json.load(open('../Data/PreProcessed/preprocessed_data.json'))

In [17]:
test['car_van_2011'][3]

'L'

In [7]:
import pandas as pd
pd.read_json(test['car_van_2011'])

Unnamed: 0,LSOA11NM,cars,households,households_without_cars
0,City of London 001A,452,876,519
1,City of London 001B,441,830,481
2,City of London 001C,180,817,655
3,City of London 001E,124,467,356
4,Barking and Dagenham 016A,496,543,186
...,...,...,...,...
29658,Cardiff 049E,535,548,135
29659,Cardiff 005G,704,643,177
29660,Cardiff 006F,1001,637,73
29661,Swansea 023E,1127,803,105


# Estimating On-Plot Parking

In [None]:
class AccommodationTypeCounter:
    def __init__(self, accommodation_type_df, houses_df):
        self.df = accommodation_type_df
        self.houses = houses_df
        self.data = None

    def count(self) -> pd.DataFrame:
        types_data = [
            self._filter_by_type('Detached', 'detached'),
            self._filter_by_type('Semi-detached', 'semi_detached'),
            self._filter_by_type('Terraced', 'terraced'),
            self._filter_by_type('In a purpose-built block of flats or tenement', 'purpose_built_flat'),
            self._combine_converted_flats()
        ]
        concatenated = pd.concat(types_data, axis=1)
        # Group by index and calculate mean (for LSOA21CDs that share a LSOA11CD)
        mean = concatenated.groupby(concatenated.index).mean()
        proportions = mean.div(mean.sum(axis=1), axis=0)
        accommodation_type_counts = round(proportions.mul(self.houses['households'], axis=0))

        # Need to drop duplicates
        self.data = accommodation_type_counts
        
        return accommodation_type_counts

    def _filter_by_type(self, accommodation_type: str, new_name: str) -> pd.Series:
        result = self.df[self.df['accommodation_type'] == accommodation_type]['Observation']
        result.name = new_name
        return result

    def _combine_converted_flats(self):
        types = [
            'Part of a converted or shared house, including bedsits',
            'Part of another converted building, for example, former school, church or warehouse',
            'In a commercial building, for example, in an office building, hotel or over a shop'
        ]
        return sum([self._filter_by_type(atype, 'converted_flat') for atype in types])

In [None]:
accommodation_type_counter = AccommodationTypeCounter(accomodation_type_2021_data, house_2021_data)
accommodation_type_counts = accommodation_type_counter.count()
accommodation_type_counts.head()

In [None]:
class OnPlotParkingEstimator:
    def __init__(self, accommodation_type_counts: pd.DataFrame, n_samples: int):
        self.accommodation_type_counts = accommodation_type_counts
        self.n_samples = n_samples
        self.dict = {
            'end_terraced': 0.505,
            'mid_terraced': 0.338,
            'semi_detached': 0.822,
            'detached': 0.961,
            'converted_flat': 0.289,
            'purpose_built_flat': 0.256
            }
        self.dict['terraced'] = round(0.3765*self.dict['end_terraced'] + 0.6235*self.dict['mid_terraced'], 3)
        del self.dict['end_terraced']
        del self.dict['mid_terraced']
        self.series = pd.Series(self.dict)
        self.on_plot_parking_samples = None
    
    def estimate(self):
        # Convert series values to a 2D array with matching shape
        probabilities = self.series[self.accommodation_type_counts.columns].values[np.newaxis, :]
        
        # Generate samples for each accommodation type and LSOA using broadcasting
        samples = binom.rvs(n=self.accommodation_type_counts.values.astype(int),
                            p=probabilities,
                            size=(self.n_samples, *self.accommodation_type_counts.shape))
        
        # Sum across accommodation types (axis 2)
        summed_samples = samples.sum(axis=2)
        
        # Convert summed_samples to a DataFrame with appropriate columns and index
        on_plot_parking_df = pd.DataFrame(summed_samples, columns=self.accommodation_type_counts.index)
        
        self.on_plot_parking_samples = on_plot_parking_df
        return on_plot_parking_df

In [None]:
on_plot_parking_estimator = OnPlotParkingEstimator(accommodation_type_counts=accommodation_type_counts,
                                                   n_samples=1000)
on_plot_parking_samples = on_plot_parking_estimator.estimate()
on_plot_parking_samples.head()

In [None]:
class VehiclesWithOnPlotParkingEstimator:
    def __init__(
            self, 
            on_plot_parking_samples,
            car_van_2021_data,
            car_van_2011_data,
            house_data,
            # relative_difference_samples,
            # vehicle_registrations_data,
            ) -> None:
        
        self.on_plot_parking_samples = on_plot_parking_samples
        self.car_van_2021_data = car_van_2021_data
        self.car_van_2011_data = car_van_2011_data
        self.house_data = house_data
        # self.relative_difference_samples = relative_difference_samples
        # self.vehicle_registrations_data = vehicle_registrations_data
        
        self.houses_with_cars = None
        self.cars_per_house_with_car = None
        self.vehicles_with_on_plot_parking_samples = None
        self.proportion_of_vehicles_with_on_plot_parking = None

    def estimate(self, vehicle_registrations_data_samples: pd.DataFrame):
        """ 
        This function assumes that all on-plot parking spaces are filled before
        off-plot parking is used.
        """
        self._estimate_houses_with_cars()
        self._estimate_proportion_of_vehicles_with_on_plot_parking(vehicle_registrations_data_samples)
        return self.vehicles_with_on_plot_parking_samples
    
    def _estimate_houses_with_cars(self):
        cars = self.car_van_2021_data['cars']
        houses = self.house_data['households']
        houses_without_cars = self.car_van_2021_data['houses_without_cars']
        
        houses_with_cars = houses - houses_without_cars
        self.cars_per_house_with_car = cars / houses_with_cars
        self.houses_with_cars = houses_with_cars
    
    def _estimate_proportion_of_vehicles_with_on_plot_parking(self, vehicle_registrations_data_samples: pd.DataFrame):
        self.vehicles_with_on_plot_parking_samples = pd.DataFrame(
            np.minimum(vehicle_registrations_data_samples.values, self.on_plot_parking_samples.values), # Assumes same dimensions
            index=vehicle_registrations_data_samples.index, 
            columns=vehicle_registrations_data_samples.columns
            )
        self.proportion_of_vehicles_with_on_plot_parking = self.vehicles_with_on_plot_parking_samples.divide(vehicle_registrations_data_samples, fill_value=0).replace([np.inf, -np.inf], np.nan)

In [None]:
vehicles_with_on_plot_parking_estimator = VehiclesWithOnPlotParkingEstimator(on_plot_parking_samples=on_plot_parking_samples,
                                                                             car_van_2011_data=car_van_2011_data,
                                                                             car_van_2021_data=car_van_2021_data,
                                                                             house_data=house_2021_data)

vehicles_with_on_plot_parking_samples = vehicles_with_on_plot_parking_estimator.estimate(vehicle_registrations_data_samples=vehicle_registrations_data_samples)
vehicles_with_on_plot_parking_samples.head()

In [None]:
proportion_of_vehicles_with_on_plot_parking_samples = vehicles_with_on_plot_parking_estimator.proportion_of_vehicles_with_on_plot_parking
proportion_of_vehicles_with_on_plot_parking_samples.head()

In [None]:
vehicles_with_on_plot_parking_estimator.vehicles_with_on_plot_parking_samples['E01014375'].hist(bins=100)

In [None]:
vehicles_with_on_plot_parking_estimator.proportion_of_vehicles_with_on_plot_parking['E01014372'].hist(bins=100)

In [None]:
class EVsWithOnPlotParkingEstimator:
    def __init__(
            self, 
            # ev_registrations_data: pd.DataFrame,
            proportion_of_vehicles_with_on_plot_parking: pd.DataFrame
            ) -> None:
        self.vehicle_registrations_data_samples = vehicle_registrations_data_samples
        # self.ev_registrations_data = ev_registrations_data
        self.relative_difference_samples = relative_difference_samples
        self.proportion_of_vehicles_with_on_plot_parking = proportion_of_vehicles_with_on_plot_parking

        self.proportion_of_evs_with_on_plot_parking = None
        self.evs_with_on_plot_parking_samples = None

    def estimate(self, ev_registrations_data_samples: pd.DataFrame):
        self._count_evs_with_on_plot_parking(ev_registrations_data_samples)
        return self.evs_with_on_plot_parking_samples

    def _calculate_proportion_of_evs_with_on_plot_parking(self, mean_total_vehicles, ev_registrations_data_samples):
        # Define alpha and beta values for beta distribution that describes likely starting points for EV on-plot parking access
        mu = 0.9 # prior mean
        var = 0.1 * mu*(1-mu)
        alpha = mu*(mu*(1-mu)/var - 1)
        beta = (1-mu)*(mu*(1-mu)/var - 1)

        c = stats.beta.rvs(a=alpha, b=beta, size=(1000, len(self.proportion_of_vehicles_with_on_plot_parking.columns))) # y intercept
        m = (self.proportion_of_vehicles_with_on_plot_parking - c)/mean_total_vehicles # gradient
        p = m*ev_registrations_data_samples + c
        return np.clip(p, a_min=0, a_max=1)
        
    def _count_evs_with_on_plot_parking(self, ev_registrations_data_samples: pd.DataFrame):
        # Return proportion of EVs with on-plot parking access. 
        # This changes depending on how many EVs there are relative to the total number of vehicles
        # It approaches the overall proportion for all vehicles.
        
        # adjusted_vehicle_registration_data_samples = (1 + self.relative_difference_samples).mul(self.vehicle_registrations_data[quarter], axis=1)
        mean_total_vehicles = self.vehicle_registrations_data_samples.mean()
        # adjusted_ev_registrations_data_samples = (1 + self.relative_difference_samples).mul(ev_registrations_data[quarter], axis=1)
        
        self.proportion_of_evs_with_on_plot_parking = self._calculate_proportion_of_evs_with_on_plot_parking(
            mean_total_vehicles, 
            ev_registrations_data_samples.sample(frac=1) # to change the order of the relative differences otherwise they cancel each other out in the next step
            )
        self.evs_with_on_plot_parking_samples = self.proportion_of_evs_with_on_plot_parking.mul(ev_registrations_data_samples, axis=1) # .round(0)
        return self.evs_with_on_plot_parking_samples

In [None]:
evs_with_on_plot_parking_estimator = EVsWithOnPlotParkingEstimator(proportion_of_vehicles_with_on_plot_parking=proportion_of_vehicles_with_on_plot_parking_samples)

bevs_with_on_plot_parking_samples = evs_with_on_plot_parking_estimator.estimate(bev_registrations_data_samples)
bevs_with_on_plot_parking_samples.head()

In [None]:
bevs_with_on_plot_parking_samples['E01014374'].hist(bins=100)

In [None]:
bevs_with_on_plot_parking_samples['E01014374'].plot.kde()

# Fitting Distributions to the Probabilistic Estimates

In [None]:
from scipy.stats import skewnorm, lognorm, gamma, beta

data = bevs_with_on_plot_parking_samples['E01014374']

params_skewnorm = skewnorm.fit(data)
params_lognorm = lognorm.fit(data)
params_gamma = gamma.fit(data)
params_beta = beta.fit(data)

plt.hist(data, bins=50, density=True, alpha=0.6, color='g', label='Original Data')

# Create an array over which you'll compute the PDFs
xmin, xmax = plt.xlim()
x = np.linspace(xmin, xmax, 1000)

# Plot the PDFs using the parameters obtained from the fit
plt.plot(x, skewnorm.pdf(x, *params_skewnorm), 'k', linewidth=2, label='Fit: SkewNorm')
plt.plot(x, lognorm.pdf(x, *params_lognorm), 'r', linewidth=2, label='Fit: LogNorm')
plt.plot(x, gamma.pdf(x, *params_gamma), 'y', linewidth=2, label='Fit: Gamma')
plt.plot(x, beta.pdf(x, *params_beta), 'b', linewidth=2, label='Fit: Beta')

plt.legend(loc='best')
plt.xlabel('Data Values')
plt.ylabel('Density')
plt.title('Original Data and Fitted Distributions')
plt.grid(True)
plt.show()

In [None]:
from scipy.stats import kstest

D_skewnorm, p_skewnorm = kstest(data, 'skewnorm', params_skewnorm)
D_lognorm, p_lognorm = kstest(data, 'lognorm', params_lognorm)
D_gamma, p_gamma = kstest(data, 'gamma', params_gamma)
D_beta, p_beta = kstest(data, 'beta', params_beta)

print(f"SkewNorm: D={D_skewnorm}, p={p_skewnorm}")
print(f"LogNorm: D={D_lognorm}, p={p_lognorm}")
print(f"Gamma: D={D_gamma}, p={p_gamma}")
print(f"Beta: D={D_beta}, p={p_beta}")

### Cramér–von Mises statistic

In [None]:
import numpy as np
from scipy.stats import skewnorm

def cramer_von_mises_skewnorm(data, params):
    data_sorted = np.sort(data)
    N = len(data_sorted)
    
    # Expected CDF values under the skewed normal distribution
    cdf_theoretical = skewnorm.cdf(data_sorted, *params)
    
    # Observed ECDF values
    i = np.arange(1, N+1)
    ecdf_observed = i / N
    
    # Calculate Cramér–von Mises Statistic
    W2 = 1/12 + np.sum((ecdf_observed - cdf_theoretical)**2)
    
    return W2

params_skewnorm = skewnorm.fit(data)

W2 = cramer_von_mises_skewnorm(data, params_skewnorm)
print(f"Cramér–von Mises Statistic for Skewed Normal: {W2}")

In [None]:
import numpy as np
from scipy.stats import skewnorm, cramervonmises

def cramer_von_mises_skewnorm(data):
    # Estimate the parameters of the skewed normal distribution
    a, loc, scale = skewnorm.fit(data)

    # Apply the Cramér–von Mises test
    result = cramervonmises(data, skewnorm.cdf, args=(a, loc, scale))

    return result

data = bevs_with_on_plot_parking_samples['E01014374']

cvm_result = cramer_von_mises_skewnorm(data)
print(cvm_result)

### Q-Q Plots

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import skewnorm, probplot

# Using probplot with the skewed normal distribution
osm, osr = probplot(data, dist=skewnorm, sparams=skewnorm.fit(data), plot=plt)

plt.title("Q-Q plot for Skewed Normal distribution")
plt.xlabel("Theoretical Quantiles")
plt.ylabel("Observed Quantiles")
plt.show()

In [None]:
vehicle_registrations_data_samples['E01000119']

# Creating LSOA Objects

In [None]:
class LSOA:
    def __init__(self) -> None:
        self.name: str
        self.gis_data: GISData
        self.params: LSOAParams

class GISData:
    def __init__(self) -> None:
        self.latitude:float
        self.longitude:float
        self.boundaries:list[GISBoundary]

class GISBoundary:
    def __init__(self) -> None:
        self.latitudes:list[float]
        self.longitudes:list[float]

class LSOAParams:
    def __init__(self) -> None:
        self.households: str
        self.accommodation_type: dict
        self.on_plot_parking: DataValue
        self.vehicles: DataValue
        self.bevs: DataValue
        self.phevs: DataValue
        self.bevs_with_on_plot_parking: DataValue
        self.phevs_with_on_plot_parking: DataValue

class DataValue:
    def __init__(self) -> None:
        self.mean: float
        self.std: float

# Mapping LSOAs Information to Selected Distribution Substations

### Load LSOA Boundary Data

In [None]:
lsoa_boundaries = gpd.read_file('../Data/Spatial/LSOA/LSOA_2011_EW_BFC_V3_WGS84/LSOA_2011_EW_BFC_V3_WGS84.shp')
lsoa_boundaries = lsoa_boundaries.set_index('LSOA11CD')
lsoa_boundaries = lsoa_boundaries.loc[common_lsoas].sort_index()
lsoa_boundaries.head()

### Load Distribution Substation Data

In [None]:
ds = (
    pd.read_csv('../Data/DistributionNetwork/distribution_substations.csv')
    .drop(
        columns = [
            'Transformer Headroom', 'LCT Count Total', 'Energy Storage', 'Heat Pumps', 
            'Total LCT Capacity', 'Total Generation Capacity', 'Solar', 
            'Wind', 'Bio Fuels', 'Water Generation', 'Waste Generation',
            'Storage Generation', 'Fossil Fuels', 'Other Generation']
    )
    .replace('Hidden', np.nan)
    .astype({'Customers':'float64', 'Substation Number':'Int64'})
    .astype({'Substation Number': str})
)
ds.head()

In [None]:
ds_geo = (
    gpd.read_file('../Data/DistributionNetwork/dist_swest_march2023.gpkg')
    .rename(columns={'NR':'Substation Number'})
    .dissolve('Substation Number').reset_index()
    .merge(ds, how='left', on ='Substation Number')
    .rename(columns={'Substation Name':'Name'})
    .fillna(value={'Discount':'Unknown'}) # For the "key_on" part of the choropleth map
    .to_crs('EPSG:4326')
)

ds_geo['Location'] = gpd.GeoSeries(gpd.points_from_xy(ds_geo.LONGITUDE, ds_geo.LATITUDE, crs="EPSG:4326"))

ds_geo.head()

# Experimenting with Data Compression / Data Fitting

In [None]:
jittered_data = ds_bevs + np.random.uniform(0, 1, size=ds_bevs.shape)

In [None]:
plt.hist(ds_bevs, bins=50)
plt.show()

In [None]:
plt.hist(jittered_data, bins=20)
plt.show()

In [None]:
plt.hist(ds_bevs, bins=len(np.unique(ds_bevs))-1, density=True, alpha=0.6, color='b', label='Original Data')
plt.hist(jittered_data, bins=50, density=True, alpha=0.6, color='g', label='Jittered Data')

params_skewnorm = skewnorm.fit(jittered_data)

# Create an array over which you'll compute the PDFs
xmin, xmax = plt.xlim()
x = np.linspace(xmin, xmax, 1000)

# Plot the PDFs using the parameters obtained from the fit
plt.plot(x, skewnorm.pdf(x, *params_skewnorm), 'k', linewidth=2, label='Fit: SkewNorm')

plt.legend(loc='best')
plt.xlabel('Data Values')
plt.ylabel('Density')
plt.title('Original Data and Fitted Distributions')
plt.grid(True)
plt.show()

In [None]:
# Number of samples you wish to generate
num_samples = 1000

samples = skewnorm.rvs(*params_skewnorm, size=num_samples).round(0).astype(int)

# Plot histogram to visualize the skewed distribution
plt.hist(ds_bevs, bins=50, density=True, alpha=0.4, color='b', label='Original Data')
plt.hist(samples, bins=60, density=True, alpha=0.4, color='g', label='Re-sampled Data')
plt.legend(loc='best')
plt.title('Histogram of Skewed Normal Distribution')
plt.show()

In [None]:
skewnorm.fit(jittered_data)

In [None]:
ds_bevs.std()

In [None]:
from scipy.stats import poisson
# Parameter
lambda_val = ds_bevs.mean()  # average rate of value

# Values
x = np.arange(0, 2*lambda_val+1)  # typically sufficient range for plotting
y = poisson.pmf(x, lambda_val)

# Plot
plt.bar(x, y, align='center', alpha=0.7)
plt.xlabel('Number of events')
plt.ylabel('Probability')
plt.title(f'Poisson Distribution\n lambda={lambda_val}')
plt.show()

### Percentiles?

In [None]:
# Calculate 5% percentile bands
percentiles = [np.percentile(ds_bevs, p).astype(int) for p in range(0, 101, 5)]

# Convert to pandas Series
percentile_series = pd.Series(percentiles, index=[f"{i}%" for i in range(0, 101, 5)])

percentile_series

In [None]:
reconstructed_data = []
for i in range(len(percentile_series) - 1):
    start = percentile_series.iloc[i]
    end = percentile_series.iloc[i+1]
    # Number of points for 5% of the data
    num_points = int(0.005 * len(ds_bevs))
    reconstructed_data.extend(np.linspace(start, end, num_points))

reconstructed_data = np.array(reconstructed_data).round(0).astype(int)
reconstructed_data

In [None]:
plt.hist(ds_bevs, bins=50, color='g', alpha=0.4, label='Original Data', density=True)
plt.hist(reconstructed_data, bins=50, color='b', alpha=0.4, label='Reconstructed Data', density=True)
plt.legend(loc='best')
plt.title('A comparison between original data and \n data reconstructed from the percentiles.')
plt.show()

### Value Probabilities (normalised value counts)

In [None]:
# Get unique values and their counts
values, counts = np.unique(ds_bevs, return_counts=True)

# Convert to Pandas Series
counts = pd.Series(counts, index=values)
probabilities = counts / counts.sum()
probabilities

In [None]:
probabilities.plot.bar(alpha=0.5)
plt.xticks(rotation='horizontal')
plt.xlabel('Number of BEVs')
plt.ylabel('Probability')
plt.show()

# Creating a list of substation objects

In [None]:
from enum import Enum

class DistSubstation:
    def __init__(self) -> None:
        self.name:str
        self.id:str # str instead of int (for now)
        self.gisData:GISData
        self.vehicles:Vehicles
        self.evDemands:EVDemands
        self.params:SubstationParams

class SubstationParams:
    def __init__(self) -> None:
        self.numCustomers:int
        self.parentLSOAs:list

class GISData:
    def __init__(self) -> None:
        self.latitude:float
        self.longitude:float
        self.boundaries:gpd.GeoSeries

class Vehicles:
    def __init__(self) -> None:
        self.vehicles:pd.Series
        self.bevs:pd.Series
        self.phevs:pd.Series
        self.bevsWithOnPlotParking:pd.Series
        self.phevsWithOnPlotParking:pd.Series

class EVDemands:
    def __init__(self) -> None:
        self.evDemandList:list[EVDemand]

class Quarter(Enum):
    Q1=0
    Q2=1
    Q3=2
    Q4=3

class EVDemand:
    def __init__(self) -> None:
        self.bev:DataValue
        self.phev:DataValue
        self.year:int
        self.quarter:Quarter

class DataValue:
    def __init__(self) -> None:
        self.mean:float = 0
        self.std:float = 0

# Wrapping this in a SubstationDataMapper Class

This class should recieve a list of substation objects, and return them with the empty attributes filled in.

# Handling Multiple Substation Objects

In [None]:
def create_substation_objects(ds_geo, substation_numbers):
    substations = []
    for substation_number in substation_numbers:
        substation = DistSubstation()
        data = ds_geo[ds_geo['Substation Number'] == substation_number]
        substation.name = data['Name'].values[0]
        substation.id = data['Substation Number'].values[0]

        gis_data = GISData()
        gis_data.latitude = data['LATITUDE'].values[0]
        gis_data.longitude = data['LONGITUDE'].values[0]
        gis_data.boundaries = data['geometry'].values[0]
        substation.gisData = gis_data

        params = SubstationParams()
        params.numCustomers = data['Customers'].values[0]
        params.parentLSOAs = None
        substation.params = params
        substations.append(substation)

    return substations

In [None]:
class MultiSubstationDataMapper:
    SAMPLE_SIZE = 1000
    PERCENTILE_INCREMENT = 5

    def __init__(self, ds_data: pd.DataFrame, lsoa_boundaries: gpd.GeoDataFrame, house_data: pd.DataFrame) -> None:
        self.ds_data = ds_data.set_index('Substation Number')
        self.lsoa_boundaries = lsoa_boundaries
        self.house_data = house_data

    def map_to_substation(self, substations: object, data: dict):
        for substation in substations:
            parent_lsoas, intersections = self._find_parent_lsoas(substation)
            substation.params.parentLSOAs = parent_lsoas
            vehicles_instance = Vehicles()
            for key in data:
                setattr(vehicles_instance, key, self._allocate_data_from_lsoa_to_ds(data[key], substation, parent_lsoas, intersections))
            substation.vehicles = vehicles_instance

    def _find_parent_lsoas(self, substation: object):
        intersections = self.lsoa_boundaries.geometry.intersection(self.ds_data.loc[substation.id].geometry)
        pip_mask = ~intersections.is_empty
        parent_lsoas = self.lsoa_boundaries[pip_mask].index.values
        return parent_lsoas, intersections

    def _allocate_data_from_lsoa_to_ds(self, data: pd.DataFrame, substation: object, parent_lsoas: list, intersections):
        data_filtered = data[parent_lsoas]
        household_intersection = self._calculate_household_intersection(substation, parent_lsoas, intersections)
        data_from_lsoas = np.empty(shape=(len(parent_lsoas), self.SAMPLE_SIZE))
        for i, lsoa in enumerate(parent_lsoas):  # For each intersecting LSOA
            n = np.maximum(data_filtered[lsoa].astype(int), 0)
            p = np.clip(household_intersection.loc[lsoa], 0, 1)
            data_from_lsoas[i] = binom.rvs(n=n, p=p, size=(1, self.SAMPLE_SIZE))
        data_from_lsoas = np.add.reduce(data_from_lsoas).flatten().astype(int)
        return self._calculate_percentiles(data=data_from_lsoas)

    def _calculate_household_intersection(self, substation: object, parent_lsoas: list, intersections):
        ds_customers_in_lsoas = self._calculate_ds_customers_in_lsoas(substation, parent_lsoas, intersections)
        households = self.house_data.loc[parent_lsoas].households
        return ds_customers_in_lsoas.divide(households)

    def _calculate_ds_customers_in_lsoas(self, substation: object, parent_lsoas: list, intersections):
        relative_intersections = self._calculate_relative_intersection(substation, parent_lsoas, intersections)
        ds_customers_in_lsoas = relative_intersections * substation.params.numCustomers
        ds_customers_in_lsoas = ds_customers_in_lsoas.fillna(0) # If numCustomers == NaN, assume numCustomers = 0
        return ds_customers_in_lsoas # Can sometimes be NaN which causes problems. Need to code an alternative way to calulate based purely on area intersection!

    def _calculate_relative_intersection(self, substation: object, parent_lsoas: list, intersections):
        intersection_areas = intersections.loc[parent_lsoas].area
        substation_area = self.ds_data.loc[substation.id].geometry.area
        return intersection_areas / substation_area

    def _calculate_percentiles(self, data):
        percentiles = [np.percentile(data, p).astype(int) for p in range(0, 101, self.PERCENTILE_INCREMENT)]
        percentile_series = pd.Series(percentiles, index=[f"{i}%" for i in range(0, 101, self.PERCENTILE_INCREMENT)])
        return percentile_series

In [None]:
quarter = '2023 Q1'

# Vehicle Registrations
vehicle_registration_estimator = VehicleRegistrationEstimator(relative_difference_samples)
vehicle_registrations_data_samples = vehicle_registration_estimator.estimate(vehicle_registrations_data_i, quarter)
bev_registrations_data_samples = vehicle_registration_estimator.estimate(bev_registrations_data_i, quarter)
phev_registrations_data_samples = vehicle_registration_estimator.estimate(phev_registrations_data_i, quarter)

# EVs with On Plot Parking
evs_with_on_plot_parking_estimator = EVsWithOnPlotParkingEstimator(proportion_of_vehicles_with_on_plot_parking_samples)
bevs_with_on_plot_parking_samples = evs_with_on_plot_parking_estimator.estimate(bev_registrations_data_samples)
phevs_with_on_plot_parking_samples = evs_with_on_plot_parking_estimator.estimate(phev_registrations_data_samples)

# proportion_of_vehicles_with_on_plot_parking_samples is assumed to be stationary and can therefore be precomputed

In [None]:
substations = create_substation_objects(ds_geo, ds_geo['Substation Number'].sample(100).values)

substation_data_mapper = MultiSubstationDataMapper(
    ds_data=ds_geo,
    lsoa_boundaries=lsoa_boundaries,
    house_data=house_2021_data
)

data = {
    'vehicles': vehicle_registrations_data_samples, 
    'bevs': bev_registrations_data_samples,
    'phevs': phev_registrations_data_samples,
    'bevsWithOnPlotParking': bevs_with_on_plot_parking_samples,
    'phevsWithOnPlotParking': phevs_with_on_plot_parking_samples
}

substation_data_mapper.map_to_substation(substations=substations, data=data)

In [None]:
substations[2]

In [None]:
substations[2].vehicles.vehicles

In [None]:
substations[2].vehicles.bevs

In [None]:
substations[2].vehicles.bevsWithOnPlotParking

In [None]:
substations[2].params.parentLSOAs

In [None]:
bev_registrations_data_samples[substations[2].params.parentLSOAs]

In [None]:
substations[0].id

# Looking for evidence in real world monitor data

In [None]:
substation_list = list(pd.read_csv('../Data/DistributionNetwork/substation_list.csv', header=None, dtype=str).sort_values(by=0, ascending=True).iloc[:, 0].values)
substation_list

In [None]:
list(ds_geo['Substation Number'].values)

In [None]:
common_substation_list = intersection_of_lists(substation_list, list(ds_geo['Substation Number'].values))
common_substation_list

In [None]:
len(common_substation_list)

In [None]:
substations = create_substation_objects(ds_geo, common_substation_list)

substation_data_mapper = MultiSubstationDataMapper(
    ds_data=ds_geo,
    lsoa_boundaries=lsoa_boundaries,
    house_data=house_2021_data
)

data = {
    'vehicles': vehicle_registrations_data_samples, 
    'bevs': bev_registrations_data_samples,
    'phevs': phev_registrations_data_samples,
    'bevsWithOnPlotParking': bevs_with_on_plot_parking_samples,
    'phevsWithOnPlotParking': phevs_with_on_plot_parking_samples
}

substation_data_mapper.map_to_substation(substations=substations, data=data)

In [None]:
[f"{i}%" for i in range(0, 101, 5)]

In [None]:
bevs_with_on_plot_parking_selected_substations = pd.DataFrame(index=[f"{i}%" for i in range(0, 101, 5)], columns=common_substation_list)
for substation in substations:
    bevs_with_on_plot_parking_selected_substations[substation.id] = substation.vehicles.bevsWithOnPlotParking

In [None]:
bevs_with_on_plot_parking_selected_substations

In [None]:
bevs_with_on_plot_parking_selected_substations.to_csv('../Data/test.csv')