## 1. Inicialização do Ambiente

Importação de pacotes, configuração da SparkSession e definiçao de variáveis.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    to_timestamp, date_format, when, count, sum as spark_sum,
    max as spark_max, min as spark_min, avg as spark_avg, col, expr
)

In [0]:
spark = SparkSession.builder.appName("PipelineTransporte").getOrCreate()

In [0]:
PATH_BRONZE = "/Volumes/workspace/default/assignment-1/bronze"
PATH_SILVER = "/Volumes/workspace/default/assignment-1/silver"
PATH_GOLD = "/Volumes/workspace/default/assignment-1/gold"
PATH_LOGS = "/Volumes/workspace/default/assignment-1/logs_datas_invalidas"

## 2. Ingestão dos Dados (Bronze) e Análise Exploratória

Leitura do arquivo CSV original e exploração inicial dos dados.

In [0]:
# Dados de entrada
raw = "/Volumes/workspace/default/assignment-1/info_transportes.csv"

# Leitura dos dados brutos
bronze = spark.read.csv(raw, header=True, inferSchema=True, sep=";")

# Salva os dados brutos no formato parquet
bronze.write.mode("overwrite").parquet(PATH_BRONZE)

In [0]:

print("Esquema do DataFrame:")
bronze.printSchema()

print("Exemplo de linhas:")
bronze.show(5)

print("Quantidade de linhas e colunas:")
print(f"Linhas: {bronze.count()}, Colunas: {len(bronze.columns)}")

print("Valores nulos por coluna:")
from pyspark.sql.functions import isnan
bronze.select([count(when(col(c).isNull(), c)).alias(c) for c in bronze.columns]).show()

print("Valores unicos em colunas categóricas:")
bronze.select("CATEGORIA").distinct().show()
bronze.select("PROPOSITO").distinct().show()

print("Estatísticas básicas da coluna DISTANCIA:")
bronze.select("DISTANCIA").describe().show()


Esquema do DataFrame:
root
 |-- DATA_INICIO: string (nullable = true)
 |-- DATA_FIM: string (nullable = true)
 |-- CATEGORIA: string (nullable = true)
 |-- LOCAL_INICIO: string (nullable = true)
 |-- LOCAL_FIM: string (nullable = true)
 |-- DISTANCIA: integer (nullable = true)
 |-- PROPOSITO: string (nullable = true)

Exemplo de linhas:
+----------------+----------------+---------+------------+---------------+---------+-----------------+
|     DATA_INICIO|        DATA_FIM|CATEGORIA|LOCAL_INICIO|      LOCAL_FIM|DISTANCIA|        PROPOSITO|
+----------------+----------------+---------+------------+---------------+---------+-----------------+
|01-01-2016 21:11|01-01-2016 21:17|  Negocio| Fort Pierce|    Fort Pierce|       51|      Alimentação|
|01-02-2016 01:25|01-02-2016 01:37|  Negocio| Fort Pierce|    Fort Pierce|        5|             NULL|
|01-02-2016 20:25|01-02-2016 20:38|  Negocio| Fort Pierce|    Fort Pierce|       48|         Entregas|
|01-05-2016 17:31|01-05-2016 17:45|  Negoci

## 3. Tratamento de Datas (Silver) e Log de Erros

Conversão de datas para timestamp, log dos registros inválidos e criação da coluna de referência.

In [0]:
silver = bronze.withColumn(
    "DATA_INICIO_TS",
    expr("try_to_timestamp(DATA_INICIO, 'MM-dd-yyyy H:mm')")
)

silver = silver.withColumn("DT_REFE", date_format(col("DATA_INICIO_TS"), "yyyy-MM-dd"))

# Log para linhas com datas inválidas
silver.filter(col("DATA_INICIO_TS").isNull()).write.mode("overwrite").parquet(PATH_LOGS)

silver = silver.filter(col("DATA_INICIO_TS").isNotNull())

## 4. Criação de Colunas Auxiliares

Criação de colunas flags para agregações condicionais.

In [0]:
silver = silver.withColumn(
    "IS_NEGOCIO", when(col("CATEGORIA") == "Negocio", 1).otherwise(0)
).withColumn(
    "IS_PESSOAL", when(col("CATEGORIA") == "Pessoal", 1).otherwise(0)
).withColumn(
    "IS_REUNIAO", when(col("PROPOSITO") == "Reunião", 1).otherwise(0)
).withColumn(
    "IS_NAO_REUNIAO", when(
        (col("PROPOSITO").isNotNull()) & (col("PROPOSITO") != "Reunião"),
        1
    ).otherwise(0)
)

# Salva os dados da camada Silver em formato parquet
silver.write.mode("overwrite").parquet(PATH_SILVER)

In [0]:
display(silver.limit(10))

