In [132]:
#check ipynb is running
print("Hello, World!")

Hello, World!


In [133]:
#set working directory
import os
os.chdir("/Users/jovita.brundziene/Python/airflow-de-intro-project-jbru")

In [134]:
#check working directory set to project root to use relative pathways later
os.getcwd()

'/Users/jovita.brundziene/Python/airflow-de-intro-project-jbru'

To do:
- Go through repo steps
- Include dev/prod environment parameters
- add parameters to config file
- create a docker image
- create a github action to run pipeline automatically
- create unit tests
- modularise code into at least config, functions and run
- Update requirements file and build it into the script
- Requirements lint?
- Nice to have: package it up as a python package?

### Extract data from local to S3

In [135]:
import boto3
import os
import logging
from botocore.exceptions import ClientError

# Set up logging configuration
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def upload_parquet_files_to_s3(bucket_name, local_directory, s3_prefix, dry_run=True):
    """
    Uploads .parquet files from a local directory to an S3 bucket under a specified prefix.

    Parameters:
        bucket_name (str): Name of the S3 bucket.
        local_directory (str): Path to the local directory containing .parquet files.
        s3_prefix (str): Path prefix within the S3 bucket.
        dry_run (bool): If True, simulates the upload without actually uploading files.
    """
    # Create an S3 client using boto3
    s3 = boto3.client('s3')

    # Loop through all files in the specified local directory
    for file in os.listdir(local_directory):
        # Only process files with a .parquet extension
        if file.endswith('.parquet'):
            # Construct the full local file path
            local_path = os.path.join(local_directory, file)
            # Define the S3 object key (i.e., path within the bucket)
            s3_key = f'{s3_prefix}/{file}'

            try:
                # Check if the file already exists in the S3 bucket
                s3.head_object(Bucket=bucket_name, Key=s3_key)
                logging.info(f"File already exists in S3: s3://{bucket_name}/{s3_key} — skipping upload.")
            except ClientError as e:
                # If the error code is 404, the file does not exist — proceed with upload
                if e.response['Error']['Code'] == '404':
                    if dry_run:
                        # Simulate the upload in dry run mode
                        print(f"[DRY RUN] Would upload: {local_path} to s3://{bucket_name}/{s3_key}")
                    else:
                        # Attempt to upload the file to S3
                        try:
                            s3.upload_file(local_path, bucket_name, s3_key)
                            logging.info(f"Successfully uploaded: {local_path} to s3://{bucket_name}/{s3_key}")
                        except Exception as upload_error:
                            logging.error(f"Failed to upload: {local_path}. Error: {upload_error}")
                else:
                    # Log unexpected errors during head_object check
                    logging.error(f"Error checking existence of {s3_key}: {e}")


In [None]:
#turn this into a config file
upload_parquet_files_to_s3(
    bucket_name='alpha-hmcts-de-testing-sandbox',
    local_directory='data/example-data',
    s3_prefix='de-intro-project-jb/dev',
    dry_run=True
)

### Load data

In [137]:
import s3fs

def list_parquet_files_from_s3(bucket_name: str, s3_prefix: str) -> list:
    """
    Lists all Parquet files in a given S3 prefix using s3fs.

    Parameters:
        bucket_name (str): S3 bucket name.
        s3_prefix (str): Prefix (folder path) in the bucket.

    Returns:
        list: List of full S3 paths to Parquet files.
    """
    fs = s3fs.S3FileSystem()
    s3_path = f"s3://{bucket_name}/{s3_prefix}"
    files = fs.ls(s3_path)
    return [f for f in files if f.endswith('.parquet')]

In [138]:
from arrow_pd_parser import reader
import pandas as pd

