In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os
from pyspark.sql.functions import col, to_timestamp, lower, to_number, regexp_replace, trim

In [None]:
jar_path = os.path.abspath("../conector_dll/mssql-jdbc-13.2.1.jre11.jar")
dll_path = os.path.abspath("../conector_dll/mssql-jdbc_auth-13.2.1.x64.dll")  

os.environ["PATH"] = os.path.dirname(dll_path) + os.pathsep + os.environ["PATH"]

In [None]:
spark = SparkSession.builder \
    .appName("Sprint 02 - Bronze para Silver") \
    .config("spark.jars", jar_path) \
    .config("spark.sql.session.timeZone", "America/Sao_Paulo") \
    .getOrCreate()

### olist_orders_dataset.csv

In [None]:
csv_path = "./dataset_files/olist_orders_dataset.csv"

In [None]:
df_orders = spark.read.csv(csv_path, header = "True")
df_orders.printSchema()

In [None]:
cols_timestamp = [
    "order_purchase_timestamp",
    "order_approved_at",
    "order_delivered_carrier_date",
    "order_delivered_customer_date",
    "order_estimated_delivery_date"
]

df_orders_silver = df_orders

for c in cols_timestamp:
    df_orders_silver = df_orders_silver.withColumn(
        c,
        to_timestamp(col(c))
    )

df_orders_silver = df_orders_silver.fillna(
    "N/A",  
    subset=["order_status"] 
)

df_orders_silver.printSchema()

### olist_products_dataset.csv

In [None]:
csv_path = "./dataset_files/olist_products_dataset.csv"

In [None]:
df_products = spark.read.csv(csv_path, header = "True")
df_products.printSchema()

In [None]:
df_products_silver = df_products \
    .withColumn("product_category_name", lower(col("product_category_name"))) \
    .withColumn("product_category_name", regexp_replace(col("product_category_name"), "_", " ")) \
    .withColumn("product_category_name", trim(col("product_category_name")))

cols_number = [
    "product_name_lenght",
    "product_description_lenght",
    "product_photos_qty",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm"
]

for c in cols_number:
    df_products_silver = df_products_silver.withColumn(
        c,
        col(c).cast(IntegerType())
    )

df_products_silver = df_products_silver.fillna(0)
df_products_silver = df_products_silver.fillna(
    "N/A",  
    subset=["product_category_name"] 
)

df_products_silver.printSchema()

### olist_order_payments_dataset.csv

In [None]:
csv_path = "./dataset_files/olist_order_payments_dataset.csv"

In [None]:
df_payments = spark.read.csv(csv_path, header = "True")
df_payments.printSchema()

In [None]:
df_payments_silver = df_payments \
    .withColumn("payment_type", lower(col("payment_type"))) \
    .withColumn("payment_type", regexp_replace(col("payment_type"), "_", " ")) \
    .withColumn("payment_type", trim(col("payment_type")))

cols_number = [
    "payment_sequential",
    "payment_installments"
]

for c in cols_number:
    df_payments_silver = df_payments_silver.withColumn(
        c,
        col(c).cast(IntegerType())
    )

df_payments_silver = df_payments_silver.withColumn("payment_value", col("payment_value").cast(DecimalType(10, 2)))

df_payments_silver = df_payments_silver.fillna(0)

df_payments_silver = df_payments_silver.fillna(
    "N/A",  
    subset=["payment_type"] 
)

df_payments_silver.printSchema()

### olist_order_items_dataset.csv

In [None]:
csv_path = "./dataset_files/olist_order_items_dataset.csv"

In [None]:
df_items = spark.read.csv(csv_path, header = "True")
df_items.printSchema()

In [None]:
cols_decimal = [
    "price",
    "freight_value"
]

df_items_silver = df_items

for c in cols_decimal:
    df_items_silver = df_items_silver.withColumn(
        c,
        col(c).cast(DecimalType(10, 2))
    )

df_items_silver = df_items_silver.withColumn("shipping_limit_date", col("shipping_limit_date").cast(TimestampType()))
df_items_silver = df_items_silver.withColumn("order_item_id", col("order_item_id").cast(IntegerType()))

df_items_silver = df_items_silver.fillna(0)

df_items_silver.printSchema()

### olist_customers_dataset.csv

In [None]:
csv_path = "./dataset_files/olist_customers_dataset.csv"

In [None]:
df_customers = spark.read.csv(csv_path, header = "True")
df_customers.printSchema()

In [None]:
df_customers_silver = df_customers \
    .withColumn("customer_city", lower(col("customer_city"))) \
    .withColumn("customer_state", lower(col("customer_state"))) 

df_customers_silver = df_customers_silver.fillna(
    "N/A",  
    subset=["customer_city", "customer_state"] 
)

df_customers_silver.printSchema()

### salvar dataframes

In [None]:
jdbc_url = "jdbc:sqlserver://localhost:1433;databaseName=olist_db;integratedSecurity=true;encrypt=true;trustServerCertificate=true;"

jdbc_properties = {
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

### salvar na camada bronze

In [None]:
df_orders.write \
    .jdbc(url=jdbc_url,
          table="bronze.orders",
          mode="overwrite",
          properties=jdbc_properties)

df_products.write \
    .jdbc(url=jdbc_url,
          table="bronze.products",
          mode="overwrite",
          properties=jdbc_properties)

df_customers.write \
    .jdbc(url=jdbc_url,
          table="bronze.customers",
          mode="overwrite",
          properties=jdbc_properties)

df_items.write \
    .jdbc(url=jdbc_url,
          table="bronze.order_items",  
          mode="overwrite",
          properties=jdbc_properties)

df_payments.write \
    .jdbc(url=jdbc_url,
          table="bronze.order_payments", 
          mode="overwrite",
          properties=jdbc_properties)

### salvar na camada silver

In [None]:
df_orders_silver.write \
    .jdbc(url=jdbc_url,
          table="silver.orders",
          mode="overwrite",
          properties=jdbc_properties)

df_products_silver.write \
    .jdbc(url=jdbc_url,
          table="silver.products",
          mode="overwrite",
          properties=jdbc_properties)

df_customers_silver.write \
    .jdbc(url=jdbc_url,
          table="silver.customers",
          mode="overwrite",
          properties=jdbc_properties)

df_items_silver.write \
    .jdbc(url=jdbc_url,
          table="silver.order_items",  
          mode="overwrite",
          properties=jdbc_properties)

df_payments_silver.write \
    .jdbc(url=jdbc_url,
          table="silver.order_payments", 
          mode="overwrite",
          properties=jdbc_properties)