# Aula 2 - Uso de RDD e DataFrame

Os RDDs foram a abstração de dados original do Spark, embora versões mais recentes do Spark tenham introduzido abstrações mais avançadas, como DataFrames e Datasets, que são mais eficientes e fáceis de usar em muitos casos. No entanto, os RDDs ainda são importantes para entender a base do processamento de dados no Spark e são úteis para operações de baixo nível e personalizadas que não são facilmente realizáveis com DataFrames ou Datasets.

In [37]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.functions import col, lit, when

Para usar RDDs além do Spark Session precisamos também do Spark Context

In [38]:
sc = SparkContext.getOrCreate()
spark = SparkSession.builder \
      .master("local[*]") \
      .appName("postech") \
      .getOrCreate()

## Criando Dataframe a partir de um RDD

In [39]:
rdd = sc.parallelize(
    [
        ('C', 85, 76, 87, 91),
        ('A', 56, 78, 88, 89),
        ('B', 85, 76, 98, 87),
        ('A', 56, 78, 78, 89)
    ], 4
)

In [40]:
print(type(rdd))

<class 'pyspark.rdd.RDD'>


Agora vamos criar o dataframe a partir do RDD existente

In [41]:
sub = ['id_person','value_1','value_2','value_3', 'value_4']
df = spark.createDataFrame(rdd, schema=sub)

In [42]:
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [43]:
df.printSchema()

root
 |-- id_person: string (nullable = true)
 |-- value_1: long (nullable = true)
 |-- value_2: long (nullable = true)
 |-- value_3: long (nullable = true)
 |-- value_4: long (nullable = true)



In [44]:
df.show()

+---------+-------+-------+-------+-------+
|id_person|value_1|value_2|value_3|value_4|
+---------+-------+-------+-------+-------+
|        C|     85|     76|     87|     91|
|        A|     56|     78|     88|     89|
|        B|     85|     76|     98|     87|
|        A|     56|     78|     78|     89|
+---------+-------+-------+-------+-------+



## Operações básicas com Dataframe

Vamos importa um dataframe existente

In [45]:
df = spark.read.csv('data/cereal.csv', sep = ',', inferSchema=True, header=True)
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- mfr: string (nullable = true)
 |-- type: string (nullable = true)
 |-- calories: integer (nullable = true)
 |-- protein: integer (nullable = true)
 |-- fat: integer (nullable = true)
 |-- sodium: integer (nullable = true)
 |-- fiber: double (nullable = true)
 |-- carbo: double (nullable = true)
 |-- sugars: integer (nullable = true)
 |-- potass: integer (nullable = true)
 |-- vitamins: integer (nullable = true)
 |-- shelf: integer (nullable = true)
 |-- weight: double (nullable = true)
 |-- cups: double (nullable = true)
 |-- rating: double (nullable = true)



### Select()

Comando utilizado para selecionar colunas do Dataframe

In [46]:
df.select('name', 'mfr', 'type').show()

+--------------------+---+----+
|                name|mfr|type|
+--------------------+---+----+
|           100% Bran|  N|   C|
|   100% Natural Bran|  Q|   C|
|            All-Bran|  K|   C|
|All-Bran with Ext...|  K|   C|
|      Almond Delight|  R|   C|
|Apple Cinnamon Ch...|  G|   C|
|         Apple Jacks|  K|   C|
|             Basic 4|  G|   C|
|           Bran Chex|  R|   C|
|         Bran Flakes|  P|   C|
|        Cap'n'Crunch|  Q|   C|
|            Cheerios|  G|   C|
|Cinnamon Toast Cr...|  G|   C|
|            Clusters|  G|   C|
|         Cocoa Puffs|  G|   C|
|           Corn Chex|  R|   C|
|         Corn Flakes|  K|   C|
|           Corn Pops|  K|   C|
|       Count Chocula|  G|   C|
|  Cracklin' Oat Bran|  K|   C|
+--------------------+---+----+
only showing top 20 rows



### withColumn()

Comando utilizado para criar/alterar colunas no Dataframe

In [47]:
(df
 .withColumn('CALORIES', col('calories').cast('Integer'))
 .withColumn('Nova_Coluna', lit('Valor Fixo'))
).printSchema()

