# Primeiros passos com PySpark

Neste Objeto de Aprendizagem daremos nossos primeiros passos com o PySpark e Spark Dataframes. O objetivo aqui √© conhecer os principais objetos do PySpark e introduzir os m√©todos mais b√°sicos para familiarizar com a tecnologia.

Come√ßaremos pela importa√ß√£o do pacote do PySpark que engloba as opera√ß√µes com DataFrames e ent√£o criaremos um pequeno DataFrame que ser√° utilizado nos exemplos.

### Bibliotecas necess√°rias

Por enquanto precisaremos somente do m√≥dulo `pyspark.sql`. O pacote PySpark possui diversos m√≥dulos, mas por enquanto precisaremos somente dos objetos que est√£o em `pyspark.sql`.

In [3]:
# instalar as depend√™ncias
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

# configurar as vari√°veis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "import√°vel"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [5]:
# Uso do Spark Dataframes no PySpark
from pyspark.sql import *

### Conectando com o Spark

O pr√≥ximo passo √© iniciar uma sess√£o do Spark (`SparkSession`), cujo papel √© o de comunica√ß√£o com o Cluster. No exemplo abaixo criaremos uma sess√£o local - ou seja, com um minicluster na sua pr√≥pria m√°quina. Esta sess√£o local √© definida por meio do m√©todo `master`. O m√©todo `master` indica qual o tipo de Cluster onde conectaremos e outros detalhes. 

No nosso caso indicamos que o tipo de Cluster √© local e que este utilizaremos 2 processadores para execu√ß√£o das tarefas do Spark. Percebam que mesmo em modo local temos √† nossa disposi√ß√£o a capacidade de processamento _multicore_.

**IMPORTANTE**: No ambiente da **Databricks** n√£o precisamos criar uma Sess√£o pois o notebook ser√° vinculado (_attached_) a um cluster no momento da execu√ß√£o. Logo, o bloco de c√≥digo abaixo n√£o ser√° necess√°rio **na Databricks**.

In [None]:
# Vamos trabalhar com o Spark localmente, sem o uso de um cluster.
#spark = SparkSession \
#    .builder \
#    .master("local[2]") \
#    .appName("Primeiros passos") \
#    .getOrCreate()

## Cria√ß√£o de um Data Frame via c√≥digo

Nesta sess√£o criaremos um DataFrame diretamente via c√≥digo. Esta n√£o √© uma pr√°tica muito comum, visto que trabalhamos com grandes volumes de dados obtidos atrav√©s de Sistemas de Armazenamento. 

Mas n√£o pensem que a cria√ß√£o de um DataFrame via c√≥digo s√≥ serve para exemplos! Ainda veremos casos onde esta pr√°tica ajudar√° na resolu√ß√£o de problemas!

### Exemplo Pr√°tico

Nosso exemplo pr√°tico utiliza a rela√ß√£o de disciplinas que comp√µe esta especializa√ß√£o e suas cargas hor√°rias!

