# Section 1: Imports and Configuration

In [0]:
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Dict, Any
import json
import os
import re
import boto3
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, input_file_name, regexp_extract, concat
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, ArrayType, MapType, DoubleType

# Configuration constants
CATALOG_NAME = "tulip_sandbox"
SCHEMA_NAME = "sitewise"
S3_BUCKET = os.getenv('BUCKET_NAME', 'hannover-messe-tulip')
BASE_PREFIX = os.getenv('BASE_PREFIX', 'iot-sitewise/')
METADATA_TABLE = f"{CATALOG_NAME}.{SCHEMA_NAME}.file_metadata"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG_NAME}.{SCHEMA_NAME}")

# Known subdirectories and their file types
KNOWN_DIRECTORIES = {
    "agg": "avro",
    "index": "avro",
    "raw": "avro"
}

# File path regex patterns
AGG_RAW_PATTERN = r"startYear=(\d{4})/startMonth=(\d{1,2})/startDay=(\d{1,2})/seriesBucket=([^/]+)/(?:agg|raw)_([^_]+)_(\d+)_GOOD\.avro$"
INDEX_PATTERN = r"series=([^/]+)/startYear=(\d{4})/startMonth=(\d{1,2})/startDay=(\d{1,2})/index_\1_(\d+)_GOOD$"

def get_aws_credentials():
    """Get AWS credentials from environment variables"""
    access_key = os.getenv('AWS_ACCESS_KEY_ID')
    secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
    
    if not access_key or not secret_key:
        raise ValueError("AWS credentials not found in environment variables")
    
    return access_key, secret_key



# Section 2: Data Classes

In [0]:
@dataclass
class S3Config:
    """Configuration for S3 access"""
    bucket: str
    aws_access_key_id: str
    aws_secret_access_key: str
    region: str = os.getenv('AWS_REGION', 'us-east-1')
    base_prefix: str = BASE_PREFIX

    @classmethod
    def from_env(cls) -> 'S3Config':
        """Create S3Config from environment variables"""
        access_key, secret_key = get_aws_credentials()
        return cls(
            bucket=S3_BUCKET,
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key
        )

@dataclass
class FileMetadata:
    """Metadata about processed files"""
    file_path: str
    file_type: str  # 'avro' or 'jsonl'
    directory: str
    last_modified: datetime
    size: int
    table_name: str
    processed_at: datetime = datetime.now()
    record_count: Optional[int] = None
    error_message: Optional[str] = None
    year: Optional[int] = None
    month: Optional[int] = None
    day: Optional[int] = None
    series_bucket: Optional[str] = None
    series_id: Optional[str] = None
    timestamp: Optional[int] = None

# Section 3: Utility Functions

In [0]:
def sanitize_table_name(name: str) -> str:
    """Convert directory name to valid table name"""
    return name.lower().replace('-', '_').replace(' ', '_')

def get_s3_client(config: S3Config) -> boto3.client:
    """Create S3 client with explicit credentials"""
    return boto3.client(
        's3',
        aws_access_key_id=config.aws_access_key_id,
        aws_secret_access_key=config.aws_secret_access_key,
        region_name=config.region
    )

def extract_partition_info(file_path: str, directory: str) -> Dict[str, Any]:
    """Extract partition information from file path based on directory type"""
    if directory in ["agg", "raw"]:
        match = re.search(AGG_RAW_PATTERN, file_path)
        if match:
            return {
                "year": int(match.group(1)),
                "month": int(match.group(2)),
                "day": int(match.group(3)),
                "series_bucket": match.group(4),
                "asset_id": match.group(5),
                "timestamp": int(match.group(6))
            }
    elif directory == "index":
        match = re.search(INDEX_PATTERN, file_path)
        if match:
            return {
                "series_id": match.group(1),
                "year": int(match.group(2)),
                "month": int(match.group(3)),
                "day": int(match.group(4)),
                "timestamp": int(match.group(5))
            }
    return {}

