## Bibliotecas

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from os.path import abspath

In [2]:
warehouse_location = abspath('../data/spark-warehouse')

In [3]:
import findspark

findspark.init()

## Setup

In [4]:
spark = (
    SparkSession
    .builder
    .config("spark.driver.memory", '8g')
    .config("spark.sql.warehouse.dir", warehouse_location)
    .enableHiveSupport()
    .getOrCreate()
)

In [5]:
df_titles = spark.read.parquet('../data/imdb/title_basics')

## Databases e Catalog

O catálogo de metadados do Spark pode ser acessado pelo objeto

`SparkSession.catalog` 

As principais funcionalidades são:

* `listDatabases()`: lista todas os databases disponíveis;
* `listTables()`: lista todas as tabelas disponíveis em um determinado database;
* `listFucntions()`: lista as funções disponíveis em um determinado database;
* `refreshTable()`: atualiza os metadados de uma determinada tabela
* `uncacheTable()`: remove uma tabela salva em memória
* `clearCache()`: remove todas as tabelas salvas em memória

In [None]:
spark.catalog

In [None]:
spark.catalog.clearCache()

In [None]:
spark.catalog.listDatabases()

In [None]:
spark.catalog.listTables()

Os databases do Spark são uma ferramenta para organizar tabelas. Eles podem e devem ser vistos como algo muito próximo dos databases de servidores de bancos de dados relacionais. O Spark utiliza por padrão um database chamado default, que serve para criar tabelas, views e realizar consultas caso o usuário não tenha definido o seu próprio. Um ponto importante é que essas estruturas persistem em diferentes sessões: se o usuário mudar de database, todas as tabelas permanecerão no database anterior e vão precisar ser consultadas de maneira diferente.

Existem alguns comandos do SQL importantes na hora de se trabalhar com databases. Else são:

* `SHOW DATABASES`: lista todas os databases disponíveis, de forma análoga ao Catalog ;
* `CREATE DATABASE <nome_do_db>`: cria um database
* `USE <nome_do_db>`: define o database como o atual para a realização de queries
    * **Obs**: ao se mudar de database, é possível acessar tabelas de um database anterior usando o prefixo “nome_do_db.” antes do nome da tabela. Exemplo:
        ```
        USE db2
        SELECT * FROM db1.table
        ```
* `SELECT current_database()`: retorna qual o database definido como o atual
* `DROP DATABASE IF EXISTS <nome_do_db>`: deleta determinado database dentre aqueles que foram definidos. Atenção: nunca delete o database default do Spark.


## Tabelas e Views

### Tabelas

* **Managed Tables**: o Spark administra tanto os dados quanto os metadados das tabelas, de forma que operações como DROP TABLE afetam também os dados escritos em disco;
* **Unmanaged Tables**: o Spark administra somente os metadados da tabela, e os dados escritos em disco não são alterados em nenhum momento

In [None]:
df_titles.limit(5).toPandas()

In [None]:
df_titles_sample = df_titles.sample(fraction = 0.1)

#### Criando Managed Tables

In [None]:
df_titles_sample.write.saveAsTable("title_basics_managed")

`CREATE TABLE title_basics_managed (schema)`

#### Criando Unmanaged Tables

In [None]:
df_titles_sample.write.option('path', '../data/imdb/title_basics_unmanaged').saveAsTable("title_basics_unmanaged")

`CREATE EXTERNAL TABLE title_basics_unmanaged (schema) 
 USING parquet OPTIONS (path '../data/imdb/title_basics_unmanaged')`

In [None]:
spark.catalog.listTables()

### Views

#### Criando Views

In [None]:
df_titles_sample.createOrReplaceTempView('title_basics_view')

`CREATE OR REPLACE TEMP VIEW AS title_basics_view
 SELECT * FROM <nome da tabela>`

#### Criando Views Globais

In [None]:
df_titles_sample.createOrReplaceGlobalTempView('title_basics_global_view')

`CREATE OR REPLACE GLOBAL TEMP VIEW AS title_basics_global_view
 SELECT * FROM <nome da tabela>`

In [None]:
spark.catalog.listTables()

#### Deletando Views

In [None]:
spark.catalog.dropTempView("title_basics_view")

In [None]:
spark.catalog.dropGlobalTempView("title_basics_global_view")

## Acessando a interface de queries

In [None]:
spark.sql('SHOW DATABASES').toPandas()

In [None]:
spark.sql("""
CREATE TABLE title_basics_managed 
(tconst STRING, 
 titleType STRING, 
 primaryTitle STRING,
 originalTitle STRING,
 isAdult STRING,
 startYear STRING, 
 endYear STRING,
 runtimeMinutes STRING,
 genres STRING)
""").toPandas()

