<a href="https://colab.research.google.com/github/lmatospereira/spark101/blob/main/spark101.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# O que é Apache Spark ?

O Spark é um framework para processamento paralelo de dados em cluster de computadores.

O que está definido em seu site oficial:

>é um mecanismo multi-linguagem para executar engenharia de dados, ciência de dados, e machine learning em máquinas ou clusters de nó-único
Apache Foudation

O Spark tem diversos componentes para diferentes tipos de processamentos, todos construídos sobre o Spark Core, que é o componente que disponibiliza as funções básicas para o processamento como as funções map, reduce, filter e collect.

As que mais se destacam são:

* Spark SQL - é o módulo para trablahar com dados estruturados
* Spark Streamminng - é o modulo para processamento de dados em tempo real;
* GrpahX -  é a API para processamento de grafos em paralelo;
* Mlib - é a blibioteca de aprendizado de máquina escalável

<p align="center">
<img src="https://github.com/lmatospereira/spark101/blob/main/spark101_img/componetes_spark.png?raw=true"> 
</p>
</br>

* sql: execute a Spark SQL query
* catalog: entry point for the Catalog API for managing tables
* read: function to read data from a file or other data source
* conf: object to manage Spark configuration settings
* sparkContext: entry point for core Spark API

# Arquitetura do spark

* **Driver Program**: este é o entry point do Spark. É onde o Spark Context é criado e é onde se define o fluxo de execução, bem como o RDD e o que deve ser executado em paralelo pelos Executores.
* **Spark Context**: Estabelece configurações de memória e processamento dos Workers Nodes. Além disso é capaz de conectar com os diferentes tipos de Cluster Manager (além do próprio Spark Cluster Manager) como Apache Mesos ou Yarn do Hadoop.
* **Cluster Manager**: esse é responsável por agendar e alocar um recurso computacional através do cluster.
Worker Node: é uma máquina que contém executores e recebe as tasks do Driver Program.
* **Executor**: é o responsável por executar as tasks.
* **Task**: é qualquer conjunto de operações (select, joing, filter, algorítimo de machine learning, e.t.c) sobre os dados.
<p align="center">
<img src="https://github.com/lmatospereira/spark101/blob/main/spark101_img/arquitetura_spark.jpg?raw=true"> 
</p>


##Lazy Evaluation

Lazy evaluation significa que o Spark espera até o último momento (action) para executar a sua DAG (grafo acíclico de instruções com o passo a passo). Temos dois tipos de operações que são as Actions e as Transformations.

Transformation: é a operação que o RDD retorna outro RDD. Por exemplo: filtro, criação e drop de coluna, alteração de valores de uma coluna já existente.

Action: é a operação que retorna um objeto que não seja um RDD. Por exemplo: count, show, collect etc.