def list_s3_files(client: boto3.client, config: S3Config) -> Dict[str, List[Dict[str, Any]]]:
    """List files in S3 bucket organized by subdirectory"""
    files_by_directory = {dir_name: [] for dir_name in KNOWN_DIRECTORIES.keys()}
    
    for directory, file_type in KNOWN_DIRECTORIES.items():
        prefix = f"{config.base_prefix}{directory}/"
        paginator = client.get_paginator('list_objects_v2')
        
        for page in paginator.paginate(Bucket=config.bucket, Prefix=prefix):
            if 'Contents' not in page:
                continue
                
            for obj in page['Contents']:
                key = obj['Key']
                if key.endswith('/'):  # Skip directories
                    continue
                    
                # Only process files with matching extension
                if (file_type == 'avro' and key.endswith('.avro')) or \
                   (file_type == 'jsonl' and key.endswith('.jsonl')):
                    partition_info = extract_partition_info(key, directory)
                    
                    files_by_directory[directory].append({
                        'key': key,
                        'type': file_type,
                        'directory': directory,
                        'last_modified': obj['LastModified'],
                        'size': obj['Size'],
                        **partition_info
                    })
    
    return files_by_directory

# Section 4: Unity Catalog Table Management

In [0]:
def create_metadata_table():
    """Create metadata table in Unity Catalog if it doesn't exist"""
    schema = StructType([
        StructField("file_path", StringType(), False),
        StructField("file_type", StringType(), False),
        StructField("directory", StringType(), False),
        StructField("last_modified", TimestampType(), False),
        StructField("size", LongType(), False),
        StructField("table_name", StringType(), False),
        StructField("processed_at", TimestampType(), False),
        StructField("record_count", LongType(), True),
        StructField("error_message", StringType(), True),
        StructField("year", LongType(), True),
        StructField("month", LongType(), True),
        StructField("day", LongType(), True),
        StructField("series_bucket", StringType(), True),
        StructField("series_id", StringType(), True),
        StructField("timestamp", LongType(), True)
    ])
    
    empty_df = spark.createDataFrame([], schema)
    empty_df.write.format("delta").mode("ignore").saveAsTable(METADATA_TABLE)

def update_metadata(metadata: List[FileMetadata]):
    """Update metadata table with processed file information"""
    metadata_df = spark.createDataFrame([vars(m) for m in metadata])
    metadata_df.write.format("delta").mode("append").saveAsTable(METADATA_TABLE)


# Section 5: Data Table Management

In [0]:
def get_processed_files() -> List[str]:
    """Get list of already processed files from metadata table"""
    return [row.file_path for row in spark.table(METADATA_TABLE).select("file_path").distinct().collect()]

def infer_schema_with_fallback(df: DataFrame) -> DataFrame:
    """Infer schema with fallback to string type for problematic columns"""
    # Get the current schema
    current_schema = df.schema
    
    # Create a new schema with fallback types
    new_fields = []
    problematic_columns = []
    
    for field in current_schema.fields:
        try:
            # If the field type is NullType or can't be determined, default to StringType
            if field.dataType.simpleString() == 'null' or field.dataType.simpleString() == 'void':
                problematic_columns.append(f"{field.name}: NullType/void type")
                new_fields.append(StructField(field.name, StringType(), field.nullable))
            # For numeric types, default to DoubleType
            elif field.dataType.simpleString() in ['int', 'bigint', 'smallint', 'tinyint', 'float', 'decimal']:
                new_fields.append(StructField(field.name, DoubleType(), field.nullable))
            # For timestamp types, ensure proper handling
            elif field.dataType.simpleString() == 'timestamp':
                new_fields.append(StructField(field.name, TimestampType(), field.nullable))
            # For array types, default to ArrayType(StringType())
            elif isinstance(field.dataType, ArrayType):
                new_fields.append(StructField(field.name, ArrayType(StringType()), field.nullable))
            # For map types, default to MapType(StringType(), StringType())
            elif isinstance(field.dataType, MapType):
                new_fields.append(StructField(field.name, MapType(StringType(), StringType()), field.nullable))
            # For all other types, keep the original type
            else:
                new_fields.append(field)
        except Exception as e:
            problematic_columns.append(f"{field.name}: {str(e)}")
            print(f"Warning: Could not determine type for column {field.name}, defaulting to StringType: {str(e)}")
            new_fields.append(StructField(field.name, StringType(), field.nullable))
    
    if problematic_columns:
        print("\nType inference issues found in the following columns:")
        print(problematic_columns)
        print("\nThese columns will be defaulted to StringType.\n")
    
    # Create new schema
    new_schema = StructType(new_fields)
    
    # Cast all columns to their new types
    for field in new_schema.fields:
        if field.name in df.columns:
            df = df.withColumn(field.name, col(field.name).cast(field.dataType))
    
    return df

