In [0]:
base_path = "/Volumes/workspace/dbfs_mvp_sprint1/arquivos"

In [0]:
from pyspark.sql.functions import col, regexp_extract
from functools import reduce
from pyspark.sql import DataFrame

# Lista de arquivos
arquivos = dbutils.fs.ls(base_path)
dfs = []

# Ler cada arquivo e adicionar coluna com nome do arquivo
for arquivo in arquivos:
    if arquivo.path.endswith(".csv"):
        temp_df = spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("delimiter", ";") \
            .load(arquivo.path) \
            .withColumn("arquivo", regexp_extract(col("_metadata.file_path"), ".*/(.*)", 1))
        dfs.append(temp_df)

# Unir todos os DataFrames mantendo todas as colunas pelo nome
df = reduce(lambda a, b: a.unionByName(b, allowMissingColumns=True), dfs)

display(df)
print("Colunas consolidadas:", df.columns)

ANO,MÊS,GRANDE REGIÃO,UNIDADE DA FEDERAÇÃO,PRODUTO,LOCALIZAÇÃO,CONSUMO,arquivo,DISPONÍVEL,OPERAÇÃO COMERCIAL,IMPORTADO / EXPORTADO,DISPÊNDIO / RECEITA,PRODUÇÃO,QUEIMADO,REINJETADO,VENDAS
2000,JAN,REGIÃO NORTE,AMAZONAS,GÁS NATURAL,TERRA,7974.0,consumo-proprio-gn1000m3-2000-2025.csv,,,,,,,,
2000,DEZ,REGIÃO NORDESTE,CEARÁ,GÁS NATURAL,MAR,1322.0,consumo-proprio-gn1000m3-2000-2025.csv,,,,,,,,
2000,JAN,REGIÃO NORDESTE,RIO GRANDE DO NORTE,GÁS NATURAL,MAR,3089647.0,consumo-proprio-gn1000m3-2000-2025.csv,,,,,,,,
2000,FEV,REGIÃO NORDESTE,RIO GRANDE DO NORTE,GÁS NATURAL,MAR,3471029.0,consumo-proprio-gn1000m3-2000-2025.csv,,,,,,,,
2000,MAR,REGIÃO NORDESTE,RIO GRANDE DO NORTE,GÁS NATURAL,MAR,418873.0,consumo-proprio-gn1000m3-2000-2025.csv,,,,,,,,
2000,ABR,REGIÃO NORDESTE,RIO GRANDE DO NORTE,GÁS NATURAL,MAR,3141402.0,consumo-proprio-gn1000m3-2000-2025.csv,,,,,,,,
2000,MAI,REGIÃO NORDESTE,RIO GRANDE DO NORTE,GÁS NATURAL,MAR,3493223.0,consumo-proprio-gn1000m3-2000-2025.csv,,,,,,,,
2000,JUN,REGIÃO NORDESTE,RIO GRANDE DO NORTE,GÁS NATURAL,MAR,48458491.0,consumo-proprio-gn1000m3-2000-2025.csv,,,,,,,,
2000,JUL,REGIÃO NORDESTE,RIO GRANDE DO NORTE,GÁS NATURAL,MAR,51753411.0,consumo-proprio-gn1000m3-2000-2025.csv,,,,,,,,
2000,AGO,REGIÃO NORDESTE,RIO GRANDE DO NORTE,GÁS NATURAL,MAR,4344206.0,consumo-proprio-gn1000m3-2000-2025.csv,,,,,,,,


Colunas consolidadas: ['ANO', 'MÊS', 'GRANDE REGIÃO', 'UNIDADE DA FEDERAÇÃO', 'PRODUTO', 'LOCALIZAÇÃO', 'CONSUMO', 'arquivo', 'DISPONÍVEL', 'OPERAÇÃO COMERCIAL', 'IMPORTADO / EXPORTADO', 'DISPÊNDIO / RECEITA', 'PRODUÇÃO', 'QUEIMADO', 'REINJETADO', 'VENDAS']


In [0]:
# Conferir se todos os arquivos foram lidos
# Contagem total de linhas
print("Total de linhas:", df.count())

