# Contando MMs

Uma cientista de dados que adora assar biscoitos com M&Ms e recompensar seus alunos nos estados dos EUA, onde frequentemente ministra cursos de aprendizado de máquina e ciência de dados com lotes desses biscoitos. Mas ela quer garantir que ela obtenha as cores certas de M&Ms nos biscoitos para estudantes em diferentes estados.

Neste exemplo vamos apresentar um programa Spark que lê um arquivo com mais de 100.000 entradas (onde cada linha ou linha tem um `<state, mnm_color, count>`) e calcula as contagens para cada cor e estado. 


<img src="https://st2.depositphotos.com/2035563/6378/i/600/depositphotos_63782833-stock-photo-candy-dots-background.jpg" class="bg-primary mb-1" width="300px">

## Conectando o Drive ao Colab

A primeira coisa que você quer fazer quando está trabalhando no Colab é montar seu Google Drive. Isso permitirá que você acesse qualquer diretório em seu Drive dentro do notebook Colab.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## PySpark no Google Colab

PySpark é a interface alto nível que permite você conseguir acessar e usar o Spark por meio da linguagem Python. Usando o PySpark, você consegue escrever todo o seu código usando apenas o estilo Python de escrever código.

Já o Google Colab é uma ferramenta incrível, poderosa e gratuita – com suporte de GPU inclusive. Uma vez que roda 100% na nuvem, você não tem a necessidade de instalar qualquer coisa na sua própria máquina.

No entanto, apesar da maioria das bibliotecas de Data Science estarem previamente instaladas no Colab, o mesmo não acontece com o PySpark. Para conseguir usar o PySpark é necessário alguns passos intermediários, que não são triviais para aqueles que estão começando.

Dessa maneira, preparei um tutorial simples e direto ensinando a instalar as dependências e a biblioteca.

## Instalando o PySpark no Google Colab

Instalar o PySpark não é um processo direto como de praxe em Python. Não basta usar um pip install apenas. Na verdade, antes de tudo é necessário instalar dependências como o Java 8, Apache Spark 2.3.2 junto com o Hadoop 2.7.

In [4]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

A próxima etapa é configurar as variáveis de ambiente, pois isso habilita o ambiente do Colab a identificar corretamente onde as dependências estão rodando.

Para conseguir “manipular” o terminal e interagir como ele, você pode usar a biblioteca os.

In [5]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

Com tudo pronto, vamos rodar uma sessão local para testar se a instalação funcionou corretamente.

In [None]:
# iniciar uma sessão local e importar dados do Airbnb
from pyspark.sql import SparkSession
sc = SparkSession.builder.master('local[*]').getOrCreate()

# carregar dados do Airbnb
df_spark = sc.read.csv("/content/drive/MyDrive/Ensino/IFG/2022/01/Big_Data/pos-ia-bd/04-spark/data/mnm_dataset.csv", inferSchema=True, header=True)

# ver algumas informações sobre os tipos de dados de cada coluna
df_spark.printSchema()

## Implementando a aplicação

A primeira coisa a fazer é criar a aplicação e a sessão do Spark.

In [13]:
spark = (SparkSession
         .builder
         .appName("PythonMnMCount")
         .getOrCreate())

Obter o nome do arquivo do conjunto de dados M&M

In [14]:
mnm_file = "/content/drive/MyDrive/Ensino/IFG/2022/01/Big_Data/pos-ia-bd/04-spark/data/mnm_dataset.csv"

Ler o arquivo em um Spark DataFrame usando o formato CSV inferindo o esquema e especificando que o arquivo contém um cabeçalho, que fornece nomes dos campos.

In [None]:
mnm_df = (spark.read.format("csv")
          .option("header", "true")
          .option("inferSchema", "true")
          .load(mnm_file))
mnm_df.show(n=5, truncate=False)

Usamos as APIs de alto nível do DataFrame. Observe que não usamos RDDs. Como algumas das funções do Spark retornam o mesmo objeto, podemos encadear chamadas de função.
1. Selecione no DataFrame os campos "Estado", "Cor" e "Contagem"
2. Como queremos agrupar cada estado e sua contagem de cores M&M, usamos `groupBy()`
3. Contagens agregadas de todas as cores e estado e cor `groupBy()`
4. `orderBy()` em ordem decrescente

In [16]:
count_mnm_df = (mnm_df.select("State", "Color", "Count")
                    .groupBy("State", "Color")
                    .sum("Count")
                    .orderBy("sum(Count)", ascending=False))

Mostre as agregações resultantes para todos os estados e cores; uma contagem total de cada cor por estado.

Observe que `show()` é uma ação que acionará a execução da consulta acima.

In [None]:
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))

Enquanto o código acima é agregado e contado para todos os estados, e se quisermos apenas ver os dados de um único estado, por exemplo, CA?
1. Selecione de todas as linhas no DataFrame
2. Filtre apenas o estado da CA
3. `groupBy()` Estado e Cor como fizemos acima
4. Agregue as contagens para cada cor
5. `orderBy()` em ordem decrescente

In [18]:
ca_count_mnm_df = (mnm_df.select("*")
                       .where(mnm_df.State == 'CA')
                       .groupBy("State", "Color")
                       .sum("Count")
                       .orderBy("sum(Count)", ascending=False))

Mostre a agregação resultante para a Califórnia.

Como acima, `show()` é uma ação que acionará a execução de toda a computação.

In [None]:
ca_count_mnm_df.show(n=10, truncate=False)