In [0]:
pip install openpyxl

In [0]:
import os, re, uuid, traceback, pandas as pd
import hashlib
from typing import List, Optional, Tuple
from datetime import datetime, date
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.types import *
from pyspark.sql.functions import col as F_col, count, when, upper, trim
import yaml


In [0]:
%run ./data_extraction

In [0]:
%sql
USE CATALOG orderbooks_main;
CREATE SCHEMA IF NOT EXISTS bronze;

CREATE TABLE IF NOT EXISTS bronze.orderbook_data (
  JobNumber                       STRING,
  Office                          STRING,
  office_div                      STRING,
  ProjectTitle                    STRING,
  Client                          STRING,
  location_country                STRING,
  gross_fee_usd                   DECIMAL(18,2),
  fee_earned_usd                  DECIMAL(18,2),
  gross_fee_yet_to_be_earned_usd  DECIMAL(18,2),
  Currency                        STRING,
  GrossFee                        DECIMAL(18,2),
  GrossFeeEarned                  DECIMAL(18,2),
  GrossFeeYetToBeEarned           DECIMAL(18,2),
  Status                          STRING,
  NewProject                      INT,
  StartDate                       DATE,
  anticipated_end_date            DATE,
  ProjectType                     STRING,

  -- metadata
  source_file                     STRING,
  source_mtime                    TIMESTAMP,
  data_year                       INT,
  data_month                      STRING,
  data_month_num                  INT,
  data_collection_date            DATE,
  row_hash                        STRING
)
USING DELTA
PARTITIONED BY (data_year, data_month_num)
TBLPROPERTIES (
  delta.autoOptimize.optimizeWrite = true,
  delta.autoOptimize.autoCompact   = true
);

In [0]:
# Load configuration
with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

# Access configuration
CATALOG_NAME = config['catalog']['name']
SCHEMA_NAME = config['catalog']['schema']
TABLE_NAME = config['catalog']['table']
PROCESSLOG_TABLE = config['catalog']['process_log_table']

VOLUME_PATH = config['volume']['path']
SOURCE_FOLDER = config['volume']['source_folder']

VALID_EXTENSIONS = set(config['processing']['valid_extensions'])
SCAN_ROWS = config['processing']['scan_rows']
FILE_PREFIX_PATTERN = config['processing']['file_prefix_pattern']

TARGET_COLUMNS = config['target_columns']
COLUMN_ALIASES = config.get('column_aliases', {})

In [0]:
#  Storage Configuration
# STORAGE_ACCOUNT = "financeorderbook"
# CONTAINER = "orderbooksall" 
# BASE_PATH = f"abfss://{CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net"

# # Unity Catalog Configuration
# CATALOG_NAME = "orderbooks_main"
# SCHEMA_NAME = "bronze"
# TABLE_NAME = "orderbook_data"
# PROCESSLOG_TABLE = "file_processing_log"

# # Volume path
# VOLUME_PATH = "/Volumes/orderbooks_main/raw_files"
# FOLDER = "fy25"

# # File Processing Configuration
# SOURCE_FOLDER = "FY25"  # Folder in your container with Excel files
# VALID_EXTENSIONS = {".xlsx", ".xlsm", ".xls"}
# SCAN_ROWS = 20
# FILE_PREFIX_PATTERN = r"^1[._]"

# # Target Columns (matching your existing script)
# TARGET_COLUMNS = [
#     'JobNumber', 'Office', 'Office (Div)', 'ProjectTitle', 'Client', 
#     'Location (Country)', 'Gross Fee (USD)', 'Fee Earned (USD)', 
#     'Gross Fee Yet To Be Earned (USD)', 'Currency', 'GrossFee', 
#     'GrossFeeEarned', 'GrossFeeYetToBeEarned', 'Status', 'NewProject', 
#     'StartDate', 'Anticipated EndDate', 'ProjectType'
# ]

In [0]:
def list_volume_files(volume_path, folder, pattern, extensions):
    """
    List all Excel files in the volume folder that match the prefix pattern.
    """
    folder_path = f"{volume_path}/{folder}" if folder else volume_path
    
    try:
        files = dbutils.fs.ls(folder_path)
        
        matching_files = []
        for file_info in files:
            filename = file_info.name
            file_ext = os.path.splitext(filename)[1].lower()
            
            # Check if file matches pattern and has valid extension
            if re.match(pattern, filename) and file_ext in extensions:
                matching_files.append({
                    'path': file_info.path,
                    'name': filename,
                    'size': file_info.size,
                    'mtime': datetime.fromtimestamp(file_info.modificationTime / 1000)
                })
        
        return matching_files
    except Exception as e:
        print(f"Error listing files: {e}")
        return []

