# Análise sobre os Preços dos Combustíveis no Brasil

Aluno: Rafael Pereira Cândido

Matrícula: 2221134

Curso: Ciência da Computação

Esse trabalho tem o propósito de analizar os preços dos combustíveis no Brasil do período de 2003 até 2023.

Chat sobre as queries e modelagem...: https://chatgpt.com/share/973eee34-e214-485c-8001-240b457834bf

In [96]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f
from uuid import uuid4
import matplotlib.pyplot as plt
import seaborn as sns

In [97]:
url = "jdbc:postgresql://db:5432/fuel_analysis"
properties = {
    "user": "root",
    "password": "root",
    "driver": "org.postgresql.Driver"
}

In [98]:
spark = SparkSession.builder \
    .appName("spark") \
    .master("local[*]") \
    .config("spark.jars", "/usr/local/spark/jars/postgresql-42.7.3.jar") \
    .getOrCreate()


In [99]:
def extract_data(path):
  df = spark.read.csv(path, sep=';', inferSchema=True, header=True)
  
  return df

def load_dim_table(table_name):
    return spark.read.jdbc(url=url, table=table_name, properties=properties)
  
def load_data_to_db(df, table_name):
    df.write.jdbc(url=url, table=table_name, mode="append", properties=properties)

In [100]:
base_path = '/home/jovyan/data/'
path = base_path+'ca-2020-02.csv'
df = extract_data(path)

In [None]:
print(df.count())

In [101]:
def transform_data(df):
  df = df.withColumn(
      "Data da Coleta",
      f.to_date(f.col("Data da Coleta").cast(StringType()), 'dd/MM/yyyy')
      )\
      .withColumn('Valor de Venda', f.regexp_replace('Valor de Venda', ',', '.'))\
      .withColumn('Valor de Venda', f.col('Valor de Venda').cast(DoubleType()))\
      .withColumn('Valor de Compra', f.regexp_replace('Valor de Compra', ',', '.'))\
      .withColumn('Valor de Compra', f.col('Valor de Compra').cast(DoubleType()))\
      .dropna(subset=['Valor de Venda', 'Valor de Compra'])\
      .fillna({'Complemento': 'S/C'})
      
  df = df.withColumnRenamed('Regiao - Sigla', 'Regiao_Sigla') \
       .withColumnRenamed('Estado - Sigla', 'Estado_Sigla') \
       .withColumnRenamed('CNPJ da Revenda', 'CNPJ_Revenda') \
       .withColumnRenamed('Data da Coleta', 'Data_Coleta') \
       .withColumnRenamed('Nome da Rua', 'Nome_Rua') \
       .withColumnRenamed('Numero Rua', 'Numero_Rua')\
       .withColumnRenamed('Valor de Venda', 'Valor_Venda') \
       .withColumnRenamed('Valor de Compra', 'Valor_Compra')
      
  return df

In [102]:
def create_dim_tempo(df):
    dim_tempo = df.select(
        f.col("Data_Coleta").alias("data_coleta"),
        f.dayofmonth("Data_Coleta").alias("dia"),
        f.month("Data_Coleta").alias("mes"),
        f.year("Data_Coleta").alias("ano"),
        f.dayofweek("Data_Coleta").alias("dia_semana")
    ).distinct()
    return dim_tempo

def create_dim_localizacao(df):
    dim_localizacao = df.select(
        f.col("Regiao_Sigla").alias("regiao_Sigla"), 
        f.col("Estado_Sigla").alias("estado_Sigla"), 
        f.col("Municipio").alias("municipio"), 
        f.col("Bairro").alias("bairro"), 
        f.col("Cep").alias("cep")
    ).distinct()
    return dim_localizacao

def create_dim_produto(df):
    dim_produto = df.select(f.col("Produto").alias("produto")).distinct()
    return dim_produto

def create_dim_revenda(df):
    dim_revenda = df.select(
        f.col("CNPJ_Revenda").alias("cnpj_revenda"), 
        f.col("Revenda").alias("nome_revenda")
    ).distinct()
    return dim_revenda

def create_dim_endereco(df):
    dim_endereco = df.select(
         f.col("Nome_Rua").alias("nome_rua"), 
         f.col("Numero_Rua").alias("numero_rua"), 
         f.col("Complemento").alias("complemento")
    ).distinct()
    return dim_endereco

In [103]:
df = transform_data(df)

In [None]:
df.show(5, False)

In [104]:
dim_tempo = create_dim_tempo(df)
dim_localizacao = create_dim_localizacao(df)
dim_produto = create_dim_produto(df)
dim_revenda = create_dim_revenda(df)
dim_endereco = create_dim_endereco(df)

In [105]:
load_data_to_db(dim_tempo, "dim_tempo")
load_data_to_db(dim_localizacao, "dim_localizacao")
load_data_to_db(dim_produto, "dim_produto")
load_data_to_db(dim_revenda, "dim_revenda")
load_data_to_db(dim_endereco, "dim_endereco")

