## -- Notebook: 03_gold_layer.ipynb
-- Camada: Gold
-- Objetivo: Criar tabelas agregadas e prontas para consumo de BI/ML.

## -- 1. Criar a Tabela Delta na Camada Gold
-- Esta tabela conterá métricas diárias agregadas por par de cidades.

In [0]:
%sql
CREATE TABLE IF NOT EXISTS mc_labs.daily_city_pair_metrics
USING DELTA
AS
SELECT
 pickup_date,
 pickup_city_name,
 dropoff_city_name,
 COUNT(*) AS total_trips,
 AVG(fare_amount) AS avg_fare_amount,
 SUM(trip_distance) AS total_trip_distance
FROM
 mc_labs.trips_silver
GROUP BY
 pickup_date,
 pickup_city_name,
 dropoff_city_name
ORDER BY
 pickup_date, pickup_city_name, dropoff_city_name;

In [0]:
%sql
-- 2. Verificar o Esquema e os Dados da Tabela Gold
DESCRIBE DETAIL mc_labs.daily_city_pair_metrics;

In [0]:
%sql
DESCRIBE HISTORY mc_labs.daily_city_pair_metrics;

In [0]:
%sql
SELECT * FROM mc_labs.daily_city_pair_metrics LIMIT 10;

## -- 3. Simular uma Atualização Incremental na Camada Gold (Exemplo com MERGE INTO)
-- Para tabelas Gold, muitas vezes você pode re-processar o dia mais recente ou um período.
-- No entanto, se as métricas forem mais complexas ou se você quiser evitar re-agregações completas,
-- MERGE INTO pode ser usado para atualizar apenas os dias/registros afetados.


## -- Vamos simular que novos dados chegaram na Silver (e, portanto, na Bronze) para uma data específica.
-- Para este exemplo, vamos re-calcular as métricas para a data mais recente presente na Silver
-- e usar MERGE INTO para atualizar/inserir na Gold.

-- Encontre a data mais recente na Silver
-- (Em um pipeline real, você teria um mecanismo para saber quais datas foram atualizadas na Silver)
-- SET spark.sql.variable.latest_silver_date = (SELECT MAX(pickup_date) FROM mc_labs.trips_silver);
-- SELECT '${spark.sql.variable.latest_silver_date}'; -- Para verificar a variável


In [0]:
%sql
WITH
 -- CTE para recalcular as métricas para a data mais recente na Silver
 -- (ou para qualquer data que tenha sido atualizada na Silver)
 latest_daily_metrics AS (
     SELECT
         pickup_date,
         pickup_city_name,
         dropoff_city_name,
         COUNT(*) AS total_trips,
         AVG(fare_amount) AS avg_fare_amount,
         SUM(trip_distance) AS total_trip_distance
     FROM
         mc_labs.trips_silver
     WHERE
         -- Filtrar pela data mais recente (ou datas que foram atualizadas)
         -- Para este exemplo, vamos pegar uma data específica que sabemos que tem dados
         pickup_date = (SELECT MAX(pickup_date) FROM mc_labs.trips_silver)
     GROUP BY
         pickup_date,
         pickup_city_name,
         dropoff_city_name
 )
MERGE INTO mc_labs.daily_city_pair_metrics AS target
USING latest_daily_metrics AS source
ON target.pickup_date = source.pickup_date
AND target.pickup_city_name = source.pickup_city_name
AND target.dropoff_city_name = source.dropoff_city_name
WHEN MATCHED THEN
 UPDATE SET
     target.total_trips = source.total_trips,
     target.avg_fare_amount = source.avg_fare_amount,
     target.total_trip_distance = source.total_trip_distance
WHEN NOT MATCHED THEN
 INSERT (pickup_date, pickup_city_name, dropoff_city_name, total_trips, avg_fare_amount, total_trip_distance)
 VALUES (source.pickup_date, source.pickup_city_name, source.dropoff_city_name, source.total_trips, source.avg_fare_amount, source.total_trip_distance);


In [0]:
%sql
-- 4. Verificar os resultados do MERGE e o histórico da Gold
SELECT * FROM mc_labs.daily_city_pair_metrics WHERE pickup_date = (SELECT MAX(pickup_date) FROM mc_labs.trips_silver) LIMIT 10;

In [0]:
%sql
DESCRIBE HISTORY mc_labs.daily_city_pair_metrics;