# Setup Geral

Se estiver executando este exercício no Google Colab, execute as próximas duas células. 

Caso esteja executando localmente, não é necessário executar mas certifique-se de que o **pyspark** está instalado e configurado em sua máquina.

In [None]:
%%bash

# Instal Java
apt-get update && apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
pip install -q pyspark

In [None]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

# Teste

O teste a ser realizado é composto de 3 partes:

- um exercício de programação em Python
- alguns exercícios de programação em SQL
- alguns exercícios de programação com PySpark

Você pode escolher qual das partes do exercício vai fazer primeiro. Todo o exercício deve ser completado no período de 48 horas.

# Python

In [7]:
# SETUP
nomes_alunos = [
    ('Maria', 1),
    ('João', 2),
    ('Pedro', 3),
    ('Gabriella', 4),
    ('Giovana', 5),
    ('Arthur', 6)
]

notas_alunos = {
    1: 9.5,
    2: 5.1,
    3: 8.7,
    4: 7.1,
    5: 4.8,
    6: 6.3
}

Implemente uma função que recebe uma lista de nomes de alunos, um dicionário de notas dos mesmo, sendo que essas estruturas se relacionam por um ID.

A função deve retornar em ordem alfabética, o nome dos alunos que obtiveram notas maior ou igual de uma nota de corte informada.

In [8]:
def filtra_alunos_acima_corte(alunos, notas, nota_corte):

    import logging

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    
    try:
        if not __valida_variaveis_alunos_acima_corte(alunos, notas, nota_corte):
            return []

    except NameError:
        logging.warning("Adicionei uma checagem das variáveis ao teste, se for válido no teste, por favor executar a célula abaixo para sua criação")

                        
    dict_notas_aprovadas = dict(filter(lambda aluno: aluno[1] >= nota_corte, notas_alunos.items()))

    if not dict_notas_aprovadas:
        logging.warning("Nao temos alunos aprovados.")
        return []

    
    id_notas_aprovadas = list(dict_notas_aprovadas.keys())
    alunos_aprovados = list(filter(lambda x: x[1] in id_notas_aprovadas, alunos))

    if len(alunos_aprovados) < len(id_notas_aprovadas):
        logging.warning("Algum id de aluno aprovado não foi encontrado.")
    
    nome_alunos_aprovados = list(map(lambda x: x[0], alunos_aprovados))
    
    nome_alunos_aprovados.sort()
    
    return nome_alunos_aprovados

    
filtra_alunos_acima_corte(nomes_alunos, notas_alunos, 6)

['Arthur', 'Gabriella', 'Maria', 'Pedro']

In [3]:
def __valida_variaveis_alunos_acima_corte(alunos, notas, nota_corte):

    import logging

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    
    if not isinstance(alunos, (list)):
        logging.error("Não recebemos uma lista de alunos.")
        return False


    elif not all(map(lambda x: 
                     isinstance(x, tuple) and 
                     len(x) == 2 and 
                     isinstance(x[0], str) and
                     isinstance(x[1], int), alunos)
                ):
        logging.error("O formato dos alunos enviados não está no padrão esperado. O padrão correto é ('nome_aluno',10)")
        return False


    if not isinstance(notas, (dict)):
        logging.error("Não recebemos um dicionário de notas.")
        return False


    if not all(map(lambda x: isinstance(x, (int)), notas.keys())) \
    or not all(map(lambda x: isinstance(x, (int, float)), notas.values())):
        logging.error("As notas enviadas não estão no padrão esperado. O padrão correto é {1:10} sendo o id do aluno e sua nota, respectivamente.")
        return False

    
    if not isinstance(nota_corte, (int, float)):
        logging.error("Nota de corte inválida. É esperado um valor numérico (inteiro ou decimal).")
        return False
    
    return True


# SQL

**Setup**


In [None]:
%%bash
mkdir bases_teste
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/produtos.csv -o bases_teste/produtos.csv
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/vendas.csv -o bases_teste/vendas.csv
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/usuarios.csv -o bases_teste/usuarios.csv

In [None]:
# Setup Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AtividadeSQL").getOrCreate()

In [None]:
def cria_tabela(path, nome_tabela):
    df = spark.read.csv(path, inferSchema=True, header=True)
    df.createOrReplaceTempView(nome_tabela)
    return df

usuarios = cria_tabela("bases_teste/usuarios.csv", "usuarios")
produtos = cria_tabela("bases_teste/produtos.csv", "produtos")
vendas = cria_tabela("bases_teste/vendas.csv", "vendas")

**Função para execução de queries**

In [None]:
def q(query, n=30):
    return spark.sql(query).show(n=n, truncate=False)

Para executar alguma consulta, basta colocar seu código sql dentro da função q como no exemplo abaixo:

```python
q("""
    SELECT *
    FROM usuarios
""")
```

e o resultado será exibido na tela.

---

Nesta parte da atividade, você vai trabalhar com três tabelas:

- produtos
- usuarios
- vendas

Use-as para responder às perguntas a seguir.

1) Qual foi a quantidade de vendas nos estados de Minas Gerais e São Paulo para cada ano e mês?

2) Quais são os usuários por Estado que mais compraram em todo o período analisado e qual foi o número de compras realizadas, a quantidade total de itens comprados e valor total pago por usuário?

3) Quais são os usuários que não fizeram nenhuma compra?

4) Qual é o ticket médio (média de valor gasto) e o número total de usuários que fizeram pelo menos uma compra por faixa etária?

In [None]:
spark.stop()

# PySpark

**setup**:

In [None]:
%%bash
mkdir bases_teste
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/produtos.csv -o bases_teste/produtos.csv
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/vendas.csv -o bases_teste/vendas.csv
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/usuarios.csv -o bases_teste/usuarios.csv

In [None]:
# Setup Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AtividadeSQL").getOrCreate()

In [None]:
def cria_tabela(path, nome_tabela):
    df = spark.read.csv(path, inferSchema=True, header=True)
    df.createOrReplaceTempView(nome_tabela)
    return df

usuarios = cria_tabela("bases_teste/usuarios.csv", "usuarios")
produtos = cria_tabela("bases_teste/produtos.csv", "produtos")
vendas = cria_tabela("bases_teste/vendas.csv", "vendas")

Responda às perguntas a seguir utilizando **Spark DATAFRAMES**.

1) Qual foi o total de compras realizadas, o total de itens comprados e a receita total obtida em todo o período analisado?

2) Quais são os 3 produtos mais comprados dos estados da região Sul e Sudeste, a quantidade de itens comprados, o valor total pago e a média de preço paga?

3) Para cada produto, quantos usuários fizeram pelo menos uma compra desse produto e qual é o valor mínimo e máximo pago por eles?

4) Aplique um desconto de 10% em todas as vendas dos usuários que fizeram pelo menos 3 compras de produtos na mesma categoria, a partir da 4ª compra realizada. Exiba apenas os usuários que terão o desconto aplicado, mantendo todas as compras, o valor original e o valor com o desconto aplicado.

# FIM!