In [70]:
import pyspark.sql.functions as f
import pyspark.sql.types     as t
from pyspark.sql import SparkSession

In [71]:
spark = (
    SparkSession.builder
                    .config("spark.executor.memory", "4G")
                    .config("spark.sql.repl.eagerEval.enabled", True)
                    .config("spark.sql.repl.eagerEval.maxNumRows", 10)
                    .appName("how to use spark")
                    .getOrCreate()
)

In [72]:
spark

In [73]:
file_path_parquet = "./data-parquet/"
file_path_parquet_user = "./data-user-parquet/"
file_path_csv = "./data-csv/"

## Lendo Arquivos [Parquet]

In [74]:
file_path_parquet

'./data-parquet/'

In [75]:
df_parquet = (
    spark
    .read
    .format("parquet")
    .load(file_path_parquet)
)

In [76]:
df_parquet

Unnamed:0,char,dialog,movie
0,DEAGOL,Oh Smeagol Ive go...,The Return of the...
1,SMEAGOL,Pull it in! Go on...,The Return of the...
2,DEAGOL,Arrghh!,The Return of the...
3,SMEAGOL,Deagol!,The Return of the...
4,SMEAGOL,Deagol!,The Return of the...
5,SMEAGOL,Deagol!,The Return of the...
6,SMEAGOL,Give us that! Dea...,The Return of the...
7,DEAGOL,Why?,The Return of the...
8,SMEAGOL,"Because' , it's m...",The Return of the...
9,SMEAGOL,My precious.,The Return of the...


## Lendo Arquivos [CSV]

In [77]:
df_csv = spark.read.format("csv").option("header", True).option("inferSchema", True).load(file_path_csv)

In [78]:
df_csv.printSchema()

root
 |-- Unnamed:0: integer (nullable = true)
 |-- char: string (nullable = true)
 |-- dialog: string (nullable = true)
 |-- movie: string (nullable = true)



### Corrigindo a leitura do Dataframe em CSV

In [79]:
args_dict = {
    'header': True,
    'sep': ',',
    'encoding': 'utf-8',
    'inferSchema': True,
}

df_csv = spark.read.format("csv").options(**args_dict).load(file_path_csv)

In [80]:
df_csv

Unnamed:0,char,dialog,movie
0,DEAGOL,Oh Smeagol Ive go...,The Return of the...
1,SMEAGOL,Pull it in! Go on...,The Return of the...
2,DEAGOL,Arrghh!,The Return of the...
3,SMEAGOL,Deagol!,The Return of the...
4,SMEAGOL,Deagol!,The Return of the...
5,SMEAGOL,Deagol!,The Return of the...
6,SMEAGOL,Give us that! Dea...,The Return of the...
7,DEAGOL,Why?,The Return of the...
8,SMEAGOL,"Because' , it's m...",The Return of the...
9,SMEAGOL,My precious.,The Return of the...


## Filtros no Spark

Vamos utilizar a função `col()` para nos referirmos às colunas. Existem outras maneiras de fazermos isso, mas essa é a melhor forma. (justificativa adiante).

Operadores lógicos:
* e: &
* ou: |
* não: ~

Para fazer o filtro, pode ser utilizado tanto a função `filter()` como `where()`.

### Filtros com uma única condição

In [82]:
#Consulta no df_parquet
(
    df_parquet.filter(f.col("char") == "SMEAGOL")
    .select("char",	"movie")
)

char,movie
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...


In [83]:
#Consulta no df_csv
(
    df_csv.filter(f.col("char") == "SMEAGOL")
    .select("Unnamed:0", "char","dialog", "movie")
)

Unnamed:0,char,dialog,movie
1,SMEAGOL,Pull it in! Go on...,The Return of the...
3,SMEAGOL,Deagol!,The Return of the...
4,SMEAGOL,Deagol!,The Return of the...
5,SMEAGOL,Deagol!,The Return of the...
6,SMEAGOL,Give us that! Dea...,The Return of the...
8,SMEAGOL,"Because' , it's m...",The Return of the...
9,SMEAGOL,My precious.,The Return of the...
10,SMEAGOL,They cursed us,The Return of the...
12,SMEAGOL,'Murderer' they c...,The Return of the...
13,SMEAGOL,Gollum' Gollum' G...,The Return of the...