root
 |-- name: string (nullable = true)
 |-- mfr: string (nullable = true)
 |-- type: string (nullable = true)
 |-- CALORIES: integer (nullable = true)
 |-- protein: integer (nullable = true)
 |-- fat: integer (nullable = true)
 |-- sodium: integer (nullable = true)
 |-- fiber: double (nullable = true)
 |-- carbo: double (nullable = true)
 |-- sugars: integer (nullable = true)
 |-- potass: integer (nullable = true)
 |-- vitamins: integer (nullable = true)
 |-- shelf: integer (nullable = true)
 |-- weight: double (nullable = true)
 |-- cups: double (nullable = true)
 |-- rating: double (nullable = true)
 |-- Nova_Coluna: string (nullable = false)



### GroupBy()

Comando utilizado para agrupar valores

In [48]:
df.groupBy('mfr').count().show()

+---+-----+
|mfr|count|
+---+-----+
|  K|   23|
|  Q|    8|
|  A|    1|
|  N|    6|
|  R|    8|
|  G|   22|
|  P|    9|
+---+-----+



### orderBy()

Comando para ordenação dos dados

In [49]:
df.orderBy('calories', ascending=False).show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|Mueslix Crispy Blend|  K|   C|     160|      3|  2|   150|  3.0| 17.0|    13|   160|      25|    3|   1.5|0.67|30.313351|
|Muesli Raisins; D...|  R|   C|     150|      4|  3|    95|  3.0| 16.0|    11|   170|      25|    3|   1.0| 1.0|37.136863|
|Muesli Raisins; P...|  R|   C|     150|      4|  3|   150|  3.0| 16.0|    11|   170|      25|    3|   1.0| 1.0|34.139765|
|Just Right Fruit ...|  K|   C|     140|      3|  1|   170|  2.0| 20.0|     9|    95|     100|    3|   1.3|0.75|36.471512|
|Nutri-Grain Almon...|  K|   C|     140|      3|  2|   220|  3.0| 21.0|     7|   130|      25|    3|  1.33|0.67| 40.69232|
|   Total Raisin

### Case When

O comando when pode ser usado para definir condições dentro de alguns comandos como Select

In [50]:
df.select(
    col('*'),
    when(col('vitamins') >= 25, 'Rico em Vitamínas')
        .otherwise('Baixo em Vitamínas')
    .alias('calories_rating')
).show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+------------------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|   calories_rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+------------------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973| Rico em Vitamínas|
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|Baixo em Vitamínas|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505| Rico em Vitamínas|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912| Rico em Vitamínas|
|     

Podemos usar o groupBy para contar quantos resultados tivemos

In [51]:
df.select(
    col('*'),
    when(col('vitamins') >= 25, 'Rico em Vitamínas')
        .otherwise('Baixo em Vitamínas')
    .alias('calories_rating')
).groupBy('calories_rating').count().show()

+------------------+-----+
|   calories_rating|count|
+------------------+-----+
| Rico em Vitamínas|   69|
|Baixo em Vitamínas|    8|
+------------------+-----+



### filter()

Comando usado para filtrar linhas

In [52]:
df.filter(col('calories') < 100).show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|           Bran Chex|  R|   C|      90|      2|  1|   200|  4.0| 15.0|     6|   125|      25|    1|   1.0|0.67|49.120253|
|         Bran Flakes|  P|   C|      90|      3|  0|   210|  5.0| 13.0|     5|   190|      25|    3|   1.0|0.67|53.313813|
|   Nutri-grain 

### isnull() / isnotnull()

Comandos para verificar se as linhas são nulas ou não

In [53]:
df.filter(col('name').isNotNull()).show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    -1|      25|    3|   1.0|0.75|34.384843|
|Apple Cinnamon 

In [54]:
df.filter(col('name').isNull()).show()

+----+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+------+
|name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|rating|
+----+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+------+
+----+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+------+



Podemos contar quantas linhas retornaram valores Nulos (ou não)

In [55]:
df.filter(col('name').isNull()).count()

0

In [56]:
df.filter(col('name').isNotNull()).count()

77