## Bibliotecas

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t

## Setup

In [2]:
import findspark

findspark.init()

In [3]:
spark = (
    SparkSession
    .builder
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

In [4]:
(
    spark.read.format("csv").load('data/imdb/title_basics.tsv', header=True, sep = '\t')
    .write.format('parquet').save('data/imdb/title_basics', mode='overwrite')
)

In [None]:
(
    spark.read.format("csv").load('data/imdb/title_ratings.tsv', header=True, sep = '\t')
    .write.format('parquet').save('data/imdb/title_ratings', mode='overwrite')
)

In [5]:
df_titles = spark.read.format("parquet").load('data/imdb/title_basics')
df_ratings = spark.read.format("parquet").load('data/imdb/title_ratings')

In [13]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


## Leitura e Escrita de Dados

### DataFrameReader

```
spark.read.format(format).option(args).load(file/path)
```

### DataFrameWriter

```
df.write.format(format).option(args).save(file/path)
```

### Lendo e Escrevendo CSV

Opções mais comuns:
* header
* inferSchema
* sep
* encoding

In [14]:
file_path = 'data/df_cnae.csv'
df = spark.read.format('csv').load(file_path)

In [19]:
df.limit(15).limit(15).toPandas()

Unnamed: 0,_c0
0,"""0111301"";""Cultivo de arroz"""
1,"""0111302"";""Cultivo de milho"""
2,"""0111303"";""Cultivo de trigo"""
3,"""0111399"";""Cultivo de outros cereais n�o espec..."
4,"""0112101"";""Cultivo de algod�o herb�ceo"""
5,"""0112102"";""Cultivo de juta"""
6,"""0112199"";""Cultivo de outras fibras de lavoura..."
7,"""0113000"";""Cultivo de cana-de-a��car"""
8,"""0114800"";""Cultivo de fumo"""
9,"""0115600"";""Cultivo de soja"""


### Corrigindo

In [20]:
schema = 'cod_cnae STRING, descricao_cnae STRING'

In [22]:
df = spark.read.format('csv').load(file_path, sep=';', encoding='ISO-8859-1', schema=schema)
df.limit(5).toPandas()

Unnamed: 0,cod_cnae,descricao_cnae
0,111301,Cultivo de arroz
1,111302,Cultivo de milho
2,111303,Cultivo de trigo
3,111399,Cultivo de outros cereais não especificados an...
4,112101,Cultivo de algodão herbáceo


In [None]:
df.printSchema()

### Lendo e Escrevendo Parquet

* Armazenamento colunar, em contraste com o CSV, que armazena baseado nas linhas. Assim, quando uma query é realizada é possível ignorar os dados não relevantes de maneira rápida e fácil, resultando em operações bem mais eficientes;
* Preservação de metadados, incluindo os tipos das colunas, o que garante eficiência e praticidade na escrita e leitura (não é necessário especificar schemas para arquivos parquet);
* Suporte a dados estruturados de forma aninhada, como listas;
* Otimizado para processar dados particionados com volume na casa dos gigabytes para cada arquivo;
* Compressão de dados na escrita, de forma a ocupar menos espaço;
* Integração com ferramentas como AWS Athena, Amazon Redshift Spectrum, Google BigQuery e Google Dataproc.

In [None]:
df.write.format('parquet').save('data/df_cnae')

In [None]:
df = spark.read.format('parquet').load('data/df_cnae')

# Manipulação dos Dados

In [23]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [24]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1809
1,tt0000002,6.0,233
2,tt0000003,6.5,1560
3,tt0000004,6.1,152
4,tt0000005,6.2,2383


In [25]:
df_titles.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)



### Colunas e Expressões

As colunas são a principal unidade de manipulação de dados do Spark. 

In [30]:
(
    df_titles
    .select('tconst', 'primaryTitle', 'runtimeMinutes', )
    .withColumn("runTimeMinutes", f.col('runTimeMinutes').cast('int'))
    .withColumn("runtimeHours", f.round(f.col('runTimeMinutes') / 60, 3))
    .limit(5)
    .toPandas()
)