In [106]:
dim_tempo_db = load_dim_table("dim_tempo")
dim_localizacao_db = load_dim_table("dim_localizacao")
dim_produto_db = load_dim_table("dim_produto")
dim_revenda_db = load_dim_table("dim_revenda")
dim_endereco_db = load_dim_table("dim_endereco")

In [115]:
def create_fato_venda(df, dim_tempo, dim_localizacao, dim_produto, dim_revenda, dim_endereco):
    fato_venda = df.join(dim_tempo, df.Data_Coleta == dim_tempo.data_coleta, 'left') \
                   .join(dim_localizacao, (df.Regiao_Sigla == dim_localizacao.regiao_sigla) & 
                                         (df.Estado_Sigla == dim_localizacao.estado_sigla) & 
                                         (df.Municipio == dim_localizacao.municipio) & 
                                         (df.Bairro == dim_localizacao.bairro) & 
                                         (df.Cep == dim_localizacao.cep), 'left') \
                   .join(dim_produto, df.Produto == dim_produto.produto, 'left') \
                   .join(dim_revenda, (df.CNPJ_Revenda == dim_revenda.cnpj_revenda) & 
                                      (df.Revenda == dim_revenda.nome_revenda), 'left') \
                   .join(dim_endereco, (df.Nome_Rua == dim_endereco.nome_rua) & 
                                       (df.Numero_Rua == dim_endereco.numero_rua) & 
                                       (df.Complemento == dim_endereco.complemento), 'left') \
                   .select(
                       "tempo_id",
                       "localizacao_id",
                       "produto_id",
                       "revenda_id",
                       "endereco_id",
                       f.col("Valor_Venda").alias('valor_venda'),
                       f.col("Valor_Compra").alias('valor_compra'),
                   )
    return fato_venda

In [116]:
fato_venda = create_fato_venda(df, dim_tempo_db, dim_localizacao_db, dim_produto_db, dim_revenda_db, dim_endereco_db)


In [117]:
fato_venda.show(5, False)

+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+-----------+------------+
|tempo_id                            |localizacao_id                      |produto_id                          |revenda_id                          |endereco_id                         |valor_venda|valor_compra|
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+-----------+------------+
|6f369256-3673-4dd0-9624-8337eeda0a82|ec1f3e5e-a8b2-4230-ae21-0965f9a6edfa|b0df3d52-9e51-44bb-8536-f54297c2b415|9bbfd423-c402-4850-96b9-084a59ea4075|bf176a2c-803b-4fc9-9504-1669423f4697|4.29       |3.7676      |
|6f369256-3673-4dd0-9624-8337eeda0a82|ec1f3e5e-a8b2-4230-ae21-0965f9a6edfa|b0df3d52-9e51-44bb-8536-f54297c2b415|9bbfd423-c402-4850-96b9-084a59ea4075|d4a

In [118]:
load_data_to_db(fato_venda, "fato_venda")

Py4JJavaError: An error occurred while calling o1518.jdbc.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 220.0 failed 1 times, most recent failure: Lost task 0.0 in stage 220.0 (TID 315) (0de04a9727e8 executor driver): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO fato_venda ("tempo_id","localizacao_id","produto_id","revenda_id","endereco_id","valor_venda","valor_compra") VALUES (('6f369256-3673-4dd0-9624-8337eeda0a82'),('ec1f3e5e-a8b2-4230-ae21-0965f9a6edfa'),('b0df3d52-9e51-44bb-8536-f54297c2b415'),('9bbfd423-c402-4850-96b9-084a59ea4075'),('bf176a2c-803b-4fc9-9504-1669423f4697'),('4.29'::double precision),('3.7676'::double precision)) was aborted: ERROR: column "tempo_id" is of type uuid but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 131  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:936)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1733)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.postgresql.util.PSQLException: ERROR: column "tempo_id" is of type uuid but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 131
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:329)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:893)
	... 19 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1036)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1034)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:901)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:70)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:756)
	at jdk.internal.reflect.GeneratedMethodAccessor104.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO fato_venda ("tempo_id","localizacao_id","produto_id","revenda_id","endereco_id","valor_venda","valor_compra") VALUES (('6f369256-3673-4dd0-9624-8337eeda0a82'),('ec1f3e5e-a8b2-4230-ae21-0965f9a6edfa'),('b0df3d52-9e51-44bb-8536-f54297c2b415'),('9bbfd423-c402-4850-96b9-084a59ea4075'),('bf176a2c-803b-4fc9-9504-1669423f4697'),('4.29'::double precision),('3.7676'::double precision)) was aborted: ERROR: column "tempo_id" is of type uuid but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 131  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:936)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1733)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:746)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:902)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1036)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1036)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2438)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: org.postgresql.util.PSQLException: ERROR: column "tempo_id" is of type uuid but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 131
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:329)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:893)
	... 19 more