# Lista de arquivos carregados
df.select("arquivo").distinct().show(truncate=False)

Total de linhas: 138980
+----------------------------------------------+
|arquivo                                       |
+----------------------------------------------+
|consumo-proprio-gn1000m3-2000-2025.csv        |
|gn-disponivel-1000m3-2000-2025.csv            |
|importacoes-exportacoes-petroleo-2000-2025.csv|
|producao-gas-natural-1000m3-1997-2025.csv     |
|producao-lgn-m3-1997-2025.csv                 |
|producao-petroleo-m3-1997-2025.csv            |
|queima-e-perda-gn-1000m3-2000-2025.csv        |
|reinjecao-gn-1000m3-2000-2025.csv             |
|vendas-combustiveis-m3-1990-2025.csv          |
+----------------------------------------------+



In [0]:
arquivos = dbutils.fs.ls(base_path)

# Verificar os arquivos carregados, suas colunas e tipos
for arquivo in arquivos:
    if arquivo.path.endswith(".csv"):
        temp_df = spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("delimiter", ";") \
            .load(arquivo.path)
        
        print(f"Arquivo: {arquivo.name}")
        print("Colunas e tipos:")
        for col_name, col_type in temp_df.dtypes:
            print(f"  {col_name}: {col_type}")
        print("-" * 50)

Arquivo: consumo-proprio-gn1000m3-2000-2025.csv
Colunas e tipos:
  ANO: int
  MÊS: string
  GRANDE REGIÃO: string
  UNIDADE DA FEDERAÇÃO: string
  PRODUTO: string
  LOCALIZAÇÃO: string
  CONSUMO: string
--------------------------------------------------
Arquivo: gn-disponivel-1000m3-2000-2025.csv
Colunas e tipos:
  ANO: int
  MÊS: string
  GRANDE REGIÃO: string
  UNIDADE DA FEDERAÇÃO: string
  PRODUTO: string
  LOCALIZAÇÃO: string
  DISPONÍVEL: string
--------------------------------------------------
Arquivo: importacoes-exportacoes-petroleo-2000-2025.csv
Colunas e tipos:
  ANO: int
  MÊS: string
  PRODUTO: string
  OPERAÇÃO COMERCIAL: string
  IMPORTADO / EXPORTADO: string
  DISPÊNDIO / RECEITA: bigint
--------------------------------------------------
Arquivo: producao-gas-natural-1000m3-1997-2025.csv
Colunas e tipos:
  ANO: int
  MÊS: string
  GRANDE REGIÃO: string
  UNIDADE DA FEDERAÇÃO: string
  PRODUTO: string
  LOCALIZAÇÃO: string
  PRODUÇÃO: string
----------------------------

In [0]:
from pyspark.sql.functions import trim
#Fazendo limpeza de espaços com trim nas colunas que forem do tipo string
for col_name, col_type in df.dtypes:
    if col_type == "string":
        df = df.withColumn(col_name, trim(col(col_name)))

In [0]:
from pyspark.sql.functions import regexp_replace, trim

numeric_cols = [
    "CONSUMO", "DISPONÍVEL", "IMPORTADO / EXPORTADO",
    "DISPÊNDIO / RECEITA", "QUEIMADO", "PRODUÇÃO",
    "REINJETADO", "VENDAS"
]

for col_name in numeric_cols:
    if col_name in df.columns:
        # Substituir vírgula por ponto
        df = df.withColumn(col_name, regexp_replace(col(col_name), ",", "."))
        # Remover espaços extras
        df = df.withColumn(col_name, trim(col(col_name)))
        # Remover qualquer caractere que não seja número ou ponto
        df = df.withColumn(col_name, regexp_replace(col(col_name), "[^0-9.]", ""))
        # Converter para double
        df = df.withColumn(col_name, col(col_name).cast("double"))
        # Preencher valores nulos com 0
        df = df.fillna({col_name: 0})

In [0]:
#Tabela de dimensão com as informações de tempo, no caso MÊS e ANO
from pyspark.sql.functions import concat_ws

