In [None]:
%%pyspark

# Join Orders com Customers
orders_df = spark.read.load('abfss://bronze@datalakeengdados.dfs.core.windows.net/orders', format='delta')
customers_df = spark.read.load('abfss://bronze@datalakeengdados.dfs.core.windows.net/customers', format='delta')

orders_customers_df = orders_df.join(customers_df, orders_df.customerId == customers_df.customerId, "left").drop(customers_df.customerId)

In [None]:
%%pyspark

# Join OrdersItems com Orders e Products
orders_items_df = spark.read.load('abfss://bronze@datalakeengdados.dfs.core.windows.net/orderItems', format='delta')
products_df = spark.read.load('abfss://bronze@datalakeengdados.dfs.core.windows.net/products', format='delta')

orders_items_orders_df = orders_items_df.join(orders_df, orders_items_df.orderId == orders_df.orderId, "left").drop(orders_df.orderId)
orders_items_orders_products_df = orders_items_orders_df.join(products_df, orders_items_orders_df.productId == products_df.productId, "left").drop(products_df.productId)

In [None]:
%%pyspark

# Join Employees com Departments
employees_df = spark.read.load('abfss://bronze@datalakeengdados.dfs.core.windows.net/employees', format='delta')
departments_df = spark.read.load('abfss://bronze@datalakeengdados.dfs.core.windows.net/departments', format='delta')

employees_departments_df = employees_df.join(departments_df, employees_df.departmentId == departments_df.departmentId, "left").drop(departments_df.departmentId)

In [None]:
%%pyspark

# Combinar todas as tabelas em uma única tabela
from pyspark.sql.functions import lit

# Adicionar um identificador de tipo para cada DataFrame
orders_customers_df = orders_customers_df.withColumn("type", lit("order_customer"))
orders_items_orders_products_df = orders_items_orders_products_df.withColumn("type", lit("order_item"))
employees_departments_df = employees_departments_df.withColumn("type", lit("employee"))

# Selecionar colunas comuns para união
orders_customers_common_df = orders_customers_df.select("type", "orderId", "customerId", "createdAt", "total", "name", "address", "email")
orders_items_orders_products_common_df = orders_items_orders_products_df.select("type", "itemId", "orderId", "productId", "productName", "quantity", "unitPrice", "totalPrice")
employees_departments_common_df = employees_departments_df.select("type", "employeeId", "name", "position", "createdAt", "salary", "departmentId", "location")

# Realizar a união
final_df = orders_customers_common_df.unionByName(orders_items_orders_products_common_df, allowMissingColumns=True)
final_df = final_df.unionByName(employees_departments_common_df, allowMissingColumns=True)

# Salvar o resultado em um arquivo Parquet
output_path = 'abfss://gold@datalakeengdados.dfs.core.windows.net/OneBigTable.parquet'
final_df.write.mode('overwrite').delta(output_path)