<img src="https://github.com/mousastech/dlt_ingestion/blob/1d1acf88ef16711f398e202ea703b6c3336765e9/files/DLT%20Ingest.png?raw=true">

In [0]:
import dlt
from pyspark.sql.functions import current_timestamp

@dlt.table(
  table_properties={
    "delta.autoOptimize.optimizeWrite": "true",
    "delta.autoOptimize.autoCompact": "true",
    "dlt.applyChanges": "true"
  }
)
def customers():
  return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/Volumes/ecommerce/bronze/files/bronze/customers/")
  )

In [0]:
@dlt.table
def products():
  return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/Volumes/ecommerce/bronze/files/bronze/products/")
  )

In [0]:
@dlt.table
def sellers():
  return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/Volumes/ecommerce/bronze/files/bronze/sellers/")
  )

In [0]:
@dlt.table
def orders():
  return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/Volumes/tutorial/dlt/arquivos/orders/")
  )

In [0]:
@dlt.table
def orders_items():
  return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/Volumes/tutorial/dlt/arquivos/order_items/")
  )

In [0]:
@dlt.table
def order_payments():
  return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/Volumes/tutorial/dlt/arquivos/order_payments/")
  )

In [0]:
@dlt.table
def order_reviews():
  return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/Volumes/tutorial/dlt/arquivos/order_reviews/")
  )

In [0]:
@dlt.table
def geolocation():
  return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/Volumes/tutorial/dlt/arquivos/geolocation/")
  )

In [0]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.window import Window

@dlt.table
def customer_bronze():
  return (dlt
          .read_stream("customers")
          .select(col("customer_id"),
                  col("customer_unique_id"),  
                  col("customer_zip_code_prefix"), 
                  col("customer_city"), 
                  col("customer_state"))
          .withColumn("DataRawLoad", current_timestamp())
          )

@dlt.table
def products_bronze():
  return (dlt
          .read_stream("products")
          .withColumn("DataRawLoad", current_timestamp())
          )
  
@dlt.table
def sellers_bronze():
  return (dlt
          .read_stream("sellers")
          .withColumn("DataRawLoad", current_timestamp())
          )

@dlt.table
def orders_bronze():
  return (dlt
          .read_stream("orders")
          .withColumn("DataRawLoad", current_timestamp())
          )
  
@dlt.table
def orders_items_bronze():
  return (dlt
          .read_stream("orders_items")
          .withColumn("DataRawLoad", current_timestamp())
          )
  
@dlt.table
def orders_payments_bronze():
  return (dlt
          .read_stream("order_payments")
          .withColumn("DataRawLoad", current_timestamp())
          )
  
@dlt.table
def orders_reviews_bronze():
  return (dlt
          .read_stream("order_reviews")
          .withColumn("DataRawLoad", current_timestamp())
          )
  
@dlt.table
def geolocation_bronze():
  return (dlt
          .read_stream("geolocation")
          .withColumn("DataRawLoad", current_timestamp())
          )

## Ingest Silver Tables

In [0]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.functions import col


@dlt.table
@dlt.expect_or_drop("customer_id", "customer_id IS NOT NULL")
def customer_silver():
    return dlt.readStream("customer_bronze").select(
        col("customer_id"),
        col("customer_unique_id"),
        col("customer_zip_code_prefix"),
        col("customer_city"),
        col("customer_state"),
        col("DataRawLoad").cast("timestamp")
    )

In [0]:
@dlt.table
def geolocation_silver():
    return dlt.readStream("geolocation_bronze").select(
        col("geolocation_zip_code_prefix"),
        col("geolocation_lat"),
        col("geolocation_lng"),
        col("geolocation_city"),
        col("geolocation_state"),
        col("DataRawLoad").cast("timestamp")
    )

In [0]:
@dlt.table
def orders_items_silver():
    return dlt.readStream("orders_items_bronze").select(
        col("order_id"),
        col("order_item_id"),
        col("product_id"),
        col("seller_id"),
        col("shipping_limit_date"),
        col("price"),
        col("freight_value"),
        col("DataRawLoad").cast("timestamp")
    )

