## Instalando o ambiente

### Opção 1 - Docker

O jeito mais simples de começar a trabalhar com Spark é instalar um container com tudo pronto! No site https://hub.docker.com/r/jupyter/pyspark-notebook vemos uma imagem Docker que já vem com `pyspark` e `jupyter lab`. Instale a imagem com o comando:

```bash
docker pull jupyter/pyspark-notebook
```

 
Vamos iniciar o ambiente de trabalho com o comando `docker run`. Para isso precisamos tomar alguns cuidados:

1) Temos que mapear nosso diretorio local de trabalho para um diretório interno do container, de modo que alterações feitas dentro do container (nesta pasta escolhida) sejam gravadas no nosso diretorio local. No container temos um usuário padrão com *username* `jovyan`. No *homedir* desse usuario temos uma pasta vazia `work`, que vai servir como local de mapeamento do nosso diretorio local de trabalho. Podemos então fazer esse mapeamendo com a opção `-v` do comando `docker run` da seguinte forma:

```bash
-v <diretorio>:/home/jovyan/work
```

onde `<diretorio>` representa seu diretorio local de trabalho.

2) Para acessar o `jupyter notebook` e o *dashboard* do Spark a partir do nosso *browser* favorito temos que abrir algumas portas do container com a opção `-p`. As portas são `8888` (para o próprio `jupyter notebook`) e `4040` (para o *dashboard* do Spark). Ou seja, adicionaremos às opções do `docker run`o seguinte:

```bash
-p 8888:8888 -p 4040:4040
```

Desta forma, ao acessar `localhost:8888` na nossa máquina, estaremos acessando o servidor Jupyter na porta 8888 interna do container.

3) Vamos iniciar o container no modo interativo, e vamos especificar que o container deve ser encerrado ao fechar o servidor Jupyter. Faremos isso com as opções `-it` e `-rm`

Antes de executar, garanta que as portas 4040 e 8888 estão livres (sem jupyter já executando) ou altere o comando. Ainda, esteja na pasta da aula ao executar, assim apenas ela será exposta ao container.

Portanto, o comando completo que eu uso na minha máquina Linux para iniciar o container é:

```bash
docker run \
    -it \
    --rm \
    -p 8888:8888 \
    -p 4040:4040 \
    -v "`pwd`":/home/jovyan/work \
    jupyter/pyspark-notebook

```

Se estiver no Windows estes comandos, utilize:

- No Powershell: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v ${PWD}:/home/jovyan/work jupyter/pyspark-notebook`

- No Prompt de comando: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v %cd%:/home/jovyan/work jupyter/pyspark-notebook`

Para facilitar a vida eu coloco esse comando em um arquivo `inicia.sh`. Engenheiros, façam do jeito que preferirem!

Agora abra esse notebook lá no container utilizando o link **com o token** que é exibido ao executar o comando!

**Importante:** Se você já estiver visualizando esse notebook via Jupyter, pode ser necessário encerrar o processo para que o comando acima funcione.

Vamos agora configurar o insperautograder para que funcione dentro do container do Docker. Primeiro, instale e importe a biblioteca:

In [23]:
#  !pip install git+https://github.com/macielcalebe/insperautograding.git

import insperautograder.jupyter as ia
from dotenv import load_dotenv

Copie o arquivo .env para dentro da pasta que está vinculada ao container e veja se está funcionando:

In [24]:
load_dotenv()
ia.tasks()