def load_parquet_files_from_s3(bucket_name, s3_prefix):
    """
    Loads and parses Parquet files from S3 using PyArrow and a custom parser.

    Parameters:
        bucket_name (str): S3 bucket name.
        s3_prefix (str): Prefix (folder path) in the bucket.

    Returns:
        pd.DataFrame: Combined DataFrame from all Parquet files.
    """
    #from your_module import list_parquet_files_from_s3  # adjust import as needed

    parquet_files = list_parquet_files_from_s3(bucket_name, s3_prefix)
    all_dfs = []

    for file_path in parquet_files:
        df = reader.read(file_path)  # reader handles S3 paths directly
        all_dfs.append(df)

    return pd.concat(all_dfs, ignore_index=True) if all_dfs else pd.DataFrame()


In [None]:
#load data
bucket = "alpha-hmcts-de-testing-sandbox"
prefix = "de-intro-project-jb/dev"

df = load_parquet_files_from_s3(bucket, prefix)
df.head()

### Metadata

In [191]:
#libraries
import pandas as pd
from arrow_pd_parser import reader

#function to load parquet files
def load_parquet(
    parquet_path: str

) -> pd.DataFrame:
    """
    Parameters:
    - parquet_path: Path to the Parquet file

    Returns:
    - Cleaned Pandas DataFrame
    """
    #load parquet with metadata
    df = reader.read(
        input_path = parquet_path
    )

    return df

In [192]:
#people-part1 df
df1 = load_parquet(
    parquet_path = "data/example-data/people-part1.parquet"
)

df1.head()

Unnamed: 0_level_0,User Id,First Name,Last Name,Email,Phone,Date of birth,Job Title,Source extraction date
Index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1,e09c4f4cbfEFaFd,Dawn,Trevino,clintongood@example.org,360-423-5286,1972-01-17,"Teacher, primary school",2024-02-29T12:30:10
2,D781D28b845Ab9D,Dale,Mcknight,clairebradshaw@example.org,9062423229,1931-01-31,"Development worker, community",2024-02-29T12:30:10
3,eda7EcaF87b2D80,Herbert,Bean,johnnybooker@example.org,001-149-154-0679x1617,2018-02-10,Ceramics designer,2024-02-29T12:30:10
4,E75ACea5D7AeC3e,Karen,Everett,wkhan@example.org,870.294.7563x20939,1938-06-14,"Civil engineer, consulting",2024-02-29T12:30:10
5,9C4Df1246ddf543,Angela,Shea,reginaldgarner@example.com,242.442.2978,1971-11-22,Health and safety adviser,2024-02-29T12:30:10


In [193]:
#check data types
print(df1.dtypes)

User Id                   string[python]
First Name                string[python]
Last Name                 string[python]
Email                     string[python]
Phone                     string[python]
Date of birth             string[python]
Job Title                 string[python]
Source extraction date    string[python]
dtype: object


In [194]:
#load metadata function
import json
import os

def load_metadata(filename: str) -> dict:
    """
    Load metadata from a JSON file located in the data/metadata folder.
    """
    metadata_path = os.path.join("data", "metadata", filename)
    with open(metadata_path, "r", encoding="utf-8") as f:
        return json.load(f)

# Example usage:
metadata = load_metadata("intro-project-metadata.json")
metadata