dim_tempo = df.select("ANO", "MÊS").distinct() \
    .withColumn("mes_ano", concat_ws("/", col("MÊS"), col("ANO")))

In [0]:
#Tabela de dimensão com as UF
dim_UF = df.select("GRANDE REGIÃO", "UNIDADE DA FEDERAÇÃO").distinct()

In [0]:
#Tabela de dimensão com localizacao
dim_local = df.select("LOCALIZAÇÃO").distinct()

In [0]:
#Tabela de dimensão com os tipos de produtos produzidos ou comercializados
dim_produto = df.select("PRODUTO").distinct()

In [0]:
fato_vendas = df.filter(col("arquivo") == "vendas-combustiveis-m3-1990-2025.csv") \
    .select("ANO", "MÊS", "GRANDE REGIÃO", "UNIDADE DA FEDERAÇÃO", "PRODUTO", "VENDAS")

In [0]:
cols_producao = [c for c in ["ANO", "MÊS", "GRANDE REGIÃO", "UNIDADE DA FEDERAÇÃO", "LOCALIZAÇÃO", "PRODUTO",
                              "PRODUÇÃO", "QUEIMADO", "REINJETADO", "CONSUMO", "DISPONÍVEL"] if c in df.columns]

fato_producao = df.filter(col("arquivo").isin(
    "producao-petroleo-m3-1997-2025.csv",
    "producao-gas-natural-1000m3-1997-2025.csv",
    "producao-lgn-m3-1997-2025.csv",
    "queima-e-perda-gn-1000m3-2000-2025.csv",
    "reinjecao-gn-1000m3-2000-2025.csv",
    "consumo-proprio-gn1000m3-2000-2025.csv",
    "gn-disponivel-1000m3-2000-2025.csv"
)).select(*cols_producao)

In [0]:
fato_import_export = df.filter(col("arquivo") == "importacoes-exportacoes-petroleo-2000-2025.csv") \
    .select("ANO", "MÊS", "PRODUTO", "OPERAÇÃO COMERCIAL", "IMPORTADO / EXPORTADO", "DISPÊNDIO / RECEITA")

In [0]:
# Função para normalizar nomes de colunas
def normalizar_colunas(df):
    for col_name in df.columns:
        novo_nome = col_name.upper() \
            .replace(" ", "_") \
            .replace("/", "_") \
            .replace("(", "") \
            .replace(")", "") \
            .replace(";", "") \
            .replace("-", "_") \
            .replace("Á", "A").replace("Ã", "A").replace("Ê", "E").replace("É", "E").replace("Í", "I").replace("Ó", "O").replace("Ú", "U")
        df = df.withColumnRenamed(col_name, novo_nome)
    return df

# Aplicar em todas as tabelas
dim_tempo = normalizar_colunas(dim_tempo)
dim_UF = normalizar_colunas(dim_UF)
dim_local = normalizar_colunas(dim_local)
dim_produto = normalizar_colunas(dim_produto)
fato_vendas = normalizar_colunas(fato_vendas)
fato_producao = normalizar_colunas(fato_producao)
fato_import_export = normalizar_colunas(fato_import_export)


In [0]:
# gravação das tabelas
# Dimensões
dim_tempo.write.format("delta").mode("overwrite").saveAsTable("workspace.dbfs_mvp_sprint1.dim_tempo")
dim_UF.write.format("delta").mode("overwrite").saveAsTable("workspace.dbfs_mvp_sprint1.dim_UF")
dim_local.write.format("delta").mode("overwrite").saveAsTable("workspace.dbfs_mvp_sprint1.dim_local")
dim_produto.write.format("delta").mode("overwrite").saveAsTable("workspace.dbfs_mvp_sprint1.dim_produto")


In [0]:
# gravação das tabelas
# Fatos
fato_vendas.write.format("delta").mode("overwrite").saveAsTable("workspace.dbfs_mvp_sprint1.fato_vendas")
fato_producao.write.format("delta").mode("overwrite").saveAsTable("workspace.dbfs_mvp_sprint1.fato_producao")
fato_import_export.write.format("delta").mode("overwrite").saveAsTable("workspace.dbfs_mvp_sprint1.fato_import_export")