## Coleta de dados

Esse projeto em PySpark faz uma análise de dados de preços de combustíveis da ANP.

O trabalho faz uso de dados publicados pela ANP (Agência Nacional de Petróleo)

> Em cumprimento às determinações da Lei do Petróleo (Lei nº 9478/1997, artigo 8º), a ANP acompanha os preços praticados por revendedores de combustíveis automotivos e de gás liquefeito de petróleo envasilhado em botijões de 13 quilos (GLP P13), por meio de uma pesquisa semanal de preços realizada por empresa contratada.

- [Série Histórica de Preços de Combustíveis e de GLP](https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/serie-historica-de-precos-de-combustiveis)

Coletamos a série histórica de "Combustíveis automotivos" que vai de 2004 a 2023. São 39 arquivos CSV totalizando aproximadamente 3.7 GB.

![Amostra dos dados](./assets/amostra_planilha_revenda.png)

- [Metadados em PDF](https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/metadados-serie-historica-precos-combustiveis-1.pdf)

Temos as seguintes colunas nos arquivos CSV, em conformidade com a documentação de metadados:

| Coluna            | Tipo    | Comentário                                                                           |
| ----------------- | ------- | ------------------------------------------------------------------------------------ |
| Regiao            | Texto   | Sigla da região do país (ex: S, N, SE)                                               |
| Estado            | Texto   | Sigla de unidade federativa (ex: RJ, SP, MG)                                         |
| Municipio         | Texto   | Nome de município                                                                    |
| Revenda           | Texto   | Razão social do revendedor de combustível                                            |
| CNPJ da Revenda   | Texto   | CNPJ do revendedor                                                                   |
| Nome da Rua       | Texto   | Logradouro                                                                           |
| Complemento       | Texto   | Logradouro                                                                           |
| Bairro            | Texto   | Logradouro                                                                           |
| CEP               | Texto   | Código de Endereçamento Postal                                                       |
| Produto           | Texto   | Produto combustível (ex: GASOLINA, ETANOL, DIESEL)                                   |
| Data da Coleta    | Data    | Data da pesquisa de preço (formato dd/mm/aaaa)                                       |
| Valor de Venda    | Decimal | Valor de venda da unidade de combustível (4 casas decimais, vírgula como separador)  |
| Valor de Compra   | Decimal | Valor de compra da unidade de combustível (4 casas decimais, virgula como separador) |
| Unidade de Medida | Texto   | Unidade ao qual os valores de compra e venda se referem (ex: R$ / litro)             |
| Bandeira          | Texto   | Nome de marca do posto de revenda (ex: IPIRANGA, BRANCA, COSAN, etc.)                |


## Infraestrutura

