## Case Taxis NY
Tecnologias implementadas:
    - Arquitetura medalhao
    - Particionamento dos dados

## Configurando conexao com AWS S3 Bucket - Setup

In [0]:
!pip install fsspec s3fs boto3
%restart_python

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Connecting to S3 Bucket
import pandas as pd
import os

cred = pd.read_csv('databricks_user_accessKeys.csv') # Arquivo gerado automaticamente pelo AWS IAM

access_key = cred['access_key_ID'].iloc[0]
secret_key = cred['secret_access_key'].iloc[0]
bucket = "ny-taxi-case"

os.environ["AWS_ACCESS_KEY_ID"] = access_key
os.environ["AWS_SECRET_ACCESS_KEY"] = secret_key

## Ingestao de dados brutos
Nesta etapa, os dados brutos serao baixados da base oficial da agencia responsavel pelo licenciamento dos taxis em NY enviados para o S3 bucket criado para o projeto. 
Utilizou-se uma funcao de repeticao para automatizar o processo de aquisicao e ingestao dos dados, ja que os link para os arquivos respeitavam o seguinte padrao:
https://d37ci6vzurychx.cloudfront.net/trip-data/[TIPO_DE_TAXI]\_tripdata\_[ANO]-[MES].parquet

Os arquivos serao organizados no S3 bucket com o seguinte padrao:
raw_data/[TIPO_DE_TAXI]/[ANO]/[MES]/arquivo.parquet

In [0]:
# Funcao auxiliar para manipulacao de datas na proxima etapa
def create_month_year_array(start_date, end_date):
    start_month, start_year = start_date
    end_month, end_year = end_date
    
    date_list = []
    current_month = start_month
    current_year = start_year

    while True:
        # Format the month with a leading zero if it's less than 10
        formatted_month = f"{current_month:02d}"
        date_list.append([formatted_month, current_year])

        if current_month == end_month and current_year == end_year:
            break

        current_month += 1
        if current_month > 12:
            current_month = 1
            current_year += 1
            
    return date_list

In [0]:
import urllib.request
import boto3
import os
import shutil

start_date = [1, 2023] # Janeiro de 2023
end_date = [5, 2023] # Maio de 2023
date_list = create_month_year_array(start_date, end_date)
trip_data_list=["yellow", "green"]
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key,  region_name='us-east-2')
bucket = "ny-taxi-case"

os.makedirs("data", exist_ok=True)
remote_path_list = []

for date in date_list:

    month = date[0]
    year = date[1]

    for trip_data in trip_data_list:
        url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{trip_data}_tripdata_{year}-{month}.parquet"
        local_path = f"data/{trip_data}_tripdata_{year}-{month}.parquet"
        remote_path = f"raw_data/{trip_data}/{year}/{month}/{trip_data}-{year}-{month}.parquet"

        urllib.request.urlretrieve(url, local_path)
        print(f"Download de {trip_data}_tripdata_{year}-{month}.parquet concluido")
        s3.upload_file(local_path, bucket, remote_path)
        print(f"Upload de {trip_data}_tripdata_{year}-{month}.parquet para S3 concluido")

        remote_path_list.append(remote_path)
shutil.rmtree('data') # Remove arquivos locais

Download de yellow_tripdata_2023-01.parquet concluido
Upload de yellow_tripdata_2023-01.parquet para S3 concluido
Download de green_tripdata_2023-01.parquet concluido
Upload de green_tripdata_2023-01.parquet para S3 concluido
Download de yellow_tripdata_2023-02.parquet concluido
Upload de yellow_tripdata_2023-02.parquet para S3 concluido
Download de green_tripdata_2023-02.parquet concluido
Upload de green_tripdata_2023-02.parquet para S3 concluido
Download de yellow_tripdata_2023-03.parquet concluido
Upload de yellow_tripdata_2023-03.parquet para S3 concluido
Download de green_tripdata_2023-03.parquet concluido
Upload de green_tripdata_2023-03.parquet para S3 concluido
Download de yellow_tripdata_2023-04.parquet concluido
Upload de yellow_tripdata_2023-04.parquet para S3 concluido
Download de green_tripdata_2023-04.parquet concluido
Upload de green_tripdata_2023-04.parquet para S3 concluido
Download de yellow_tripdata_2023-05.parquet concluido
Upload de yellow_tripdata_2023-05.parquet 