In [0]:
list_volume_files(VOLUME_PATH, SOURCE_FOLDER, FILE_PREFIX_PATTERN, VALID_EXTENSIONS)

In [0]:
def extract_date_from_filename(filename):
    """
    Extract year, month name, and month number from filename.
    Examples:
      "1_Order_Book_Mar_2025.xlsx" -> (2025, "Mar", 3)
      "1. Order Book (Jun 2025).xlsm" -> (2025, "Jun", 6)
    """
    month_map = {
        'jan': (1, 'Jan'), 'january': (1, 'Jan'),
        'feb': (2, 'Feb'), 'february': (2, 'Feb'),
        'mar': (3, 'Mar'), 'march': (3, 'Mar'),
        'apr': (4, 'Apr'), 'april': (4, 'Apr'),
        'may': (5, 'May'),
        'jun': (6, 'Jun'), 'june': (6, 'Jun'),
        'jul': (7, 'Jul'), 'july': (7, 'Jul'),
        'aug': (8, 'Aug'), 'august': (8, 'Aug'),
        'sep': (9, 'Sep'), 'sept': (9, 'Sep'), 'september': (9, 'Sep'),
        'oct': (10, 'Oct'), 'october': (10, 'Oct'),
        'nov': (11, 'Nov'), 'november': (11, 'Nov'),
        'dec': (12, 'Dec'), 'december': (12, 'Dec')
    }
    
    # Extract year (4 digits)
    year_match = re.search(r'20\d{2}', filename)
    year = int(year_match.group()) if year_match else None
    
    # Extract month
    month_num = None
    month_name = None
    
    filename_lower = filename.lower()
    for month_key, (num, name) in month_map.items():
        if month_key in filename_lower:
            month_num = num
            month_name = name
            break
    
    return year, month_name, month_num

def calculate_row_hash(row_dict):
    """
    Calculate a hash for deduplication based on key fields.
    """
    key_fields = ['JobNumber', 'ProjectTitle', 'Client', 'Office']
    hash_string = '|'.join([str(row_dict.get(f, '')) for f in key_fields])
    return hashlib.md5(hash_string.encode()).hexdigest()


def process_excel_file(file_info):
    """
    Process a single Excel file using data_extraction.py module.
    """
    file_path = file_info['path']
    file_name = file_info['name']
    
    try:
        print(f"\n{'='*70}")
        print(f"Processing: {file_name}")
        print(f"{'='*70}")
        
        # Convert volume path to local path for pandas
        # Volume paths are already accessible as local paths in Databricks
        local_path = file_path.replace("dbfs:", "")
        print(local_path)
        # Use your existing read_file function
        df = read_file(local_path)
        
        if df.empty:
            print(f"⚠ No data extracted from {file_name}")
            return pd.DataFrame()
        
        print(f"✓ Successfully extracted {len(df)} rows")
        
        # Add metadata columns
        df['source_file'] = file_name
        df['source_mtime'] = file_info['mtime']
        
        # Extract date information from filename
        year, month_name, month_num = extract_date_from_filename(file_name)
        df['data_year'] = year
        df['data_month'] = month_name
        df['data_month_num'] = month_num
        
        # Set collection date (file modification time or current date)
        df['data_collection_date'] = file_info['mtime'].date()
        
        # Calculate row hash for each row
        df['row_hash'] = df.apply(lambda row: calculate_row_hash(row.to_dict()), axis=1)
        
        return df
        
    except Exception as e:
        print(f"✗ Error processing {file_name}: {e}")
        import traceback
        traceback.print_exc()
        return pd.DataFrame()