In [0]:
@dlt.table
def orders_payments_silver():
    return dlt.readStream("orders_payments_bronze").select(
        col("order_id"),
        col("payment_sequential"),
        col("payment_type"),
        col("payment_value"),
        col("DataRawLoad").cast("timestamp")
    )

In [0]:
@dlt.table
def order_reviews_silver():
    return dlt.readStream("orders_reviews_bronze").select(
        col("review_id"),
        col("order_id"),
        col("review_score"),
        col("review_comment_title"),
        col("review_comment_message"),
        col("review_creation_date"),
        col("review_answer_timestamp"),
        col("DataRawLoad").cast("timestamp")
    )

In [0]:
@dlt.table
@dlt.expect_or_drop("order_id", "order_id IS NOT NULL")
def orders_silver():
    return dlt.readStream("orders_bronze").select(
        col("order_id"),
        col("customer_id"),
        col("order_status"),
        col("order_purchase_timestamp"),
        col("order_approved_at"),
        col("order_delivered_carrier_date"),
        col("order_delivered_customer_date"),
        col("order_estimated_delivery_date"),
        col("DataRawLoad").cast("timestamp")
    )

In [0]:
@dlt.table
def products_silver():
    return dlt.readStream("products_bronze").select(
        col("product_id"),
        col("product_category_name"),
        col("product_name_lenght"),
        col("product_description_lenght"),
        col("product_photos_qty"),
        col("product_weight_g"),
        col("product_length_cm"),
        col("product_height_cm"),
        col("product_width_cm"),
        col("DataRawLoad").cast("timestamp")
    )

In [0]:
@dlt.table
@dlt.expect_or_drop("seller_id", "seller_id IS NOT NULL")
def sellers_silver():
    return dlt.readStream("sellers_bronze").select(
        col("seller_id"),
        col("seller_zip_code_prefix"),
        col("seller_city"),
        col("seller_state"),
        col("DataRawLoad").cast("timestamp")
    )

## Gold - Aggregated

In [0]:
import dlt
from pyspark.sql.functions import when, datediff

@dlt.table(
  name="sales",
  comment="Criando a camada Gold, gerando uma tabela juntando todos os dados",
  partition_cols=["estadoCliente"]
)
def sales():
  orders = dlt.read("orders")
  order_payments = dlt.read("order_payments")
  order_reviews = dlt.read("order_reviews")
  customers = dlt.read("customers")
  
  return (
    orders
    .join(order_payments, orders.order_id == order_payments.order_id, "left")
    .join(order_reviews, orders.order_id == order_reviews.order_id, "left")
    .join(customers, orders.customer_id == customers.customer_id, "left")
    .select(
      when(orders.order_status == 'shipped', 'enviado')
      .when(orders.order_status == 'canceled', 'cancelado')
      .when(orders.order_status == 'invoiced', 'faturado')
      .when(orders.order_status == 'created', 'criado')
      .when(orders.order_status == 'delivered', 'entregue')
      .when(orders.order_status == 'unavailable', 'indisponível')
      .when(orders.order_status == 'processing', 'em processamento')
      .when(orders.order_status == 'approved', 'aprovado')
      .alias("statusDoPedido"),
      orders.order_purchase_timestamp.alias("horaCompraPedido"),
      orders.order_approved_at.alias("horaPedidoAprovado"),
      orders.order_estimated_delivery_date.alias("dataEstimadaEntrega"),
      datediff(orders.order_estimated_delivery_date, orders.order_approved_at).alias("dataEntregaEmDias"),
      order_reviews.review_score.alias("notaProduto"),
      order_reviews.review_answer_timestamp.alias("dataComentarioSobreProduto"),
      when(order_payments.payment_type == 'credit_card', 'cartao_de_credito')
      .when(order_payments.payment_type == 'boleto', 'boleto')
      .when(order_payments.payment_type == 'not_defined', 'não_definido')
      .when(order_payments.payment_type == 'voucher', 'voucher')
      .when(order_payments.payment_type == 'debit_card', 'cartao_de_debito')
      .alias("meioDePagamento"),
      order_payments.payment_installments.alias("parcelamento"),
      order_payments.payment_value.alias("valorPago"),
      customers.customer_city.alias("cidadeCliente"),
      customers.customer_state.alias("estadoCliente")
    )
  )