def create_table_for_directory(directory: str, file_type: str) -> str:
    """Create Delta table for a directory if it doesn't exist"""
    table_name = f"{CATALOG_NAME}.{SCHEMA_NAME}.{sanitize_table_name(directory)}"
    
    try:
        spark.table(table_name)
        print(f"Table {table_name} already exists")
    except:
        print(f"Creating new table {table_name}")
        
        # Use the first file to infer schema
        s3_client = get_s3_client(S3Config.from_env())
        prefix = f"{BASE_PREFIX}{directory}/"
        
        response = s3_client.list_objects_v2(
            Bucket=S3_BUCKET,
            Prefix=prefix,
            MaxKeys=1
        )
        
        if 'Contents' not in response:
            raise ValueError(f"No files found in {directory}")
            
        sample_key = response['Contents'][0]['Key']
        print(f"Using {sample_key} as sample file for schema inference")
        
        # Read sample file with string type fallback
        sample_path = f"s3a://{S3_BUCKET}/{sample_key}"
        if file_type == "ndjson":
            sample_df = spark.read \
                .format("json") \
                .option("multiline", "false") \
                .option("lineSep", "\n") \
                .option("primitivesAsString", "true") \
                .option("inferSchema", "false") \
                .load(sample_path)
        else:
            sample_df = spark.read \
                .format(file_type) \
                .option("inferSchema", "false") \
                .load(sample_path)
        
        # Apply robust schema inference with fallback
        sample_df = infer_schema_with_fallback(sample_df)
        
        # Add partition columns with empty values if they don't exist
        if directory in ["agg", "raw"]:
            if "year" not in sample_df.columns:
                sample_df = sample_df.withColumn("year", lit(None).cast(LongType()))
            if "month" not in sample_df.columns:
                sample_df = sample_df.withColumn("month", lit(None).cast(LongType()))
            if "day" not in sample_df.columns:
                sample_df = sample_df.withColumn("day", lit(None).cast(LongType()))
            if "series_bucket" not in sample_df.columns:
                sample_df = sample_df.withColumn("series_bucket", lit(None).cast(StringType()))
            # Add unique identifier for deduplication
            if "unique_id" not in sample_df.columns:
                sample_df = sample_df.withColumn("unique_id", 
                    concat(
                        col("series_bucket"),
                        lit("_"),
                        col("asset_id"),
                        lit("_"),
                        col("timestamp")
                    )
                )
        elif directory == "index":
            if "series_id" not in sample_df.columns:
                sample_df = sample_df.withColumn("series_id", lit(None).cast(StringType()))
            if "year" not in sample_df.columns:
                sample_df = sample_df.withColumn("year", lit(None).cast(LongType()))
            if "month" not in sample_df.columns:
                sample_df = sample_df.withColumn("month", lit(None).cast(LongType()))
            if "day" not in sample_df.columns:
                sample_df = sample_df.withColumn("day", lit(None).cast(LongType()))
            # Add unique identifier for deduplication
            if "unique_id" not in sample_df.columns:
                sample_df = sample_df.withColumn("unique_id", 
                    concat(
                        col("series_id"),
                        lit("_"),
                        col("timestamp")
                    )
                )
        else:  # asset_metadata
            # For asset metadata, use asset_id as unique identifier
            if "unique_id" not in sample_df.columns:
                sample_df = sample_df.withColumn("unique_id", col("asset_id"))
        
        # Create table with appropriate partitioning and constraints
        if directory in ["agg", "raw"]:
            sample_df.write \
                .format("delta") \
                .mode("ignore") \
                .partitionBy("year", "month", "day", "series_bucket") \
                .saveAsTable(table_name)
            
            # Add unique constraint
            spark.sql(f"""
                ALTER TABLE {table_name} 
                ADD CONSTRAINT unique_id_constraint 
                UNIQUE (unique_id)
            """)
        elif directory == "index":
            sample_df.write \
                .format("delta") \
                .mode("ignore") \
                .partitionBy("series_id", "year", "month", "day") \
                .saveAsTable(table_name)
            
            # Add unique constraint
            spark.sql(f"""
                ALTER TABLE {table_name} 
                ADD CONSTRAINT unique_id_constraint 
                UNIQUE (unique_id)
            """)
        else:  # asset_metadata
            sample_df.write \
                .format("delta") \
                .mode("ignore") \
                .saveAsTable(table_name)
            
            # Add unique constraint
            spark.sql(f"""
                ALTER TABLE {table_name} 
                ADD CONSTRAINT unique_id_constraint 
                UNIQUE (unique_id)
            """)
    
    return table_name

