
# Pipeline da camada landing para camada bronze

Apenas transforma .csvs em delta tables


## Libs necessárias


In [0]:
import pytz
from datetime import datetime, timedelta

from pyspark.sql.functions import regexp_extract, col, last, lit
from pyspark.sql.connect.dataframe import DataFrame
from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import StructType
from delta.tables import DeltaTable




## Coleta datas do databricks




In [0]:

dbutils.widgets.text("date_reference", "", "Date Reference")
dbutils.widgets.dropdown("reprocess_everything", "False", ["True", "False"], "Reprocess Everything")

date_reference_str = dbutils.widgets.get("date_reference")
reprocess_everything = True if dbutils.widgets.get("reprocess_everything") == 'True' else False

tz = pytz.timezone("America/Sao_Paulo")

yesterday = datetime.now(tz) - timedelta(days=2)
first_day = datetime(2022, 6, 21).astimezone(tz)

date_reference = datetime.fromisoformat(date_reference_str).astimezone(tz) if date_reference_str else yesterday

dates = [date_reference]
if reprocess_everything:
    for date in range((date_reference - first_day).days):
        dates.append(first_day + timedelta(days=date + 1))

dates.sort(reverse=False)

print('Running for: ', [dt.strftime('%Y-%m-%d') for dt in dates])

## Define catalogo do Unity Catalog

In [0]:

CATALOG = "precos_pmc"
SCHEMA = "bronze"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA} MANAGED LOCATION 's3://precos-pmc-bronze'")
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {SCHEMA}")


## Base de dados completa

In [0]:

def get_full_db_dataframe_from_s3_landing(
    date_reference: datetime
) -> DataFrame:

    """
        Read Full Database file from S3
        
        It was used before of 2023-07-18

    """

    filepattern = 'Clique_Economia_-_Base_de_Dados.csv'

    paths = (
        "s3a://precos-pmc-landing"
        + f"/{date_reference.strftime('%Y')}" # YYYY
        + f"/{date_reference.strftime('%m')}" # MM
        + f"/{date_reference.strftime('%d')}" # DD
        + f"/{date_reference.strftime('%Y-%m-%d')}_{filepattern}"
    )

    try:
        
        # Lazy
        df = (
            spark.read

                # Read all
                .option("header", "true")
                .option("inferSchema", "true")
                .csv(paths)
                
                # Extract filename
                .withColumn("file_path", col("_metadata.file_path"))
                .withColumn(
                    "filename",
                    regexp_extract(col("_metadata.file_path"), r"([^/]+$)", 1)
                )
                .drop("file_path")
        )

        # Materialize
        df.count()

        # Return materialized
        return df

    except AnalysisException as e:

        if "PATH_NOT_FOUND" in e.getMessage():
            print(f"Landing file not found (expected): {paths}")
            return spark.createDataFrame([], StructType([]))
        else:
            # Anything else is a real failure
            raise
        
    except Exception as e:
        print(f'Error reading {filepattern}: {e}')
        return spark.createDataFrame([], StructType([]))





## Base de dados diária

Após 18/07/2023, as bases de dados passaram a apenas a cotação diária, sem carregar toda a base de dados.



In [0]:

def get_incremental_db_dataframe_from_s3_landing(
    date_reference: datetime
) -> DataFrame:

    """
        Read Incremental Database file from S3
        
        It is being used after of 2023-07-18

    """

    filepattern = 'Clique_Economia_-_Cotacoes_-_Base_de_Dados.csv'

    paths = (
        "s3a://precos-pmc-landing"
        + f"/{date_reference.strftime('%Y')}" # YYYY
        + f"/{date_reference.strftime('%m')}" # MM
        + f"/{date_reference.strftime('%d')}" # DD
        + f"/{date_reference.strftime('%Y-%m-%d')}_{filepattern}"
    )

    try:
        df = (
            spark.read

                # Read all
                .option("header", "true")
                .option("inferSchema", "true")
                .option("delimiter", ";")
                .option("encoding", "ISO-8859-1")
                .csv(paths)
                
                # Extract filename
                .withColumn("file_path", col("_metadata.file_path"))
                .withColumn(
                    "filename",
                    regexp_extract(col("_metadata.file_path"), r"([^/]+$)", 1)
                )
                .drop("file_path")
        )

        # Materialize
        df.count()

        # Return materialized
        return df

    except AnalysisException as e:

        if "PATH_NOT_FOUND" in e.getMessage():
            print(f"Landing file not found (expected): {paths}")
            return spark.createDataFrame([], StructType([]))
        else:
            # Anything else is a real failure
            raise
        
    except Exception as e:
        print(f'Error reading {filepattern}: {e}')
        return spark.createDataFrame([], StructType([]))



