In [0]:
%run ../../../bootstrap

In [0]:
import sys
import os
import importlib.util
from src.common.init_tables import to_struct_type

# Set the configuration to preserve char/varchar type information
spark.conf.set("spark.sql.preserveCharVarcharTypeInfo", "true")

tables_path = os.path.abspath(os.path.join(os.getcwd(), "../tables"))
tables_infos = []

for fname in os.listdir(tables_path):
    if fname.endswith(".py") and not fname.startswith("__"):
        fpath = os.path.join(tables_path, fname)
        module_name = fname[:-3]
        spec = importlib.util.spec_from_file_location(module_name, fpath)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        if hasattr(module, "get_table_info"):
            info = module.get_table_info()
            tables_infos.append(info)
            print(f"Procesando tabla: {info['table']}")

    # Importa fuera del ciclo, así solo se crea una vez por tabla
    for info in tables_infos:
        schema = to_struct_type(info["schema"])

        # 1. Crea DataFrame vacío con el schema declarado
        df = spark.createDataFrame([], schema)

        # 2. Escribe el Delta vacío (esto "crea" la estructura real del Delta)
        df.write.format("delta").mode("overwrite").save(info["path"])

        # 3. Registra la tabla Delta en el metastore
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {info['table']}
            USING DELTA
            LOCATION '{info['path']}'
        """)

        for col in info["schema"]:
            if "comment" in col and col["comment"]:
                spark.sql(f"""
                    ALTER TABLE {info['table']} ALTER COLUMN {col['name']} COMMENT '{col['comment']}'
                """)
        print(f"Tabla {info['table']} creada y registrada.")