def normalize_column_names(df):
    """
    Normalize pandas DataFrame column names to match the Delta table schema.
    """
    column_mapping = {
        'Office (Div)': 'office_div',
        'Location (Country)': 'location_country',
        'Gross Fee (USD)': 'gross_fee_usd',
        'Fee Earned (USD)': 'fee_earned_usd',
        'Gross Fee Yet To Be Earned (USD)': 'gross_fee_yet_to_be_earned_usd',
        'Anticipated EndDate': 'anticipated_end_date',
        'StartDate': 'StartDate',
        'ProjectType': 'ProjectType'
    }
    
    # Rename columns
    df = df.rename(columns=column_mapping)
    
    # Convert column names to lowercase where needed
    final_columns = {}
    for col in df.columns:
        if col in ['JobNumber', 'Office', 'ProjectTitle', 'Client', 'Currency', 
                   'GrossFee', 'GrossFeeEarned', 'GrossFeeYetToBeEarned', 
                   'Status', 'NewProject', 'StartDate', 'ProjectType']:
            final_columns[col] = col
        else:
            final_columns[col] = col.lower()
    
    df = df.rename(columns=final_columns)
    
    return df

In [0]:
spark = SparkSession.builder.getOrCreate()

print(f"ORDER BOOK DATA EXTRACTION PIPELINE")
print(f"Volume: {VOLUME_PATH}/{SOURCE_FOLDER}")
print(f"Target: {CATALOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}")

In [0]:
matching_files = list_volume_files(VOLUME_PATH, SOURCE_FOLDER, FILE_PREFIX_PATTERN, VALID_EXTENSIONS)

print(f"\n✓ Found {len(matching_files)} matching files:")
for f in matching_files:
    print(f"  - {f['name']} ({f['size']:,} bytes, modified: {f['mtime']})")
    

In [0]:
all_dataframes = []
successful_files = []
failed_files = []

for file_info in matching_files:
    try:
        df = process_excel_file(file_info)
        
        if not df.empty:
            all_dataframes.append(df)
            successful_files.append(file_info['name'])
        else:
            failed_files.append((file_info['name'], "No data extracted"))
            
    except Exception as e:
        failed_files.append((file_info['name'], str(e)))
        print(f"✗ Failed to process {file_info['name']}: {e}")

In [0]:
combined_df = pd.concat(all_dataframes, ignore_index=True)
print(f"✓ Total rows before normalization: {len(combined_df)}")

# Normalize column names to match Delta table schema
combined_df = normalize_column_names(combined_df)

# Convert date columns properly
date_columns = ['StartDate', 'anticipated_end_date', 'data_collection_date']
for col_name in date_columns:
    if col_name in combined_df.columns:
        # Try multiple date formats commonly found in Excel
        # First try YYYY-MM-DD, then DD/MM/YYYY, then let pandas infer
        def parse_date_flexible(date_val):
            if pd.isna(date_val) or date_val is None or date_val == '':
                return None
            
            # Try YYYY-MM-DD format first
            try:
                return pd.to_datetime(date_val, format='%Y-%m-%d', errors='raise')
            except:
                pass
            
            # Try DD/MM/YYYY format
            try:
                return pd.to_datetime(date_val, format='%d/%m/%Y', errors='raise')
            except:
                pass
            
            # Try MM/DD/YYYY format
            try:
                return pd.to_datetime(date_val, format='%m/%d/%Y', errors='raise')
            except:
                pass
            
            # Let pandas infer as last resort
            try:
                return pd.to_datetime(date_val, errors='coerce')
            except:
                return None
        
        combined_df[col_name] = combined_df[col_name].apply(parse_date_flexible)
        # Convert datetime to date objects
        combined_df[col_name] = combined_df[col_name].dt.date
        # Replace NaT with None
        combined_df[col_name] = combined_df[col_name].where(pd.notna(combined_df[col_name]), None)

integer_columns = ['data_year', 'data_month_num']
for col_name in integer_columns:
    if col_name in combined_df.columns:
        # Convert to nullable Int64 type to handle NaN values
        combined_df[col_name] = pd.to_numeric(combined_df[col_name], errors='coerce').astype('Int64')
        # Replace NaN with None for Spark
        combined_df[col_name] = combined_df[col_name].where(pd.notna(combined_df[col_name]), None)

print(f"✓ Columns after normalization: {list(combined_df.columns)}")
print(f"✓ Total rows: {len(combined_df)}")
print(f"✓ Date column types:")
for col_name in date_columns:
    if col_name in combined_df.columns:
        print(f"  - {col_name}: {combined_df[col_name].dtype}")
    
print(f"✓ Columns after normalization: {list(combined_df.columns)}")
print(f"✓ Total rows: {len(combined_df)}")

