## 1. Importando Bibliotecas

In [0]:
# Bibliotecas PySpark

# Sessão Spark - ponto de entrada para usar o PySpark
from pyspark.sql import SparkSession

# Funções do PySpark para transformar e manipular colunas
from pyspark.sql.functions import (
    col,           # Seleção e transformação de colunas
    when,          # Condicionais (if-else)
    to_timestamp,  # Conversão de string para timestamp
    avg,           # Média
    sum,           # Soma
    count,         # Contagem
    countDistinct, # Contagem de valores distintos
    month          # Extração de mês de data
)

# Tipos de dados usados na definição de schema
from pyspark.sql.types import (
    StringType, 
    BooleanType, 
    IntegerType, 
    FloatType, 
    DoubleType,
    ArrayType, 
    StructType, 
    StructField
)


In [0]:
# Bibliotecas de download e manipulação de arquivos

# Biblioteca padrão para requisições HTTP
import requests

# Biblioteca padrão para lidar com arquivos .tar.gz
import tarfile

# Biblioteca padrão para operações com caminhos e arquivos no sistema
import os
import urllib.request

## 2. Leitura dos arquivos

In [0]:

# Leitura dos Arquivos JSON e CSV no S3

# Caminhos dos arquivos no S3
order_path = "s3a://data-architect-test-source/order.json.gz"
consumer_path = "s3a://data-architect-test-source/consumer.csv.gz"
restaurant_path = "s3a://data-architect-test-source/restaurant.csv.gz"

# Leitura do JSON (Pedidos)
df_order = spark.read.json(order_path)

# Leitura dos CSVs (Usuários e Restaurantes)
df_consumer = spark.read.option("header", "true").csv(consumer_path)
df_restaurant = spark.read.option("header", "true").csv(restaurant_path)



In [0]:
df_orders_raw = spark.read.option("multiline", "true").json(order_path)
df_consumers_raw = spark.read.option("header", "true").csv(consumer_path)
df_restaurant_raw = spark.read.option("header", "true").csv(restaurant_path)

## 3. Ajuste dos tipos de dados

In [0]:
df_orders = df_orders_raw \
    .withColumn("cpf", col("cpf").cast(StringType())) \
    .withColumn("customer_id", col("customer_id").cast(StringType())) \
    .withColumn("customer_name", col("customer_name").cast(StringType())) \
    .withColumn("delivery_address_city", col("delivery_address_city").cast(StringType())) \
    .withColumn("delivery_address_country", col("delivery_address_country").cast(StringType())) \
    .withColumn("delivery_address_district", col("delivery_address_district").cast(StringType())) \
    .withColumn("delivery_address_external_id", col("delivery_address_external_id").cast(StringType())) \
    .withColumn("delivery_address_latitude", col("delivery_address_latitude").cast(FloatType())) \
    .withColumn("delivery_address_longitude", col("delivery_address_longitude").cast(FloatType())) \
    .withColumn("delivery_address_state", col("delivery_address_state").cast(StringType())) \
    .withColumn("delivery_address_zip_code", col("delivery_address_zip_code").cast(StringType())) \
    .withColumn("merchant_id", col("merchant_id").cast(StringType())) \
    .withColumn("merchant_latitude", col("merchant_latitude").cast(FloatType())) \
    .withColumn("merchant_longitude", col("merchant_longitude").cast(FloatType())) \
    .withColumn("merchant_timezone", col("merchant_timezone").cast(StringType())) \
    .withColumn("order_created_at", to_timestamp("order_created_at")) \
    .withColumn("order_id", col("order_id").cast(StringType())) \
    .withColumn("order_scheduled", col("order_scheduled").cast(BooleanType())) \
    .withColumn("order_total_amount", col("order_total_amount").cast(FloatType())) \
    .withColumn("origin_platform", col("origin_platform").cast(StringType()))