Aproveitando a configuração de cluster da Google Cloud Platform criada para o [projeto Hadoop](https://github.com/pinei/hadoop-precos-combustiveis), incluímos o JupyterLab para trabalhar com o PySpark.

![cluster-info](./assets/cluster_info.png)

Em paralelo usamos uma instalação em Raspberry Pi 4 para verificar a viabilidade e desempenho nesse tipo de hardware de baixo custo.

- [Running PySpark in JupyterLab on a Raspberry Pi](https://dev.to/pinei/running-pyspark-in-jupyterlab-on-a-raspberry-pi-1293)

## Download

Os arquivos CSV já estão organizados em um bucket do Google Cloud Storage.

> `https://console.cloud.google.com/storage/browser/hadoop-dados-brutos`

Optamos por usar uma API Python para trazer os arquivos do storage e trabalhar com eles no cluster ou na máquina local (a depender de como estamos executando este notebook).

!pip install google-cloud-storage

A API precisa de uma chave para autenticação no serviço.

Passo a passo para gerar a chave:
- Console do Google Cloud Platform
- APIs e Serviços > Credenciais
- Criar Conta de Serviço
- Criar Chave do tipo JSON

> "É feito o download de um arquivo contendo a chave privada. Armazene o arquivo com segurança porque essa chave não pode ser recuperada em caso de perda."

Subimos o arquivo JSON para uma pasta de trabalho no cluster:

> `/home/aldinei_bastos/work/seventh-abacus-395221-52fc140a5609.json`

Fizemos uso da API para acessar o bucket, listar o conteúdo e fazer o download de cada arquivo com extensão CSV.

In [7]:
import os

home_dir = os.environ["HOME"]
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = f'{home_dir}/work/seventh-abacus-395221-52fc140a5609.json'

In [8]:
from google.cloud import storage

# Inicializa o cliente
storage_client = storage.Client()

# Lista todos os buckets
buckets = list(storage_client.list_buckets())

# Obtem um bucket específico
bucket = storage_client.get_bucket('hadoop-dados-brutos')
blobs = list(bucket.list_blobs())

display(blobs)

[<Blob: hadoop-dados-brutos, anp-combustiveis-automotivos/, 1693781005419814>,
 <Blob: hadoop-dados-brutos, anp-combustiveis-automotivos/ca-2004-01.csv, 1693781239357851>,
 <Blob: hadoop-dados-brutos, anp-combustiveis-automotivos/ca-2004-02.csv, 1693781397853813>,
 <Blob: hadoop-dados-brutos, anp-combustiveis-automotivos/ca-2005-01.csv, 1693781399574637>,
 <Blob: hadoop-dados-brutos, anp-combustiveis-automotivos/ca-2005-02.csv, 1693781397708461>,
 <Blob: hadoop-dados-brutos, anp-combustiveis-automotivos/ca-2006-01.csv, 1693781398243286>,
 <Blob: hadoop-dados-brutos, anp-combustiveis-automotivos/ca-2006-02.csv, 1693781398142808>,
 <Blob: hadoop-dados-brutos, anp-combustiveis-automotivos/ca-2007-01.csv, 1693781395022902>,
 <Blob: hadoop-dados-brutos, anp-combustiveis-automotivos/ca-2007-02.csv, 1693781398311389>,
 <Blob: hadoop-dados-brutos, anp-combustiveis-automotivos/ca-2008-01.csv, 1693781397510159>,
 <Blob: hadoop-dados-brutos, anp-combustiveis-automotivos/ca-2008-02.csv, 1693781398

In [5]:
import re
import os

def download_blob(blob):
    file_path = f'../work/{blob.name}'
        
    # Cria a pasta se não existir
    dir_name = os.path.dirname(file_path)
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)
        
    # Download
    blob.download_to_filename(file_path)    

for blob in blobs:
    # Se arquivo CSV
    if re.search(r'.csv$', blob.name):
        print(f'Downloading {blob.name}')
        download_blob(blob)


Downloading anp-combustiveis-automotivos/ca-2004-01.csv
Downloading anp-combustiveis-automotivos/ca-2004-02.csv
Downloading anp-combustiveis-automotivos/ca-2005-01.csv
Downloading anp-combustiveis-automotivos/ca-2005-02.csv
Downloading anp-combustiveis-automotivos/ca-2006-01.csv
Downloading anp-combustiveis-automotivos/ca-2006-02.csv
Downloading anp-combustiveis-automotivos/ca-2007-01.csv
Downloading anp-combustiveis-automotivos/ca-2007-02.csv
Downloading anp-combustiveis-automotivos/ca-2008-01.csv
Downloading anp-combustiveis-automotivos/ca-2008-02.csv
Downloading anp-combustiveis-automotivos/ca-2009-01.csv
Downloading anp-combustiveis-automotivos/ca-2009-02.csv
Downloading anp-combustiveis-automotivos/ca-2010-01.csv
Downloading anp-combustiveis-automotivos/ca-2010-02.csv
Downloading anp-combustiveis-automotivos/ca-2011-01.csv
Downloading anp-combustiveis-automotivos/ca-2011-02.csv
Downloading anp-combustiveis-automotivos/ca-2012-01.csv
Downloading anp-combustiveis-automotivos/ca-2012

## Iniciando a base de dados com PySpark

Inicializamos uma sessão do Spark com suporte ao Hive para gerenciamento de metadados das bases de dados.

In [14]:
from pyspark.sql import SparkSession

spark = ( SparkSession
    .builder
    .appName("analise-dados")
    .enableHiveSupport()
    .getOrCreate() )

23/10/12 21:52:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [16]:
spark.catalog.listDatabases()

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
                                                                                

[Database(name='default', description='Default Hive database', locationUri='hdfs://cluster-anp-spark-m/user/hive/warehouse')]

In [24]:
def create_database(name, comment):
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {name} COMMENT '{comment}'")
    return spark.sql(f"DESCRIBE DATABASE {name}")

metadata = create_database("precos_anp", "Base de dados de preços de combustíveis fornecidos pela ANP e alguns indicadores de mercado")
metadata.show()

+--------------+--------------------+
|     info_name|          info_value|
+--------------+--------------------+
|Namespace Name|          precos_anp|
|       Comment|Base de dados de ...|
|      Location|file:/spark-wareh...|
|         Owner|                root|
+--------------+--------------------+



## Ingestão

Definimos o schema para leitura dos arquivos CSV.

In [1]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, DecimalType

schema = StructType([
    StructField("Regiao", StringType(), True),
    StructField("Estado", StringType(), True),
    StructField("Municipio", StringType(), True),
    StructField("Revenda", StringType(), True),
    StructField("CNPJ da Revenda", StringType(), True),
    StructField("Nome da Rua", StringType(), True),
    StructField("Complemento", StringType(), True),
    StructField("Bairro", StringType(), True),
    StructField("CEP", StringType(), True),
    StructField("Produto", StringType(), True),
    StructField("Data da Coleta", DateType(), True),
    StructField("Valor de Venda", DecimalType(10, 4), True),
    StructField("Valor de Compra", DecimalType(10, 4), True),
    StructField("Unidade de Medida", StringType(), True),
    StructField("Bandeira", StringType(), True)
])

df = spark.read.csv("file:///work/anp-combustiveis-automotivos/ca-2004-01.csv", header=True, schema=schema, sep=';', multiLine=True, quote='"', locale='pt-BR')
df.count()

23/10/12 22:41:12 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (cluster-anp-spark-w-0.c.seventh-abacus-395221.internal executor 2): java.io.FileNotFoundException: 
File file:/work/anp-combustiveis-automotivos/ca-2004-01.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:660)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Itera

Py4JJavaError: An error occurred while calling o89.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (cluster-anp-spark-w-1.c.seventh-abacus-395221.internal executor 1): java.io.FileNotFoundException: 
File file:/work/anp-combustiveis-automotivos/ca-2004-01.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:660)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2717)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2653)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2652)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2652)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1189)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2913)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2855)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2844)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: 
File file:/work/anp-combustiveis-automotivos/ca-2004-01.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:660)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [11]:
df.show()

+------+------+---------+-------+---------------+-----------+-----------+------+---+-------+--------------+--------------+---------------+-----------------+--------+
|Regiao|Estado|Municipio|Revenda|CNPJ da Revenda|Nome da Rua|Complemento|Bairro|CEP|Produto|Data da Coleta|Valor de Venda|Valor de Compra|Unidade de Medida|Bandeira|
+------+------+---------+-------+---------------+-----------+-----------+------+---+-------+--------------+--------------+---------------+-----------------+--------+
+------+------+---------+-------+---------------+-----------+-----------+------+---+-------+--------------+--------------+---------------+-----------------+--------+