In [0]:
# Properly convert numeric columns to float before Spark conversion
decimal_columns = [
    'gross_fee_usd', 'fee_earned_usd', 'gross_fee_yet_to_be_earned_usd',
    'GrossFee', 'GrossFeeEarned', 'GrossFeeYetToBeEarned'
]

for col_name in decimal_columns:
    if col_name in combined_df.columns:
        # Convert to numeric, coerce errors to NaN, then to nullable float
        combined_df[col_name] = pd.to_numeric(combined_df[col_name], errors='coerce')
        # Fill NaN with None for proper NULL handling in Spark
        combined_df[col_name] = combined_df[col_name].where(pd.notna(combined_df[col_name]), None)

# Ensure string columns are proper strings
string_columns = [
    'JobNumber', 'Office', 'office_div', 'ProjectTitle', 'Client', 
    'location_country', 'Currency', 'Status', 'NewProject', 'ProjectType',
    'source_file', 'data_month', 'row_hash'
]

for col_name in string_columns:
    if col_name in combined_df.columns:
        combined_df[col_name] = combined_df[col_name].astype(str)
        combined_df[col_name] = combined_df[col_name].replace('nan', None)
        combined_df[col_name] = combined_df[col_name].replace('None', None)

In [0]:
combined_df.info()

In [0]:
# Create Spark DataFrame without strict schema first
try:
    spark_df = spark.createDataFrame(combined_df)
    print(f"✓ Spark DataFrame created with {spark_df.count()} rows")
    
    # Cast decimal columns to proper decimal type in Spark
    for col_name in decimal_columns:
        if col_name in spark_df.columns:
            spark_df = spark_df.withColumn(col_name, F_col(col_name).cast(DecimalType(18, 2)))
            print(f"✓ Decimal columns cast to DecimalType(18, 2)")
    
    # Cast 1 if 'New Project', 0 otherwise
    if 'NewProject' in spark_df.columns:
        # Convert NewProject to binary: 1 if new project, 0 otherwise
        spark_df = spark_df.withColumn(
            'NewProject',
            when(
                (upper(trim(F_col('NewProject'))).isin(['New Project', 'NewProject'])),
                1
            ).otherwise(0)
        )
        print(f"✓ 'NewProject' column converted to binary")
    # Force integer types right before write
    spark_df = spark_df.withColumn('data_year', F_col('data_year').cast(IntegerType()))
    spark_df = spark_df.withColumn('data_month_num', F_col('data_month_num').cast(IntegerType()))
except Exception as e:
    print(f"✗ Error creating Spark DataFrame: {e}")
    print("Attempting alternative conversion...")
    
    # Alternative: Let Spark infer schema and cast later
    spark_df = spark.createDataFrame(combined_df.astype(str))
    
    # Cast columns to proper types
    for col_name in decimal_columns:
        if col_name in spark_df.columns:
            spark_df = spark_df.withColumn(col_name, F_col(col_name).cast(DecimalType(18, 2)))
    
    for col_name in string_columns:
        if col_name in spark_df.columns:
            spark_df = spark_df.withColumn(col_name, F_col(col_name).cast(StringType()))
    
    if 'data_year' in spark_df.columns:
        spark_df = spark_df.withColumn('data_year', F_col('data_year').cast(IntegerType()))
    if 'data_month_num' in spark_df.columns:
        spark_df = spark_df.withColumn('data_month_num', F_col('data_month_num').cast(IntegerType()))
    
    print(f"✓ Alternative conversion successful")


In [0]:
# Check spark_df schema
spark_df.printSchema()

# Check specific problematic columns
for col_name in ['data_year', 'data_month_num', 'NewProject']:
    col_type = str(spark_df.schema[col_name].dataType)
    print(f"{col_name}: {col_type}")

In [0]:
# Store parquet file
parquet_path = "/Volumes/orderbooks_main/raw_files/fy25/processed_orderbook.parquet"
spark_df.write.mode("overwrite").parquet(parquet_path)


In [0]:
table_full_name = f"{CATALOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}"
    
try:
    spark_df.write.mode("overwrite").saveAsTable(table_full_name)
    
    final_count = spark.table(table_full_name).count()
    
    print(f"\n SUCCESS!")
    print(f"✓ Data written to: {table_full_name}")
    print(f"✓ Total records in table: {final_count:,}")

except Exception as e:
    print(f"❌ Error writing to Unity Catalog: {e}")
    import traceback
    traceback.print_exc()
    raise