Unnamed: 0,tconst,primaryTitle,runTimeMinutes,runtimeHours
0,tt0000001,Carmencita,1,0.017
1,tt0000002,Le clown et ses chiens,5,0.083
2,tt0000003,Pauvre Pierrot,4,0.067
3,tt0000004,Un bon bock,12,0.2
4,tt0000005,Blacksmith Scene,1,0.017


Forma "pandas" de selecionar:

1. `df.coluna`
2. `df['coluna']`

In [None]:
(
    df_titles.select('tconst', 'primaryTitle', 'runtimeMinutes', )
    .withColumn("runtimeHours", df_titles['runtimeMinutes'].cast('int') / 60 )
    .show(5)
)

In [None]:
(
    df_titles.select('tconst', 'primaryTitle', 'runtimeMinutes', )
    .withColumn("runtimeHours", df_titles['runtimeMinutes'].cast('int') / 60 )
    .withColumn("hours_plus2", df_titles['runtimeHours'] + 2 )
    .show(5)
)

In [None]:
(
    df_titles.select('tconst', 'primaryTitle', 'runtimeMinutes', )
    .withColumn("runtimeHours", col('runTimeMinutes').cast('int') / 60 )
    .withColumn("hours_plus2", col('runtimeHours') + 2 )
    .show(5)
)

### Seleção de Colunas

In [31]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [33]:
df_titles.columns

['tconst',
 'titleType',
 'primaryTitle',
 'originalTitle',
 'isAdult',
 'startYear',
 'endYear',
 'runtimeMinutes',
 'genres']

In [32]:
df_titles.select('tconst', 'primaryTitle', 'genres').limit(5).toPandas()

Unnamed: 0,tconst,primaryTitle,genres
0,tt0000001,Carmencita,"Documentary,Short"
1,tt0000002,Le clown et ses chiens,"Animation,Short"
2,tt0000003,Pauvre Pierrot,"Animation,Comedy,Romance"
3,tt0000004,Un bon bock,"Animation,Short"
4,tt0000005,Blacksmith Scene,"Comedy,Short"


In [34]:
cols = ['tconst', 'primaryTitle', 'genres']
df_titles.select(cols).limit(5).toPandas()

Unnamed: 0,tconst,primaryTitle,genres
0,tt0000001,Carmencita,"Documentary,Short"
1,tt0000002,Le clown et ses chiens,"Animation,Short"
2,tt0000003,Pauvre Pierrot,"Animation,Comedy,Romance"
3,tt0000004,Un bon bock,"Animation,Short"
4,tt0000005,Blacksmith Scene,"Comedy,Short"


In [35]:
cols = ['primaryTitle', 'genres']
df_titles.select('tconst', *cols).limit(5).toPandas()

Unnamed: 0,tconst,primaryTitle,genres
0,tt0000001,Carmencita,"Documentary,Short"
1,tt0000002,Le clown et ses chiens,"Animation,Short"
2,tt0000003,Pauvre Pierrot,"Animation,Comedy,Romance"
3,tt0000004,Un bon bock,"Animation,Short"
4,tt0000005,Blacksmith Scene,"Comedy,Short"


In [39]:
df_titles.select((f.col('runtimeMinutes') / 60).alias('runtimeHours') , '*').limit(5).toPandas()

Unnamed: 0,runtimeHours,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,0.016667,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,0.083333,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,0.066667,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,0.2,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,0.016667,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


Observações:
* Podemos realizar operações sobre colunas selecionadas. 
* A ordem em que as colunas são selecionadas é a ordem em que elas vão ser inseridas no DataFrame resultante.

In [43]:
df_titles.select('tconst', 'genres', 'primaryTitle', f.upper('primaryTitle').alias('primaryTitle')).limit(5).toPandas()

Unnamed: 0,tconst,genres,primaryTitle,primaryTitle.1
0,tt0000001,"Documentary,Short",Carmencita,CARMENCITA
1,tt0000002,"Animation,Short",Le clown et ses chiens,LE CLOWN ET SES CHIENS
2,tt0000003,"Animation,Comedy,Romance",Pauvre Pierrot,PAUVRE PIERROT
3,tt0000004,"Animation,Short",Un bon bock,UN BON BOCK
4,tt0000005,"Comedy,Short",Blacksmith Scene,BLACKSMITH SCENE


