In [0]:
from pyspark.sql.functions import explode


In [0]:
%run ../../utils/utils.py

In [0]:
dbutils.widgets.text("run_date", "30072025", "Run Date")
run_date = dbutils.widgets.get("run_date")
print(f"Running pipeline for date: {run_date}")

In [0]:
# load config
current_dir = os.getcwd()
config_path = os.path.join(current_dir, '..', '..', '..', 'config', 'config.yaml')
config_path = os.path.normpath(config_path)
config, root_path, master_data_folder, master_data_files = load_config(config_path)
operational_folder = config['paths']['operational_data_folder']

In [0]:
def load_operational_data(config, run_date, root_path, operational_folder): 
    datasets = []

    for file_key, file_info in config['operational_files'].items():
        base_name = file_info['base_name']
        extension = file_info['extension']
        schema_config = file_info.get('schema')  # get schema config, may be None
        spark_schema = parse_schema(schema_config) if schema_config else None

        filename = f"{base_name}_{run_date}.{extension}"
        file_path = os.path.join(root_path, operational_folder, filename)
        print(f"Loading {file_key} from {file_path}")

        print(f"Schema config: {schema_config}")
        print(f"Extension: {extension}")

        if extension.lower() == 'csv':
            if spark_schema:
                df = spark.read.schema(spark_schema).csv(file_path, header=True)
            else:
                df = spark.read.csv(file_path, header=True, inferSchema=True)
        elif extension.lower() == 'json':
            if spark_schema:
                print("Using schema for JSON read")
                df = spark.read.schema(spark_schema).json(file_path)
            else:
                df = spark.read.json(file_path)
        else:
            raise ValueError(f"Unsupported file extension for {file_key}: {extension}")

        primary_key = file_info.get('primary_key', [])
        if isinstance(primary_key, str):
            primary_key = [primary_key]

        datasets.append({
            "name": file_key,
            "df": df,
            "pk_cols": primary_key,
        })

    return datasets


In [0]:
datasets = load_operational_data(config, run_date, root_path, operational_folder)

In [0]:
def validate_dataset(dataset):
    """
    Validates a dataset dictionary with keys:
    - 'name': string, dataset name
    - 'df': Spark DataFrame
    - 'pk_cols': list of primary key column names
    """

    name = dataset['name']
    df = dataset['df']
    pk_cols = dataset['pk_cols']
    print(f"Validating dataset: {name}")

    # Check if DataFrame is empty
    check_empty(df, name)

    # Check duplicates in primary key columns
    check_duplicates(df, pk_cols, name)

    # Check nulls in critical columns
    check_empty(df, name)

In [0]:
#TODO: de-hardcode value, move it to utils
def flatten_team_moves(dataset):
    if dataset['name'] == 'team_moves':
        df = dataset['df']
        exploded_df = df.select(
            "user_id",
            explode("team").alias("team_move")
        ).select(
            "user_id",
            "team_move.team_id",
            "team_move.effective_from"
        )
        dataset['df'] = exploded_df
    return dataset

In [0]:
datasets = [flatten_team_moves(ds) for ds in datasets]

In [0]:
for dataset in datasets:

    validate_dataset(dataset)

In [0]:
#@TODO: Add audit columns

In [0]:
fact_df = next(ds['df'] for ds in datasets if ds['name'] == 'fact_feed')
fact_table = f"{config['operational_layer']['schema']}.fact_feed"
team_moves_df = next(ds['df'] for ds in datasets if ds['name'] == 'team_moves')
team_moves_table = f"{config['operational_layer']['schema']}.team_moves"

In [0]:
print(f"Fact feed rows: {fact_df.count() if fact_df else 'Not loaded'}")
print(f"Team moves rows: {team_moves_df.count() if team_moves_df else 'Not loaded'}")

In [0]:
def validate_fact_feed(df):
    pk_cols = ['datekey', 'user_id', 'task_id']
    critical_cols = pk_cols + ['recordedtimehours']
    check_duplicates(df, pk_cols, 'fact_feed')
    check_nulls(df, critical_cols, 'fact_feed')

def validate_team_moves(df):
    pk_cols = ['user_id', 'effective_from']
    critical_cols = pk_cols + ['team_id']
    check_duplicates(df, pk_cols, 'team_moves')
    check_nulls(df, critical_cols, 'team_moves')

validate_fact_feed(fact_df)
validate_team_moves(team_moves_df)


In [0]:
%sql
--Create schema and Delta table if it doesn't exist (empty table)

CREATE SCHEMA IF NOT EXISTS openbet_operational; --TODO: hardcode

CREATE TABLE IF NOT EXISTS openbet_operational.fact_feed ( --TODO: hardcode
  datekey INT,
  user_id INT,
  task_id INT,
  recordedtimehours DOUBLE
)
USING DELTA

In [0]:
merge_condition = """
  target.datekey = source.datekey AND
  target.user_id = source.user_id AND
  target.task_id = source.task_id
"""

from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, fact_table)  

# Perform merge
(deltaTable.alias("target")
    .merge(fact_df.alias("source"), merge_condition)
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

In [0]:
%sql
--Create schema and Delta table if it doesn't exist (empty table)

CREATE SCHEMA IF NOT EXISTS openbet_operational; --TODO: hardcode

CREATE TABLE IF NOT EXISTS openbet_operational.fact_feed ( --TODO: hardcode
  datekey INT,
  user_id INT,
  task_id INT,
  recordedtimehours DOUBLE
)
USING DELTA

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS openbet_operational;

CREATE TABLE IF NOT EXISTS openbet_operational.team_moves (
  user_id STRING,
  team_id INT,
  effective_from STRING
)
USING DELTA;


In [0]:
merge_condition = """
  target.user_id = source.user_id AND
  target.team_id = source.team_id AND
  target.effective_from = source.effective_from
"""

team_moves_table = f"{config['operational_layer']['schema']}.team_moves"

# Load Delta table for team_moves
deltaTeamMoves = DeltaTable.forName(spark, team_moves_table)
# Or use forPath if you prefer:
# deltaTeamMoves = DeltaTable.forPath(spark, "/path/to/team_moves/delta/table")

# Perform merge/upsert
(
    deltaTeamMoves.alias("target")
    .merge(team_moves_df.alias("source"), merge_condition)
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

In [0]:
# %sql
# DROP TABLE IF EXISTS openbet_operational.fact_feed;

# DROP TABLE IF EXISTS openbet_operational.team_moves;


In [0]:
%sql
select * from openbet_operational.fact_feed;

In [0]:
%sql
select * from openbet_operational.team_moves

In [0]:
# TODO: ADD postruns sanity checks