## Refinamento dos dados
Nesta etapa sera feita a limpeza e organizacao dos dados. Serao selecionados apenas colunas de interesse, remocao de dados nulos, exclusao de registros duplicados ou incoerentes: (corrida fora do intervalo de interesse, com valor recebido menor ou igual 0...), alem da padronizacao do tipo de cada coluna dos parquets. 

Os arquivos serao organizados no S3 bucket com o seguinte padrao:
cleaned_data/[TIPO_DE_TAXI]/[ANO]/[MES]/arquivo.parquet

In [0]:
from pyspark.sql import functions as F

for date in date_list:

  month = date[0]
  year = date[1]

  for trip_data in trip_data_list:
    relative_path = f"raw_data/{trip_data}/{year}/{month}/{trip_data}-{year}-{month}.parquet"
    df = spark.read.format("parquet") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(f"s3://{bucket}/{relative_path}")

    # Renomeia coluna para caso dos Green Taxis
    df = (df.withColumnRenamed("lpep_pickup_datetime", "tpep_pickup_datetime")
            .withColumnRenamed("lpep_dropoff_datetime", "tpep_dropoff_datetime")
        )    
    
    # Seleciona colunas de interesse, remove nulos, registros duplicados e filtra incoerencias
    df = (df.select(["VendorID","passenger_count", "total_amount","tpep_pickup_datetime","tpep_dropoff_datetime"])
            .dropna()
            .dropDuplicates()
            .filter(df['passenger_count'] > 0) # Corrida com zero passagerios
            .filter(df['total_amount'] > 0) # Corrida com valor total negativo
            .filter(df['tpep_pickup_datetime'] < df['tpep_dropoff_datetime']) # Tempo de chegada < tempo de partida
            .filter(df['tpep_pickup_datetime'] > F.to_timestamp(F.lit(f"{year}-{month}-01"))) # Corrida fora do mes
            .filter(df['tpep_pickup_datetime'] < F.add_months(F.to_timestamp(F.lit(f"{year}-{month}-01")), 1)) # Corrida fora do mes
        )
    
    # Garante que a mesma coluna tenha o mesmo tipo nos diferentes parquets
    df = (df.withColumn("VendorID", F.col("VendorID").cast("long"))
          .withColumn("passenger_count", F.col("passenger_count").cast("int"))
          .withColumn("total_amount", F.col("total_amount").cast("double"))
          .withColumn("tpep_pickup_datetime", F.col("tpep_pickup_datetime").cast("timestamp"))
          .withColumn("tpep_dropoff_datetime", F.col("tpep_dropoff_datetime").cast("timestamp"))
      )
    
    # Upload de dado tratado para Bucket S3
    s3_path = f"s3://{bucket}/{relative_path.replace("raw_data", "cleaned_data")}"
    df.write.parquet(s3_path, mode="overwrite")


## Dados de metricas
Por fim, para agilizar e otimizar consultas para geracao de BIs, dashes, ou reports, foi tambem gerado um arquivo de metricas. Para este caso, os dados foram separados e estruturados trazendo um resumo dos registros por hora, auziliando assim uma analise por tempo dos registros. 
O arquivo de metricas foi estruturado com as seguintes colunuas: date, hour, sum_total_amount, sum_passenger_count, total_rides, avg_total_amounts e avg_passenger_count.

Utilizou-se funcoes PySpark para manipulacao dos dados. 

Os arquivos serao organizados no S3 bucket com o seguinte padrao: metrics_data/[TIPO_DE_TAXI]/arquivo.parquet

In [0]:
from pyspark.sql import functions as F

for trip_type in ["yellow", "green"]:
      relative_path = f"cleaned_data/{trip_type}/*/*/*"
      df = spark.read.format("parquet") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(f"s3://{bucket}/{relative_path}")

      df = (
      df.withColumn("date", F.to_date(F.col("tpep_pickup_datetime")))
            .withColumn("hour", F.hour(F.col("tpep_pickup_datetime")))
      )

      df = df.groupBy("date", "hour").agg(
      F.sum("total_amount").alias("sum_total_amount").cast("double"),
      F.sum("passenger_count").alias("sum_passenger_count").cast("int"),
      F.count("*").alias("total_rides").cast("int")
      )

      df = (df.withColumn("avg_total_amounts", F.col("sum_total_amount") / F.col("total_rides"))
              .withColumn("avg_passenger_count", F.col("sum_passenger_count") / F.col("total_rides"))
      )

      s3_path = f"s3://{bucket}/metrics_data/{trip_type}/{trip_type}_hour_analysis.parquet"
      df.write.parquet(s3_path, mode="overwrite")
