# Operacoes basicas com Spark DataFrames - Parte 1

* Trabalharemos com os dados do IPCA do Brasil.

In [0]:
import pyspark.sql.functions as F

**Carregando os dados:**

In [0]:
df = spark.table('ipca_csv')
# Comando para verificar apenas as 3 primeiras linhas
df.show(3)

+-------------------+--------------------+-------------+-------------+------------------+-------------+
|           DateTime|meta_para_a_inflacao|IPCA_ocorrido|limite_maximo|focus_mais_recente|limite_minimo|
+-------------------+--------------------+-------------+-------------+------------------+-------------+
|2012-07-01 00:00:00|                 4.5|          5.2|          6.5|              null|          2.5|
|2012-08-01 00:00:00|                 4.5|         5.24|          6.5|              null|          2.5|
|2012-09-01 00:00:00|                 4.5|         5.28|          6.5|              null|          2.5|
+-------------------+--------------------+-------------+-------------+------------------+-------------+
only showing top 3 rows



In [0]:
df.printSchema()

root
 |-- DateTime: timestamp (nullable = true)
 |-- meta_para_a_inflacao: double (nullable = true)
 |-- IPCA_ocorrido: double (nullable = true)
 |-- limite_maximo: double (nullable = true)
 |-- focus_mais_recente: double (nullable = true)
 |-- limite_minimo: double (nullable = true)



### Filtrando dados
`.filter()`: Para filtrar os dados do dataframe que correspondam a um condicional (booleano)

In [0]:
df.filter(df['IPCA_ocorrido'] < 6).show()

+-------------------+--------------------+-------------+-------------+------------------+-------------+
|           DateTime|meta_para_a_inflacao|IPCA_ocorrido|limite_maximo|focus_mais_recente|limite_minimo|
+-------------------+--------------------+-------------+-------------+------------------+-------------+
|2012-07-01 00:00:00|                 4.5|          5.2|          6.5|              null|          2.5|
|2012-08-01 00:00:00|                 4.5|         5.24|          6.5|              null|          2.5|
|2012-09-01 00:00:00|                 4.5|         5.28|          6.5|              null|          2.5|
|2012-10-01 00:00:00|                 4.5|         5.45|          6.5|              null|          2.5|
|2012-11-01 00:00:00|                 4.5|         5.53|          6.5|              null|          2.5|
|2012-12-01 00:00:00|                 4.5|         5.84|          6.5|              null|          2.5|
|2013-09-01 00:00:00|                 4.5|         5.86|        

**Construindo filtros mais complexo**

* `&` -> E

* `|` -> OU

In [0]:
df.filter(df['IPCA_ocorrido'] < df['meta_para_a_inflacao'])\
    .select('DateTime', 'meta_para_a_inflacao','IPCA_ocorrido')\
    .withColumn('dif_meta_ipca_ocorrido', round(df['meta_para_a_inflacao'] - df['IPCA_ocorrido'],2)).show()

+-------------------+--------------------+-------------+----------------------+
|           DateTime|meta_para_a_inflacao|IPCA_ocorrido|dif_meta_ipca_ocorrido|
+-------------------+--------------------+-------------+----------------------+
|2017-04-01 00:00:00|                 4.5|         4.08|                  0.42|
|2017-05-01 00:00:00|                 4.5|          3.6|                   0.9|
|2017-06-01 00:00:00|                 4.5|          3.0|                   1.5|
|2017-07-01 00:00:00|                 4.5|         2.71|                  1.79|
|2017-08-01 00:00:00|                 4.5|         2.46|                  2.04|
|2017-09-01 00:00:00|                 4.5|         2.54|                  1.96|
|2017-10-01 00:00:00|                 4.5|          2.7|                   1.8|
|2017-11-01 00:00:00|                 4.5|          2.8|                   1.7|
|2017-12-01 00:00:00|                 4.5|         2.95|                  1.55|
|2018-01-01 00:00:00|                 4.

Fazendo a leitura do código acima:
*  `.filter()` para buscar nos registros onde o IPCA foi menor que a meta mensal.
*  `.select()` para selecionar quais atributos da serão apresentados no resultado.
* `.withColumn()` para criar um novo atributo (que só existirá no resultado da pesquisa), primeiro parametro para dar nome ao atributo e segundo parametro inserir os valores.

In [0]:
df.filter((df['IPCA_ocorrido'] < 7) | (df['limite_maximo'] <= 6)).show()