|    | Atividade            | De                  | Até                 | Conta como ATV?   | % Nota Atraso   |
|---:|:---------------------|:--------------------|:--------------------|:------------------|:----------------|
|  0 | newborn              | 2025-02-01 00:00:00 | 2025-05-30 00:00:00 | Não               | 0%              |
|  1 | select01             | 2025-02-06 16:00:00 | 2025-02-15 23:59:59 | Sim               | 25%             |
|  2 | ddl                  | 2025-02-20 00:00:00 | 2025-02-27 23:59:59 | Sim               | 25%             |
|  3 | dml                  | 2025-02-24 00:00:00 | 2025-03-06 15:45:59 | Sim               | 25%             |
|  4 | agg_join             | 2025-02-26 00:00:00 | 2025-03-09 23:59:59 | Sim               | 25%             |
|  5 | group_having         | 2025-03-06 00:00:00 | 2025-03-13 23:59:59 | Sim               | 25%             |
|  6 | views                | 2025-03-10 00:00:00 | 2025-03-16 23:59:59 | Sim               | 25%             |
|  7 | sql_review1          | 2025-03-13 00:00:00 | 2025-03-20 23:59:59 | Sim               | 25%             |
|  8 | permissions          | 2025-03-20 00:00:00 | 2025-04-07 23:59:59 | Sim               | 25%             |
|  9 | ai_md_23_2           | 2025-03-23 00:00:00 | 2025-03-30 23:59:59 | Sim               | 25%             |
| 10 | ai_md_23_1           | 2025-03-23 00:00:00 | 2025-03-30 23:59:59 | Sim               | 25%             |
| 11 | ai_md_25_1           | 2025-03-31 00:00:00 | 2025-03-31 10:05:00 | Não               | 0%              |
| 12 | desafio_normalizacao | 2025-04-07 00:00:00 | 2025-05-30 00:00:00 | Não               | 0%              |
| 13 | triggers             | 2025-04-24 00:00:00 | 2025-05-04 23:59:59 | Sim               | 25%             |
| 14 | functional           | 2025-05-05 00:00:00 | 2025-05-11 23:59:59 | Sim               | 25%             |
| 15 | spark                | 2025-05-08 00:00:00 | 2025-05-15 23:59:59 | Sim               | 25%             |
| 16 | exercicios_spark     | 2025-05-12 00:00:00 | 2025-05-18 23:59:59 | Sim               | 25%             |

In [25]:
ia.grades(by="task")

|    | Tarefa               |   Nota | Conta como ATV?   |
|---:|:---------------------|-------:|:------------------|
|  0 | newborn              |  10    | Não               |
|  1 | select01             |  10    | Sim               |
|  2 | ddl                  |  10    | Sim               |
|  3 | dml                  |  10    | Sim               |
|  4 | agg_join             |  10    | Sim               |
|  5 | group_having         |  10    | Sim               |
|  6 | views                |  10    | Sim               |
|  7 | sql_review1          |  10    | Sim               |
|  8 | permissions          |  10    | Sim               |
|  9 | ai_md_23_1           |  10    | Sim               |
| 10 | ai_md_23_2           |  10    | Sim               |
| 11 | ai_md_25_1           |   7.6  | Não               |
| 12 | desafio_normalizacao |   6    | Não               |
| 13 | triggers             |   0    | Sim               |
| 14 | functional           |  10    | Sim               |
| 15 | spark                |   5.71 | Sim               |
| 16 | exercicios_spark     |   0    | Sim               |

### Opção 2 - Instalar `pyspark` diretamente

Caso já tenha as dependências em sua máquina, faça:

In [26]:
# !pip install pyspark

## Iniciando o Spark

Vamos iniciar o ambiente Spark. Para isso vamos:

1) Criar um objeto de configuração do ambiente Spark. Nossa configuração será simples: vamos especificar que o nome da nossa aplicação Spark é "Minha aplicação", e que o *master node* é a máquina local, usando todos os *cores* disponíveis. Aplicações reais de Spark são configuradas de modo ligeiramente diferente: ao especificar o *master node* passamos uma URL real, com o endereço do nó gerente do *cluster* Spark.

2) Vamos criar um objeto do tipo `SparkContext` com essa configuração

In [27]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MinhaAplicacao").getOrCreate()
sc = spark.sparkContext

O `SparkContext` é a nossa porta de entrada para o cluster Spark, ele será a raiz de todas as nossas operações com o Spark.

In [28]:
sc

O link acima para a Spark UI provavelmente não funcionará porque ele se refere à porta 4040 interna do container (portanto a URL está com endereço interno). Porém fizemos o mapeamento da porta 4040 interna para a porta 4040 externa, logo você pode acessar o *dashboard* do Spark para monitorar seus *jobs* no endereço http://localhost:4040

<center><img src="./img/spark_dashboard.png" width=800/></center>

Pronto, assim você vai conseguir testar seus programas com facilidade!

## Trabalhando com RDDs

