In [0]:
from pyspark.sql import functions as f
from pyspark.sql.types import *
import re

In [0]:
%run ./schemas

In [0]:

schema_config = _schemas["global-super-store-dataset"]
params = schema_config["parameters"]

src_catalog_name       = params["dest_catalog_name"]
src_schema_name        = params["dest_schema_name"]
src_table_name         = params["dest_table_name"]

dest_catalog_name       = params["dest_catalog_name"].replace("bronze","silver")
dest_schema_name        = params["dest_schema_name"].replace("ingestion","refined")
dest_table_name         = params["dest_table_name"]

print()
print(f"""src_catalog_name:         '{src_catalog_name}'""")
print(f"""src_schema_name:          '{src_schema_name}'""")
print(f"""src_table_name:           '{src_table_name}'""")
print()
print(f"""dest_catalog_name:        '{dest_catalog_name}'""")
print(f"""dest_schema_name:         '{dest_schema_name}'""")
print(f"""dest_table_name:          '{dest_table_name}'""")


In [0]:
spark.sql(f"CREATE CATALOG if NOT EXISTS {dest_catalog_name}")

In [0]:
spark.sql(f"CREATE SCHEMA if NOT EXISTS {dest_catalog_name}.{dest_schema_name}")

In [0]:
filter_max = (
    spark
    .table(f"{src_catalog_name}.{src_schema_name}.{src_table_name}")
    .select(f.max("ingestion_timestamp"))
    .collect()[0][0]
)
print(f""" > filter_max: {filter_max}""")


In [0]:
df = (
    spark
    .table(f"{src_catalog_name}.{src_schema_name}.{src_table_name}")
    .filter(f"""ingestion_timestamp = {filter_max}""")
)

In [0]:
cols_date = ['order_date', 'ship_date']
for col in cols_date:
    df = df.withColumn(col, f.trim(f.to_date(f.col(col), 'dd-MM-yyyy')))
for col in cols_date:
    df = (
        df
        .withColumn(
            col,
            f.when(
                f.col(col).isNull(), f.lit('1900-01-01').cast('date')
            )
            .otherwise(f.col(col))
        )
    )

In [0]:
cols_int = ['row_id']
for col in cols_int:
    df = df.withColumn(col, f.col(col).cast('int'))
for col in cols_int:
    df = (
        df
        .withColumn(
            col,
            f.when(
                f.col(col).isNull(), f.lit(0).cast('int')
            )
            .otherwise(f.col(col))
        )
    )

In [0]:
cols_decimal = ['quantity', 'discount', 'profit', 'shipping_cost']
for col in cols_decimal:
    df = df.withColumn(col, f.col(col).cast('decimal(12,2)'))
for col in cols_decimal:
    df = (
        df
        .withColumn(
            col,
            f.when(
                f.col(col).isNull(), f.lit(0.00).cast('decimal(12,2)')
            )
            .otherwise(f.col(col))
        )
    )

In [0]:
# Remove da coluna sales os registros que não são numéricos, ha uma infinidade de registros com esse valor tipo string nao numerica
# Ha necessidade de correcao do no sistema de origem.
sales_regex_clean = r'^-?\d+(\.\d+)?$'
df = (
    df
    .filter(f.col("sales").rlike(sales_regex_clean))
    .withColumn('sales', f.col('sales').cast('decimal(12,2)'))
)

In [0]:
# Trata valores nulos das colunas do tipo String setando como 'N/A'
# Trata strings com blank space nas estremidades da sentenca
for col in df.dtypes:
    if col[1] == 'string':
        df = (
            df
            .withColumn(
                col[0], 
                f.when(
                    f.col(col[0]).isNull(), f.lit('N/A')
                )
                .otherwise(f.trim(f.col(col[0])))
            )
        )

# display(df)

In [0]:
(
    df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(f"{dest_catalog_name}.{dest_schema_name}.{dest_table_name}")
)


In [0]:
# display(
#   spark
#   .table(f"{dest_catalog_name}.{dest_schema_name}.{dest_table_name}")
#   .limit(10)
# )