<!-- Referencias -->
[Apache Spark](https://spark.apache.org/)

[DevMedia](https://www.devmedia.com.br/introducao-ao-apache-spark/34178)

[Apache Spark](https://spark.apache.org/)

[Apache Spark](https://spark.apache.org/)

[Apache Spark](https://spark.apache.org/)


1. Primeiro vamos precisar instalar uma JDK.
2. Baixar e descompactar os arquivos do spark.
3. Instalar o pacote findspark.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

Realizando o import e criando e preparando as variáveis de ambiente necessárias.

Com as variáveis definidas, podemos utilizar o findspark que vai permitir a importação dos pacotes necessários para utilizar o PySpark.

In [None]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

findspark.init()

Criando a sessão do spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, Row

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Spark 101') \
    .config('spark.ui.port', '4050') \
    .getOrCreate()
spark

Baixando o arquivo de exemplo

In [None]:
!wget -nc -q https://raw.githubusercontent.com/lmatospereira/spark101/main/beneficio.txt
!ls -1

beneficio.txt
sample_data
spark-3.1.2-bin-hadoop2.7
spark-3.1.2-bin-hadoop2.7.tgz


In [None]:
%%writefile users.json
{"name":"Alice", "pcode":"94304"}
{"name":"Brayden", "age":30, "pcode":"94304"}
{"name":"Carla", "age":19, "pcode":"10036"}
{"name":"Diana", "age":46}
{"name":"Étienne", "pcode":"94104"}

Overwriting users.json


In [None]:
%%writefile cod_sexo.json
{"name":"Masculino", "code":"1"}
{"name":"Feminimo", "code":"2"}

Writing cod_sexo.json


#Criando um data Frame

##DataFrames (Conjuntos de dados de Row objects)

* O modelo dos DataFrames é semelhante a uma tabela de um RDBMS eles são representado de forma tabular.
* Transformações são não tipadas
  * Linhas podem conter elementos de qualquer tipo
  * Schemas definidos não são aplicados os tipos de coluna até o tempo de execução

In [None]:
df_user = spark.read.json("users.json")

df_cod_sexo = spark.read.json("cod_sexo.json")

#file = 'CupomJurosTesouroDireto.csv'
file = 'beneficio.txt'
df_beneficio = spark.read.csv(
    file,
    header=True,
    sep = ","
)

##Especificando a estrura do squema
Alguns tipos de dados facilitam a inferência do esquema.

Muitas vezes tem que definir o esquema você mesmo.

Spark tem ferramentas para ajudar a especificar a estrutura
em seguida, precisamos criar a lista de campos de estrutura

|Exemplo|
|---|
|:param name: string, nome do campo|
|:param dataType: :class:DataType do campo|
|:param nullable: boolean, se o campo pode ser nulo (Nenhum)|

[sql-ref-datatypes](https://spark.apache.org/docs/latest/sql-ref-datatypes.html)



In [None]:
columns_list = [
    StructField("ano", StringType()),
    StructField("tipo_beneficio", StringType()),
    StructField("grupo_idade", StringType()),
    StructField("sexo", StringType()),
    StructField("tipo_cliente", StringType()),
    StructField("qtd_beneficio", IntegerType()),
    StructField("grupo_especie", StringType())
]

new_schema = StructType(columns_list)

df2_beneficio = spark.read.csv(
    file,
    schema=new_schema,
    header=True,
    sep = ","
)

Schema = Row("id","setor")
dados_teste = [
    Schema(1, "vendas"),
    Schema(2,"TI"),
    Schema(3,'RH')
]

df_teste = spark.createDataFrame(
    data=dados_teste
)


print(f'Schema do dataframe: df_beneficio')
df_beneficio.printSchema()

print(f'Schema do dataframe: df2_beneficio')
df2_beneficio.printSchema()

print(f'Schema do dataframe: df_teste')
df_teste.printSchema()

Schema do dataframe: df_beneficio
root
 |-- Ano: string (nullable = true)
 |-- Especie de Beneficio: string (nullable = true)
 |-- Grupos de Idade Cessacao: string (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Clientela: string (nullable = true)
 |-- Quantidade Beneficios Cessados: string (nullable = true)
 |-- Grupo/Principais Especies: string (nullable = true)

Schema do dataframe: df2_beneficio
root
 |-- ano: string (nullable = true)
 |-- tipo_beneficio: string (nullable = true)
 |-- grupo_idade: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- tipo_cliente: string (nullable = true)
 |-- qtd_beneficio: integer (nullable = true)
 |-- grupo_especie: string (nullable = true)

Schema do dataframe: df_teste
root
 |-- id: long (nullable = true)
 |-- setor: string (nullable = true)



#Existem dois tipos principais de operações DataFrame

##Ações com Data Frame (action)
* Ações de saída de valores de dados do DataFrame
  * A saída é normalmente retornada dos executores para o Spark principal
programa (chamado de driver) ou salvo em um arquivo

In [None]:
print(f"count: retorna o número de linhas​")
df_beneficio.count()

count: retorna o número de linhas​


42031

In [None]:
print(f"first: retorna a primeira linha (sinônimo de head())")
df_beneficio.first()

In [None]:
print(f"take(n): retorna as primeiras n linhas como um array (sinônimo de head(n))")
df_beneficio.take(1)

In [None]:
print(f"collect: retorna todas as linhas do DataFrame como um array​")
df_beneficio.collect()[0]

collect: retorna todas as linhas do DataFrame como um array​


Row(Ano='1997', Especie de Beneficio='Ap Tempo Contribuição Det Ignorado', Grupos de Idade Cessacao='25 a 29 Anos', Sexo='Masculino', Clientela='Urbana', Quantidade Beneficios Cessados='0', Grupo/Principais Especies='Ap Tempo Contrib Det Ignorado')

In [None]:
print(f"show(n): exibe as primeiras n linhas em forma de tabela (o padrão é 20 linhas)​")
df_beneficio.show(3)

show(n): exibe as primeiras n linhas em forma de tabela (o padrão é 20 linhas)​
+----+--------------------+------------------------+---------+---------+------------------------------+-------------------------+
| Ano|Especie de Beneficio|Grupos de Idade Cessacao|     Sexo|Clientela|Quantidade Beneficios Cessados|Grupo/Principais Especies|
+----+--------------------+------------------------+---------+---------+------------------------------+-------------------------+
|1997|Ap Tempo Contribu...|            25 a 29 Anos|Masculino|   Urbana|                             0|     Ap Tempo Contrib ...|
|1997|Ap Tempo Contribu...|            25 a 29 Anos| Feminino|   Urbana|                             0|     Ap Tempo Contrib ...|
|1997|Ap Tempo Contribu...|            25 a 29 Anos| Ignorado|   Urbana|                             0|     Ap Tempo Contrib ...|
+----+--------------------+------------------------+---------+---------+------------------------------+-------------------------+
only showi

#Transformations

* As transformações criam um novo DataFrame baseado em um existente
  * O novo DataFrame pode ter o mesmo esquema ou um diferente
* As transformações não retornam nenhum valor ou dado ao driver
  * Os dados permanecem distribuídos entre os executores do aplicativo  
* DataFrames são imutáveis
  * Os dados em um DataFrame nunca são modificados
  * Use transformações para criar um novo DataFrame com os dados que você precisa

In [None]:
print(f"select: seleciona apenas as colunas especificadas são incluídas")
df = df_beneficio.select('Ano')

df.show(1)

df = df_beneficio.select('Sexo')
df.show(1)


select: seleciona apenas as colunas especificadas são incluídas
+----+
| Ano|
+----+
|1997|
+----+
only showing top 1 row

+---------+
|     Sexo|
+---------+
|Masculino|
+---------+
only showing top 1 row



In [None]:
print(f"where: Filtra apenas as linhas em que a expressão especificada é verdadeira são incluídas (sinônimo de filter)")
df_beneficio.where('Ano > 1997').show(3)
df_beneficio.where("Sexo == 'Masculino'").show(3)


where: Filtra apenas as linhas em que a expressão especificada é verdadeira são incluídas (sinônimo de filter)
+----+--------------------+------------------------+---------+---------+------------------------------+-------------------------+
| Ano|Especie de Beneficio|Grupos de Idade Cessacao|     Sexo|Clientela|Quantidade Beneficios Cessados|Grupo/Principais Especies|
+----+--------------------+------------------------+---------+---------+------------------------------+-------------------------+
|1998|Ap Tempo Contribu...|            25 a 29 Anos|Masculino|   Urbana|                             0|     Ap Tempo Contrib ...|
|1998|Ap Tempo Contribu...|            25 a 29 Anos| Feminino|   Urbana|                             0|     Ap Tempo Contrib ...|
|1998|Ap Tempo Contribu...|            25 a 29 Anos| Ignorado|   Urbana|                             0|     Ap Tempo Contrib ...|
+----+--------------------+------------------------+---------+---------+------------------------------+------

In [None]:
print(f"orderBy: Ordena as  as linhas são classificadas pela(s) coluna(s) especificada(s) (sinônimo de sort)")
df_beneficio.orderBy('Ano').show(3)

orderBy: Ordena as  as linhas são classificadas pela(s) coluna(s) especificada(s) (sinônimo de sort)
+----+--------------------+------------------------+---------+---------+------------------------------+-------------------------+
| Ano|Especie de Beneficio|Grupos de Idade Cessacao|     Sexo|Clientela|Quantidade Beneficios Cessados|Grupo/Principais Especies|
+----+--------------------+------------------------+---------+---------+------------------------------+-------------------------+
|   -|                   -|                       -|        -|        -|                             -|                     null|
|1991|Acidentários Det ...|             Até 19 Anos|Masculino|    Rural|                            59|     Benefícios Aciden...|
|1991|Acidentários Det ...|             Até 19 Anos| Feminino|    Rural|                             4|     Benefícios Aciden...|
+----+--------------------+------------------------+---------+---------+------------------------------+----------------

In [40]:
print(f"join: junta dois DataFrames na(s) coluna(s) especificada(s)")

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

deptColumns = ["dept_name","dept_id"]
empDF = spark.createDataFrame(data=emp, schema = empColumns)
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)

df = empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner")

df.show(10)

join: junta dois DataFrames na(s) coluna(s) especificada(s)
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [41]:
print(f"limit(n): cria um novo DataFrame com apenas as primeiras n linhas")


df = df_beneficio.limit(10)

df.count()

limit(n): cria um novo DataFrame com apenas as primeiras n linhas


10