In [None]:
spark.sql("""
            INSERT INTO title_basics_managed  SELECT * FROM default.title_basics_managed
          """
).toPandas()

In [None]:
spark.sql('DROP TABLE title_basics_managed').toPandas()

In [None]:
spark.sql('USE DEFAULT').toPandas()

In [None]:
spark.sql('DROP TABLE title_basics_unmanaged').toPandas()

In [None]:
spark.sql('SHOW TABLES').toPandas()

In [None]:
spark.sql('SELECT CAST(runTimeMinutes as INT) FROM title_basics_view')\
.withColumn('teste', f.col('runTimeMinutes') + 1).limit(5).toPandas()

In [None]:
df_titles.count()

## Configurando o Spark

SparkSession -> spark-submit -> spark-defaults.conf

In [None]:
import findspark

findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .config('spark.driver.memory', '8g')
    .getOrCreate()
)

In [None]:
spark.conf.get('spark.serializer')

In [None]:
spark.conf.get('spark.driver.memory')

In [None]:
spark.conf.get('spark.sql.shuffle.partitions')

In [None]:
spark.conf.set('spark.sql.shuffle.partitions', 100)

In [None]:
spark.conf.get('spark.sql.shuffle.partitions')

* `spark.master`: seleciona o modo de deploy da aplicação Spark. Será tratado em mais detalhes no capítulo seguinte.
* `spark.driver.memory`: quantidade de memória atribuída para o driver da aplicação.
* `spark.executor.memory`: quantidade de memória atribuída para cada um dos executores da aplicação.
* `spark.serializer`: classe utilizada para realizar a serialização de objetos durante a execução. É recomendado utilizar o valor org.apache.spark.serializer.KryoSerializer para ganhar em velocidade de processamento, uma vez que chega a ser até 10x mais rápido que o default.
* `spark.executor.heartbeatInterval`:  intervalo entre notificações dos executores ao driver. Aumentar esse valor evita que a aplicação sofra com timeouts.
* `spark.sql.adaptive.enabled`: habilita o Adaptive Query Execution, programa que atualiza o plano de execução durante a execução, a partir de métricas coletadas durante o processo. Ativar essa configuração pode otimizar processamentos significativamente.
* `spark.sql.shuffle.partitions`: número padrão de partições utilizadas em shuffles de operações de junção (joins) e agregações (agg). 
* `spark.sql.broadcastTimeout`: tempo de timeout em segundos para operações de broadcast join, a serem tratadas no fim do capítulo.

### Tornando o Spark Escalável

* `spark.dynamicAllocation.enabled`: habilita o uso do recurso de dynamic allocation na aplicação.
* `spark.dynamicAllocation.executorIdleTimeout`: configura o tempo máximo de ociosidade de um executor até que o dynamic allocation o derrube.
* `spark.dynamicAllocation.initialExecutors`: quantidade inicial de executores na aplicação ao utilizar o dynamic allocation.
* `spark.dynamicAllocation.maxExecutors`: quantidade mínima de executores na aplicação ao utilizar o dynamic allocation.
* `spark.dynamicAllocation.minExecutors`: quantidade máxima de executores na aplicação ao utilizar o dynamic allocation.

## Persistência de Dados na Memória

In [None]:
df_titles_sample = df_titles.sample(fraction = 0.1)

In [None]:
df_titles_sample.limit(5).toPandas()

In [None]:
df_ratings = spark.read.format('parquet').load('../data/imdb/title_ratings')

In [None]:
df_ratings.limit(5).toPandas()

In [None]:
int_cols = ['startYear', 'endYear', 'runtimeMinutes', 'isAdult']
for c in int_cols:
    df_titles_sample = (
        df_titles_sample
        .withColumn(c, f.col(c).cast('int'))
    )
# Limpa os Strings
str_cols = ['primaryTitle', 'originalTitle', 'titleType']
for c in str_cols:
    df_titles_sample = (
        df_titles_sample
        .withColumn(c, f.initcap(f.trim(f.col(c))))
    )

In [None]:
df_join = (
    df_titles_sample
    .replace('\\N', None)
    .withColumn('genres', f.split(f.col('genres'), ','))
    .join(df_ratings, 'tconst', 'left')
)

In [None]:
df_join.limit(5).toPandas()

In [None]:
df_final = (
    df_join
    .withColumn('genres', f.explode(f.col('genres')))
    .groupBy('titleType')
    .pivot('genres')
    .agg(f.round(f.mean('averageRating'), 2))
    .fillna(0)
)