###  Filtros com duas ou mais condição

In [85]:
#Consulta no df_Parquet
(
    df_parquet.filter(
        (f.col('char') == "SMEAGOL") 
        & 
        (f.col('movie') == "The Return of the King")
    )
)

Unnamed:0,char,dialog,movie


In [None]:
#Consulta no df_csv
(
    df_csv.filter(
        (f.col('char') == "SMEAGOL") 
        & 
        (f.col('movie') == "The Return of the King")
    )   
)

###  Visualizando os Schemas das arquiteturas.

In [86]:
df_parquet.printSchema()
df_parquet.dtypes
df_parquet.schema


root
 |-- Unnamed:0: string (nullable = true)
 |-- char: string (nullable = true)
 |-- dialog: string (nullable = true)
 |-- movie: string (nullable = true)



StructType(List(StructField(Unnamed:0,StringType,true),StructField(char,StringType,true),StructField(dialog,StringType,true),StructField(movie,StringType,true)))

In [87]:
df_csv.printSchema()
df_csv.dtypes
df_csv.schema

root
 |-- Unnamed:0: integer (nullable = true)
 |-- char: string (nullable = true)
 |-- dialog: string (nullable = true)
 |-- movie: string (nullable = true)



StructType(List(StructField(Unnamed:0,IntegerType,true),StructField(char,StringType,true),StructField(dialog,StringType,true),StructField(movie,StringType,true)))

### Definindo um schema
Da maneira trabalhosa:

Da maneira simples e rápida:

In [88]:
cols = ["Unnamed:0", "char","dialog", "movie"]
schema = t.StructType([t.StructField(c, t.StringType()) for c in cols])
schema

StructType(List(StructField(Unnamed:0,StringType,true),StructField(char,StringType,true),StructField(dialog,StringType,true),StructField(movie,StringType,true)))

## Olhando para os dados

In [89]:
# Olhando para os dados no df_csv
df_csv.describe()

