In [38]:
import re
import os
from mojap_metadata import Metadata
from arrow_pd_parser import writer, reader, caster
from data_linter import validation
from dataengineeringutils3.s3 import get_filepaths_from_s3_folder

from logging import getLogger

logger = getLogger(__name__)

LAND_BUCKETS = {"preprod": "mojap-land-dev", "prod": "mojap-land"}

RAW_HIST_BUCKETS = {"preprod": "mojap-raw-hist-dev", "prod": "mojap-raw-hist"}

BASE_CONFIG = {
    "land-base-path": "s3://{bucket}/corporate/matrix",
    "fail-base-path": "s3://{bucket}/corporate/matrix/fail/",
    "pass-base-path": "s3://{bucket}/corporate/matrix/pass/",
    "log-base-path": "s3://{bucket}/corporate/matrix/log/",
    "compress-data": False,
    "remove-tables-on-pass": True,
    "all-must-pass": False,
}

TABLE_CONFIG = {
    "required": True,
    "allow-unexpected-data": True,
    "allow-missing-cols": True,
}

META_PATH = {
    "bookings": "fff",
    "locations": "fff",
}


def create_config(scrape_date, env, table):
    buckets = {"land": LAND_BUCKETS[env], "raw-hist": RAW_HIST_BUCKETS[env]}
    config = BASE_CONFIG
    config["land-base-path"] = config["land-base-path"].format(bucket=buckets["land"])
    config["fail-base-path"] = config["fail-base-path"].format(
        bucket=buckets["raw-hist"]
    )
    config["pass-base-path"] = config["pass-base-path"].format(
        bucket=buckets["raw-hist"]
    )
    config["log-base-path"] = config["log-base-path"].format(bucket=buckets["raw-hist"])
    config["tables"] = {}
    config["tables"][table] = TABLE_CONFIG
    config["tables"][table]["pattern"] = f"/{table}/{scrape_date}"
    config["tables"][table]["metadata"] = META_PATH[table]
    return config


def assert_no_files(scrape_date, env, table):
    config = create_config(scrape_date, env, table)
    land_files = get_filepaths_from_s3_folder(config["land-base-path"])
    land_files = [
        file
        for file in land_files
        if re.match(
            f"/{table}/{scrape_date}",
            file.replace(config["land-base-path"], ""),
        )
    ]
    pass_files = get_filepaths_from_s3_folder(config["pass-base-path"])
    pass_files = [
        file
        for file in pass_files
        if re.match(
            f"/{table}/{scrape_date}",
            file.replace(config["pass-base-path"], ""),
        )
    ]
    fail_files = get_filepaths_from_s3_folder(config["fail-base-path"])
    fail_files = [
        file
        for file in fail_files
        if re.match(
            f"/{table}/{scrape_date}",
            file.replace(config["fail-base-path"], ""),
        )
    ]
    assert (not land_files and not fail_files) and pass_files, logger.error(
        f"Failed to validate data for {scrape_date}, see one of {fail_files}"
    )
    logger.info(f"Latest ingest validated against schema for {scrape_date}")

In [40]:
scrape_date = "2023-10-10"
table = "bookings"
config = create_config(
    scrape_date,
    "preprod",
    table
)
land_files = get_filepaths_from_s3_folder(config["land-base-path"])
[
    file
    for file in land_files
    if re.match(
        f"/{table}/{scrape_date}",
        file.replace(config["land-base-path"], ""),
    )
]

--- Logging error ---
Traceback (most recent call last):
  File "/Users/matt.heery/.pyenv/versions/3.9.13/lib/python3.9/logging/__init__.py", line 434, in format
    return self._format(record)
  File "/Users/matt.heery/.pyenv/versions/3.9.13/lib/python3.9/logging/__init__.py", line 430, in _format
    return self._fmt % record.__dict__
KeyError: 'context'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/matt.heery/.pyenv/versions/3.9.13/lib/python3.9/logging/__init__.py", line 1083, in emit
    msg = self.format(record)
  File "/Users/matt.heery/.pyenv/versions/3.9.13/lib/python3.9/logging/__init__.py", line 927, in format
    return fmt.format(record)
  File "/Users/matt.heery/.pyenv/versions/3.9.13/lib/python3.9/logging/__init__.py", line 666, in format
    s = self.formatMessage(record)
  File "/Users/matt.heery/.pyenv/versions/3.9.13/lib/python3.9/logging/__init__.py", line 635, in formatMessage
    return self

['s3://mojap-land-dev/corporate/matrix/locations/2023-10-10/raw-2023-10-10.jsonl']

In [34]:
df[
    [
        "duration_milliseconds",
        "audit_cancelled_created",
        "audit_cancelled_when",
        "audit_cancelled_event_type",
        "audit_cancelled_event_user_id",
    ]
]

KeyError: "None of [Index(['duration_milliseconds', 'audit_cancelled_created',\n       'audit_cancelled_when', 'audit_cancelled_event_type',\n       'audit_cancelled_event_user_id'],\n      dtype='object')] are in the [columns]"

In [37]:
df.columns

Index(['id', 'time_from', 'time_to', 'label', 'location_id', 'location_kind',
       'status', 'has_started', 'has_ended', 'check_in_status',
       'attendee_count', 'owner_is_attendee', 'source', 'version',
       'has_external_notes', 'owner_id', 'owner_name', 'owner_email',
       'booked_by_id', 'booked_by_name', 'booked_by_email', 'organisation_id',
       'organisation_name', 'duration_millis', 'possible_actions_edit',
       'possible_actions_cancel', 'possible_actions_approve',
       'possible_actions_confirm', 'possible_actions_end_early',
       'possible_actions_change_owner', 'possible_actions_start',
       'possible_actions_view_history', 'audit_created_created',
       'audit_created_when', 'audit_created_event_type',
       'audit_created_event_user_id', 'audit_created_event_user_name',
       'audit_created_event_user_email', 'audit_created_statement',
       'audit_checked_in_created', 'audit_checked_in_when',
       'audit_checked_in_event_type', 'audit_checked_in_