<h1>Configuração<h1>
<hr>

In [157]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import upper
from pyspark.sql.functions import col
from pyspark.sql.functions import date_format
from pyspark.sql.functions import year, month, dayofmonth, quarter
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col
from pyspark.sql.functions import when, lit
import psycopg2

# Configurar a sessão do Spark
spark = SparkSession.builder \
    .appName("ETL") \
    .config("spark.jars", "postgresql-8.2-506.jdbc3.jar") \
    .getOrCreate()
sqlContext = SQLContext(spark)




<h1>Extração<h1>
<hr>

In [158]:
# Obter os data frames das tabelas
query_products = "select * from products"
query_categories = "select * from categories"
query_suppliers = "select * from suppliers"
query_sales_items = "select * from sales_items"
query_sales = "select * from sales"
query_sellers = "select * from sellers"
query_customers = "select * from customers"

def read_from_postgresql(query, table_name):
    return sqlContext.read.format('jdbc').options(
        url='jdbc:postgresql://localhost/fatorv',
        dbtable='({}) as {}'.format(query, table_name),
        user='fatorv',
        password='123456',
        driver='org.postgresql.Driver').load()

df_products = read_from_postgresql(query_products, "products")
df_suppliers = read_from_postgresql(query_suppliers, "suppliers")
df_categories = read_from_postgresql(query_categories, "categories")
df_sales_items = read_from_postgresql(query_sales_items, "sales_items")
df_sales = read_from_postgresql(query_sales, "sales")
df_sellers = read_from_postgresql(query_sellers, "sellers")
df_customers = read_from_postgresql(query_customers, "customers")

states_schema = StructType([
    StructField("id_uf", IntegerType(), False),
    StructField("sigla_uf", StringType(), False),
    StructField("state_code", StringType(), False),
    StructField("nome_uf", StringType(), False),
    StructField("id_regiao", IntegerType(), False)
])

regions_schema = StructType([
    StructField("id_regiao", IntegerType(), False),
    StructField("sigla_regiao", StringType(), False),
    StructField("nome_regiao", StringType(), False)
])

# Tabela regions
df_regions = spark.read.option("multiline", "true").schema(regions_schema).json("regioes.json")
df_regions = df_regions.withColumnRenamed("id_regiao", "region_id") \
                     .withColumnRenamed("sigla_regiao", "region_acronym") \
                     .withColumnRenamed("nome_regiao", "region_name")

# Tabela states
df_states = spark.read.option("multiline", "true").schema(states_schema).json("estados.json")
df_states = df_states.withColumnRenamed("id_uf", "state_id") \
                     .withColumnRenamed("sigla_uf", "state_acronym") \
                     .withColumnRenamed("nome_uf", "state_name") \
                     .withColumnRenamed("id_regiao", "region_id")


<h1>Transformação<h1>
<hr>

In [159]:
# Alterando os dados das tabelas, construindo a tabela fatos

# Join states com regions
df_dim_states = df_states.join(df_regions, "region_id")
#df_dim_states.show()

# Fazendo upper em supplier_name e email em suppliers
df_dim_suppliers = df_suppliers.withColumn("email", upper(df_suppliers["email"])) \
                           .withColumn("supplier_name", upper(df_suppliers["supplier_name"]))
df_dim_suppliers = df_dim_suppliers.join(df_dim_states.select("state_id", "state_acronym"), df_dim_states["state_acronym"] == df_dim_suppliers["state"])\
    .withColumnRenamed("state_id", "supplier_state_id")

#df_dim_suppliers.show()

# Cria dim_date e coluna date_id
df_dim_date = df_sales.select("date").distinct().withColumn("date_id", monotonically_increasing_id() + 1) \
                .withColumn("date_id", col("date_id").cast("numeric(8,0)"))

# Divide date em year, month, day
df_dim_date = df_dim_date.withColumn("year", year("date")).withColumn("year", col("year").cast("numeric(4,0)")) \
                   .withColumn("month", month("date")).withColumn("month", col("month").cast("numeric(2,0)")) \
                   .withColumn("day", dayofmonth("date")).withColumn("day", col("day").cast("numeric(2,0)")) \
                   .withColumn("quarter", quarter("date")).withColumn("quarter", col("quarter").cast("numeric(1,0)"))
#df_dim_date.show()

# Muda o formato da data
df_dim_date = df_dim_date.withColumn("date", date_format("date", "yyyyMMdd"))