## Base de dados de produtos

In [0]:

def get_product_db_dataframe_from_s3_landing(
    date_reference: datetime
) -> DataFrame:

    """
        Read Product Database file from S3
        
        It is being used before 2023-07-18

    """

    filepattern = 'Clique_Economia_-_Produto_-_Base_de_Dados.csv'

    paths = (
        "s3a://precos-pmc-landing"
        + f"/{date_reference.strftime('%Y')}" # YYYY
        + f"/{date_reference.strftime('%m')}" # MM
        + f"/{date_reference.strftime('%d')}" # DD
        + f"/{date_reference.strftime('%Y-%m-%d')}_{filepattern}"
    )

    try:
        df = (
            spark.read

                # Read all
                .option("header", "true")
                .option("inferSchema", "true")
                .csv(paths)
                
                # Extract filename
                .withColumn("file_path", col("_metadata.file_path"))
                .withColumn(
                    "filename",
                    regexp_extract(col("_metadata.file_path"), r"([^/]+$)", 1)
                )
                .drop("file_path")
        )

        # Materialize
        df.count()

        # Return materialized
        return df

    except AnalysisException as e:

        if "PATH_NOT_FOUND" in e.getMessage():
            print(f"Landing file not found (expected): {paths}")
            return spark.createDataFrame([], StructType([]))
        else:
            # Anything else is a real failure
            raise
        
    except Exception as e:
        print(f'Error reading {filepattern}: {e}')
        return spark.createDataFrame([], StructType([]))



## Registra dados no catalogo (bronze)

In [0]:

def table_exists(catalog: str, schema: str, table: str) -> bool:
    return (
        spark.sql(f"SHOW TABLES IN {catalog}.{schema}")
             .filter(f"tableName = '{table}'")
             .count() > 0
    )

def write_bronze_delta_table_on_s3(
    df: DataFrame, 
    date_reference: datetime, 
    table_name: str
) -> bool:

    if df.isEmpty():
        print(f"Nothing to insert in {table_name}")
        return False

    s3_path = (
        "s3a://precos-pmc-bronze"
        f"/{table_name}"
    )

    (y, m, d) = (
        date_reference.year,
        date_reference.month,
        date_reference.day
    )

    full_table_name = f"{CATALOG}.{SCHEMA}.{table_name}"

    df_with_partitions = (
        df
        .withColumn("year", lit(y))
        .withColumn("month", lit(m))
        .withColumn("day", lit(d))
    )

    try:
        
        # Create table
        if not table_exists(CATALOG, SCHEMA, table_name):
            print(f"Creating table {full_table_name}")
            (
                df_with_partitions
                .writeTo(full_table_name)
                .using("delta")
                .partitionedBy("year", "month", "day")
                .create()
            )
    
        # Grants idempotency replaces
        else:

            print(f"Replacing partition {y}-{m}-{d} in {full_table_name}")

            spark.sql(f"""
                DELETE FROM {full_table_name}
                WHERE year = {y} AND month = {m} AND day = {d}
            """)
                    
            
            (
                df_with_partitions
                .writeTo(full_table_name)
                .append()
            )

        print(f'Sucessful inserted into {full_table_name}')
        return True
    
    except Exception as e:
        print(f'Error while writing bronze delta ({table_name}): {e}')
        return False



## Fluxo de execução

In [0]:

for date in dates:
    
  df_product = get_product_db_dataframe_from_s3_landing(date_reference=date)
  df_product = write_bronze_delta_table_on_s3(df=df_product, date_reference=date, table_name='produtos')

  df_incremental_db = get_full_db_dataframe_from_s3_landing(date_reference=date)
  df_incremental_db = write_bronze_delta_table_on_s3(df=df_incremental_db, date_reference=date, table_name='base_incremental')

  df_day = get_incremental_db_dataframe_from_s3_landing(date_reference=date)
  df_day = write_bronze_delta_table_on_s3(df=df_day, date_reference=date, table_name='cotacoes')