#### Selecionando valores distintos

In [46]:
df_titles.select('startYear').distinct().toPandas()

Unnamed: 0,startYear
0,1903
1,1953
2,1897
3,1957
4,1987
...,...
145,1975
146,1970
147,1892
148,1933


### Filtros

Operadores lógicos:
* e: &
* ou: |
* não: ~

Para fazer o filtro, pode ser utilizado tanto a função `filter()` como `where()`.

#### Filtros com uma condição

In [None]:
(
    df_titles.filter(~(col('titleType') == 'movie'))
    .count()
)

#### Filtros com duas ou mais condições
Cada uma das condições deve estar entre parênteses e separada por um operador lógico. Naturalmente, é possível também "aninhar" condições, seguindo essa mesma lógica

In [None]:
(
    df_titles.filter((col('titleType') == 'movie') & (col('runtimeMinutes') <= 90))
    .show(5)
)

In [None]:
(
    df_titles.filter((col('titleType') == 'movie') & (col('runtimeMinutes') <= 90))
    .count()
)

In [None]:
(
    df_titles.filter(((col('titleType') == 'movie') | (col('titleType') == 'tvSeries')) & (col('runtimeMinutes') <= 90))
    .count()
)

In [None]:
(
    df_titles.filter((col('titleType').isin('movie', 'tvSeries')) & (col('runtimeMinutes') <= 90))
    .count()
)

In [None]:
(
    df_titles
    .filter(col('titleType').isin('movie','tvSeries'))
    .filter(col('runtimeMinutes') <= 90)
    .count()
)

#### Filtros Utilizando Expressões

In [None]:
(
    df_titles
    .filter('titleType = "movie"')
    .show(5)
)

In [None]:
(
    df_titles
    .filter('titleType in ("movie", "tvSeries") and runtimeMinutes <= 90')
    .show(5)
)

#### Observações
Quando nos referimos às colunas por meio da função `col()`, temos acesso à diversos métodos das colunas que podem ser utilizados para auxliar na filtragem do DataFrame. Alguns deles são:
* `isin()`: checa se a coluna contém os valores listados na função.
* `contains()`: utilizado para verificar se uma coluna de texto contém algum padrão especificado (não aceita regex). Aceita uma outra coluna de texto.
* `like()`: utilizado para verificar se uma coluna de texto contém algum padrão especificado (não aceita regex). Funciona de forma similar ao "LIKE" do SQL.
* `rlike()`: utilizado para verificar se uma coluna de texto contém algum padrão especificado (**aceita regex**). Funciona de forma similar ao "RLIKE" do SQL.
* `startswith()`: utilizado para verificar se uma coluna de texto começa com algum padrão especificado (**aceita regex**).
* `endswith()`: utilizado para verificar se uma coluna de texto termina com algum padrão especificado (**aceita regex**).
* `between()`: checa se os valores da coluna estão dentro do intervalo especificado. Os dois lados do intervalo são inclusivos.
* `isNull()`: retorna True se o valor da coluna é nulo
* `isNotNull()`: retorna True se o valor da coluna não é nulo

Outros métodos úteis:
* `alias()/name()`: usado para renomear as colunas em operações como select() e agg()
* `astype()/cast()`: usado para mudar o tipo das colunas. Aceita tanto um string como um tipo especificado pelo módulo pyspark.sql.types
* `substr()`: utilizado para cortar um string com base em índices dos caracteres 

In [None]:
(
    df_titles
    .filter(col('primaryTitle').like('Avengers%'))
    .filter(col('titleType') == 'movie')
    .show()
)

In [None]:
(
    df_titles
    .withColumn('startYear', col("startYear").cast('int'))
    .filter('startYear is not null')
    .show()
)

### Ordenando o DataFrame

