## Shared variables e accumulators
Neste notebook iremos aprender a trabalhar com outras APIs low level do Spark.

<p>Você pode usar accumulators para contabilizar dados de todas as tarefas em uma única variável compartilhada.
Por exemplo, contar quantos registros foram processados em cada tarefa.

<p>Você também pode usar broadcast variables para armazenar variáveis em todos os worker nodes e evitar
que os mesmos tenham que trocar informações entre eles para ter acesso a este tipo de dado.<br>
Se for necessário unir dados de um Dataframe grande e um Dataframe pequeno, uma boa forma de otimização é fazer o broadcast do Dataframe pequeno.<br>
  Veremos este exemplo a seguir

### Importando as bibliotecas
Nesta etapa iremos apenas importar todas as bibliotecas e funções necessárias para rodar o programa

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
from pyspark.sql.types import StructField, StructType, StringType, LongType, DoubleType, IntegerType

### Criando uma SparkSession
Por meio de uma SparkSession terei acesso ao SparkContext da minha aplicação.

In [5]:
spark = SparkSession.builder.appName('Select').getOrCreate()

### Criando um Dataframe manualmente
Nesta etapa estamos criando os dados que utilizaremos neste notebook

In [7]:
# Carrega dados usando schema
schema = StructType([
    StructField("job_id", IntegerType(), True),
    StructField("desc", StringType(), True)
])

# Cria as linhas do nosso futuro dataframe
newRows = [
    [30, "Cientista de dados"],
    [20, "Dev Java"],
    [10, None]
]

# Cria um RDD de Rows
parallelizedRows = spark.sparkContext.parallelize(newRows)

# Cria um dataframe a partir do RDD que criamos anteriormente
dados_manual = spark.createDataFrame(parallelizedRows, schema)

# Mostra as informações do dataframe
dados_manual.show()

### Criando um segundo Dataframe manualmente

In [9]:
# Carrega dados usando schema
schema = StructType([
    StructField("job_id", IntegerType(), True),
    StructField("salary", DoubleType(), True)
])

# Cria as linhas do nosso futuro dataframe
newRows = [
    [30, 10000.0],
    [20, 9000.0],
    [1, 15000.0],
    [2, 2000.0],
    [3, 3000.0]
]

# Cria um RDD de Rows
parallelizedRows = spark.sparkContext.parallelize(newRows)

# Cria um dataframe a partir do RDD que criamos anteriormente
salarios = spark.createDataFrame(parallelizedRows, schema)

# Mostra as informações do dataframe
salarios.show()

In [10]:
dados_join = dados_manual.join(salarios, dados_manual.job_id == salarios.job_id, "inner")

In [11]:
dados_join.explain()

In [12]:
salarios.persist()

In [13]:
broadcast(salarios)
salarios.count()

In [14]:
dados_join = dados_manual.join(salarios, dados_manual.job_id == salarios.job_id, "inner")
dados_join.explain()

### Broacast de outros objetos

In [16]:
filiais = ["SP", "BH", "RJ"]
filiais_bc = spark.sparkContext.broadcast(filiais)
filiais_bc.value

### Accumulators

In [18]:
# Definição do acumulador
contador = spark.sparkContext.accumulator(0)

def contadorFunc(salario):
    if (salario >= 10000):
        contador.add(1)

# Chamando sua função diretamente, apenas para testar o acumulador
contador.value
contadorFunc(10000)
contador.value

# Chamando sua função dentro de um dataframe
salarios.foreach(lambda row : contadorFunc(row.salary))
contador.value

### Obrigado!
Quer construir uma carreira em Data Science? Acesse meu blog pessoal em https://www.hackinganalytics.com/