Os dados para este exemplo foram obtidos **manualmente** da p√°gina de Estrutura Curricular do curso, dispon√≠vel [aqui](http://www.unisinos.br/especializacao/big-data-data-science-e-data-analytics/ead/sao-leopoldo/estrutura-curricular).

#### Defini√ß√£o da estrutura: Registros e Colunas

Para criar um DataFrame por c√≥digo precisamos inicialmente definir sua estrutura. A linha de c√≥digo abaixo define que nosso DataFrame ser√° formado por disciplinas, onde cada registro (**Row**) ser√° uma disciplina. Os atributos de uma disciplina dispon√≠veis ser√£o o _nome_ e a _carga hor√°ria_.

In [6]:
# Estrutura do nosso DataFrame
Disciplina = Row("nome", "carga_horaria")

#### Cria√ß√£o de inst√¢ncias (registros e atributos)

Nosso pr√≥ximo passo √© a cria√ß√£o de inst√¢ncias para popular o DataFrame. Usaremos a estrutura `Disciplina` rec√©m criada para instanciar cada uma das disciplinas da especializa√ß√£o e sua carga hor√°ria. 

Neste exemplo foi criada uma refer√™ncia (`d01` a `d14`) por disciplina para deixar o c√≥digo mais claro. No passo seguinte criaremos uma lista que agrupar√° todas as disciplinas e servir√° de fonte para envio dos dados ao Spark.

In [7]:
# Cada uma das disciplinas da especializa√ß√£o √© criada como uma inst√¢ncia do registro Disciplina.

d01 = Disciplina("Introdu√ß√£o a BigData e Analytics", 36)
d02 = Disciplina("Estat√≠stica aplicada", 24)
d03 = Disciplina("Visualiza√ß√£o de dados e informa√ß√£o", 24)
d04 = Disciplina("Compartilhamento e seguran√ßa de dados", 24)
d05 = Disciplina("Introdu√ß√£o a Python e linguagem R", 36)
d06 = Disciplina("Machine Learning", 24)
d07 = Disciplina("Processamento de Alto Desempenho e Aplica√ß√µes", 24)
d08 = Disciplina("Lidando com BigData: Apache Spark, Hadoop, MapReduce, Hive", 24)
d09 = Disciplina("Gerenciamento e Processamento de grande volume de dados", 24)
d10 = Disciplina("Internet das Coisas e Aplica√ß√µes Distribu√≠das", 24)
d11 = Disciplina("Deep Learning", 24)
d12 = Disciplina("Business Intelligence e BigData", 24)
d13 = Disciplina("Atividades Integradoras", 12)
d14 = Disciplina("Prepara√ß√£o para Projeto Aplicado", 36)

Por meio da fun√ß√£o `display` temos uma pr√©via do que ser√° nosso DataFrame! Atentem para o fato de que at√© aqui nossos dados est√£o no Python e n√£o no Spark. Ainda n√£o temos um DataFrame!

No ambiente **Databricks** o resultado da fun√ß√£o `display` ser√° apresentado de forma mais amig√°vel pois o mecanismo de notebooks do ambiente est√° preparado para formata√ß√£o dos objetos Row do Spark. L√° a visualiza√ß√£o da lista `Row` e posteriormente do DataFrame ser√£o muito parecidas!

In [8]:
especializacao_bigdata_datascience = [d01, d02, d03, d04, d05, d06, d07, d08, d09, d10, d11, d12, d13, d14]

display(especializacao_bigdata_datascience)

[Row(nome='Introdu√ß√£o a BigData e Analytics', carga_horaria=36),
 Row(nome='Estat√≠stica aplicada', carga_horaria=24),
 Row(nome='Visualiza√ß√£o de dados e informa√ß√£o', carga_horaria=24),
 Row(nome='Compartilhamento e seguran√ßa de dados', carga_horaria=24),
 Row(nome='Introdu√ß√£o a Python e linguagem R', carga_horaria=36),
 Row(nome='Machine Learning', carga_horaria=24),
 Row(nome='Processamento de Alto Desempenho e Aplica√ß√µes', carga_horaria=24),
 Row(nome='Lidando com BigData: Apache Spark, Hadoop, MapReduce, Hive', carga_horaria=24),
 Row(nome='Gerenciamento e Processamento de grande volume de dados', carga_horaria=24),
 Row(nome='Internet das Coisas e Aplica√ß√µes Distribu√≠das', carga_horaria=24),
 Row(nome='Deep Learning', carga_horaria=24),
 Row(nome='Business Intelligence e BigData', carga_horaria=24),
 Row(nome='Atividades Integradoras', carga_horaria=12),
 Row(nome='Prepara√ß√£o para Projeto Aplicado', carga_horaria=36)]

#### Cria√ß√£o do DataFrame por meio da transfer√™ncia dos dados da lista

Lembram que mais acima eu descrevi o `SparkSession` como o canal de comunica√ß√£o com o Cluster? Pois bem, agora veremos na pr√°tica o que isso significa. Nossa sess√£o possibilita a cria√ß√£o de um DataFrame pelo m√©todo `createDataFrame`. Este m√©todo:
- envia a lista de objetos `Row` para o Cluster
- cria uma estrutra de DataFrame no Cluster
- popula o DataFrame com os objetos `Row` recebidos
- retorna a refer√™ncia ao DataFrame para o Python

In [13]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()
df_especializacao = spark.createDataFrame(especializacao_bigdata_datascience)

Voltaremos a usar a fun√ß√£o `display`, desta vez para inspecionar o conte√∫do da refer√™ncia ao DataFrame que o m√©todo `createDataFrame` retornou para n√≥s. Aqui percebemos que se trata de um DataFrame, e que ele possui duas colunas:

- nome: string
- carga_horaria: bigint

**Importante**: Na **Databricks** a fun√ß√£o `display` exibe o conte√∫do do nosso DataFrame de forma bastante similar a quando usamos `display` para visualizar o conte√∫do da lista de objetos `Row`.

In [14]:
display(df_especializacao)

DataFrame[nome: string, carga_horaria: bigint]

### Visualiza√ß√£o de dados de um DataFrame

#### M√©todo `show`

O m√©todo `show` exibe registros do DataFrame formatados em modo texto. Se a chamada ao m√©todo for sem nenhum par√¢metro ele retornar√° uma tabela com os nomes das coluns em cabe√ßalho, registros at√© um m√°ximo de 20 linhas e os valores das colunas de tipo String (texto) ser√£o exibidos at√© um m√°ximo de 20 caracteres.

A documenta√ß√£o do m√©todo `show` ([link](http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show)) detalha os seguintes par√¢metros:

- **n** ‚Äì N√∫mero de registros a exibir. Se quisermos uma quantidade diferente de 20 registros ent√£o devemos informar a quantidade neste par√¢metro.
- **truncate** ‚Äì Se 20 caracteres for pouco (e no nosso exemplo vimos que √© pouco) ent√£o devemos informar quantos caracteres das colunas String devem ser mostrados. Se o DataFrame tiver muitas colunas do tipo String a visualiza√ß√£o pode ficar dif√≠cil.
- **vertical** ‚Äì Se for False (padr√£o), exibe em formato de tabela. Se for True, exibir√° cada coluna em uma linha, em formato de lista de valores.

In [15]:
df_especializacao.show()

+--------------------+-------------+
|                nome|carga_horaria|
+--------------------+-------------+
|Introdu√ß√£o a BigD...|           36|
|Estat√≠stica aplicada|           24|
|Visualiza√ß√£o de d...|           24|
|Compartilhamento ...|           24|
|Introdu√ß√£o a Pyth...|           36|
|    Machine Learning|           24|
|Processamento de ...|           24|
|Lidando com BigDa...|           24|
|Gerenciamento e P...|           24|
|Internet das Cois...|           24|
|       Deep Learning|           24|
|Business Intellig...|           24|
|Atividades Integr...|           12|
|Prepara√ß√£o para P...|           36|
+--------------------+-------------+



In [16]:
# Lista de registros, exibindo os primeiros 60 caracteres de cada nome.
df_especializacao.show(vertical=True, truncate=60)

-RECORD 0-------------------------------------------------------------------
 nome          | Introdu√ß√£o a BigData e Analytics                           
 carga_horaria | 36                                                         
-RECORD 1-------------------------------------------------------------------
 nome          | Estat√≠stica aplicada                                       
 carga_horaria | 24                                                         
-RECORD 2-------------------------------------------------------------------
 nome          | Visualiza√ß√£o de dados e informa√ß√£o                         
 carga_horaria | 24                                                         
-RECORD 3-------------------------------------------------------------------
 nome          | Compartilhamento e seguran√ßa de dados                      
 carga_horaria | 24                                                         
-RECORD 4-----------------------------------------------------------

In [17]:
# Somente 5 registros
df_especializacao.show(n=3, truncate=60)

+----------------------------------+-------------+
|                              nome|carga_horaria|
+----------------------------------+-------------+
|  Introdu√ß√£o a BigData e Analytics|           36|
|              Estat√≠stica aplicada|           24|
|Visualiza√ß√£o de dados e informa√ß√£o|           24|
+----------------------------------+-------------+
only showing top 3 rows



#### M√©todos `describe` e `summary`

O m√©todo `describe` computa estat√≠sticas descritivas b√°sicas nas colunas num√©ricas e textuais. √â utilizado em conjunto com o m√©todo `show` para exibi√ß√£o do resultado.

**Aten√ß√£o**: Esta opera√ß√£o pode ser bastante demorada em um DataFrame de maior volume. O motivo ficar√° claro ao longo da disciplina.

In [18]:
df_especializacao.describe().show(truncate=60)

+-------+----------------------------------+------------------+
|summary|                              nome|     carga_horaria|
+-------+----------------------------------+------------------+
|  count|                                14|                14|
|   mean|                              null|25.714285714285715|
| stddev|                              null|6.4142698058981855|
|    min|           Atividades Integradoras|                12|
|    max|Visualiza√ß√£o de dados e informa√ß√£o|                36|
+-------+----------------------------------+------------------+



J√° o m√©todo `summary` computa algumas estat√≠sticas a mais, os quantis. Sem informar par√¢metros, summary ir√° calcular os quantis 25%, 50% (mediana) e 75%. O par√¢metro de `summary` possiblita escolher quais estat√≠sticas ser√£o calculadas.

As estat√≠sticas dispon√≠veis est√£o descritas na documenta√ß√£o do m√©todo ([link](http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.summary)).

**O mesmo alerta e tempo de processamento segue v√°lido**

In [19]:
df_especializacao.summary().show(truncate=60)

+-------+----------------------------------+------------------+
|summary|                              nome|     carga_horaria|
+-------+----------------------------------+------------------+
|  count|                                14|                14|
|   mean|                              null|25.714285714285715|
| stddev|                              null|6.4142698058981855|
|    min|           Atividades Integradoras|                12|
|    25%|                              null|                24|
|    50%|                              null|                24|
|    75%|                              null|                24|
|    max|Visualiza√ß√£o de dados e informa√ß√£o|                36|
+-------+----------------------------------+------------------+



In [20]:
df_especializacao.summary("count", "mean", "10%", "50%", "90%").show()

+-------+----+------------------+
|summary|nome|     carga_horaria|
+-------+----+------------------+
|  count|  14|                14|
|   mean|null|25.714285714285715|
|    10%|null|                24|
|    50%|null|                24|
|    90%|null|                36|
+-------+----+------------------+



#### M√©todo `columns`

Retorna uma lista com os nomes das colunas do DataFrame.

In [21]:
df_especializacao.columns

['nome', 'carga_horaria']

#### M√©todo count

Retorna a quantidade de registros de um DataFrame.

**Aten√ß√£o**: Por mais que n√£o pare√ßa intuitivo, este opera√ß√£o pode ser bastante demorada em um DataFrame de maior volume, e novamente digo que o motivo ficar√° claro ao longo da disciplina!

In [22]:
df_especializacao.count()

14

## O caminho contr√°rio

Da mesma forma como conseguimos enviar dados do Python para o Spark (üêç‚û°Ô∏èüí•) podemos tamb√©m trazer dados do Spark  para o Python (üêç‚¨ÖÔ∏èüí•).

**Mas antes temos que conversar sobre volumes de dados.**

> Neste Objeto de Aprendizagem estamos trabalhando com pequenos volumes de dados em ambiente local, ent√£o a transfer√™ncia de dados n√£o causar√° dores de cabe√ßa. No entanto, considerem o cen√°rio real de lidar com grandes volumes de dados em um cluster, em ordem de grandeza maior do que sua m√°quina √© capaz de armazenar em mem√≥ria. Pense em Terabytes (TB) de dados. Tentar transferir este volume de dados do cluster para sua m√°quina ser√° um desastre.

Na pr√°tica, a transfer√™ncia de DataFrames do Spark para o Python √© feita ap√≥s algum processamento dos dados no Spark. Este processamento pode ser:
- sumariza√ß√£o de dados (estat√≠sticas descritivas, agrupamentos)
- a sele√ß√£o e filtro de um subconjunto de dados
- amostragem
- etc.

E uma justificativa para transfer√™ncias deste tipo √© a necessidade de uso de recursos que n√£o est√£o dispon√≠veis no Spark. E mesmo assim temos formas de enviar recursos do Python para uso no Spark (faremos isso em outra oportunidade).

#### M√©todos `head`, `first` e `take`

O m√©todo `head` retorna o **n** primeiros registros de um DataFrame, retornando somente 1 registro se o par√¢metro **n** n√£o for especificado.

Uma pegadinha: Se n√£o especificar o par√¢metro, o objeto de retorno √© o primeiro registro, de tipo `Row`. No entanto, se especificar **n=1** o retorno ser√° de tipo `list` com o objeto `Row` dentro da lista. `head` sem par√¢metros √© equivalente ao m√©todo `first`.

`take` √© bastante similar a `head`, por√©m com par√¢metro **num** obrigat√≥rio.

Apesar da aparente confus√£o, pense que `head` √© uma combina√ß√£o de `first` e `take`:

- `head` sem par√¢metro equivale a `first`
- `head` com par√¢metro equivale a `take`

In [23]:
um = df_especializacao.head()
lum = df_especializacao.head(n=5)

print((um, type(um)))
print((lum, type(lum)))

(Row(nome='Introdu√ß√£o a BigData e Analytics', carga_horaria=36), <class 'pyspark.sql.types.Row'>)
([Row(nome='Introdu√ß√£o a BigData e Analytics', carga_horaria=36), Row(nome='Estat√≠stica aplicada', carga_horaria=24), Row(nome='Visualiza√ß√£o de dados e informa√ß√£o', carga_horaria=24), Row(nome='Compartilhamento e seguran√ßa de dados', carga_horaria=24), Row(nome='Introdu√ß√£o a Python e linguagem R', carga_horaria=36)], <class 'list'>)


In [24]:
df_especializacao.first()

Row(nome='Introdu√ß√£o a BigData e Analytics', carga_horaria=36)

In [25]:
df_especializacao.take(num=14)

[Row(nome='Introdu√ß√£o a BigData e Analytics', carga_horaria=36),
 Row(nome='Estat√≠stica aplicada', carga_horaria=24),
 Row(nome='Visualiza√ß√£o de dados e informa√ß√£o', carga_horaria=24),
 Row(nome='Compartilhamento e seguran√ßa de dados', carga_horaria=24),
 Row(nome='Introdu√ß√£o a Python e linguagem R', carga_horaria=36),
 Row(nome='Machine Learning', carga_horaria=24),
 Row(nome='Processamento de Alto Desempenho e Aplica√ß√µes', carga_horaria=24),
 Row(nome='Lidando com BigData: Apache Spark, Hadoop, MapReduce, Hive', carga_horaria=24),
 Row(nome='Gerenciamento e Processamento de grande volume de dados', carga_horaria=24),
 Row(nome='Internet das Coisas e Aplica√ß√µes Distribu√≠das', carga_horaria=24),
 Row(nome='Deep Learning', carga_horaria=24),
 Row(nome='Business Intelligence e BigData', carga_horaria=24),
 Row(nome='Atividades Integradoras', carga_horaria=12),
 Row(nome='Prepara√ß√£o para Projeto Aplicado', carga_horaria=36)]

#### M√©todo `collect`

Este m√©todo retorna **todos** os registros do DataFrame. 

**Cuidado** ao usar este m√©todo com grandes volumes de dados.

In [26]:
df_especializacao.collect()

[Row(nome='Introdu√ß√£o a BigData e Analytics', carga_horaria=36),
 Row(nome='Estat√≠stica aplicada', carga_horaria=24),
 Row(nome='Visualiza√ß√£o de dados e informa√ß√£o', carga_horaria=24),
 Row(nome='Compartilhamento e seguran√ßa de dados', carga_horaria=24),
 Row(nome='Introdu√ß√£o a Python e linguagem R', carga_horaria=36),
 Row(nome='Machine Learning', carga_horaria=24),
 Row(nome='Processamento de Alto Desempenho e Aplica√ß√µes', carga_horaria=24),
 Row(nome='Lidando com BigData: Apache Spark, Hadoop, MapReduce, Hive', carga_horaria=24),
 Row(nome='Gerenciamento e Processamento de grande volume de dados', carga_horaria=24),
 Row(nome='Internet das Coisas e Aplica√ß√µes Distribu√≠das', carga_horaria=24),
 Row(nome='Deep Learning', carga_horaria=24),
 Row(nome='Business Intelligence e BigData', carga_horaria=24),
 Row(nome='Atividades Integradoras', carga_horaria=12),
 Row(nome='Prepara√ß√£o para Projeto Aplicado', carga_horaria=36)]

## Finalizando a sess√£o

Em muitos casos de uso o Cluster √© um ambiente compartilhado e de recursos finitos. Ao concluir o uso de uma sess√£o do Spark sempre √© recomendado finaliz√°-la para liberar os recursos alocados nesta sess√£o.

In [28]:
spark.stop()