A ordenação do DataFrame pode ser feita utilizando as funções `orderBy()` ou `sort()`. Algumas funções auxiliares importante para serem usadas ao ordenar:
* `asc()`: ordena a coluna de forma ascendente (default)
* `desc()`ordena a coluna de forma decrescente
* `asc_nulls_first() / desc_nulls_first()`: ordena a coluna de forma ascendente e decrescente, respectivamente, mantendo os campos nulos primeiro
* `asc_nulls_last() / desc_nulls_last()`: ordena a coluna de forma ascendente e decrescente, respectivamente, mantendo os campos nulos por último

In [None]:
df_titles.show(5)

In [None]:
from pyspark.sql.functions import desc

(
    df_titles
    .withColumn('startYear', col('startYear').cast('int'))
    .orderBy('startYear')
    .filter('titleType = "movie"')
    .show()
)

In [None]:
from pyspark.sql.functions import desc_nulls_first

(
    df_titles
    .withColumn('startYear', col('startYear').cast('int'))
    .orderBy(desc_nulls_first('startYear'))
    .show()
)

### Renomeando Colunas

Para renomear colunas, é utilizada a função `withColumnRenamed()`, da seguinte forma:

```
df.withColumnRenamed("nome_antigo", "nome_novo")
```

In [None]:
(
    df_titles
    .withColumnRenamed('primaryTitle', 'nome_filme')
    .show(5)
)

In [None]:
(
    df_titles
    .select('*', f.col('primaryTitle').alias('nome_filme'))   
    .limit(5)
    .toPandas()
)

In [None]:
df_renamed = df_titles
for c in df_titles.columns:
    df_renamed = df_renamed.withColumnRenamed(c, c + '_suffix')

df_renamed.limit(5).toPandas()

### Criando e Alterando Colunas

Para criar ou alterar colunas, é utilizada a função `withColumn()`, da seguinte forma:

```
df.withColumn("nome_da_coluna", {expressão geradora de coluna})
```

In [None]:
from pyspark.sql.functions import upper

(
    df_titles
    .select('tconst', 'primaryTitle', 'runtimeMinutes', )
    .withColumn("primaryTitle_2", upper('primaryTitle'))
    .show(5)
)

#### Criando colunas a partir de constantes

In [None]:
from pyspark.sql.functions import lit

(
    df_titles
    .select('tconst', 'primaryTitle', 'runtimeMinutes', )
    .withColumn("pais", lit('Brasil'))
    .show(5)
)

#### Criando colunas condicionais

In [None]:
from pyspark.sql.functions import when, expr

predicado = """

CASE WHEN runTimeMinutes <= 60 THEN 'curto'
     WHEN runTimeMinutes > 60 AND runTimeMinutes < 120 THEN 'normal'
     WHEN runTimeMinutes >= 120 THEN 'longo'
     WHEN runTimeMinutes IS NULL THEN 'nulo'
     ELSE 'Erro'
END

"""

(
    df_titles
    .select('tconst', 'primaryTitle', 'runtimeMinutes', )
    .withColumn("runtimeMinutes", col('runTimeMinutes').cast('int'))
    .withColumn("categoria_runtime", expr(predicado))
    .filter('runTimeMinutes > 60')
    .show(25)
)

### Agregação e Agrupamento

O agrupamento dos DataFrames é feito por meio da função **`groupby()`**. Essa função deve ser sucedida pela função de agregação `agg()`, de pivotação `pivot()` ou `count()`. 

---

A função **`agg()`** aplica uma função de agregação no DataFrame. Se precedida por `groupby()`, realiza a agregação dentro dos grupos esabelecidos.
Algumas das funções de agregação mais comuns:
* `sum()`: retorna a soma os valores da coluna
* `sumDistinct()`: retorna a soma os valores distintos da coluna
* `max() / min()`: retorna o mínimo e o máximo da coluna, respectivamente
* `avg() / mean()`: retorna a média dos valores da coluna
* `percentile_approx()`: retorna o percentil da coluna, comaproximação. Para trazer a mediana exata, usar: `percentile_approx(f.col('column'), 0.5, lit(1000000))`
* `stddev()`: retorna o desvio padrão dos valores da coluna
* `count()`: retorna a contagem de linhas
* `countDistinct()`: retorna a contagem de valores distintos da coluna
* `first() / last()`: retorna o primeiro e o último valor da coluna no agrupamento, respectivamente. Interessante de ser utilizada em conjunto com o argumento `ignoreNulls=True`.
* `collect_list()`: retorna os valores do agrupamento em uma lista, com duplicações
* `collect_set()`: retorna os valores do agrupamento em uma lista, sem duplicações (desordenado)

