In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F



In [2]:
spark = ( 
 SparkSession
 .builder
    .master("local[*]")
 .appName('spark_dataframe_api')
 .getOrCreate()
)


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/13 14:05:08 WARN Utils: Your hostname, DESKTOP-BBCLCU0, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/09/13 14:05:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/13 14:05:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/13 14:05:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [28]:

df = (
    spark
    .read
    .option('delimiter', ';')
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('enconding', 'ISO-8859-1')
    .csv('./dados/precos-gasolina-etanol-07.csv')
)

df.printSchema()

root
 |-- Regiao - Sigla: string (nullable = true)
 |-- Estado - Sigla: string (nullable = true)
 |-- Municipio: string (nullable = true)
 |-- Revenda: string (nullable = true)
 |-- CNPJ da Revenda: string (nullable = true)
 |-- Nome da Rua: string (nullable = true)
 |-- Numero Rua: string (nullable = true)
 |-- Complemento: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cep: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Data da Coleta: string (nullable = true)
 |-- Valor de Venda: string (nullable = true)
 |-- Valor de Compra: string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)
 |-- Bandeira: string (nullable = true)



In [78]:
df_precos = (
    df
    .select('Estado - Sigla', 'Produto', 'Valor de Compra', 'Valor de Venda', 'Unidade de Medida', 'Regiao - Sigla')
    .withColumn(
        "Valor de Venda",
        F.regexp_replace(F.col("Valor de Venda"), ",", ".")
        .cast("float")
    )
)

In [79]:
df_precos.show(5)

+--------------+------------------+---------------+--------------+-----------------+--------------+
|Estado - Sigla|           Produto|Valor de Compra|Valor de Venda|Unidade de Medida|Regiao - Sigla|
+--------------+------------------+---------------+--------------+-----------------+--------------+
|            AL|          GASOLINA|           NULL|          6.15|       R$ / litro|            NE|
|            AL|            ETANOL|           NULL|          4.79|       R$ / litro|            NE|
|            AL|          GASOLINA|           NULL|          5.89|       R$ / litro|            NE|
|            AL|GASOLINA ADITIVADA|           NULL|          5.99|       R$ / litro|            NE|
|            AL|            ETANOL|           NULL|          4.98|       R$ / litro|            NE|
+--------------+------------------+---------------+--------------+-----------------+--------------+
only showing top 5 rows


In [80]:
### Formatando os dados

In [92]:
df_precos = (
    df
    .select('Estado - Sigla', 'Produto', 'Valor de Venda', 'Unidade de Medida', 'Regiao - Sigla')
    .withColumn(
        "Valor de Venda",
        F.regexp_replace(F.col("Valor de Venda"), ",", ".")
        .cast("float")
    )
)

In [93]:
### Coferindo se o valor de compra possui dados

In [94]:
(
    df_precos
    .where(
        F.col('Valor de Compra').isNotNull()
    )
    .show()
)

+--------------+-------+--------------+-----------------+--------------+
|Estado - Sigla|Produto|Valor de Venda|Unidade de Medida|Regiao - Sigla|
+--------------+-------+--------------+-----------------+--------------+
+--------------+-------+--------------+-----------------+--------------+



In [95]:
df_precos.show(5)

+--------------+------------------+--------------+-----------------+--------------+
|Estado - Sigla|           Produto|Valor de Venda|Unidade de Medida|Regiao - Sigla|
+--------------+------------------+--------------+-----------------+--------------+
|            AL|          GASOLINA|          6.15|       R$ / litro|            NE|
|            AL|            ETANOL|          4.79|       R$ / litro|            NE|
|            AL|          GASOLINA|          5.89|       R$ / litro|            NE|
|            AL|GASOLINA ADITIVADA|          5.99|       R$ / litro|            NE|
|            AL|            ETANOL|          4.98|       R$ / litro|            NE|
+--------------+------------------+--------------+-----------------+--------------+
only showing top 5 rows


In [96]:
### Analisando a média, o menor, o maior, a mediana, o desvio padrão de valor de venda por estado, produt e unidade de medida

In [103]:
df_precos_analise = (
    df_precos
    .groupBy(
        F.col('Estado - Sigla'),
        F.col('Regiao - Sigla'),
        F.col('Produto'),
        F.col('Unidade de Medida')
    )
    .agg(
        F.round(F.min("Valor de Venda"), 2).alias('menor_valor'),
        F.round(F.max("Valor de Venda"), 2).alias('maior_valor'),
        F.round(F.expr("percentile_approx(`Valor de Venda`, 0.5)"), 2).alias('mediana'),
        F.round(F.avg("Valor de Venda"), 2).alias('media'),
        F.round(F.stddev("Valor de Venda"), 2).alias('desvio_padrao')
    )
    .withColumn(
        "diferenca",
        F.round(F.col('maior_valor') -  F.col('menor_valor'),2)
    )
    .orderBy('diferenca', ascending=False)
)