In [None]:
df_final.limit(5).toPandas()

In [None]:
df_final.explain('formatted')

In [None]:
%%time
df_final.limit(5).toPandas()

In [None]:
%%time
df_join.cache()
df_join.count()

In [None]:
df_final.cache()
df_final.count()

In [None]:
%%time
df_final.limit(5).toPandas()

#### Retirando dados da persistência em memória

In [None]:
df_final.unpersist()

In [None]:
spark.catalog.clearCache()

## Estratégias de Particionamento de Dados

### Bucketing

In [7]:
df_ratings = spark.read.format('parquet').load('../data/imdb/title_ratings')

In [9]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [8]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1759
1,tt0000002,6.0,223
2,tt0000003,6.5,1516
3,tt0000004,6.1,144
4,tt0000005,6.2,2330


In [10]:
df_titles.write.format('parquet').bucketBy(5, 'tconst').saveAsTable('title_basics')

In [11]:
df_ratings.write.format('parquet').bucketBy(5, 'tconst').saveAsTable('title_ratings')

In [12]:
df_titles_bucket = spark.sql('SELECT * FROM title_basics')

In [13]:
df_ratings_bucket = spark.sql('SELECT * FROM title_ratings')

In [26]:
%%time
df_titles.join(df_ratings, 'tconst').count()

Wall time: 1 s


1174186

In [27]:
%%time
df_titles_bucket.join(df_ratings_bucket, 'tconst').count()

Wall time: 982 ms


1174186

### Partiticionando por Colunas

In [28]:
df_titles.filter('titleType = "short"').explain("formatted")

== Physical Plan ==
* Filter (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Batched: true
Location: InMemoryFileIndex [file:/C:/Users/Pedro Toledo/Documents/igti/edc-mod3-igti/data/imdb/title_basics]
PushedFilters: [IsNotNull(titleType), EqualTo(titleType,short)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) ColumnarToRow [codegen id : 1]
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]