In [0]:
# df_consumers
df_consumers = df_consumers_raw \
    .withColumn("customer_id", col("customer_id").cast(StringType())) \
    .withColumn("language", col("language").cast(StringType())) \
    .withColumn("created_at", to_timestamp("created_at")) \
    .withColumn("active", col("active").cast(BooleanType())) \
    .withColumn("customer_name", col("customer_name").cast(StringType())) \
    .withColumn("customer_phone_area", col("customer_phone_area").cast(StringType())) \
    .withColumn("customer_phone_number", col("customer_phone_number").cast(StringType()))


In [0]:
df_restaurant = df_restaurant_raw \
    .withColumn("id", col("id").cast(StringType())) \
    .withColumn("created_at", to_timestamp("created_at")) \
    .withColumn("enabled", col("enabled").cast(BooleanType())) \
    .withColumn("price_range", col("price_range").cast(IntegerType())) \
    .withColumn("average_ticket", col("average_ticket").cast(FloatType())) \
    .withColumn("delivery_time", col("delivery_time").cast(FloatType())) \
    .withColumn("minimum_order_value", col("minimum_order_value").cast(FloatType())) \
    .withColumn("merchant_zip_code", col("merchant_zip_code").cast(StringType())) \
    .withColumn("merchant_city", col("merchant_city").cast(StringType())) \
    .withColumn("merchant_state", col("merchant_state").cast(StringType())) \
    .withColumn("merchant_country", col("merchant_country").cast(StringType()))


## 4. Arquivo ab_test_ref

In [0]:
# Baixar o arquivo
local_tar_path = "/Workspace/Users/kleyton.kenji@gmail.com/ifood/ab_test_ref.tar.gz"
urllib.request.urlretrieve(ab_test_url, local_tar_path)

# Extrair
extract_path = "/Workspace/Users/kleyton.kenji@gmail.com/ifood/ab_test_ref_extracted"
with tarfile.open(local_tar_path, "r:gz") as tar:
    tar.extractall(path=extract_path)

# Leitura do CSV extraído
ab_test_csv_path = f"{extract_path}/ab_test_ref.csv"
df_ab_test = spark.read.option("header", "true").csv(ab_test_csv_path)

# Cast de colunas (ajuste conforme necessário)
df_ab_test = df_ab_test \
    .withColumn("customer_id", col("customer_id").cast(StringType())) \
    .withColumn("group", col("group").cast(StringType()))

display(df_ab_test)

[0;31m---------------------------------------------------------------------------[0m
[0;31mUnsupportedOperationException[0m             Traceback (most recent call last)
File [0;32m<command-6320009551982280>, line 19[0m
[1;32m     14[0m [38;5;66;03m# Cast de colunas (ajuste conforme necessário)[39;00m
[1;32m     15[0m df_ab_test [38;5;241m=[39m df_ab_test \
[1;32m     16[0m     [38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mcustomer_id[39m[38;5;124m"[39m, col([38;5;124m"[39m[38;5;124mcustomer_id[39m[38;5;124m"[39m)[38;5;241m.[39mcast(StringType())) \
[1;32m     17[0m     [38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mgroup[39m[38;5;124m"[39m, col([38;5;124m"[39m[38;5;124mgroup[39m[38;5;124m"[39m)[38;5;241m.[39mcast(StringType()))
[0;32m---> 19[0m display(df_ab_test)

File [0;32m/databricks/python_shell/lib/dbruntime/display.py:142[0m, in [0;36mDisplay.display[0;34m(self, input, *args, **kwargs)[0m
[1;32m    140[0m [38;

## 5. Ingestão dos dados

In [0]:
# Criar o banco lógico se não existir
spark.sql("CREATE DATABASE IF NOT EXISTS ifood_bronze")

# Salvar as tabelas
df_orders.write.format("delta").mode("overwrite").saveAsTable("ifood_bronze.orders")
df_consumers.write.format("delta").mode("overwrite").saveAsTable("ifood_bronze.consumers")
df_restaurant.write.format("delta").mode("overwrite").saveAsTable("ifood_bronze.restaurants")
df_ab_test.write.format("delta").mode("overwrite").saveAsTable("ifood_bronze.ab_test_ref")

print("Tabelas salvas com sucesso.")
spark.sql("SHOW TABLES IN ifood_bronze").show()

