In [0]:
%sql
select
  *
from
  information_schema.columns
where
  table_schema = "bronze"
order by
  table_name,
  ordinal_position

## Generate a mapping file template

In [0]:
def generate_mappings_template():
    query = """
    select
        *
    from
        information_schema.columns
    where
        table_schema = 'bronze'
    order by
        table_name,
        ordinal_position
    """

    df = spark.sql(query).toPandas()

    yaml_content = """# mappings:
    #   - target: silver.table_name.column_name
    #     source:
    #       - bronze.table_name.column_name
    mappings:
    - ignored:
    """

    for index, row in df.iterrows():
        yaml_content += f"      - bronze.{row['table_name']}.{row['column_name']}\n"

    return yaml_content


print(generate_mappings_template())

## Use the mapping file

In [0]:
import yaml
from collections import defaultdict

mappings_yaml_file_path = "/Volumes/rk_workspace/bronze/disk/mappings.yaml"

with open(mappings_yaml_file_path, "r") as file:
    mappings_content = file.readlines()
mappings = yaml.safe_load("".join(mappings_content))

flat_mappings = []
table_mappings = defaultdict(set)


def parse_name(name):
    parts = name.split(".")
    return (parts[0], parts[1], parts[2])


for mapping in mappings["mappings"]:
    if "ignored" in mapping:
        continue

    target_schema, target_table, target_column = parse_name(mapping["target"])

    for source in mapping["source"]:
        source_schema, source_table, source_column = parse_name(source)
        flat_mappings.append(
            {
                "source_schema": source_schema,
                "source_table": source_table,
                "source_column": source_column,
                "target_schema": target_schema,
                "target_table": target_table,
                "target_column": target_column,
            }
        )
        table_mappings[f"{target_schema}.{target_table}"].add(
            f"{source_schema}.{source_table}"
        )

# print(flat_mappings)

for target_table in table_mappings:
    print(f"Table: {target_table}")
    target_columns = set()

    for mapping in flat_mappings:
        if f'{mapping["target_schema"]}.{mapping["target_table"]}' == target_table:
            target_columns.add(mapping["target_column"])

    print(target_columns)

    sql = f"CREATE OR REPLACE VIEW {target_table} AS (\n"

    union_all = "\n  UNION ALL\n\n"

    for source_table in table_mappings[target_table]:

        columns = ""

        for target_column in target_columns:
            column_found = False
            for mapping in flat_mappings:
                if (
                    f'{mapping["source_schema"]}.{mapping["source_table"]}'
                    == source_table
                    and mapping["target_column"] == target_column):
                        columns += f"\n    {mapping['source_column']} AS {target_column}, "
                        column_found = True
                        break
            if not column_found:
                columns += f"\n    NULL AS {target_column}, "
            

        sql += f"  SELECT{columns[:-2]}\n  FROM\n    {source_table}\n{union_all}"

    sql = sql.rstrip(union_all)
    sql += "\n);"

    print(sql)
    spark.sql(sql)