# AWS Glue Studio Notebook

Analise atentamente os comandos a seguir e execute-os em sequência.

Caso você não tenha familiaridade com Jupyter notebooks, aqui está um mini tutorial para começar:

**Para executar células**: Um notebook é composto por células. Cada célula pode conter código Python ou texto formatado. Para executar o código em uma célula, clique na célula para selecioná-la, depois pressione `Shift+Enter`. O resultado do código será exibido abaixo da célula.

**Para adicionar novas células**: Você pode adicionar novas células clicando no botão "+" na barra de ferramentas na parte superior. A nova célula será adicionada abaixo da célula atualmente selecionada.

**Salvar o notebook**: Você pode salvar o notebook clicando no botão de **Save** na parte superior.

**Interromper o notebook**: Se quiser sair, não esqueça de interromper o notebook utilizando o botão **Stop notebook**. Isso garante que os recursos alocados para o notebook sejam liberados e você não seja cobrado por isso.

### 1. Iniciando uma sessão interativa.


In [None]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
import boto3
import time
from awsglue.transforms import *
from pyspark.sql.functions import sum, col, desc
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame  

print("Criando o contexto do Glue")
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
print("Contexto do Glue criado com sucesso")

### 2. Definindo a variável de ambiente `s3_bucket`

In [None]:
print("Definindo a variável s3_bucket que vamos utilizar ao longo do código")
s3_bucket = ""
s3_client = boto3.client('s3')
response = s3_client.list_buckets()

for bucket in response['Buckets']:
    if bucket['Name'].startswith('lab-data-eng-'):
        s3_bucket = bucket['Name']

print("O bucket que vamos utilizar é: " + s3_bucket)

### 3. Criando um DynamicFrame a partir de uma tabela do Glue Data Catalog


#### Clientes

In [None]:
dyfClientes = glueContext.create_dynamic_frame.from_catalog(database='ecommerce', table_name='tb_raw_clientes')

In [None]:
dyfClientes.printSchema()

#### Pedidos

Perceba que aqui estamos usando o mesmo banco de dados, mas agora estamos usando a tabela pedidos_part

In [None]:
dyfPedidos = glueContext.create_dynamic_frame.from_catalog(database='ecommerce', table_name='pedidos_part')

In [None]:
dyfPedidos.printSchema()

### 4. Convertendo o DynamicFrame para um Spark DataFrame e apresentando algumas linhas


#### Clientes

In [None]:
dfCli = dyfClientes.toDF()
dfCli.show(5)

#### Pedidos

In [None]:
dfPed = dyfPedidos.toDF()
dfPed.show(5)

### 5. Escrevendo tabelas **Parquet** e atualizando o Glue Data Catalog

#### Clientes

Perceba que agora estamos escrevendo uma tabela PARQUET em um novo path `stage/ecommerce/clientes/`

In [None]:
# Primeiro vamos limpar a pasta destino
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix='stage/ecommerce/clientes/')

for object in response['Contents']:
    out = s3_client.delete_object(Bucket=s3_bucket, Key=object['Key'])

> Caso o comando acima retorne `KeyError: 'Contents'`, significa que a pasta já estava vazia.

In [None]:
s3output = glueContext.getSink(
  path="s3://" + s3_bucket + "/stage/ecommerce/clientes/",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="ecommerce", catalogTableName="clientes_parquet"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(dyfClientes)

#### Pedidos

Perceba que agora estamos escrevendo uma tabela PARQUET em um novo path `stage/ecommerce/pedidos/`

In [None]:
# Primeiro vamos limpar a pasta destino
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix='stage/ecommerce/pedidos/')

for object in response['Contents']:
    out = s3_client.delete_object(Bucket=s3_bucket, Key=object['Key'])

> Caso o comando acima retorne `KeyError: 'Contents'`, significa que a pasta já estava vazia.