+-------------------+--------------------+-------------+-------------+------------------+-------------+
|           DateTime|meta_para_a_inflacao|IPCA_ocorrido|limite_maximo|focus_mais_recente|limite_minimo|
+-------------------+--------------------+-------------+-------------+------------------+-------------+
|2012-07-01 00:00:00|                 4.5|          5.2|          6.5|              null|          2.5|
|2012-08-01 00:00:00|                 4.5|         5.24|          6.5|              null|          2.5|
|2012-09-01 00:00:00|                 4.5|         5.28|          6.5|              null|          2.5|
|2012-10-01 00:00:00|                 4.5|         5.45|          6.5|              null|          2.5|
|2012-11-01 00:00:00|                 4.5|         5.53|          6.5|              null|          2.5|
|2012-12-01 00:00:00|                 4.5|         5.84|          6.5|              null|          2.5|
|2013-01-01 00:00:00|                 4.5|         6.15|        

### GroupBy e Operacoes de agregacao
* Os metodos de agrupamento sao muito similares aos ja estudados no Pandas e SQL
* Vamos utilizar o dataset vgsales sobre venda de video games

In [0]:
df_vg = spark.table("vgsales")
df_vg.show(5)

+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports| Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing| Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports| Nintendo|   15.75|   11.01|    3.28|       2.96|        33.0|
|   5|Pokemon Red/Pokem...|      GB|1996|Role-Playing| Nintendo|   11.27|    8.89|   10.22|        1.0|       31.37|
+----+--------------------+--------+----+------------+---------+

* Algumas perguntas que podemos fazer ao dataframe:
  * Qual a media de vendas por Genero?
  * Qual o valor maximo de vendas por Plataforma?
  * Quantos jogos foram lancados para diferentes generos?

In [0]:
# Calculando a media de vendas por Genero

# Sem usar as funcoes
df_vg.groupBy("Genre").mean().select("Genre", "avg(Global_Sales)").show()

# Usando as funcoes
df_vg.groupBy("Genre").agg(F.mean("Global_Sales").alias("avg_global_sales")).show()

+------------+-------------------+
|       Genre|  avg(Global_Sales)|
+------------+-------------------+
|   Adventure|0.18587869362364026|
|      Sports|  0.567318840579705|
|      Racing| 0.5861008807045601|
|Role-Playing| 0.6232325268817165|
|     Shooter| 0.7918854961831986|
|        Misc| 0.4657619321449072|
|    Platform| 0.9383408577878074|
|      Puzzle|0.42087628865979465|
|    Fighting|  0.529374999999999|
|      Action| 0.5281001206272524|
|    Strategy| 0.2571512481644646|
|  Simulation| 0.4523644752018429|
+------------+-------------------+



In [0]:
# Calculando o valor maximo de vendas por plataforma
# Utilizando o orderBy para ordernar em modo decrescente.
df_vg.groupBy("Platform").max().select("Platform", "max(Global_Sales)").orderBy(F.col("max(Global_Sales)").desc()).show()

+--------+-----------------+
|Platform|max(Global_Sales)|
+--------+-----------------+
|     Wii|            82.74|
|     NES|            40.24|
|      GB|            31.37|
|      DS|            30.01|
|    X360|            21.82|
|     PS3|             21.4|
|     PS2|            20.81|
|    SNES|            20.61|
|     GBA|            15.85|
|     3DS|            14.35|
|     PS4|            14.24|
|     N64|            11.89|
|      PS|            10.95|
|      XB|             8.49|
|      PC|             8.11|
|    2600|             7.81|
|     PSP|             7.72|
|    XOne|              7.3|
|      GC|             7.07|
|    WiiU|             6.96|
+--------+-----------------+
only showing top 20 rows



In [0]:
# Fazendo verificoes com SQL
df_vg.createOrReplaceTempView('tabela_vg')

spark.sql("SELECT Platform, max(Global_Sales) AS max_global_sales\
           FROM tabela_vg\
           GROUP BY Platform\
           ORDER BY max_global_sales DESC;").show()

+--------+----------------+
|Platform|max_global_sales|
+--------+----------------+
|     Wii|           82.74|
|     NES|           40.24|
|      GB|           31.37|
|      DS|           30.01|
|    X360|           21.82|
|     PS3|            21.4|
|     PS2|           20.81|
|    SNES|           20.61|
|     GBA|           15.85|
|     3DS|           14.35|
|     PS4|           14.24|
|     N64|           11.89|
|      PS|           10.95|
|      XB|            8.49|
|      PC|            8.11|
|    2600|            7.81|
|     PSP|            7.72|
|    XOne|             7.3|
|      GC|            7.07|
|    WiiU|            6.96|
+--------+----------------+
only showing top 20 rows



In [0]:
# Verificando quais generos possuem mais jogos lancados.
df_vg.groupBy("Genre").count().orderBy(F.col("count").desc()).show()

+------------+-----+
|       Genre|count|
+------------+-----+
|      Action| 3316|
|      Sports| 2346|
|        Misc| 1739|
|Role-Playing| 1488|
|     Shooter| 1310|
|   Adventure| 1286|
|      Racing| 1249|
|    Platform|  886|
|  Simulation|  867|
|    Fighting|  848|
|    Strategy|  681|
|      Puzzle|  582|
+------------+-----+