def process_directory_files(directory: str, files: List[Dict[str, Any]]) -> List[FileMetadata]:
    """Process all files in a directory"""
    if not files:
        print(f"No files to process in directory {directory}")
        return []
    
    metadata_records = []
    processed_files = set(get_processed_files())
    file_type = KNOWN_DIRECTORIES[directory]
    new_files = []  # Initialize new_files at the top level
    
    try:
        table_name = create_table_for_directory(directory, file_type)
        
        # Process files not already processed
        new_files = [f for f in files if f['key'] not in processed_files]
        if not new_files:
            print(f"No new files to process in directory {directory}")
            return []
        
        print(f"Found {len(new_files)} new files to process in {directory}")
        
        if directory in ["agg", "raw"]:
            # Group files by partition for agg and raw directories
            files_by_partition = {}
            for file_info in new_files:
                partition_key = (
                    file_info.get('year', ''),
                    file_info.get('month', ''),
                    file_info.get('day', ''),
                    file_info.get('series_bucket', '')
                )
                if partition_key not in files_by_partition:
                    files_by_partition[partition_key] = []
                files_by_partition[partition_key].append(file_info)
            
            # Process each partition
            for partition_key, partition_files in files_by_partition.items():
                year, month, day, series_bucket = partition_key
                print(f"Processing partition: year={year}, month={month}, day={day}, series_bucket={series_bucket}")
                
                file_paths = [f"s3a://{S3_BUCKET}/{file_info['key']}" for file_info in partition_files]
                
                try:
                    # Read with more conservative schema inference
                    df = spark.read \
                        .format(file_type) \
                        .option("inferSchema", "false") \
                        .load(file_paths)
                    
                    # Apply robust schema inference with fallback
                    df = infer_schema_with_fallback(df)
                    
                    # Add partition columns from file path information
                    df = df.withColumn("year", lit(year).cast(LongType()))
                    df = df.withColumn("month", lit(month).cast(LongType()))
                    df = df.withColumn("day", lit(day).cast(LongType()))
                    df = df.withColumn("series_bucket", lit(series_bucket).cast(StringType()))
                    
                    # Add unique identifier for deduplication
                    df = df.withColumn("unique_id", 
                        concat(
                            col("series_bucket"),
                            lit("_"),
                            col("asset_id"),
                            lit("_"),
                            col("timestamp")
                        )
                    )
                    
                    # Validate schema compatibility
                    schema_compatible = validate_schema_compatibility(df, table_name)
                    if not schema_compatible:
                        print(f"Schema incompatibility detected. Trying to harmonize schema...")
                        try:
                            table_df = spark.table(table_name)
                            for field in table_df.schema.fields:
                                if field.name in df.columns:
                                    df = df.withColumn(field.name, col(field.name).cast(field.dataType))
                        except Exception as e:
                            print(f"Error harmonizing schema: {str(e)}")
                    
                    # Use merge to handle deduplication
                    df.createOrReplaceTempView("source_df")
                    spark.sql(f"""
                        MERGE INTO {table_name} AS target
                        USING source_df AS source
                        ON target.unique_id = source.unique_id
                        WHEN MATCHED THEN UPDATE SET *
                        WHEN NOT MATCHED THEN INSERT *
                    """)
                    
                    record_count = df.count()
                    for file_info in partition_files:
                        metadata_records.append(FileMetadata(
                            file_path=file_info['key'],
                            file_type=file_type,
                            directory=directory,
                            last_modified=file_info['last_modified'],
                            size=file_info['size'],
                            table_name=table_name,
                            record_count=record_count,
                            year=file_info.get('year'),
                            month=file_info.get('month'),
                            day=file_info.get('day'),
                            series_bucket=file_info.get('series_bucket')
                        ))
                        
                except Exception as e:
                    print(f"Error processing partition: {str(e)}")
                    for file_info in partition_files:
                        metadata_records.append(FileMetadata(
                            file_path=file_info['key'],
                            file_type=file_type,
                            directory=directory,
                            last_modified=file_info['last_modified'],
                            size=file_info['size'],
                            table_name=table_name,
                            error_message=str(e),
                            year=file_info.get('year'),
                            month=file_info.get('month'),
                            day=file_info.get('day'),
                            series_bucket=file_info.get('series_bucket')
                        ))
        
        elif directory == "index":
            # Group files by partition for index directory
            files_by_partition = {}
            for file_info in new_files:
                partition_key = (
                    file_info.get('series_id', ''),
                    file_info.get('year', ''),
                    file_info.get('month', ''),
                    file_info.get('day', '')
                )
                if partition_key not in files_by_partition:
                    files_by_partition[partition_key] = []
                files_by_partition[partition_key].append(file_info)
            
            # Process each partition
            for partition_key, partition_files in files_by_partition.items():
                series_id, year, month, day = partition_key
                print(f"Processing partition: series_id={series_id}, year={year}, month={month}, day={day}")
                
                file_paths = [f"s3a://{S3_BUCKET}/{file_info['key']}" for file_info in partition_files]
                
                try:
                    # Read with more conservative schema inference
                    df = spark.read \
                        .format(file_type) \
                        .option("inferSchema", "false") \
                        .load(file_paths)
                    
                    # Apply robust schema inference with fallback
                    df = infer_schema_with_fallback(df)
                    
                    # Add partition columns from file path information
                    df = df.withColumn("series_id", lit(series_id).cast(StringType()))
                    df = df.withColumn("year", lit(year).cast(LongType()))
                    df = df.withColumn("month", lit(month).cast(LongType()))
                    df = df.withColumn("day", lit(day).cast(LongType()))
                    
                    # Add unique identifier for deduplication
                    df = df.withColumn("unique_id", 
                        concat(
                            col("series_id"),
                            lit("_"),
                            col("timestamp")
                        )
                    )
                    
                    # Validate schema compatibility
                    schema_compatible = validate_schema_compatibility(df, table_name)
                    if not schema_compatible:
                        print(f"Schema incompatibility detected. Trying to harmonize schema...")
                        try:
                            table_df = spark.table(table_name)
                            for field in table_df.schema.fields:
                                if field.name in df.columns:
                                    df = df.withColumn(field.name, col(field.name).cast(field.dataType))
                        except Exception as e:
                            print(f"Error harmonizing schema: {str(e)}")
                    
                    # Use merge to handle deduplication
                    df.createOrReplaceTempView("source_df")
                    spark.sql(f"""
                        MERGE INTO {table_name} AS target
                        USING source_df AS source
                        ON target.unique_id = source.unique_id
                        WHEN MATCHED THEN UPDATE SET *
                        WHEN NOT MATCHED THEN INSERT *
                    """)
                    
                    record_count = df.count()
                    for file_info in partition_files:
                        metadata_records.append(FileMetadata(
                            file_path=file_info['key'],
                            file_type=file_type,
                            directory=directory,
                            last_modified=file_info['last_modified'],
                            size=file_info['size'],
                            table_name=table_name,
                            record_count=record_count,
                            series_id=file_info.get('series_id'),
                            year=file_info.get('year'),
                            month=file_info.get('month'),
                            day=file_info.get('day')
                        ))
                        
                except Exception as e:
                    print(f"Error processing partition: {str(e)}")
                    for file_info in partition_files:
                        metadata_records.append(FileMetadata(
                            file_path=file_info['key'],
                            file_type=file_type,
                            directory=directory,
                            last_modified=file_info['last_modified'],
                            size=file_info['size'],
                            table_name=table_name,
                            error_message=str(e),
                            series_id=file_info.get('series_id'),
                            year=file_info.get('year'),
                            month=file_info.get('month'),
                            day=file_info.get('day')
                        ))
        
        else:  # asset_metadata
            # Process all files at once for asset_metadata
            file_paths = [f"s3a://{S3_BUCKET}/{file_info['key']}" for file_info in new_files]
            
            try:
                # Read NDJSON files with specific options
                df = spark.read \
                    .format("json") \
                    .option("multiline", "false") \
                    .option("lineSep", "\n") \
                    .option("inferSchema", "false") \
                    .load(file_paths)
                
                # Apply robust schema inference with fallback
                df = infer_schema_with_fallback(df)
                
                # Add unique identifier for deduplication
                df = df.withColumn("unique_id", col("asset_id"))
                
                df.write \
                  .format("delta") \
                  .mode("append") \
                  .saveAsTable(table_name)
                
                record_count = df.count()
                for file_info in new_files:
                    metadata_records.append(FileMetadata(
                        file_path=file_info['key'],
                        file_type=file_type,
                        directory=directory,
                        last_modified=file_info['last_modified'],
                        size=file_info['size'],
                        table_name=table_name,
                        record_count=record_count
                    ))
                    
            except Exception as e:
                print(f"Error processing files: {str(e)}")
                for file_info in new_files:
                    metadata_records.append(FileMetadata(
                        file_path=file_info['key'],
                        file_type=file_type,
                        directory=directory,
                        last_modified=file_info['last_modified'],
                        size=file_info['size'],
                        table_name=table_name,
                        error_message=str(e)
                    ))
                
    except Exception as e:
        print(f"Error processing directory {directory}: {str(e)}")
        for file_info in new_files:
            metadata_records.append(FileMetadata(
                file_path=file_info['key'],
                file_type=file_type,
                directory=directory,
                last_modified=file_info['last_modified'],
                size=file_info['size'],
                table_name="",
                error_message=str(e)
            ))
    
    return metadata_records