**Obs**: O spark ignora os valores nulos para calcular as agregações, com exceção da função `count()`.

---

A função **`pivot`** é utilizada para passar valores de uma linha para as colunas, realizando uma agregação. Deve ser sucedido por uma função de agregação utilizando `agg()`. Pode utilizar qualquer uma das funções de agregação anteriores.



In [None]:
df_titles_subset = (
    df_titles
    .filter("cast(startYear as int) >= 2000")
    .withColumn('genre', f.split('genres', ',').getItem(0))
)

In [None]:
df_titles_subset.limit(5).toPandas()

In [None]:
(
    df_titles_subset
    .agg(f.countDistinct('genre').alias('distinct_genres'),)
    .toPandas()
)

In [None]:
(
    df_titles_subset
    .withColumn('runtimeMinutes', f.col('runtimeMinutes').cast('int'))
    .agg(f.sum('runtimeMinutes').alias('total_runtimeMinutes'),
         f.mean('runtimeMinutes').alias('mean_runtimeMinutes'),
         f.min('runtimeMinutes').alias('min_runtimeMinutes'),
         f.max('runtimeMinutes').alias('max_runtimeMinutes'),
         f.percentile_approx('runtimeMinutes', 0.5, f.lit(10000000)).alias('median_runtimeMinutes'),
         f.stddev('runtimeMinutes').alias('std_runtimeMinutes'),
        )
    .toPandas()
)

In [None]:
(
    df_titles_subset
    .withColumn('runtimeMinutes', f.col('runtimeMinutes').cast('int'))
    .select('runtimeMinutes')
    .describe()
    .toPandas()
)

#### Agrupamento

In [None]:
df_titles_subset.limit(5).toPandas()

In [None]:
(
    df_titles_subset
    .groupby('genre', 'startYear')
    .agg(f.mean('runtimeMinutes').alias('mean_runtimeMinutes'),)
    .orderBy('startYear', f.col('mean_runtimeMinutes').desc())
    .filter('startYear = 2021')
    .toPandas()
)

In [None]:
(
    df_titles_subset
    .groupby('genre')
    .agg(f.collect_set(f.col('titleType')).alias('lista_tipos_titulo'),
         f.countDistinct(f.col('titleType')).alias('n_distinct')
        )
    .withColumn('tipos_filmes', f.explode(f.col('lista_tipos_titulo')))
    .select('genre', 'tipos_filmes')
    .toPandas()
)

#### Pivotação

In [None]:
df_titles_subset.limit(5).toPandas()

In [None]:
(
    df_titles_subset
    .drop('genre')
    .withColumn('genres', f.explode(f.split(f.col('genres'), ',')))
    .groupby('startYear')
    .pivot('genres')
    .agg(f.mean('runtimeMinutes'))
    .na.fill(0)
    .orderBy('startYear')
    .limit(5)
    .toPandas()
)

### Window Functions

Window functions são funções que realizam cálculos similares à uma agregação, mas que não resultam em um DataFrame agregado. Ao invés disso, os resultados são colocados em uma nova coluna, segundo a partição (ou agrupamento) especificado. 
Exemplos mais comuns:
* `row_number()`
* `rank() / dense_rank() / percent_rank()`
* `lag()`
* `cume_dist()`
* `collect_list() / collect_set()`
* Demais funções de agregação, com exceção de `countDistinct()`

Para usar as funções dessa forma, devemos criar uma janela (window) da seguinte forma:

```{python}
from pyspark.sql.window import Window
w = Window.partitionBy({columns}).orderBy({columns}).rowsBetween({lower}, {upper})
```