summary,Unnamed:0,char,dialog,movie
count,399.0,399,399,399
mean,99.70676691729324,,,
stddev,57.73089302418732,,,
min,0.0,(GOLLUM,", Go back...",The Return of the...
max,199.0,GANDALF,My precious.,The Return of the...


In [90]:
# Olhando para os dados no df_parquet
df_parquet.describe()

summary,Unnamed:0,char,dialog,movie
count,7569.0,7569,7566,7569
mean,1136.78795085216,,,
stddev,714.8498512362687,,,
min,0.0,(GOLLUM,,"""""Yes! That..."
max,999.0,GANDALF,You're a daughte...,The Two Towers


## Contagem de linhas 

In [91]:
# Contagem de linhas df_parquet
print('Número de Linhas: ', df_parquet.count(), '\n',
      'Número de Colunas: ', len(df_parquet.columns), '\n')
df_parquet.printSchema()

Número de Linhas:  7612 
 Número de Colunas:  4 

root
 |-- Unnamed:0: string (nullable = true)
 |-- char: string (nullable = true)
 |-- dialog: string (nullable = true)
 |-- movie: string (nullable = true)



In [92]:
# Contagem de Linhas no df_csv
print('Número de Linhas: ', df_csv.count(), '\n',
      'Número de Colunas: ', len(df_csv.columns), '\n')
df_csv.printSchema()

Número de Linhas:  399 
 Número de Colunas:  4 

root
 |-- Unnamed:0: integer (nullable = true)
 |-- char: string (nullable = true)
 |-- dialog: string (nullable = true)
 |-- movie: string (nullable = true)



# Manipulação dos Dados

O design do pyspark.sql, obviamente, é baseado no SQL para o processamento de dados estruturados. Assim, muitas funções tem nomes parecidos e/ou funcionam da mesma forma que seus análogos em sql puro. 

Inclusive, é possível operar em um DataFrame utilizando somente SQL

### Seleção de Colunas

In [93]:
# Select das Colunas no df_parquet
df_parquet.select("char","dialog")

char,dialog
DEAGOL,Oh Smeagol Ive go...
SMEAGOL,Pull it in! Go on...
DEAGOL,Arrghh!
SMEAGOL,Deagol!
SMEAGOL,Deagol!
SMEAGOL,Deagol!
SMEAGOL,Give us that! Dea...
DEAGOL,Why?
SMEAGOL,"Because' , it's m..."
SMEAGOL,My precious.


In [94]:
# Select das Colunas no df_csv
df_csv.select("Unnamed:0", "char","dialog", "movie")

Unnamed:0,char,dialog,movie
0,DEAGOL,Oh Smeagol Ive go...,The Return of the...
1,SMEAGOL,Pull it in! Go on...,The Return of the...
2,DEAGOL,Arrghh!,The Return of the...
3,SMEAGOL,Deagol!,The Return of the...
4,SMEAGOL,Deagol!,The Return of the...
5,SMEAGOL,Deagol!,The Return of the...
6,SMEAGOL,Give us that! Dea...,The Return of the...
7,DEAGOL,Why?,The Return of the...
8,SMEAGOL,"Because' , it's m...",The Return of the...
9,SMEAGOL,My precious.,The Return of the...


In [95]:
cols = ["Unnamed:0", "char","dialog", "movie"]
df_parquet.select(cols)

Unnamed:0,char,dialog,movie
0,DEAGOL,Oh Smeagol Ive go...,The Return of the...
1,SMEAGOL,Pull it in! Go on...,The Return of the...
2,DEAGOL,Arrghh!,The Return of the...
3,SMEAGOL,Deagol!,The Return of the...
4,SMEAGOL,Deagol!,The Return of the...
5,SMEAGOL,Deagol!,The Return of the...
6,SMEAGOL,Give us that! Dea...,The Return of the...
7,DEAGOL,Why?,The Return of the...
8,SMEAGOL,"Because' , it's m...",The Return of the...
9,SMEAGOL,My precious.,The Return of the...


In [56]:
cols = ["Unnamed:0", "char","dialog", "movie"]
df_csv.select(cols)

Unnamed:0,char,dialog,movie
0,DEAGOL,Oh Smeagol Ive go...,The Return of the...
1,SMEAGOL,Pull it in! Go on...,The Return of the...
2,DEAGOL,Arrghh!,The Return of the...
3,SMEAGOL,Deagol!,The Return of the...
4,SMEAGOL,Deagol!,The Return of the...
5,SMEAGOL,Deagol!,The Return of the...
6,SMEAGOL,Give us that! Dea...,The Return of the...
7,DEAGOL,Why?,The Return of the...
8,SMEAGOL,"Because' , it's m...",The Return of the...
9,SMEAGOL,My precious.,The Return of the...


In [96]:
cols = ["char", "movie"]
df_csv.select("Unnamed:0", *cols)

Unnamed:0,char,movie
0,DEAGOL,The Return of the...
1,SMEAGOL,The Return of the...
2,DEAGOL,The Return of the...
3,SMEAGOL,The Return of the...
4,SMEAGOL,The Return of the...
5,SMEAGOL,The Return of the...
6,SMEAGOL,The Return of the...
7,DEAGOL,The Return of the...
8,SMEAGOL,The Return of the...
9,SMEAGOL,The Return of the...


Observações:
* Podemos realizar operações sobre colunas selecionadas. 
* A ordem em que as colunas são selecionadas é a ordem em que elas vão ser inseridas no DataFrame resultante.

In [62]:
df_csv.select("movie", "char", f.lower('char').alias('char_lower'))

movie,char,char_lower
The Return of the...,DEAGOL,deagol
The Return of the...,SMEAGOL,smeagol
The Return of the...,DEAGOL,deagol
The Return of the...,SMEAGOL,smeagol
The Return of the...,SMEAGOL,smeagol
The Return of the...,SMEAGOL,smeagol
The Return of the...,SMEAGOL,smeagol
The Return of the...,DEAGOL,deagol
The Return of the...,SMEAGOL,smeagol
The Return of the...,SMEAGOL,smeagol


#### Selecionando valores distintos

In [98]:
df_parquet.select("char").distinct().show(n=df_parquet.count(), truncate=False)

+------------------------+
|char                    |
+------------------------+
|SOLDIERS ON GATE        |
|PEOPLE                  |
|CROWD                   |
|FRODO VOICE             |
|LEGOLAS                 |
|URUK HAI                |
|ELROND                  |
|HAMA                    |
|SARUMAN                 |
|SARUMAN VOICE OVER      |
|THEODEN                 |
| GANDALF                |
|SHARKU                  |
|EYE OF SAURON           |
|BOSON                   |
|MEN                     |
|HOBBIT                  |
|WILDMAN                 |
|GATEKEEPR               |
|GIMLI                   |
|GRIMBOLD                |
|BOROMIR                 |
|RING                    |
|ROHAN HORSEMAN          |
|SMEAGOL                 |
|OLD MAN                 |
|GALADRIEL VOICE OVER    |
|GENERAL SHOUT           |
|ROHIRRIM                |
|MRS BRACEGIRDLE         |
|ORC                     |
|EOMER                   |
|MAN                     |
|GALADRIL                |
|

In [67]:
df_csv.select("movie").distinct().show(n=df_csv.count(), truncate=False)

+-----------------------+
|movie                  |
+-----------------------+
|The Return of the King |
|The Return of the King |
+-----------------------+



#### Filtros Utilizando SQL

In [99]:
(
    df_parquet.filter("char = 'SMEAGOL'")
    .select("char", "movie")
)

char,movie
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...
SMEAGOL,The Return of the...


#### Observações
Quando nos referimos às colunas por meio da função `col()`, temos acesso à diversos métodos das colunas que podem ser utilizados para auxliar na filtragem do DataFrame. Alguns deles são:
* `isin()`: checa se a coluna contém os valores listados na função.
* `contains()`: utilizado para verificar se uma coluna de texto contém algum padrão especificado (não aceita regex). Aceita uma outra coluna de texto.
* `like()`: utilizado para verificar se uma coluna de texto contém algum padrão especificado (não aceita regex). Funciona de forma similar ao "LIKE" do SQL.
* `rlike()`: utilizado para verificar se uma coluna de texto contém algum padrão especificado (**aceita regex**). Funciona de forma similar ao "RLIKE" do SQL.
* `startswith()`: utilizado para verificar se uma coluna de texto começa com algum padrão especificado (**aceita regex**).
* `endswith()`: utilizado para verificar se uma coluna de texto termina com algum padrão especificado (**aceita regex**).
* `between()`: checa se os valores da coluna estão dentro do intervalo especificado. Os dois lados do intervalo são inclusivos.
* `isNull()`: retorna True se o valor da coluna é nulo
* `isNotNull()`: retorna True se o valor da coluna não é nulo

Outros métodos úteis:
* `alias()/name()`: usado para renomear as colunas em operações como select() e agg()
* `astype()/cast()`: usado para mudar o tipo das colunas. Aceita tanto um string como um tipo especificado pelo módulo pyspark.sql.types
* `substr()`: utilizado para cortar um string com base em índices dos caracteres 

In [107]:
(
    df_parquet.filter(f.col("char").like('%F%'))
    .select("char").show(truncate=False)
)

+-------+
|char   |
+-------+
|FRODO  |
|FRODO  |
|FRODO  |
|FRODO  |
|FRODO  |
|FRODO  |
|FRODO  |
|GANDALF|
|FRODO  |
|FRODO  |
|FRODO  |
|FRODO  |
|FRODO  |
|FRODO  |
|FRODO  |
|FRODO  |
|GANDALF|
|GANDALF|
|GANDALF|
|GANDALF|
+-------+
only showing top 20 rows



### Ordenando o DataFrame

A ordenação do DataFrame pode ser feita utilizando as funções `orderBy()` ou `sort()`. Algumas funções auxiliares importante para serem usadas ao ordenar:
* `asc()`: ordena a coluna de forma ascendente (default)
* `desc()`ordena a coluna de forma decrescente
* `asc_nulls_first() / desc_nulls_first()`: ordena a coluna de forma ascendente e decrescente, respectivamente, mantendo os campos nulos primeiro
* `asc_nulls_last() / desc_nulls_last()`: ordena a coluna de forma ascendente e decrescente, respectivamente, mantendo os campos nulos por último

In [108]:
df_parquet.select("char").distinct().orderBy('char').show(truncate=False)

+----------------+
|char            |
+----------------+
|null            |
|(GOLLUM         |
|ARAGORN         |
|ARAGORN         |
|ARGORN          |
|ARMY            |
|ARWEN           |
|ARWEN VOICEOVER |
|BARLIMAN        |
|BILBO           |
|BILBO VOICEOVER |
|BOROMIR         |
|BOROMIR         |
|BOSON           |
|CAPTAIN         |
|CHILDREN HOBBITS|
|CROWD           |
|DAMROD          |
|DEAGOL          |
|DENETHOR        |
+----------------+
only showing top 20 rows



### Renomeando Colunas

Para renomear colunas, é utilizada a função `withColumnRenamed()`, da seguinte forma:

```
df.withColumnRenamed(`nome_antigo`, 'nome_novo')
```

In [110]:
(
    df_parquet.select("Unnamed:0","char","dialog", "movie")
    .withColumnRenamed("Unnamed:0", "codigo")
    .withColumnRenamed("char", "personagem")
    .withColumnRenamed("dialog", "dialogo")
    .withColumnRenamed("movie", "filme")
)

codigo,personagem,dialogo,filme
0,DEAGOL,Oh Smeagol Ive go...,The Return of the...
1,SMEAGOL,Pull it in! Go on...,The Return of the...
2,DEAGOL,Arrghh!,The Return of the...
3,SMEAGOL,Deagol!,The Return of the...
4,SMEAGOL,Deagol!,The Return of the...
5,SMEAGOL,Deagol!,The Return of the...
6,SMEAGOL,Give us that! Dea...,The Return of the...
7,DEAGOL,Why?,The Return of the...
8,SMEAGOL,"Because' , it's m...",The Return of the...
9,SMEAGOL,My precious.,The Return of the...


### Usando SQL no Spark

In [111]:
spark.catalog.currentDatabase()

'default'

In [112]:
spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [113]:
spark.sql('create database mbaProjectLotr')

In [114]:
spark.sql('show databases').show()

+--------------+
|     namespace|
+--------------+
|       default|
|mbaprojectlotr|
+--------------+



In [115]:
spark.sql("use mbaProjectLotr")

In [116]:
spark.catalog.currentDatabase()

'mbaprojectlotr'

In [117]:
df_parquet.createOrReplaceTempView("tb_lotr")

In [118]:
spark.catalog.listTables('mbaProjectLotr')

[Table(name='tb_lotr', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [137]:
spark.sql("""
          SELECT 'Unnamed:0', char, dialog
          FROM tb_lotr
          ORDER BY char DESC
          """).show()

+---------+----------+--------------------+
|Unnamed:0|      char|              dialog|
+---------+----------+--------------------+
|Unnamed:0|   GANDALF|      Retreat! Th...|
|Unnamed:0|   GANDALF|      Retreat! Th...|
|Unnamed:0|   GANDALF|      Retreat! Th...|
|Unnamed:0|   GANDALF|      Retreat! Th...|
|Unnamed:0|   GANDALF|      Retreat! Th...|
|Unnamed:0|     WOMAN|They are breaking...|
|Unnamed:0|     WOMAN|They're past the ...|
|Unnamed:0|     WOMAN|They are breaking...|
|Unnamed:0|     WOMAN|        He's alive! |
|Unnamed:0|     WOMAN|        He's alive! |
|Unnamed:0|     WOMAN|They're past the ...|
|Unnamed:0|     WOMAN|They're past the ...|
|Unnamed:0|     WOMAN|        He's alive! |
|Unnamed:0|     WOMAN|They are breaking...|
|Unnamed:0|WITCH KING|  Do not come bet...|
|Unnamed:0|WITCH KING| I will break him.  |
|Unnamed:0|WITCH KING| I will break him.  |
|Unnamed:0|WITCH KING|    Do you not kn...|
|Unnamed:0|WITCH KING|    Do you not kn...|
|Unnamed:0|WITCH KING|Send forth

In [140]:
spark.sql("""
          SELECT count(char) as qtd_personagem, movie as filme
          FROM tb_lotr
          GROUP BY movie
          ORDER BY movie ASC
          """).show()

+--------------+--------------------+
|qtd_personagem|               filme|
+--------------+--------------------+
|             0|                null|
|             3|      ""Yes! That...|
|             3|       Lord of Moria|
|             3|            3'8""! "|
|          1518|The Fellowship of...|
|             2|The Return of the...|
|          3016|The Return of the...|
|          3024|     The Two Towers |
+--------------+--------------------+



### User Defined Functions (UDFs)

Em algumas situações é necessário criar/alterar uma coluna utilizando uma operação não implementada na biblioteca padrão. Para isso, é possível utilzar funções definidas pelo usuário (UDFs) por meio da função `udf()`.

**Importante**: As udfs não são otimizadas para serem executadas em paralelo, e por isso podem representar um gargalo na na aplicação.

In [43]:
def convert_santander_name(string):
    if string == "SANTANDER UK PLC":
        return "SANTANDER MBA UNIESP"
    else:
        return string

convert_santander_name = f.udf(convert_santander_name, returnType=t.StringType())

In [44]:
convert_santander_name

<function __main__.convert_santander_name(string)>

In [45]:
(
    df_parquet.select("bank_name")
    .withColumn('bank_name', convert_santander_name(f.col('bank_name')))\
    .where("bank_name = 'SANTANDER MBA UNIESP'")\
    .limit(1).show(truncate=False)
)

+--------------------+
|bank_name           |
+--------------------+
|SANTANDER MBA UNIESP|
+--------------------+



### Joins

Os joins no pyspark são especificados pela função `join()`, da seguinte forma:

```
df1.join(df2, {key_columns}, {join_type})
```

* `key_columns`: colunas que vão ser utilizadas para fazer a junção das tabelas. Pode ser especificada como
    * Um único string -> só uma coluna é chave, mesmos nomes nas duas tabelas
    * Uma lista de string ou de colunas (`col()`) -> mais de uma coluna é chave, mesmos nomes nas duas tabelas
    * Com nomes diferentes, é necessário fazer uma especificação do tipo: `f.col(column1) == f.col(column2)`, em que a coluna da esquerda está no DataFrame da esquerda e analogamente a coluna da direita está no DataFrame da direita. Caso existam mais de uma coluna como chave, essas especificações devem ser colocadas em uma lista.
* `join_type`: o tipo de join a ser realizado. As opções são:
    * `inner (default)`: INNER JOIN do SQL
    * `outer / full / fullouter / full_outer`: : FULL OUTER JOIN do SQL
    * `left / leftouter / left_outer`: : LEFT JOIN do SQL
    * `right / rightouter / right_outer`: : RIGHT JOIN do SQL
    * `semi / leftsemi / left_semi`: realiza um LEFT JOIN do SQL e retorna somente as colunas do DataFrame da esquerda que também estão no DataFrame da Direita
    * `anti / leftanti / left_anti`: realiza um LEFT JOIN do SQL e retorna somente as colunas do DataFrame da esquerda que não estão no DataFrame da Direita

In [141]:
df_parquet_user = spark.read.format("parquet").load(file_path_parquet_user)

In [144]:
(
    df_parquet_user.select("word", "count")
    .withColumnRenamed("word", "user")
    .withColumnRenamed("count", "quantidade")
)

user,quantidade
FRODO,49
SAM,48
GANDALF,40
PIPPIN,30
SMEAGOL,24
THEODEN,20
ARAGORN,20
SARUMAN,16
GIMLI,16
ORC,14


In [145]:
df_parquet_user

word,count
FRODO,49
SAM,48
GANDALF,40
PIPPIN,30
SMEAGOL,24
THEODEN,20
ARAGORN,20
SARUMAN,16
GIMLI,16
ORC,14


In [143]:
df_parquet

Unnamed:0,char,dialog,movie
0,DEAGOL,Oh Smeagol Ive go...,The Return of the...
1,SMEAGOL,Pull it in! Go on...,The Return of the...
2,DEAGOL,Arrghh!,The Return of the...
3,SMEAGOL,Deagol!,The Return of the...
4,SMEAGOL,Deagol!,The Return of the...
5,SMEAGOL,Deagol!,The Return of the...
6,SMEAGOL,Give us that! Dea...,The Return of the...
7,DEAGOL,Why?,The Return of the...
8,SMEAGOL,"Because' , it's m...",The Return of the...
9,SMEAGOL,My precious.,The Return of the...


In [146]:
df_lotr_user = df_parquet.join(df_parquet_user, df_parquet_user.word == df_parquet.char, how="inner")

In [147]:
df_lotr_user

Unnamed:0,char,dialog,movie,word,count
0,DEAGOL,Oh Smeagol Ive go...,The Return of the...,DEAGOL,6
1,SMEAGOL,Pull it in! Go on...,The Return of the...,SMEAGOL,24
2,DEAGOL,Arrghh!,The Return of the...,DEAGOL,6
3,SMEAGOL,Deagol!,The Return of the...,SMEAGOL,24
4,SMEAGOL,Deagol!,The Return of the...,SMEAGOL,24
5,SMEAGOL,Deagol!,The Return of the...,SMEAGOL,24
6,SMEAGOL,Give us that! Dea...,The Return of the...,SMEAGOL,24
7,DEAGOL,Why?,The Return of the...,DEAGOL,6
8,SMEAGOL,"Because' , it's m...",The Return of the...,SMEAGOL,24
9,SMEAGOL,My precious.,The Return of the...,SMEAGOL,24


### Salvando os Dados

Para a escrita dos dados, importante ressaltar a seguinte opção
* mode: especifica o comportamento da operação de escrita dos dados, caso eles já existam
  * append: Anexa o conteudo do DataFrame aos dados existentes.
  * overwrite: Sobrescreve dados existentes.
  * ignore: Ignora silenciosamente essa oprecao se os dados ja existirem.
  * error or errorifexists (default): Retorna erro se os dados ja existirem.

In [148]:
cols = ["char","dialog","movie","count"]

In [149]:
df = df_lotr_user.coalesce(1)

In [150]:
df.rdd.getNumPartitions()

1

In [151]:
df

Unnamed:0,char,dialog,movie,word,count
0,DEAGOL,Oh Smeagol Ive go...,The Return of the...,DEAGOL,6
1,SMEAGOL,Pull it in! Go on...,The Return of the...,SMEAGOL,24
2,DEAGOL,Arrghh!,The Return of the...,DEAGOL,6
3,SMEAGOL,Deagol!,The Return of the...,SMEAGOL,24
4,SMEAGOL,Deagol!,The Return of the...,SMEAGOL,24
5,SMEAGOL,Deagol!,The Return of the...,SMEAGOL,24
6,SMEAGOL,Give us that! Dea...,The Return of the...,SMEAGOL,24
7,DEAGOL,Why?,The Return of the...,DEAGOL,6
8,SMEAGOL,"Because' , it's m...",The Return of the...,SMEAGOL,24
9,SMEAGOL,My precious.,The Return of the...,SMEAGOL,24


In [152]:
df_lotr_user.rdd.getNumPartitions()

6

In [153]:
(
    df.select(cols).write.mode("overwrite")
        .format("parquet")
        .save("./data-processing-lotr-user/")
)

                                                                                

In [154]:
df = spark.read.format("parquet").load("./data-processing-lotr-user")

In [155]:
df

                                                                                

char,dialog,movie,count
DEAGOL,Oh Smeagol Ive go...,The Return of the...,6
SMEAGOL,Pull it in! Go on...,The Return of the...,24
DEAGOL,Arrghh!,The Return of the...,6
SMEAGOL,Deagol!,The Return of the...,24
SMEAGOL,Deagol!,The Return of the...,24
SMEAGOL,Deagol!,The Return of the...,24
SMEAGOL,Give us that! Dea...,The Return of the...,24
DEAGOL,Why?,The Return of the...,6
SMEAGOL,"Because' , it's m...",The Return of the...,24
SMEAGOL,My precious.,The Return of the...,24