In [104]:
df_precos_analise.show(10)

+--------------+--------------+------------------+-----------------+-----------+-----------+-------+-----+-------------+---------+
|Estado - Sigla|Regiao - Sigla|           Produto|Unidade de Medida|menor_valor|maior_valor|mediana|media|desvio_padrao|diferenca|
+--------------+--------------+------------------+-----------------+-----------+-----------+-------+-----+-------------+---------+
|            SP|            SE|GASOLINA ADITIVADA|       R$ / litro|       5.22|       9.39|   6.29| 6.35|         0.54|     4.17|
|            SP|            SE|          GASOLINA|       R$ / litro|       4.99|       8.99|   5.99| 6.06|         0.48|      4.0|
|            SP|            SE|            ETANOL|       R$ / litro|       3.09|       5.99|   3.96| 3.98|         0.36|      2.9|
|            BA|            NE|GASOLINA ADITIVADA|       R$ / litro|       5.49|       7.99|   6.46|  6.4|         0.34|      2.5|
|            RJ|            SE|GASOLINA ADITIVADA|       R$ / litro|       5.49|   

In [105]:
### Analisando os estados com a maior média de preço da gasolina mais cara do país

In [111]:
df_ranking_gasolina = (
    df_precos_analise
    .filter(F.col("Produto") == "GASOLINA")
    .select("Estado - Sigla", "Produto", "media")
    .orderBy(F.desc("media"))
)

df_ranking_gasolina.show()



+--------------+--------+-----+
|Estado - Sigla| Produto|media|
+--------------+--------+-----+
|            AC|GASOLINA| 7.76|
|            AM|GASOLINA| 7.25|
|            RR|GASOLINA| 6.95|
|            RO|GASOLINA| 6.84|
|            SE|GASOLINA| 6.61|
|            TO|GASOLINA| 6.47|
|            DF|GASOLINA| 6.44|
|            CE|GASOLINA| 6.42|
|            PR|GASOLINA|  6.4|
|            SC|GASOLINA| 6.38|
|            PA|GASOLINA| 6.37|
|            BA|GASOLINA| 6.31|
|            AL|GASOLINA|  6.3|
|            PE|GASOLINA| 6.29|
|            RS|GASOLINA|  6.2|
|            GO|GASOLINA| 6.19|
|            RN|GASOLINA| 6.19|
|            MS|GASOLINA| 6.15|
|            MA|GASOLINA| 6.14|
|            ES|GASOLINA| 6.14|
+--------------+--------+-----+
only showing top 20 rows


In [112]:
### Analisando as regiões com a maior média de preço da gasolina mais cara do país

In [114]:
df_ranking_gasolina_regiao = (
    df_precos_analise
    .filter(F.col("Produto") == "GASOLINA")
    .groupBy("Regiao - Sigla")
    .agg(F.round(F.avg("media"), 2).alias("preco_medio"))
    .orderBy(F.desc("preco_medio"))
)

df_ranking_gasolina_regiao.show()


+--------------+-----------+
|Regiao - Sigla|preco_medio|
+--------------+-----------+
|             N|       6.81|
|             S|       6.33|
|            NE|       6.24|
|            CO|       6.22|
|            SE|       6.11|
+--------------+-----------+



In [119]:
pivot = (
    df_precos_analise.groupBy("Estado - Sigla")
    .pivot("Produto") 
    .agg(F.first("media"))
)

pivot.show(27)


+--------------+------+--------+------------------+
|Estado - Sigla|ETANOL|GASOLINA|GASOLINA ADITIVADA|
+--------------+------+--------+------------------+
|            SC|  4.73|    6.38|               6.5|
|            RO|  5.04|    6.84|              6.95|
|            PI|  4.64|     5.9|              6.12|
|            AM|  5.48|    7.25|              7.28|
|            RR|  5.16|    6.95|              7.06|
|            GO|  4.25|    6.19|              6.34|
|            TO|  4.74|    6.47|              6.54|
|            MT|   3.9|    6.11|              6.31|
|            SP|  3.98|    6.06|              6.35|
|            PB|  4.52|    5.99|              6.14|
|            ES|  4.47|    6.14|              6.27|
|            RS|  4.86|     6.2|              6.39|
|            MS|  4.01|    6.15|              6.34|
|            AL|  5.09|     6.3|              6.55|
|            MG|  4.24|    6.13|              6.35|
|            PA|  4.81|    6.37|              6.62|
|           