(3) Filter [codegen id : 1]
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Condition : (isnotnull(titleType#1) AND (titleType#1 = short))




In [29]:
(
    df_titles
    .write
    .format('parquet')
    .partitionBy('titleType')
    .save('../data/imdb/df_titles_partitioned')
)

In [34]:
df_titles_partitions = spark.read.parquet('../data/imdb/df_titles_partitioned')

In [35]:
df_titles_partitions.filter('titleType = "short"').explain("formatted")

== Physical Plan ==
* ColumnarToRow (2)
+- Scan parquet  (1)


(1) Scan parquet 
Output [9]: [tconst#482, primaryTitle#483, originalTitle#484, isAdult#485, startYear#486, endYear#487, runtimeMinutes#488, genres#489, titleType#490]
Batched: true
Location: InMemoryFileIndex [file:/C:/Users/Pedro Toledo/Documents/igti/edc-mod3-igti/data/imdb/df_titles_partitioned]
PartitionFilters: [isnotnull(titleType#490), (titleType#490 = short)]
ReadSchema: struct<tconst:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) ColumnarToRow [codegen id : 1]
Input [9]: [tconst#482, primaryTitle#483, originalTitle#484, isAdult#485, startYear#486, endYear#487, runtimeMinutes#488, genres#489, titleType#490]




In [38]:
df_further_partitions = spark.read.parquet('../data/imdb/df_titles_partitioned_further')

In [41]:
(
    df_further_partitions
    .filter('titleType = "short"')
    .filter('genres = "Action"')
    .explain("formatted")
)

== Physical Plan ==
* ColumnarToRow (2)
+- Scan parquet  (1)


(1) Scan parquet 
Output [9]: [tconst#528, primaryTitle#529, originalTitle#530, isAdult#531, startYear#532, endYear#533, runtimeMinutes#534, titleType#535, genres#536]
Batched: true
Location: InMemoryFileIndex [file:/C:/Users/Pedro Toledo/Documents/igti/edc-mod3-igti/data/imdb/df_titles_partitioned_further]
PartitionFilters: [isnotnull(titleType#535), isnotnull(genres#536), (titleType#535 = short), (genres#536 = Action)]
ReadSchema: struct<tconst:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string>

(2) ColumnarToRow [codegen id : 1]
Input [9]: [tconst#528, primaryTitle#529, originalTitle#530, isAdult#531, startYear#532, endYear#533, runtimeMinutes#534, titleType#535, genres#536]




## Reparticionando DataFrames

In [44]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [46]:
df_titles.rdd.getNumPartitions()

12

In [49]:
df_titles.repartition(5).rdd.getNumPartitions()

5

In [50]:
df_titles.coalesce(5).rdd.getNumPartitions()

5

In [52]:
df_titles.repartition(5).explain('formatted')

== Physical Plan ==
Exchange (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Batched: true
Location: InMemoryFileIndex [file:/C:/Users/Pedro Toledo/Documents/igti/edc-mod3-igti/data/imdb/title_basics]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) ColumnarToRow [codegen id : 1]
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]

(3) Exchange
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Arguments: RoundRobinPartitioning(5), REPARTITION_WITH_NUM, [id=#1269]




1000000 MB / 50 = 20GB/partição

1000000 MB / 5 = 200GB/partição

In [53]:
df_titles.coalesce(5).explain('formatted')

== Physical Plan ==
Coalesce (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Batched: true
Location: InMemoryFileIndex [file:/C:/Users/Pedro Toledo/Documents/igti/edc-mod3-igti/data/imdb/title_basics]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) ColumnarToRow [codegen id : 1]
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]

(3) Coalesce
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Arguments: 5




In [58]:
df_titles.repartition(50).write.parquet('../data/imdb/df_titles_repartioned')

In [59]:
df_titles.coalesce(1).write.parquet('../data/imdb/df_titles_coalesced')

## Escolhendo o melhor tipo de join

* **Broadcast Hash Join (BHJ)**: a estratégia consiste em enviar os dados completos para cada um dos executores, de forma que só há necessidade de realizar o shuffle uma vez. O Spark costuma utilizar esse join automaticamente com base em algumas configurações, como o `spark.sql.autoBroadcastJoinThreshold`, que define o tamanho máximo do menor DataFrame para que esse método seja escolhido, mas é sempre interessante analisar cada situação e ter autonomia para indicar o seu uso;
* **Sort Merge Join (SMJ)**: é o algoritmo padrão do Spark, uma vez que o tamanho dos DataFrames não impacta na viabilidade do algoritmo. Nesse caso, os dados são enviados entre os executores via shuffle e os posteriormente ordenados, para que os dados estejam particionados corretamente e na mesma ordem;
* **Shuffle Hash Join (SHJ)**: é um algoritmo que também usa shuffles, mas compensa essa operação com o uso de um mapa de hash que exime a necessidade de ordenação dos dados. A única condição é que um dos DataFrames seja significativamente menor do que o outro, mas não tanto quanto o BHJ.


In [61]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [60]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1759
1,tt0000002,6.0,223
2,tt0000003,6.5,1516
3,tt0000004,6.1,144
4,tt0000005,6.2,2330


In [74]:
df_titles.join(df_ratings.hint('broadcast'), 'tconst').explain('formatted')

== Physical Plan ==
* Project (9)
+- * BroadcastHashJoin Inner BuildRight (8)
   :- * Filter (3)
   :  +- * ColumnarToRow (2)
   :     +- Scan parquet  (1)
   +- BroadcastExchange (7)
      +- * Filter (6)
         +- * ColumnarToRow (5)
            +- Scan parquet  (4)


(1) Scan parquet 
Output [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]
Batched: true
Location: InMemoryFileIndex [file:/C:/Users/Pedro Toledo/Documents/igti/edc-mod3-igti/data/imdb/title_basics]
PushedFilters: [IsNotNull(tconst)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) ColumnarToRow [codegen id : 2]
Input [9]: [tconst#0, titleType#1, primaryTitle#2, originalTitle#3, isAdult#4, startYear#5, endYear#6, runtimeMinutes#7, genres#8]

(3) Filter [codegen id : 2]
Input [9]: [tconst#0, titleType#1, primaryTitle

In [67]:
import time
import numpy as np

#### Sort Merge Join

In [68]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('merge'), 'tconst').count()
    end = time.time()
    times.append(end - start)

print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))

Média:  1.880633978843689 
 DP:  0.2220382350619025


#### Shuffled Hash Join

In [69]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('shuffle_hash'), 'tconst').count()
    end = time.time()
    times.append(end - start)
print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))

Média:  1.9259305787086487 
 DP:  0.447783649974506


#### Broadcast Join

In [70]:
times = []
for i in range(100):
    start = time.time()
    df_titles.join(df_ratings.hint('broadcast'), 'tconst').count()
    end = time.time()
    times.append(end - start)
print('Média: ', np.mean(times), '\n',
      'DP: ', np.std(times))

Média:  0.961837158203125 
 DP:  0.10518858795581905
