In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [2]:
spark = (
    SparkSession
    .builder
    .appName("combustiveis_app")
    .enableHiveSupport()
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

In [3]:
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.auth.service.account.enable", "true")
conf.set("fs.gs.auth.service.account.json.keyfile", "/mnt/secrets/key.json")
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

In [4]:
df_landing = (
    spark
    .read
    .format("csv")
    .options(header='true', inferSchema='true', delimiter=';')
    .load("gs://lading-zone-299792458/combustiveis/ca-2016-01.csv")
)

In [5]:
df_landing.printSchema()

root
 |-- Regiao - Sigla: string (nullable = true)
 |-- Estado - Sigla: string (nullable = true)
 |-- Municipio: string (nullable = true)
 |-- Revenda: string (nullable = true)
 |-- CNPJ da Revenda: string (nullable = true)
 |-- Nome da Rua: string (nullable = true)
 |-- Numero Rua: string (nullable = true)
 |-- Complemento: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cep: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Data da Coleta: string (nullable = true)
 |-- Valor de Venda: string (nullable = true)
 |-- Valor de Compra: string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)
 |-- Bandeira: string (nullable = true)



In [6]:
df_landing.count()

486897

In [7]:
df_silver = (
    spark
    .read
    .format("parquet")
    .load("gs://silver-zone-299792458/combustiveis/")
)

In [8]:
df_silver.printSchema()

root
 |-- regiao_sigla: string (nullable = true)
 |-- estado_sigla: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- revenda: string (nullable = true)
 |-- cnpj_da_revenda: string (nullable = true)
 |-- nome_da_rua: string (nullable = true)
 |-- numero_rua: string (nullable = true)
 |-- complemento: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: string (nullable = true)
 |-- produto: string (nullable = true)
 |-- data_da_coleta: string (nullable = true)
 |-- valor_de_venda: double (nullable = true)
 |-- valor_de_compra: double (nullable = true)
 |-- unidade_de_medida: string (nullable = true)
 |-- bandeira: string (nullable = true)
 |-- file_name: string (nullable = true)



In [9]:
df_silver.count()

7773224

In [13]:
df_silver_2016_01 =  df_silver.where(f.col('file_name') == 'gs://lading-zone-299792458/combustiveis/ca-2016-01.csv')

In [14]:
(
    df_silver_2016_01
    .select("estado_sigla", "valor_de_venda", "valor_de_compra")
    .groupBy('estado_sigla')
    .mean().show(30)
)

+------------+-------------------+--------------------+
|estado_sigla|avg(valor_de_venda)|avg(valor_de_compra)|
+------------+-------------------+--------------------+
|          SC| 3.1772432326786446|  2.8415160185967934|
|          RO| 3.5485229295003644|  3.0588587769784157|
|          PI| 3.3867126962051417|  3.0005756143667313|
|          AM|  3.577786058746206|  3.0805091746248303|
|          RR|  3.530304709141284|   3.081691703056768|
|          GO| 3.2157576949657165|   2.844709628832706|
|          TO|  3.351466737910748|  2.9310575757575763|
|          MT| 3.2795786468247186|  2.8502949095022547|
|          SP|  3.013359033221733|  2.6216841530054666|
|          ES|   3.28502738192192|   2.939781697546312|
|          PB| 3.2211107182732133|  2.8783362050924355|
|          RS| 3.4500900888992065|   2.987821485010469|
|          MS|  3.307321733333332|   2.870162062566282|
|          AL|  3.339623248293248|   2.949637021276589|
|          MG| 3.1952225643738212|  2.8136022768