In [None]:
s3output = glueContext.getSink(
  path="s3://" + s3_bucket + "/stage/ecommerce/pedidos/",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=["data_pedido"],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="ecommerce", catalogTableName="pedidos_parquet"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(dyfPedidos)

### 6. Verificando os arquivos Parquet gerados

In [None]:
session = boto3.Session()
s3 = session.resource('s3')
lab_eng_bucket = s3.Bucket(s3_bucket)

print("Arquivos de clientes em stage")
for clientes_object in lab_eng_bucket.objects.filter(Prefix="stage/ecommerce/clientes/"):
    print(clientes_object.key)

print("Arquivos de pedidos em stage")
for pedidos_object in lab_eng_bucket.objects.filter(Prefix="stage/ecommerce/pedidos/"):
    print(pedidos_object.key)

### 7. Verificando a criação das tabelas `clientes_parquet` e `pedidos_parquet`

> ##### Atenção!
> Normalmente esse tipo de verificação visual não fica dentro do job. Neste laboratório só incluí por comodidade.

In [None]:
!aws glue get-tables --database-name 'ecommerce'  --query "TableList[].Name"

### 8. E-mail TOP10 do dia

##### Criando os dynamicframes

In [None]:
dyfClientes = glueContext.create_dynamic_frame.from_catalog(database='ecommerce', table_name='clientes_parquet')
dyfClientes.printSchema()

In [None]:
dyfClientes.toDF().show(5)

In [None]:
data_pedido="2024-01-01"
dyfPedidos = glueContext.create_dynamic_frame.from_catalog(database='ecommerce', table_name='pedidos_parquet', push_down_predicate = "data_pedido='"+ data_pedido +"'")
dyfPedidos.printSchema()

In [None]:
dyfPedidos.toDF().show(5)

##### Tratamento do dataframe dyfClientes

In [None]:
# Renomeando o atributo ID para evitar colisão com o outro dataset
# Eliminando atributos desnecessários nesta operação
dyfClientes = (
    dyfClientes.drop_fields(["cpf","data_nasc"])
    .rename_field("id", "id_cli")
)

dyfClientes.toDF().show(5)

##### Tratamento do dataframe dyfPedidos

In [None]:
#Eliminando atributo produto, desnecessário nesta operação
dyfPedidos = dyfPedidos.drop_fields(["produto"])

# Criando um novo atributo VALOR_TOTAL
dfPedidos = dyfPedidos.toDF()
dfPedidos = dfPedidos.withColumn("valor_total", col("quantidade") * col("valor_unitario"))
dyfPedidos = DynamicFrame.fromDF(dfPedidos, glueContext, "dyfPedidos")

# Agora podemos eliminar os atributos quantidade e valor_unitario
dyfPedidos = dyfPedidos.drop_fields(["quantidade", "valor_unitario"])

##### Join Clientes + Pedidos

In [None]:
# Vamos fazer o join com dyfClientes para obter o nome e email dos clientes
dyfClientesPedidos = dyfClientes.join(paths1=["id_cli"], paths2=["id_cliente"], frame2=dyfPedidos)

In [None]:
dyfClientesPedidos.toDF().show(5)

##### Calculando os top 10 clientes em valor total de pedidos

In [None]:
# Converte para dataframe
dfClientesPedidos = dyfClientesPedidos.toDF()

In [None]:

dfTop10 = dfClientesPedidos.groupBy("id_cliente", "nome", "email") \
        .agg(sum("valor_total").alias("tot")) \
        .sort(desc("tot")) \
        .limit(10)

##### Salvando o resultado em uma tabela `top_10_pedidos_dia` para reuso

In [None]:
# Primeiro vamos limpar a pasta destino
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix='analytics/ecommerce/relatorios/top10pedidosdia/')

for object in response['Contents']:
    out = s3_client.delete_object(Bucket=s3_bucket, Key=object['Key'])

In [None]:
dyfTop10 = DynamicFrame.fromDF(dfTop10, glueContext, "dyfTop10")

s3output = glueContext.getSink(
  path="s3://" + s3_bucket + "/analytics/ecommerce/relatorios/top10pedidosdia/",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="ecommerce", catalogTableName="top_10_pedidos_dia"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(dyfTop10)

Verifique no console **AWS Glue > Tables** que a tabela `top_10_pedidos_dia` foi criada.

# Parabéns!
Você concluiu a execução de um job Glue que pode ser empacotado e automatizado para execução periódica.<br>

> Não esqueça de clicar no botão **Stop notebook** para evitar cobranças extras.