DATA_INICIO,DATA_FIM,CATEGORIA,LOCAL_INICIO,LOCAL_FIM,DISTANCIA,PROPOSITO,DATA_INICIO_TS,DT_REFE,IS_NEGOCIO,IS_PESSOAL,IS_REUNIAO,IS_NAO_REUNIAO
01-01-2016 21:11,01-01-2016 21:17,Negocio,Fort Pierce,Fort Pierce,51,Alimentação,2016-01-01T21:11:00.000Z,2016-01-01,1,0,0,1
01-02-2016 01:25,01-02-2016 01:37,Negocio,Fort Pierce,Fort Pierce,5,,2016-01-02T01:25:00.000Z,2016-01-02,1,0,0,0
01-02-2016 20:25,01-02-2016 20:38,Negocio,Fort Pierce,Fort Pierce,48,Entregas,2016-01-02T20:25:00.000Z,2016-01-02,1,0,0,1
01-05-2016 17:31,01-05-2016 17:45,Negocio,Fort Pierce,Fort Pierce,47,Reunião,2016-01-05T17:31:00.000Z,2016-01-05,1,0,1,0
01-06-2016 14:42,01-06-2016 15:49,Negocio,Fort Pierce,West Palm Beach,637,Visita ao cliente,2016-01-06T14:42:00.000Z,2016-01-06,1,0,0,1
01-06-2016 17:15,01-06-2016 17:19,Negocio,West Palm Beach,West Palm Beach,43,Alimentação,2016-01-06T17:15:00.000Z,2016-01-06,1,0,0,1
01-06-2016 17:30,01-06-2016 17:35,Negocio,West Palm Beach,Palm Beach,71,Reunião,2016-01-06T17:30:00.000Z,2016-01-06,1,0,1,0
01-07-2016 13:27,01-07-2016 13:33,Negocio,Cary,Cary,8,Reunião,2016-01-07T13:27:00.000Z,2016-01-07,1,0,1,0
01-10-2016 08:05,01-10-2016 08:25,Negocio,Cary,Morrisville,83,Reunião,2016-01-10T08:05:00.000Z,2016-01-10,1,0,1,0
01-10-2016 12:17,01-10-2016 12:44,Negocio,Jamaica,New York,165,Visita ao cliente,2016-01-10T12:17:00.000Z,2016-01-10,1,0,0,1


## 5. Agregação das Métricas Diárias (Gold)

Geração da tabela final agregada por data de referência.

In [0]:
gold = silver.groupBy("DT_REFE").agg(
    count("*").alias("QT_CORR"),
    spark_sum("IS_NEGOCIO").alias("QT_CORR_NEG"),
    spark_sum("IS_PESSOAL").alias("QT_CORR_PESS"),
    spark_max("DISTANCIA").alias("VL_MAX_DIST"),
    spark_min("DISTANCIA").alias("VL_MIN_DIST"),
    spark_avg("DISTANCIA").alias("VL_AVG_DIST"),
    spark_sum("IS_REUNIAO").alias("QT_CORR_REUNI"),
    spark_sum("IS_NAO_REUNIAO").alias("QT_CORR_NAO_REUNI"),
)

In [0]:
# Salva os dados da camada Gold em formato em uma tabela Hive
gold.write.mode("overwrite").saveAsTable("info_corridas_do_dia")

### Tabela final do pipeline, com métricas diárias resumidas das corridas, separadas por categoria, propósito e estatísticas de distância.

| Nome da Coluna         | Descrição                                                                              |
|------------------------|----------------------------------------------------------------------------------------|
| DT_REFE                | Data de referência.                                                                    |
| QT_CORR                | Quantidade de corridas.                                                                |
| QT_CORR_NEG            | Quantidade de corridas com a categoria “Negócio”.                                      |
| QT_CORR_PESS           | Quantidade de corridas com a categoria “Pessoal”.                                      |
| VL_MAX_DIST            | Maior distância percorrida por uma corrida.                                            |
| VL_MIN_DIST            | Menor distância percorrida por uma corrida.                                            |
| VL_AVG_DIST            | Média das distâncias percorridas.                                                      |
| QT_CORR_REUNI          | Quantidade de corridas com o propósito de "Reunião".                                   |
| QT_CORR_NAO_REUNI      | Quantidade de corridas com o propósito declarado e diferente de "Reunião".             |


In [0]:
display(spark.sql("SELECT * FROM info_corridas_do_dia LIMIT 20"))

DT_REFE,QT_CORR,QT_CORR_NEG,QT_CORR_PESS,VL_MAX_DIST,VL_MIN_DIST,VL_AVG_DIST,QT_CORR_REUNI,QT_CORR_NAO_REUNI
2016-09-30,2,2,0,377,167,272.0,0,0
2016-11-25,2,2,0,111,103,107.0,2,0
2016-08-23,8,8,0,177,5,70.375,0,0
2016-12-19,11,11,0,102,7,45.0,1,5
2016-11-17,1,1,0,163,163,163.0,0,1
2016-02-01,3,3,0,233,39,155.33333333333334,0,3
2016-07-02,2,2,0,101,99,100.0,2,0
2016-11-16,2,2,0,31,23,27.0,0,2
2016-08-07,4,4,0,27,25,26.0,0,2
2016-07-14,3,2,1,1953,33,701.3333333333334,0,1