* **`partitionBy()`**: agrupamento em que os cálculos serão realizados. É análogo ao `groupBy()`.
* **`orderBy`**: algumas funções como `row_number()` e `lag()` dependem da ordenação das linhas do agrupamento. Essa função é usada para especificar essa ordem.
* **`rowsBetween()`**: esse método é usado para especificar janelas deslizantes. A partir dele é possível definir um intervalo de linhas, relativas à linha atual, em que a função vai ser aplicada. Caso isso não seja especificado, as operações são realizadas em todo o grupo. Muito útil para construir **médias móveis**. Os seguintes objetos ajudam na constrção desse intervalo:
  * `Window.currentRow`: define a linha para qual o valor está sendo calculado como um dos limites de cálculo
  * `Window.unboundedPreceding`: define que não há limites para as linhas anteriores à linha para qual o valor está sendo calculado, isto é, a função irá considerar todas as linhas do grupo que já passaram. Deve ser usado no primeiro argumento (start).
  * `Window.unboundedFollowing`: define que não há limites para as linhas posteriores à linha para qual o valor está sendo calculado, isto é, a função irá considerar todas as linhas do grupo que ainda não passaram. Deve ser usado no segundo argumento (end).

Depois disso, basta utilizar a função `over()` para indicar que aquela função deve ser realizada na janela.  Exemplo:
```
df.withColumn('rn', f.row_number().over(w))
```


In [None]:
from pyspark.sql.window import Window

In [None]:
df_titles_subset.count()

In [None]:
w = Window.partitionBy('genre').orderBy('startYear')
(
    df_titles_subset
    .withColumn('genre', f.split('genres', ',').getItem(0))
    .withColumn('startYear', f.col('startYear').cast('int'))
    .filter('startYear >= 2021')
    .withColumn('rn', f.row_number().over(w))
    .limit(25)
    .toPandas()
)

In [None]:
w = Window.partitionBy('titleType', 'startYear')
(
    df_titles_subset
    .withColumn('genre', f.split('genres', ',').getItem(0))
    .withColumn('runtimeMinutes', f.col('runtimeMinutes').cast('int'))
    .withColumn('total_minutes', f.sum(f.col('runtimeMinutes')).over(w))
    .withColumn('mean_minutes', f.mean(f.col('runtimeMinutes')).over(w))
    .withColumn('relative_minutes', f.col('runtimeMinutes') / f.col('total_minutes'))
    .filter('runtimeMinutes is not null')
    .limit(5)
    .toPandas()
)

In [None]:
w = Window.partitionBy('titleType').orderBy('startYear').rowsBetween(-2, Window.currentRow)
(
    df_titles_subset
    .withColumn('runtimeMinutes', f.col('runtimeMinutes').cast('int'))
    .groupby('titleType', 'startYear')
    .agg(f.mean('runtimeMinutes').alias('media_minutos'))
#     .orderBy('titleType', 'startYear')
#     .withColumn('meadia_movel_3anos', f.round(f.mean('media_minutos').over(w), 3))
    .limit(15)
    .toPandas()
)

In [None]:
round((49.007634 + 50.358881 + 52.182771)/3, 3) == 50.516

In [None]:
round((49.186983 + 50.358881 + 52.182771 + 49.007634 + 55.584795)/5, 3) == 51.264

### Joins

Os joins no pyspark são especificados pela função `join()`, da seguinte forma:

```
df1.join(df2, {key_columns}, {join_type})
```

* `key_columns`: colunas que vão ser utilizadas para fazer a junção das tabelas. Pode ser especificada como
    * Um único string -> só uma coluna é chave, mesmos nomes nas duas tabelas
    * Uma lista de string ou de colunas (`col()`) -> mais de uma coluna é chave, mesmos nomes nas duas tabelas
    * Com nomes diferentes, é necessário fazer uma especificação do tipo: `f.col(column1) == f.col(column2)`. Caso existam mais de uma coluna como chave, essas especificações devem ser colocadas em uma lista.