Esse é o jeito "Apache raiz": Resilient Distributed Datasets (RDDs). Este é o principal objeto de processamento do Spark.

Um RDD é criado à partir do objeto `SparkContext`. Por exemplo:

In [29]:
rdd = sc.parallelize([1, 2, 3, 4, 5])

Vamos recuperar dois elementos:

In [30]:
rdd.max(2)

TypeError: 'int' object is not callable

Um RDD também pode ser criado a partir de um dataset (claro!):

In [31]:
rdd = sc.textFile("data/memorias.txt")

Veja que o rdd pode ser particionado

In [32]:
print("Quantidade de partições: ", rdd.getNumPartitions())

Quantidade de partições:  2


Podemos imaginar um RDD como uma coleção de itens, similar a uma grande lista. O que vem em cada item depende do arquivo original de dados. Para arquivos de texto, cada linha é um item.

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html e sobre os RDDs em https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html

### Açoes e Transformações

O objeto `rdd` é repleto de métodos para definir pipelines computacionais. Estes métodos se dividem em *actions* e *transformations*.

*Transformations* são métodos que atuam no RDD e devolvem um "novo" RDD que está conectado ao RDD antigo. Por exemplo, vamos usar a *transformation* `map` para inverter a sequência de letras em cada linha: 

In [33]:
def inverte_linha(linha):
    return linha[::-1]

rdd2 = rdd.map(inverte_linha)
rdd2

PythonRDD[17] at RDD at PythonRDD.scala:53

