# Revisions Toolkit
This project is intended to help with both obtaining and constructing revisions series for a number of GDP components from the ONS. It's not doing anything revolutionary.

In the 'old' folder it is structured as a Kedro project. Kedro has some little issues it seems with the NAS drive and symlinks, so have had to port it into one big jupyter notebook. Have opened an issue on the Kedro project with a proposed [fix](https://github.com/kedro-org/kedro/issues/3694)


## Outline
For the project:
1. Look to the ONS website and scrape the latest spreadsheets
2. Clean them up and extract the data
3. Construct the revisions series
4. Save the data

## How to run the project
Make sure you've created an environment with the dependencies in `requirements.txt`. If not, you can do this by running the following command:

```miniconda3
conda create --name <env_name> 
pip install -r requirements.txt
```

To make this easier to do, I'd recommend using Visual Studio Code to open the terminal in the correct folder. Otherwise you can copy the path.

The project is now a simple jupyter notebook, so you should be able to run it as usual, by selecting the correct kernel in the top right.

## Common errors
If you're having trouble running the project, you might want to check the following:

1. Make sure you're in the right environment. You can check this by running `conda env list` and seeing which environment is active.

2. The ONS may have changed the format of their spreadsheets, which could cause the scraping to fail. There isn't an easy way to check for this, but you can look at the logs to see if and when the scraping is failing.

3. Sometimes the initial download might fail or be blocked by the ONS website. It might also be an issue with the Bank's firewall, though those issues should be resolved?

## Layout
The rest of this project is structured as follows:

1. Logic
Area for functions and classes which we use later on to be defined.

2. Config
Configuration area for the project

3. Pipelines
Some simple pipelines to structure how we run the ETL

# Section 1: Logic
Contains the functions and classes we need to run the code

In [26]:
from pathlib import PurePosixPath
from typing import Any, Dict

import sys
import logging
import re
import zipfile
import io
import random
import time
import pandas as pd
import os

try:
    import boerequests
    sys.modules["requests"] = boerequests # Replace all requests with boerequests
except:
    import requests as boerequests


class GDPVintage():
    """
    A class representing the GDP vintages (revision triangles) dataset.

    Attributes:
        _writepath (PurePosixPath): The write path for saving the dataset.
        _base_url (str): The base URL for downloading the dataset.
        _name (str): The name of the dataset.
        _extracted_files (List[str]): The list of extracted files from the dataset.
        _release_date (str): The release date of the dataset.

    Methods:
        load() -> Dict[str, pd.DataFrame]:
            Loads the GDP vintages (revision triangles) data from the ONS website.

        save(data: Dict[str, pd.DataFrame]) -> None:
            Saves the data to the specified filepath.

        describe() -> Dict[str, Any]:
            Returns a dictionary that describes the attributes of the dataset.
    """
    def __init__(self, writepath: str, base_url: str, dataset_name: str):
        self._writepath = PurePosixPath(writepath)
        self._base_url = base_url
        self._name = dataset_name
        self._extracted_files = None
        self._release_date = None
        
    def load(self) -> Dict[str, pd.DataFrame]:
        """Loads the GDP vintages (revision triangles) data from the ONS website.
        
        Returns:
            A dictionary containing the dataframes for each sheet in the downloaded release.
        
        Raises:
            FileNotFoundError: If the expected file is missing in the downloaded release.
            ConnectionError: If there is an issue with downloading the release or accessing the data page.
        """
        
        print("Downloading the latest release from the ONS website...")
        
        # Introduce a short random delay to prevent concurrent Airflow tasks from hitting the same URL at the same time
        delay = random.uniform(0, 5)
        time.sleep(delay)
        
        # Send a GET request to the ONS website
        response = boerequests.get(self._base_url, timeout=15)
        
        # Check if the response was successful
        if response.status_code == 200:
            
            # Find the latest release URL from the page content. 
            # This logic assumes the release URL is the first one found on the page.
            pattern = r'<a href="(/file\?uri=.*?)".*?>'
            latest_release_url = re.search(pattern, response.text).group(1)
            latest_release_url = "https://www.ons.gov.uk" + latest_release_url
            print("Found the latest release URL: " + latest_release_url)
            
            # Try get the release date from the URL
            try:
                quarter = re.findall(r"quarter(\d)", latest_release_url)
                year = re.findall(r"(\d{4})", latest_release_url)
                self._release_date = "Q" + quarter[0] + " " + year[0]
            except:
                self._release_date = None
            
            # Send a GET request to the latest release URL and get the response
            response = boerequests.get(latest_release_url, timeout=15)

            # Check if the request was successful
            if response.status_code == 200:

                # Load the data as a dictionary to hold each sheet within a df
                data = {}
                # If the content header is a zip file, load the data using the zipfile module
                if "application/zip" in response.headers.get("content-type"):
                    with zipfile.ZipFile(io.BytesIO(response.content)) as z:
                        for filename in z.namelist():
                            with z.open(filename) as f:
                                data[filename] = pd.read_excel(f, sheet_name=None)
                elif "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" in response.headers.get("content-type"):
                    data[latest_release_url.split("/")[-1]] = pd.read_excel(io.BytesIO(response.content), sheet_name=None)
                
                ## SAVE THE DATA
                print("Saving the raw data...")
                self.save(data)
                
                # Return the data as a dictionary of dataframes
                return data

            else:
                print("[bold red blink]Failed to download the latest release due to status code: " + str(response.status_code), extra={"markup": True})
                raise ConnectionError
        else:
            print("[bold red blink]Failed to access the data page due to status code: " + str(response.status_code), extra={"markup": True})
            raise ConnectionError
        
    def save(self, data: Dict[str, pd.DataFrame]) -> None:
            """
            Saves the data to the specified filepath.

            Args:
                data (Dict[str, pd.DataFrame]): A dictionary containing the data to be saved.
                    The keys represent the filenames, and the values represent the corresponding
                    pandas DataFrame objects.

            Returns:
                None
            """

            for filename, df_dict in data.items():      
                output_file_path = self._writepath / filename
                
                # Create the directory if it doesn't exist
                os.makedirs(output_file_path.parent, exist_ok=True)
                
                with pd.ExcelWriter(output_file_path) as writer:
                    for sheet_name, sheet_df in df_dict.items():
                        sheet_df.to_excel(writer, sheet_name=sheet_name)
                                    
                                    
            self._extracted_files = list(data.keys())  # Log the list of extracted files
            print(f"Successfully saved the latest {self._name} data release from the ONS website.")

    def describe(self) -> Dict[str, Any]:
            """Returns a dictionary that describes the attributes of the dataset.

            Returns:
                A dictionary containing the following attributes:
                - folderpath: The folder path of the dataset.
                - release_date: The release date of the dataset.
                - extracted_files: The list of extracted files from the dataset.
                - name: The name of the dataset.
            """
            
            output_dict = dict(
                folderpath=self._writepath, 
                release_date=self._release_date, 
                extracted_files=self._extracted_files,
                name=self._name
            )
            
            return output_dict
        
# # # # Convenience functions
def get_latest_data(base_url):
    """
    Retrieves the latest data from a given base URL by iterating back in months and testing URLs.

    Args:
        base_url (str): The base URL to construct the test URLs.

    Returns:
        requests.Response: The response object containing the data if a working URL is found.

    Raises:
        ValueError: If no working URL is found.
    """
    # Get the current date
    current_date = pd.Timestamp.now()

    # Adjust the current date to the midpoint of the current month. Might avoid some datetime weirdness
    current_date = current_date.replace(day=15)

    # Iterate back in months from the current date
    for i in range(0, 12):
        time.sleep(0.3)
        # Calculate the date to test
        test_date = current_date - pd.DateOffset(months=i)

        # Construct the URL to test
        test_url = f"{base_url}{test_date.strftime('%b%y').lower()}.zip"

        # Send a request to the URL
        response = boerequests.get(test_url)

        # Check if the response is successful
        if response.status_code == 200:
            return response

    raise ValueError("No working URL found.")

In [25]:
"""Nodes are the building blocks of pipelines, and represent tasks. 
Pipelines are used to combine nodes to build workflows, which range from simple data engineering workflows 
to end-to-end production workflows.
"""

import logging
from typing import List
import pandas as pd
import warnings

def load_data(
    data_dict: dict, 
    list_of_filenames_to_load: List[str]
) -> List[pd.DataFrame]:
    """
    Load the quarterly data from the given data dictionary and return the dataframe.
    
    Parameters:
        data_dict (dict): A dictionary containing the data to be loaded.
        
    Returns:
        pd.DataFrame: The loaded data as a pandas DataFrame.
    """
    print("Loading the data...")
    df_holder = []
    
    for filename, dict_of_sheets in data_dict.items(): # For file extracted from the zip
        for file_to_load in list_of_filenames_to_load: # For the list of files to load in config
            if file_to_load.lower() in filename.lower(): # If the names match
                for sheet_name, df in dict_of_sheets.items(): # For the sheets in the file
                    if "triangle" in sheet_name.lower(): # If the sheet name contains "triangle"
                        df_holder.append(df) # Append to a list of returns
                    elif sheet_name.lower() == "estimate": # If the sheet name is "estimate"
                        df_holder.append(df) # Append to a list of returns
    
    return df_holder


def clean_quarterly_data(data_list: List[pd.DataFrame]) -> pd.DataFrame:
    """
    Cleans the given GDP vintages dataset by performing the following steps:
    1. Drops unnecessary rows.
    2. Replaces the index with the first column.
    3. Drops the first column.
    4. Sets the first row as the column names.
    5. Drops the first row.
    6. Rotates the table.
    7. Replaces empty data with NaNs.

    Parameters:
    - gdp_vintages_ons (pandas.DataFrame): The Q-GDP vintages dataset to be cleaned.

    Returns:
    - pandas.DataFrame: The cleaned Q-GDP vintages dataset.
    """
    print("Cleaning the data...")
    
    clean_data_list = []
    
    for data in data_list:
    
        # Drop some unnecessary rows
        data = data.drop(
            data.index[[0,1,3,4,5,-1]]
        )
        # Replace the index with the first column
        data.index = data.iloc[:, 0]
        
        # Drop the first column now it's been moved
        data = data.drop(data.columns[0], axis=1)
        
        # Make the first row into the names of the columns
        data.columns = data.iloc[0]
        
        # Drop the first row
        data = data.drop(data.index[0])

        # Rotate the table
        data = data.T
        
        # Replace empty data with NaNs
        pd.set_option('future.no_silent_downcasting', True) # Silly warning
        data = data.replace(' ', pd.NA)
        
        # Turn the index into a datetime index which excel can read
        data.index = data.index.map(lambda x: x.replace('Q1', '01').replace('Q2', '04').replace('Q3', '07').replace('Q4', '10'))
       
        with warnings.catch_warnings():  # Suppress the warning about not specifying a format
            warnings.filterwarnings("ignore", category=UserWarning)
            data.index = pd.to_datetime(data.index).to_period('Q') 
        
        
        clean_data_list.append(data)
    
    return clean_data_list


def transform_and_combine(data_list: List[pd.DataFrame]) -> dict:
    """
    Transforms the input DataFrame by constructing revision series for different quarters
    and combines them into a new DataFrame along with the original data.

    Args:
        data (pd.DataFrame): The input DataFrame containing the revisions triangle.

    Returns:
        pd.DataFrame: The combined DataFrame containing the revisions triangle and the revision series.
    """
    t_data_list = []
    print("Transforming the data...")
    
    for data in data_list:
            
        transformed_revisions = pd.DataFrame(index=data.index)
        
        transformed_revisions["First Estimate"] = construct_revision_series(data, 0).values
        transformed_revisions["1st Period"] = construct_revision_series(data, 1).values
        transformed_revisions["2nd Period"] = construct_revision_series(data, 2).values
        transformed_revisions["3rd Period"] = construct_revision_series(data, 3).values
        transformed_revisions["4th Period"] = construct_revision_series(data, 4).values
        transformed_revisions["12th Period"] = construct_revision_series(data, 12).values
        transformed_revisions["36th Period"] = construct_revision_series(data, 36).values
        
        total_gdp_vintage = {
            'Revisions triangle': data,
            'Revisions series': transformed_revisions
        }
        
        t_data_list.append(total_gdp_vintage)
        
        
    return t_data_list


def save_data(data_list: List[dict], input_name_list: List[str], save_path: str) -> None:
    """Save the data to the specified filepath.
    
    Args:
        data (pd.DataFrame): The DataFrame containing the data to be saved.
    
    Returns:
        None
    """
    print("Saving the data...")
    
    save_path = PurePosixPath(save_path)
    
    for data, name in zip(data_list, input_name_list):

        output_path = save_path / (name+'_PROCESSED.xlsx')

        # Create the directory if it doesn't exist
        os.makedirs(output_path.parent, exist_ok=True)

        with pd.ExcelWriter(output_path) as writer:
            for sheet_name, df in data.items():
                df.to_excel(writer, sheet_name=sheet_name, index=True)
            
    

def construct_revision_series(revisions_triangle: pd.DataFrame, periods: int) -> pd.Series:
    """
    Construct the revision series from the revisions triangle, given the specified period.
    Convenience function used in the transform_and_merge node.
    
    Parameters:
        revisions_triangle (pd.DataFrame): The revisions triangle containing the estimates over time.
        periods (int): The number of periods to consider for the revision series.
        
    Returns:
        pd.Series: The revision series calculated based on the specified period.
        
    If the passed period = 0, then the first estimate series is returned.
    """
    revision_series = pd.Series(index=revisions_triangle.index, name=f"{periods}_period_revision_series")
    
    if periods == 0:
        # Assume the user wants the first estimate series
        for idx in revisions_triangle.index:
            # Get the first estimate
            try:
                first_estimate = revisions_triangle.loc[idx].dropna().iloc[0]
            except IndexError:
                first_estimate = pd.NA   
                
            revision_series.loc[idx] = first_estimate         
    else:
        # Calculate the revisions series
        for idx in revisions_triangle.index:
            
            # Get the first estimate            
            # Get the estimate relevant to the period
            try:
                first_estimate = revisions_triangle.loc[idx].dropna().iloc[0]
                final_estimate = revisions_triangle.loc[idx].dropna().iloc[periods]
            except IndexError:
                first_estimate = pd.NA
                final_estimate = pd.NA

            # Calculate the revision series
            if final_estimate is pd.NA:
                revision_series.loc[idx] = pd.NA
            else:
                revision_series.loc[idx] = round(final_estimate - first_estimate, 3)

    return revision_series

In [30]:
def clean_monthly_data(df_list: List[pd.DataFrame]) -> List[pd.DataFrame]:
    """
    Clean the monthly data by performing the following steps:
    1. Set the index of the DataFrame to the first column.
    2. Drop the first two rows.
    3. Drop the first column.
    4. Set the column names to the values in the first row.
    5. Drop the first row.
    6. Drop the second row.
    7. Transpose the DataFrame.
    8. Drop the last column.
    9. Clean the index by removing non-alphanumeric characters and converting it to datetime format.
    
    Args:
        df (pd.DataFrame): The input DataFrame containing the monthly data.
        
    Returns:
        pd.DataFrame: The cleaned DataFrame.
    """
    df_holder = []
        
    for df in df_list:
        df.index = df.iloc[:, 0]
        df = df.drop(index=df.index[0:2])
        df = df.drop(columns=[df.columns[0]])
        df.columns = df.iloc[0, :]
        df = df.drop(index=df.index[0])
        df = df.drop(index=df.index[0])
        df = df.T
        df = df.drop(columns=[df.columns[-1]])
        
        clean_index = df.index.map(lambda x: ' '.join(re.findall(r'[A-Za-z]+|\d+', x)))
        df.index = pd.to_datetime(clean_index, format='%Y %b')

        df_holder.append(df)
        
    return df_holder

# 2. Configuration
Here is where we configure our project, using some of the functions we did before

In [27]:
## Setup for our scraping of the datasets


headline_qgdp_vintages = GDPVintage(
    writepath="data/01_raw/income/",
    base_url="https://www.ons.gov.uk/economy/grossdomesticproductgdp/datasets/revisionstrianglesforukgdpabmi",
    dataset_name="income Vintages"
)

expenditure_qgdp_vintages = GDPVintage(
    writepath="data/01_raw/expenditure/",
    base_url="https://www.ons.gov.uk/economy/grossdomesticproductgdp/datasets/revisionstrianglesforcomponentsfortheexpenditureapproachtothemeasureofukgdp",
    dataset_name="Expenditure Vintages"
)

deflator_qgdp_vintages = GDPVintage(
    writepath="data/01_raw/deflator/",
    base_url="https://www.ons.gov.uk/economy/grossdomesticproductgdp/datasets/revisionstrianglesforukgdpdeflatorquarteronquarterayearago",
    dataset_name="Deflator Vintages"
)

income_mgdp_vintages = GDPVintage(
    writepath="data/01_raw/headline/",
    base_url="https://www.ons.gov.uk/economy/grossdomesticproductgdp/datasets/revisionstrianglesformonthlygdp",
    dataset_name="Monthly Vintages"
)

income_qgdp_vintages = GDPVintage(
    writepath="data/01_raw/investment/",
    base_url="https://www.ons.gov.uk/economy/grossdomesticproductgdp/datasets/revisionstrianglesforcomponentsfortheincomeapproachtothemeasureofukgdp",
    dataset_name="Income Vintages"
)

investment_qgdp_vintages = GDPVintage(
    writepath="data/01_raw/investment/",
    base_url="https://www.ons.gov.uk/economy/grossdomesticproductgdp/datasets/revisionstogrossfixedcapitalformationandbusinessinvestment",
    dataset_name="Investment Vintages"
)

In [28]:
## Setup for loading the correct spreadsheets from the downloaded files

# THIS MATCHES THE FILENAMES IN THE DOWNLOADED ZIP FOLDERS WE WANT TO PROCESS
headline_qgdp_filename_list = [
    "ABMI - Quarterly GDP at Market Prices"
]
headline_qgdp_finalpath = "data/02_intermediate/headline/"

# THIS MATCHES THE FILENAMES IN THE DOWNLOADED ZIP FOLDERS WE WANT TO PROCESS
headline_mgdp_filename_list = [
    "mgdp revision triangle (m on m)", 
    "mgdp revision triangle (3m on 3m)"
]
headline_mgdp_finalpath = "data/02_intermediate/headline/"

# THIS MATCHES THE FILENAMES IN THE DOWNLOADED ZIP FOLDERS WE WANT TO PROCESS
expenditure_qgdp_filename_list = [
    "ABJR - Household Final Consumption",
    "BQKO - Imports of Goods",
    "BQKQ - Exports of Goods",
    "CAFU - Change in Inventories",
    "HAYO - Non-Profit",
    "IKBE - Export of Services",
    "IKBF - Imports of Services",
    "IKBK - Total Imports",
    "NMRY - General Government",
    "NPQT - Gross Fixed Capital",
]
expenditure_finalpath = "data/02_intermediate/expenditure/"

# MATCHES THE FILENAME OF THE DOWNLOADED XL FILE
deflator_qgdp_filename_list = [
    'ybgbgdpimplieddeflator.xlsx'
]
deflator_finalpath = "data/02_intermediate/headline/"

# MATCHES THE FILENAME OF THE DOWNLOADED XL FILE
income_qgdp_filename_list = [
    "CAEQ - Public corporations gross operating surplus.xlsx",
    "CAER - Private non-financial corporations gross operating surplus.xlsx",
    "CGBX - Other Income.xlsx",
    "CMVL - Taxes less subsidies.xlsx",
    "DTWM - Compensation of employees.xlsx",
    "NHCZ - Financial corporations gross operating surplus.xlsx",
]
income_finalpath = "data/02_intermediate/income/"

# MATCHES THE FILENAME OF THE DOWNLOADED XL FILE
investment_qgdp_filename_list = [
    "dataset5rftrt.xls"
]
investment_finalpath = "data/02_intermediate/headline/"

# 3. Pipelines

### Headline Quarterly GDP

In [None]:
# Extract the data and save to local folder
headline_qgdp_raw = headline_qgdp_vintages.load()

In [15]:


# Load up the data
loaded_QGDP_df = load_data(headline_qgdp_raw, headline_qgdp_filename_list)

# Clean the data
clean_QGDP_df = clean_quarterly_data(loaded_QGDP_df)

# Transform the data
transformed_QGDP_df = transform_and_combine(clean_QGDP_df)

# Save the output
save_data(transformed_QGDP_df, headline_qgdp_filename_list, headline_qgdp_finalpath)

Downloading the latest release from the ONS website...
Found the latest release URL: https://www.ons.gov.uk/file?uri=/economy/grossdomesticproductgdp/datasets/revisionstrianglesforukgdpabmi/quarter4octtodec2023firstestimate/abmi.zip
Saving the raw data...
Successfully saved the latest Headline Vintages data release from the ONS website.
Loading the data...
Cleaning the data...
Transforming the data...
Saving the data...


OSError: Cannot save file into a non-existent directory: 'data\02_intermediate\headline'

### Income QGDP

In [None]:
# Extract the data and save to local folder
income_qgdp_raw = income_qgdp_vintages.load()

In [23]:


# Load up the data
income_loaded_QGDP_df = load_data(income_qgdp_raw, income_qgdp_filename_list)

# Clean the data
income_clean_QGDP_df = clean_quarterly_data(income_loaded_QGDP_df)

# Transform the data
income_transformed_QGDP_df = transform_and_combine(income_clean_QGDP_df)

# Save the output
save_data(income_transformed_QGDP_df, income_qgdp_filename_list, income_finalpath)

Downloading the latest release from the ONS website...
Found the latest release URL: https://www.ons.gov.uk/file?uri=/economy/grossdomesticproductgdp/datasets/revisionstrianglesforcomponentsfortheincomeapproachtothemeasureofukgdp/quarter4octtodec2023firstestimate/income.zip


ReadTimeout: HTTPSConnectionPool(host='www.ons.gov.uk', port=443): Read timed out. (read timeout=5)

## Expenditure QGDP

In [None]:
# Extract the data and save to local folder
expenditure_qgdp_raw = expenditure_qgdp_vintages.load()

In [29]:

# Load up the data
expenditure_loaded_QGDP_df = load_data(expenditure_qgdp_raw, expenditure_qgdp_filename_list)

# Clean the data
expenditure_clean_QGDP_df = clean_quarterly_data(expenditure_loaded_QGDP_df)

# Transform the data
expenditure_transformed_QGDP_df = transform_and_combine(expenditure_clean_QGDP_df)

# Save the output
save_data(expenditure_transformed_QGDP_df, expenditure_qgdp_filename_list, expenditure_finalpath)

Downloading the latest release from the ONS website...
Found the latest release URL: https://www.ons.gov.uk/file?uri=/economy/grossdomesticproductgdp/datasets/revisionstrianglesforcomponentsfortheexpenditureapproachtothemeasureofukgdp/quarter4octtodec2023firstestimate/expenditure.zip
Saving the raw data...
Successfully saved the latest Expenditure Vintages data release from the ONS website.
Loading the data...
Cleaning the data...
Transforming the data...


NameError: name 'expenditure_qgdp_finalpath' is not defined

## Deflator QGDP

In [None]:
# Extract the data and save to local folder
deflator_qgdp_raw = deflator_qgdp_vintages.load()

In [None]:


# Load up the data
deflator_loaded_QGDP_df = load_data(deflator_qgdp_raw, deflator_qgdp_filename_list)

# Clean the data
deflator_clean_QGDP_df = clean_quarterly_data(deflator_loaded_QGDP_df)

# Transform the data
deflator_transformed_QGDP_df = transform_and_combine(deflator_clean_QGDP_df)

# Save the output
save_data(deflator_transformed_QGDP_df, deflator_qgdp_filename_list, deflator_qgdp_finalpath)

## Headline Monthly GDP

In [None]:
# Extract the data and save to local folder
headline_mgdp_raw = headline_mgdp_vintages.load()

In [31]:


# Load up the data
headline_loaded_MGDP_df = load_data(headline_mgdp_raw, headline_mgdp_filename_list)

# Clean the data
headline_clean_MGDP_df = clean_monthly_data(headline_loaded_MGDP_df)

# Transform the data
headline_transformed_MGDP_df = transform_and_combine(headline_clean_MGDP_df)

# Save the output
save_data(headline_transformed_MGDP_df, headline_mgdp_filename_list, headline_mgdp_finalpath)

Downloading the latest release from the ONS website...
Found the latest release URL: https://www.ons.gov.uk/file?uri=/economy/grossdomesticproductgdp/datasets/revisionstrianglesformonthlygdp/current/mgdprevisiontriangle.zip
Saving the raw data...
Successfully saved the latest Monthly Vintages data release from the ONS website.
Loading the data...
Transforming the data...
Saving the data...