# Adiciona o nome dos meses
df_dim_date = df_dim_date.withColumn("month_name", 
                   when(df_dim_date.month == 1, lit("January"))
                   .when(df_dim_date.month == 2, lit("February"))
                   .when(df_dim_date.month == 3, lit("March"))
                   .when(df_dim_date.month == 4, lit("April"))
                   .when(df_dim_date.month == 5, lit("May"))
                   .when(df_dim_date.month == 6, lit("June"))
                   .when(df_dim_date.month == 7, lit("July"))
                   .when(df_dim_date.month == 8, lit("August"))
                   .when(df_dim_date.month == 9, lit("September"))
                   .when(df_dim_date.month == 10, lit("October"))
                   .when(df_dim_date.month == 11, lit("November"))
                   .when(df_dim_date.month == 12, lit("December"))
                   .otherwise(None))
#df_dim_date.show()

# Join df_sales com df_date
df_sales = df_sales.withColumn("date", date_format("date", "yyyyMMdd"))
df_sales = df_sales.join(df_dim_date.select("date_id", "date"), "date")
df_sales.drop("date")
df_dim_sales = df_sales.select("sales_id", "customer_id", "seller_id", "date_id", "total_price")
#df_dim_sales.show()

# Criando df_dim_suppliers e trocando a coluna states por states_id
df_suppliers = df_suppliers.join(df_states.select("state_id", "state_acronym"), df_suppliers["state"] == df_states["state_acronym"], "inner")\
    .withColumnRenamed("state_id", "supplier_state_id")
df_suppliers = df_suppliers.drop("state")
#df_suppliers.show()

# Criando df_dim_sellers fazendo upper em supplier_name e email em sellers
df_dim_sellers = df_sellers.withColumn("email", upper(df_sellers["email"])) \
                       .withColumn("seller_name", upper(df_sellers["seller_name"]))

# Join sellers com states, troca a coluna states por states_id
df_dim_sellers = df_dim_sellers.join(df_dim_states.select("state_id", "state_acronym"), df_dim_sellers["state"] == df_dim_states["state_acronym"], "inner")\
    .withColumnRenamed("state_id", "seller_state_id")
df_dim_sellers = df_dim_sellers.drop('state', 'state_acronym')
#df_dim_sellers.show()

# Criando df_dim_customers e fazendo upper em supplier_name e email em customers
df_dim_customers = df_customers.withColumn("email", upper(df_customers["email"])) \
                           .withColumn("customer_name", upper(df_customers["customer_name"]))

# Join customers com states, troca a coluna states por states_id
df_dim_customers = df_dim_customers.join(df_dim_states.select("state_id", "state_acronym"), df_dim_customers["state"] == df_states["state_acronym"], "inner")\
    .withColumnRenamed("state_id", "customer_state_id")
df_dim_customers = df_dim_customers.drop("state", "state_acronym")
#df_dim_customers.show()

# Criando df_dim_products fazendo upper em product_name em products
df_dim_products = df_products.withColumn("product_name", upper(df_products["product_name"]))

# Criando dim_products
df_dim_products = df_products.select("product_id", "product_name", "category_id", "supplier_id", "price")

# Criando df_dim_categories fazendo upper em category_name em categories
df_dim_categories = df_categories.withColumn("category_name", upper(df_categories["category_name"]))

# Criando a tabela fato
fato = df_dim_sales.join(df_sales_items.select("sales_id", "product_id", "quantity", "price"), "sales_id")
fato = fato.withColumnRenamed("price", "sell_price")
fato = fato.join(df_dim_products.select("product_id", "category_id", "supplier_id"), "product_id")
fato = fato.join(df_dim_customers.select("customer_id", "customer_state_id"), "customer_id")
fato = fato.join(df_dim_sellers.select("seller_id", "seller_state_id"), "seller_id")
fato = fato.join(df_dim_suppliers.select("supplier_id", "supplier_state_id"), "supplier_id")
fato = fato.drop("total_price")

# Calculando sub_total = sell_price * quantity
fato = fato.withColumn("sub_total", col("sell_price") * col("quantity"))

# Calculando total_price = sum(sub_total)
fato = fato.join(fato.select("sales_id", "sub_total").groupBy("sales_id").sum("sub_total").withColumnRenamed("sum(sub_total)", "total_price"), "sales_id")




<h1>Carga<h1>
<hr>

In [160]:
connected = False
try:
    connection = psycopg2.connect(
        dbname="fatorvgestao",
        user="fatorv",
        password="123456",
        host="localhost"
    )
    print("Conexão bem-sucedida!")
    connected = True
except psycopg2.Error as e:
    print("Erro ao conectar:", e)
    exit()