{'$schema': 'https://moj-analytical-services.github.io/metadata_schema/mojap_metadata/v1.3.0.json',
 'name': 'Intro Project',
 'description': '',
 'file_format': '',
 'sensitive': False,
 'columns': [{'name': 'User ID', 'type': 'string', 'description': ''},
  {'name': 'First Name',
   'type': 'string',
   'description': 'A Users first name'},
  {'name': 'Last Name', 'type': 'string', 'description': 'A Users last name'},
  {'name': 'Email', 'type': 'string', 'description': 'A Users email address'},
  {'name': 'Phone', 'type': 'string', 'description': 'A Users phone number'},
  {'name': 'Date of birth',
   'type': 'timestamp(s)',
   'datetime_format': '%Y-%m-%dT%H:%M:%S',
   'description': 'A Users date of birth'},
  {'name': 'Job Title', 'type': 'string', 'description': 'A Users job title'},
  {'name': 'Source extraction date',
   'type': 'timestamp(s)',
   'datetime_format': '%Y-%m-%dT%H:%M:%S',
   'description': "Timestamp for start of record's presence in database"}],
 'primary_key':

In [195]:
#create a metadata class and validate
from mojap_metadata import Metadata

# Create Metadata object from JSON
metadata_obj = Metadata.from_dict(metadata)

# Check the object
print(metadata_obj.name)          # Project name


Intro Project


In [196]:
#validate metadata against schema
metadata_obj.validate()

In [199]:
#normalise column names to lowercase and replace spaces with underscores.
def normalize_column_names(df):
    df.columns = [c.lower().replace(" ", "_") for c in df.columns]
    return df

df1 = normalize_column_names(df1)

df1.head()


Unnamed: 0_level_0,user_id,first_name,last_name,email,phone,date_of_birth,job_title,source_extraction_date
Index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1,e09c4f4cbfEFaFd,Dawn,Trevino,clintongood@example.org,360-423-5286,1972-01-17,"Teacher, primary school",2024-02-29T12:30:10
2,D781D28b845Ab9D,Dale,Mcknight,clairebradshaw@example.org,9062423229,1931-01-31,"Development worker, community",2024-02-29T12:30:10
3,eda7EcaF87b2D80,Herbert,Bean,johnnybooker@example.org,001-149-154-0679x1617,2018-02-10,Ceramics designer,2024-02-29T12:30:10
4,E75ACea5D7AeC3e,Karen,Everett,wkhan@example.org,870.294.7563x20939,1938-06-14,"Civil engineer, consulting",2024-02-29T12:30:10
5,9C4Df1246ddf543,Angela,Shea,reginaldgarner@example.com,242.442.2978,1971-11-22,Health and safety adviser,2024-02-29T12:30:10


In [None]:
# Convert 'Date of birth' from 'YYYY-MM-DD' to ISO timestamp format 'YYYY-MM-DDTHH:MM:SS'
def convert_to_iso_timestamp(date_str):
    if pd.isna(date_str):
        return None
    try:
        # Parse date string and format as ISO timestamp
        return pd.to_datetime(date_str, format='%Y-%m-%d').strftime('%Y-%m-%dT%H:%M:%S')
    except Exception as e:
        print(f'Error converting {date_str}: {e}')
        return None

df1 = df1.apply(convert_to_iso_timestamp)
print(df1['date_of_birth'].head(10))

KeyError: 'Date of birth'

In [187]:

def enforce_metadata_types(df, metadata_obj):
    """
    Enforce column data types in a DataFrame based on mojap-metadata schema.
    Handles both dict-based and object-based columns.
    """
    for col in metadata_obj.columns:
        # Detect if col is dict or object
        if isinstance(col, dict):
            col_name = col["name"].lower().replace(" ", "_")
            col_type = col["type"]
            fmt = col.get("datetime_format", None)
        else:  # Column-like object
            col_name = col.name.lower().replace(" ", "_")
            col_type = col.type
            fmt = getattr(col, "datetime_format", None)

        if col_name not in df.columns:
            print(f"⚠️ Column '{col_name}' not found in DataFrame.")
            continue

        # Apply type casting
        if col_type == "string":
            df[col_name] = df[col_name].astype("string")
        elif col_type.startswith("timestamp"):
            df[col_name] = pd.to_datetime(df[col_name], format=fmt, errors="coerce")
        elif col_type in ["int", "integer"]:
            df[col_name] = pd.to_numeric(df[col_name], errors="coerce").astype("Int64")
        elif col_type in ["float", "double"]:
            df[col_name] = pd.to_numeric(df[col_name], errors="coerce")

    return df


In [None]:
#enforce metadata types
df1 = enforce_metadata_types(df1, metadata_obj)
df1

ValueError: cannot assemble with duplicate keys

In [151]:
print(df1.dtypes)


user_id                   string[python]
first_name                string[python]
last_name                 string[python]
email                     string[python]
phone                     string[python]
date_of_birth             datetime64[ns]
job_title                 string[python]
source_extraction_date    datetime64[ns]
dtype: object