* `join_type`: o tipo de join a ser realizado. As opções são:
    * `inner (default)`: INNER JOIN do SQL
    * `outer / full / fullouter / full_outer`: : FULL OUTER JOIN do SQL
    * `left / leftouter / left_outer`: : LEFT JOIN do SQL
    * `right / rightouter / right_outer`: : RIGHT JOIN do SQL
    * `semi / leftsemi / left_semi`: realiza um LEFT JOIN do SQL e retorna somente as colunas do DataFrame da esquerda que também estão no DataFrame da Direita
    * `anti / leftanti / left_anti`: realiza um LEFT JOIN do SQL e retorna somente as colunas do DataFrame da esquerda que não estão no DataFrame da Direita

In [None]:
df_ratings.limit(5).toPandas()

In [None]:
df_titles.limit(5).toPandas()

In [None]:
df_ratings.count()

In [None]:
df_titles.count()

In [None]:
(
    df_titles
    .join(df_ratings, 'tconst')
    .limit(5)
    .toPandas()
)

In [None]:
(
    df_titles
    .withColumnRenamed('tconst', 'id_title')
#     .withColumnRenamed('genres', 'averageRating')
    .join(df_ratings, f.col('tconst') == f.col('id_title'))
#     .withColumn('averageRating', f.expr('averageRating + 1'))
    .limit(5)
    .toPandas()
)

#### Utilizando semi e anti join

In [None]:
(
    df_titles
    .join(df_ratings, 'tconst', 'semi')
    .count()
)

In [None]:
(
    df_titles
    .join(df_ratings, 'tconst', 'anti')
    .count()
)

In [None]:
6961705 + 1174232 == df_titles.count()

### Union

Existem três formas de unir DataFrames no pyspark:
* `union() / unionAll()`: empilha os DataFrames, preservando linhas duplicadas. As colunas são unidas por posição, e por isso a ordem delas deve ser a mesma entre os dois DFs.
* `unionByName()`: empilha os DataFrames, preservando linhas duplicadas. As colunas são unidas por nome, e por tanto não precisam estar ordenadas da mesma forma

In [None]:
df_titles.count()

In [None]:
df1 = df_titles.sample(fraction = 0.5)
df2 = df_titles.join(df1, ['tconst'], 'anti')

In [None]:
df1.count()

In [None]:
df2.count()

In [None]:
df1.union(df2).count()

In [None]:
df3 = df_titles.sample(fraction = 0.05)

In [None]:
df3.count()

In [None]:
df3.union(df3).count()

In [None]:
df3.union(df3).distinct().count()

In [None]:
df2 = df2.select(df2.columns[::-1])

In [None]:
df1.limit(5).toPandas()

In [None]:
df2.limit(5).toPandas()

In [None]:
df1.union(df2).filter('genres rlike "[0-9]"').limit(5).toPandas()

In [None]:
df1.unionByName(df2).filter('genres rlike "[0-9]"').limit(5).toPandas()

### User Defined Functions (UDFs)

Em algumas situações é necessário criar/alterar uma coluna utilizando uma operação não implementada na biblioteca padrão. Para isso, é possível utilzar funções definidas pelo usuário (UDFs) por meio da função `udf()`.

**Importante**: As udfs não são otimizadas para serem executadas em paralelo, e por isso podem representar um gargalo na na aplicação.

In [None]:
from unidecode import unidecode
from pyspark.sql.types import StringType

In [None]:
unidecode('àáâçéõü')

In [None]:
def unidecode_function(string):
    if not string:
        return None
    else:
        return unidecode(string)

unidecode_udf = f.udf(unidecode_function, returnType=StringType())

In [None]:
(
    df_titles
    .filter(f.col('primaryTitle').rlike('à|á|â|ç|é|õ|ü'))
    .withColumn('cleaned_string', unidecode_udf(f.col('primaryTitle')))
    .select('primaryTitle', 'cleaned_string')
    .limit(5)
    .toPandas()
)

In [None]:
del unidecode_udf

In [None]:
@f.udf(returnType=t.StringType())
def unidecode_udf(string):
    if not string:
        return None
    else:
        return unidecode(string)

In [None]:
(
    df_titles
    .filter(f.col('primaryTitle').rlike('à|á|â|ç|é|õ|ü'))
    .withColumn('cleaned_string', unidecode_udf(f.expr('primaryTitle')))
    .select('primaryTitle', 'cleaned_string')
    .limit(5)
    .toPandas()
)