Se você consultar a Web UI do Spark (http://localhost:4040) vai ver que nada ainda aconteceu. Isto é assim porque o Spark é *lazy*: a computação só acontece quando um resultado de uma sequência de *transformations* é demandado por uma *action*.

As *actions* são métodos que retornam dados para o seu programa. Por exemplo, a *action* `count` retorna o número de itens no RDD, e a *action* `take` permite coletar alguns itens no inicio do RDD, bem útil para debugar:

In [34]:
rdd.count()

8843

Confira a Web UI e veja que agora temos um novo job completo!

Vamos espiar as primeiras 20 linhas do documento original:

In [35]:
rdd.take(20)

["Project Gutenberg's Memorias Postumas de Braz Cubas, by Machado de Assis",
 '',
 'This eBook is for the use of anyone anywhere in the United States and most',
 'other parts of the world at no cost and with almost no restrictions',
 'whatsoever.  You may copy it, give it away or re-use it under the terms of',
 'the Project Gutenberg License included with this eBook or online at',
 "www.gutenberg.org.  If you are not located in the United States, you'll have",
 'to check the laws of the country where you are located before using this ebook.',
 '',
 'Title: Memorias Postumas de Braz Cubas',
 '',
 'Author: Machado de Assis',
 '',
 'Release Date: June 2, 2017 [EBook #54829]',
 '',
 'Language: Portuguese',
 '',
 '',
 '*** START OF THIS PROJECT GUTENBERG EBOOK MEMORIAS POSTUMAS DE BRAZ CUBAS ***',
 '']

E agora as 20 primeiras linhas após a inversão de linha:

In [36]:
rdd2.take(20)

["sissA ed odahcaM yb ,sabuC zarB ed samutsoP sairomeM s'grebnetuG tcejorP",
 '',
 'tsom dna setatS detinU eht ni erehwyna enoyna fo esu eht rof si kooBe sihT',
 'snoitcirtser on tsomla htiw dna tsoc on ta dlrow eht fo strap rehto',
 'fo smret eht rednu ti esu-er ro yawa ti evig ,ti ypoc yam uoY  .reveostahw',
 'ta enilno ro kooBe siht htiw dedulcni esneciL grebnetuG tcejorP eht',
 "evah ll'uoy ,setatS detinU eht ni detacol ton era uoy fI  .gro.grebnetug.www",
 '.koobe siht gnisu erofeb detacol era uoy erehw yrtnuoc eht fo swal eht kcehc ot',
 '',
 'sabuC zarB ed samutsoP sairomeM :eltiT',
 '',
 'sissA ed odahcaM :rohtuA',
 '',
 ']92845# kooBE[ 7102 ,2 enuJ :etaD esaeleR',
 '',
 'eseugutroP :egaugnaL',
 '',
 '',
 '*** SABUC ZARB ED SAMUTSOP SAIROMEM KOOBE GREBNETUG TCEJORP SIHT FO TRATS ***',
 '']

Vamos discutir algumas açoes e transformações uteis.

#### `map`

A transformação `map` recebe uma função e aplica esta função a cada item do RDD. A função deve receber um item e retornar um unico item. Como exemplo temos o uso de `map` acima para inverter a ordem das letras de cada linha.

Note que no `map`, o RDD resultante tem o mesmo numero de elementos do RDD original.

**Exercicio 1**: Crie uma função chamada conta_letras que recebe um rdd e retorna o rdd transformado onde cada linha tenha a contagem de letras da linha recebida. Por exemplo, a linha "abacaxi" deve ser transformada em 7.

In [37]:
# resultado_esperado: [72, 0, 74, 67, 74, 67, 76, 79, 0, 38, 0, 24, 0, 41, 0, 20, 0, 0, 77, 0 ...

def conta_letras(rdd):
    return rdd.map(lambda x: len(x))

conta_letras(rdd).take(20)

[72, 0, 74, 67, 74, 67, 76, 79, 0, 38, 0, 24, 0, 41, 0, 20, 0, 0, 77, 0]

**Atenção:** Caso tenha problemas em rederizar o botão de envio, **teste em outros navegadorres**.

In [38]:
ia.sender(answer="conta_letras", task="spark", question="ex01", answer_type="pycode")

interactive(children=(Button(description='Enviar ex01', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Exercício 2**: Crie uma função chamada transforma_minusculas que recebe um rdd e retorna o rdd transformado onde cada linha tenha todas as letras em minúsculas. Por exemplo, a linha "AbaCaxi HOJE" deve ser transformada em "abacaxi hoje".

In [39]:
def transforma_minusculas(rdd):
    return rdd.map(lambda x: x.lower())
    
transforma_minusculas(rdd).take(20)

["project gutenberg's memorias postumas de braz cubas, by machado de assis",
 '',
 'this ebook is for the use of anyone anywhere in the united states and most',
 'other parts of the world at no cost and with almost no restrictions',
 'whatsoever.  you may copy it, give it away or re-use it under the terms of',
 'the project gutenberg license included with this ebook or online at',
 "www.gutenberg.org.  if you are not located in the united states, you'll have",
 'to check the laws of the country where you are located before using this ebook.',
 '',
 'title: memorias postumas de braz cubas',
 '',
 'author: machado de assis',
 '',
 'release date: june 2, 2017 [ebook #54829]',
 '',
 'language: portuguese',
 '',
 '',
 '*** start of this project gutenberg ebook memorias postumas de braz cubas ***',
 '']

In [40]:
ia.sender(answer="transforma_minusculas", task="spark", question="ex02", answer_type="pycode")

interactive(children=(Button(description='Enviar ex02', style=ButtonStyle()), Output()), _dom_classes=('widget…

#### `flatMap`

Esta transformação é similar ao `map`, porém a função passada para o `flatMap` pode retornar zero ou mais itens. Estes itens então são concatenados e o RDD resultante acaba tendo um número de itens diferente do RDD original.

Por exemplo: suponha que queremos gerar uma lista de palavras do documento. Podemos fazê-lo da seguinte forma:

In [41]:
def separa_palavras(linha):
    return linha.strip().split()

Observe o resultado da aplicação da função `separa_palavras` com `map`:

In [42]:
rdd_map = rdd.map(lambda x: x.lower()).map(separa_palavras)
rdd_map.take(30)

[['project',
  "gutenberg's",
  'memorias',
  'postumas',
  'de',
  'braz',
  'cubas,',
  'by',
  'machado',
  'de',
  'assis'],
 [],
 ['this',
  'ebook',
  'is',
  'for',
  'the',
  'use',
  'of',
  'anyone',
  'anywhere',
  'in',
  'the',
  'united',
  'states',
  'and',
  'most'],
 ['other',
  'parts',
  'of',
  'the',
  'world',
  'at',
  'no',
  'cost',
  'and',
  'with',
  'almost',
  'no',
  'restrictions'],
 ['whatsoever.',
  'you',
  'may',
  'copy',
  'it,',
  'give',
  'it',
  'away',
  'or',
  're-use',
  'it',
  'under',
  'the',
  'terms',
  'of'],
 ['the',
  'project',
  'gutenberg',
  'license',
  'included',
  'with',
  'this',
  'ebook',
  'or',
  'online',
  'at'],
 ['www.gutenberg.org.',
  'if',
  'you',
  'are',
  'not',
  'located',
  'in',
  'the',
  'united',
  'states,',
  "you'll",
  'have'],
 ['to',
  'check',
  'the',
  'laws',
  'of',
  'the',
  'country',
  'where',
  'you',
  'are',
  'located',
  'before',
  'using',
  'this',
  'ebook.'],
 [],
 ['title:

E o resultado com `flatMap`, que é o resultado desejado:

In [43]:
rdd_flatMap = rdd.map(lambda x: x.lower()).flatMap(separa_palavras)
rdd_flatMap.take(30)

['project',
 "gutenberg's",
 'memorias',
 'postumas',
 'de',
 'braz',
 'cubas,',
 'by',
 'machado',
 'de',
 'assis',
 'this',
 'ebook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'in',
 'the',
 'united',
 'states',
 'and',
 'most',
 'other',
 'parts',
 'of',
 'the']

Veja a contagem de palavras no `rdd_flatMap`:

In [44]:
rdd_flatMap.count()

64713

Compare com o numero de itens no RDD original:

In [45]:
rdd.count()

8843

**Exercício 3**: Crie uma função chamada gera_bigramas que recebe um rdd e retorna o rdd transformado onde cada linha contém uma **tupla** com um *bigrama*: sequências de duas palavras consecutivas.

Gere cada bigrama como uma **tupla** (p[i], p[i+1]). 

Sua função deve utilizar o comando `flatMap`. Para esse exercício é permitido utilizar um comando for dentro do flatmap. Por que isso não prejudica a performance?

Além disso, teremos um pequeno problema nos bigramas - que problema é esse?

In [46]:
def gera_bigramas(rdd):
    return rdd.map(lambda x: x.lower()) \
        .map(lambda x: x.split()) \
        .flatMap(lambda x: [(x[i], x[i+1]) for i in range(len(x)-1)])

rdd_bigramas = gera_bigramas(rdd)
rdd_bigramas.take(20)

[('project', "gutenberg's"),
 ("gutenberg's", 'memorias'),
 ('memorias', 'postumas'),
 ('postumas', 'de'),
 ('de', 'braz'),
 ('braz', 'cubas,'),
 ('cubas,', 'by'),
 ('by', 'machado'),
 ('machado', 'de'),
 ('de', 'assis'),
 ('this', 'ebook'),
 ('ebook', 'is'),
 ('is', 'for'),
 ('for', 'the'),
 ('the', 'use'),
 ('use', 'of'),
 ('of', 'anyone'),
 ('anyone', 'anywhere'),
 ('anywhere', 'in'),
 ('in', 'the')]

In [47]:
ia.sender(answer="gera_bigramas", task="spark", question="ex03", answer_type="pycode")

interactive(children=(Button(description='Enviar ex03', style=ButtonStyle()), Output()), _dom_classes=('widget…

O problema é que bigramas que cruzam linhas não serão contabilizados.

#### `filter`

A transformação `filter` recebe uma função que recebe um item e retorna True se o item deve ser mantido, e False se deve ser ignorado. Por exemplo, suponha que temos uma lista de palavras proibidas e queremos manter apenas as palavras permitidas da nossa lista de palavras em `rdd_flatMap`:

In [48]:
rdd_flatMap.take(30)

['project',
 "gutenberg's",
 'memorias',
 'postumas',
 'de',
 'braz',
 'cubas,',
 'by',
 'machado',
 'de',
 'assis',
 'this',
 'ebook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'in',
 'the',
 'united',
 'states',
 'and',
 'most',
 'other',
 'parts',
 'of',
 'the']

In [49]:
def eh_palavra_permitida(palavra):
    palavras_proibidas = set(["Braz", "Assis"])
    return palavra not in palavras_proibidas

rdd_limpo = rdd_flatMap.filter(eh_palavra_permitida)
rdd_limpo.take(30)

['project',
 "gutenberg's",
 'memorias',
 'postumas',
 'de',
 'braz',
 'cubas,',
 'by',
 'machado',
 'de',
 'assis',
 'this',
 'ebook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'in',
 'the',
 'united',
 'states',
 'and',
 'most',
 'other',
 'parts',
 'of',
 'the']

#### `reduce`

A *action* `reduce` apresenta o mesmo comportamento da operação `reduce` da biblioteca `functools` do Python. Ela recebe uma função de dois argumentos que retorna apenas um valor. O comportamento de `reduce` é aplicar esta função aos vários elementos do RDD de modo sucessivo, visando "reduzir" o RDD inteiro a um único valor, que é então retornado para o programa principal.

Por exemplo, suponha que desejamos calcular a soma dos comprimentos de palavras. Podemos fazer o seguinte:

In [50]:
rdd_flatMap.map(lambda x: len(x)).reduce(lambda x, y: x + y)

317610

Note que como `reduce` é uma *action*, temos um resultado concreto.

**Exercício 4**: Crie uma função chamada maior_palindromo que recebe um rdd com o texto e retorna a maior palavra palíndroma do texto.

In [51]:
def maior_palindromo(rdd): 
    def eh_palindromo(string):
        return string == string[::-1]
    return rdd.filter(eh_palindromo).reduce(lambda x, y: x if len(x.replace(" ","")) > len(y.replace(" ","")) else y)

maior_palindromo(rdd)

'..........'

In [52]:
ia.sender(answer="maior_palindromo", task="spark", question="ex04", answer_type="pycode")

interactive(children=(Button(description='Enviar ex04', style=ButtonStyle()), Output()), _dom_classes=('widget…

### Pares chave-valor

Sempre que o RDD consistir em itens que sejam tuplas de dois elementos, Spark vai considerar que temos um par chave-valor por item. Isso nos permite realizar agregações por chave, abrindo a possibilidade de coletar dados agregados mais interessantes. 

#### `reduceByKey`
Para essa tarefa, a transformação mais importante é a `reduceByKey`. Esta operação realiza uma agregação por chave (gerando uma lista de valores para aquela chave), e realiza uma redução da lista dos valores associados à chave.

Por exemplo, o *hello world* do Spark: o conta-palavras:

In [53]:
rdd_flatMap \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .takeOrdered(10, lambda x: -x[1])

[('a', 2593),
 ('de', 2139),
 ('que', 2097),
 ('e', 1944),
 ('o', 1831),
 ('não', 1063),
 ('um', 987),
 ('do', 744),
 ('da', 656),
 ('uma', 652)]

**Exercício 5**: Crie uma função chamada conta_bigramas que recebe um rdd com as tuplas de bigramas e retorna pares chave e valor dos 10 bigramas mais populares.

**Atenção:** Apesar da resposta aparecer como se fossem listas, os elementos apresentados são tuplas em python.

In [54]:
def conta_bigramas(rdd_bigramas):
    rdd_conta_bigramas = rdd_bigramas.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
    return rdd_conta_bigramas.takeOrdered(10, lambda x: -x[1])

conta_bigramas(gera_bigramas(rdd))

[(('.', '.'), 159),
 (('de', 'um'), 148),
 (('que', 'a'), 133),
 (('que', 'o'), 125),
 (('que', 'não'), 117),
 (('que', 'me'), 116),
 (('o', 'que'), 111),
 (('e', 'a'), 107),
 (('que', 'eu'), 107),
 (('de', 'uma'), 100)]

In [55]:
ia.sender(answer="conta_bigramas", task="spark", question="ex05", answer_type="pycode")

interactive(children=(Button(description='Enviar ex05', style=ButtonStyle()), Output()), _dom_classes=('widget…

#### `join`

A transformação `join` realiza o *inner join* de dois RDDs. Por exemplo:

In [56]:
rdd_a = sc.parallelize([("a", 10), ("b", 20), ("b", 30)])
rdd_b = sc.parallelize([("a", "banana"), ("a", "abacate"), ("b", "chuchu"), ("b", "tomate")])
rdd_c = rdd_a.join(rdd_b)

rdd_c.collect()

[('a', (10, 'banana')),
 ('a', (10, 'abacate')),
 ('b', (20, 'chuchu')),
 ('b', (20, 'tomate')),
 ('b', (30, 'chuchu')),
 ('b', (30, 'tomate'))]

**Desafio 1**: Crie uma função chamada desafio1 que recebe um rdd e o transforma de modo que para cada bigrama, temos a seguinte informação:

(bigrama, contagem_bigrama, contagem_palavra_1, contagem_palavra_2)

**Atenção:** Apesar da resposta aparecer como se fossem listas, os elementos apresentados são tuplas em python.

In [108]:
def desafio1(rdd):
    # Normalizar texto para minúsculas e dividir em palavras
    rdd_bigramas = rdd.map(lambda x: x.lower()).map(lambda x: x.split()).flatMap(lambda x: [(x[i], x[i+1]) for i in range(len(x)-1)])

    # Contar bigramas
    rdd_conta_bigramas = rdd_bigramas.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

    # Contar palavras individuais
    rdd_palavras = rdd.map(lambda x: x.lower()).flatMap(lambda x: x.split())
    rdd_conta_palavras = rdd_palavras.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

    # Usar join para combinar os resultados
    rdd_joined = rdd_conta_bigramas.join(rdd_conta_palavras)

    rdd_desorganizado = rdd_conta_bigramas.map(lambda x : (x[0][0], x)).join(rdd_conta_palavras).map(lambda x: (x[1][0][0][1],x)).join(rdd_conta_palavras)
    return rdd_desorganizado.map(lambda x: (x[1][0][1][0][0],x[1][0][1][0][1],x[1][0][1][1],x[1][1]))


rdd_contagem = desafio1(rdd)
rdd_contagem.take(5)

[(('by', 'e-mail)'), 1, 28, 1),
 (('use', 'part'), 1, 11, 4),
 (('any', 'part'), 2, 35, 4),
 (('a', 'part'), 1, 2593, 4),
 (('use', 'and'), 2, 11, 69)]

In [109]:
ia.sender(answer="desafio1", task="spark", question="desafio1", answer_type="pycode")

interactive(children=(Button(description='Enviar desafio1', style=ButtonStyle()), Output()), _dom_classes=('wi…

**Desafio 2**: Faça uma função chamada desafio2 que recebe um RDD no formato retornado pela função desafio1 e constrói um RDD com a seguinte informação:

`(bigrama, (contagem_bigrama) /((contagem_palavra_1 + K) * (contagem_palavra_2 + K)))`

para K = 10

Por fim, retorne os 15 bigramas com maior valor associado.

In [115]:
def desafio2(rdd_d1):
    K = 10
    return rdd_d1.map(lambda x: (x[0], (x[1]) /((x[2] + K) * (x[3] + K))))
contagem = desafio2(desafio1(rdd))
contagem.take(5)

[(('by', 'e-mail)'), 0.0023923444976076554),
 (('use', 'part'), 0.003401360544217687),
 (('any', 'part'), 0.0031746031746031746),
 (('a', 'part'), 2.7440864936062786e-05),
 (('use', 'and'), 0.0012055455093429777)]

In [112]:
ia.sender(answer="desafio2", task="spark", question="desafio2", answer_type="pycode")

interactive(children=(Button(description='Enviar desafio2', style=ButtonStyle()), Output()), _dom_classes=('wi…

## Gabarito

**<div id="gab_ex3">**Exercício 3**</div>**
<div class="alert alert-warning">

```python
def gera_bigramas(rdd):
    return rdd.map(lambda x: x.lower()) \
        .map(lambda x: x.split()) \
        .flatMap(lambda x: [(x[i], x[i+1]) for i in range(len(x)-1)])
```
    
</div>

**<div id="gab_ex4">**Exercício 5**</div>**
<div class="alert alert-warning">

```python
def conta_bigramas(rdd_bigramas):
    rdd_conta_bigramas = rdd_bigramas.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
    return rdd_conta_bigramas.takeOrdered(10, lambda x: -x[1])
```
    
</div>