if(connected):
   cursor = connection.cursor()

   # Limpa as tabelas

   query = "delete from fato_sales_items"
   try:
      cursor.execute(query)
   except psycopg2.Error as e:
      print("erro ao limpar fato_sales_items")

   query = "delete from dim_customers"
   try:
      cursor.execute(query)
   except psycopg2.Error as e:
      print("erro ao limpar dim_customers")
   
   query = "delete from dim_sellers"
   try:
      cursor.execute(query)
   except psycopg2.Error as e:
      print("erro ao limpar dim_sellers")

   query = "delete from dim_suppliers"
   try:
      cursor.execute(query)
   except psycopg2.Error as e:
      print("erro ao limpar dim_suppliers")

   query = "delete from dim_states"
   try:
      cursor.execute(query)
   except psycopg2.Error as e:
      print("erro ao limpar dim_states")

   query = "delete from dim_products"
   try:
      cursor.execute(query)
   except psycopg2.Error as e:
      print("erro ao limpar dim_products")

   query = "delete from dim_categories"
   try:
      cursor.execute(query)
   except psycopg2.Error as e:
      print("erro ao limpar dim_categories")
   
   query = "delete from dim_date"
   try:
      cursor.execute(query)
   except psycopg2.Error as e:
      print("erro ao limpar dim_date")
   
   connection.commit()

# Insere os dados
try:
   # dim_categories
   data = df_dim_categories.collect()
   insert_values = [(row['category_id'], row['category_name']) for row in data]
   insert_query = "INSERT INTO dim_categories (category_id, category_name) VALUES (%s, %s)"
   cursor.executemany(insert_query, insert_values)

   # dim_customers
   data = df_dim_customers.collect()
   insert_values = [(row['customer_id'], row['customer_name'], row['email'], row['customer_state_id']) for row in data]
   insert_query = "INSERT INTO dim_customers (customer_id, customer_name, email, customer_state_id) VALUES (%s, %s, %s, %s)"
   cursor.executemany(insert_query, insert_values)

   # dim_date
   data = df_dim_date.collect()
   insert_values = [(row['date_id'], row['date'], row['year'], row['month'], row['quarter'], row['day'], row['month_name']) for row in data]
   insert_query = "INSERT INTO dim_date (date_id, date, year, month, quarter, day, month_name) VALUES (%s, %s, %s, %s, %s, %s, %s)"
   cursor.executemany(insert_query, insert_values)

   # dim_products
   data = df_dim_products.collect()
   insert_values = [(row['product_id'], row['product_name'], row['price']) for row in data]
   insert_query = "INSERT INTO dim_products (product_id, product_name, price) VALUES (%s, %s, %s)"
   cursor.executemany(insert_query, insert_values)

   # dim_sellers
   data = df_dim_sellers.collect()
   insert_values = [(row['seller_id'], row['seller_name'], row['email'], row['tx_commission'], row['seller_state_id']) for row in data]
   insert_query = "INSERT INTO dim_sellers (seller_id, seller_name, email, tx_commission, seller_state_id) VALUES (%s, %s, %s, %s, %s)"
   cursor.executemany(insert_query, insert_values)

   # dim_states
   data = df_dim_states.collect()
   insert_values = [(row['state_id'], row['state_acronym'], row['state_code'], row['state_name'], row['region_id'], row['region_acronym'], row['region_name']) for row in data]
   insert_query = "INSERT INTO dim_states (state_id, state_acronym, state_code, state_name, region_id, region_acronym, region_name) VALUES (%s, %s, %s, %s, %s, %s, %s)"
   cursor.executemany(insert_query, insert_values)

   # dim_suppliers
   data = df_dim_suppliers.collect()
   insert_values = [(row['supplier_id'], row['supplier_name'], row['email'], row['supplier_state_id']) for row in data]
   insert_query = "INSERT INTO dim_suppliers (supplier_id, supplier_name, email, supplier_state_id) VALUES (%s, %s, %s, %s)"
   cursor.executemany(insert_query, insert_values)

   # fato_sales_items
   data = fato.collect()
   insert_values = [(row['sales_id'], row['product_id'], row['date_id'], row['customer_id'], row['seller_id'], row['total_price'],
                     row['supplier_id'], row['customer_state_id'], row['seller_state_id'], row['supplier_state_id'], 
                     row['category_id'], row['quantity'], row['sell_price'], row['sub_total']) for row in data]
   insert_query = """INSERT INTO fato_sales_items (sales_id, product_id, date_id, customer_id, seller_id, total_price, supplier_id,
                                                   customer_state_id, seller_state_id, supplier_state_id, category_id, quantity, 
                                                   sell_price, sub_total) 
                     VALUES (%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s, %s, %s)"""
   cursor.executemany(insert_query, insert_values)

   connection.commit()
except psycopg2.Error as e:
   print("erro - ", e)

connection.close()
spark.stop()

Conexão bem-sucedida!
