In [0]:
import re
from pathlib import Path
from pyspark.sql import DataFrame


class DataProductConfig:
    """
    Configuration class to manage allowed schemas and enforce schema validation
    for writing Spark tables in Databricks.
    """

    ALLOWED_CATALOGS = {
        "bronze": "1_data_sources",
        "silver": "2_data_products",
        "gold": "3_analytics_products",
    }

    PREFIXES = {
        "1_data_sources": "ds",
        "2_data_products": "dp",
        "3_analytics_products": "ap",
    }

    def __init__(self, table_name: str, schema: str, layer: str):
        """
        Initialize DataProductConfig.

        Args:
            table_name (str): Original table name (may contain spaces or special characters).
            schema (str): Schema name where the table will be stored.
            layer (str): Data layer name (bronze, silver, gold).
        """
        if layer not in self.ALLOWED_CATALOGS:
            raise ValueError(f"Invalid layer '{layer}'. Must be one of: {list(self.ALLOWED_CATALOGS)}")

        self.table_name = table_name
        self.schema = schema
        self.catalog = self.ALLOWED_CATALOGS[layer]

    @staticmethod
    def _clean_table_name(table_name: str) -> str:
        """
        Clean a table name string by:
        - Removing special characters
        - Replacing spaces and dots with underscores
        - Lowercasing all letters

        Args:
            table_name (str): Raw table name.

        Returns:
            str: Cleaned table name.
        """
        p = Path(table_name)
        # convert to lower caseand replace non alphanumerical characters
        clean_name = re.sub(r"[^a-z0-9]+", "_", p.stem.lower()).strip("_")
        return clean_name

    def _table_name(self) -> str:
        """
        Cleans and standardizes the table name according to naming conventions.

        Returns:
            str: Final standardized table name with the correct prefix.
        """
        prefix = self.PREFIXES[self.catalog]
        if self.table_name.startswith(f"{prefix}_"):
            return self._clean_table_name(self.table_name)
        return self._clean_table_name(f"{prefix}_{self.table_name}")

    def write_table(self, df: DataFrame) -> None:
        """
        Writes a Spark DataFrame to a table in the specified catalog and schema.
        The table name is cleaned and prefixed according to conventions.

        Args:
            df (DataFrame): Spark DataFrame to write.

        Returns:
            None
        """
        # Create new schema if it doesn't already exist
        spark.sql(f"CREATE SCHEMA IF NOT EXISTS {self.catalog}.{self.schema}")

        # clean table name
        final_name = self._table_name()
        full_path = f"{self.catalog}.{self.schema}.{final_name}"
        print(f"Writing DataFrame to table: {full_path}")
        df.write.mode("overwrite").saveAsTable(full_path)