def ensure_partition_column_types(df: DataFrame, directory: str) -> DataFrame:
    """Ensure partition columns have consistent data types"""
    if directory in ["agg", "raw"]:
        if "year" in df.columns:
            df = df.withColumn("year", col("year").cast(LongType()))
        if "month" in df.columns:
            df = df.withColumn("month", col("month").cast(LongType()))
        if "day" in df.columns:
            df = df.withColumn("day", col("day").cast(LongType()))
        if "series_bucket" in df.columns:
            df = df.withColumn("series_bucket", col("series_bucket").cast(StringType()))
    elif directory == "index":
        if "series_id" in df.columns:
            df = df.withColumn("series_id", col("series_id").cast(StringType()))
        if "year" in df.columns:
            df = df.withColumn("year", col("year").cast(LongType()))
        if "month" in df.columns:
            df = df.withColumn("month", col("month").cast(LongType()))
        if "day" in df.columns:
            df = df.withColumn("day", col("day").cast(LongType()))
    return df

def validate_schema_compatibility(df: DataFrame, table_name: str) -> bool:
    """Validate if DataFrame schema is compatible with existing table"""
    try:
        # Get the table schema
        table_df = spark.table(table_name)
        table_schema = table_df.schema
        df_schema = df.schema
        
        # Check for columns that exist in both with different types
        for table_field in table_schema.fields:
            for df_field in df_schema.fields:
                if table_field.name == df_field.name and table_field.dataType != df_field.dataType:
                    print(f"Schema conflict: Column '{table_field.name}' has different types - "
                          f"Table: {table_field.dataType}, DataFrame: {df_field.dataType}")
                    return False
        
        return True
    except Exception as e:
        print(f"Error validating schema: {str(e)}")
        return True  # Assume compatible if we can't check


# Section 6: Main Execution

In [0]:
def main():
    """Main execution function"""
    try:
        # Initialize S3 configuration from environment variables
        s3_config = S3Config.from_env()
        
        # Create S3 client
        s3_client = get_s3_client(s3_config)
        
        # Create metadata table if it doesn't exist
        create_metadata_table()
        
        # List files in each directory
        files_by_directory = list_s3_files(s3_client, s3_config)
        
        # Process each directory
        all_metadata_records = []
        for directory, files in files_by_directory.items():
            print(f"Processing directory: {directory}")
            metadata_records = process_directory_files(directory, files)
            all_metadata_records.extend(metadata_records)
        
        # Update metadata
        if all_metadata_records:
            update_metadata(all_metadata_records)
            print(f"Processed {len(all_metadata_records)} new files")
        else:
            print("No new files to process")
        
        print("ETL process completed successfully")
        
    except Exception as e:
        print(f"Error in ETL process: {str(e)}")
        raise

# Run the main process
if __name__ == "__main__":
    main()