- #  Configuração do Ambiente


In [1]:
!pip install pyspark google-cloud-bigquery matplotlib seaborn -q
!pip install pandas-gbq -q
!pip install pandas-gbq -q
import pandas as pd
import requests
from io import BytesIO
import pandas_gbq
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import matplotlib.pyplot as plt
import seaborn as sns
!pip install pyspark pandas-gbq db-dtypes -q
from google.colab import auth
auth.authenticate_user()
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col


- # Configurações do BigQuery

In [None]:
project_id = 'leanttro-projeto-taxi'
tabela_destino_bq = 'dados_analise.dados_brutos'
colunas_para_usar = [
    'VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
    'passenger_count', 'trip_distance', 'total_amount']


- ##  Verifica o Último Mês Carregado no BigQuery

In [None]:
print("Verificando o último mês carregado no BigQuery...")
start_date = '2023-01-01' # Data de início padrão se a tabela não existir
ultima_data_carregada_str = "Nenhuma"

try:
    sql_ultima_data = f"SELECT MAX(tpep_pickup_datetime) as ultima_data FROM `{project_id}.{tabela_destino_bq}`"
    df_ultima_data = pandas_gbq.read_gbq(sql_ultima_data, project_id=project_id)

    ultima_data_carregada = df_ultima_data['ultima_data'].iloc[0]

    if pd.notna(ultima_data_carregada):
        # Move a data de início para o primeiro dia do MÊS SEGUINTE
        start_date = (ultima_data_carregada + pd.offsets.MonthBegin(1)).strftime('%Y-%m-%d')
        ultima_data_carregada_str = ultima_data_carregada.strftime('%Y-%m-%d')

except Exception as e:
    print(f"AVISO: Não foi possível verificar a última data (tabela pode não existir ainda). Começando do início. Erro: {e}")

print(f"Último dado encontrado: {ultima_data_carregada_str}. Iniciando busca a partir de: {start_date}")


- ## Gera o Intervalo de Datas Dinamicamente

O fim é sempre o primeiro dia do mês atual, para garantir que só peguemos meses completos.

In [None]:
end_date = pd.to_datetime('today').replace(day=1).strftime('%Y-%m-%d')
datas_para_carregar = pd.date_range(start=start_date, end=end_date, freq='MS')

- ## Loop de Carga Incremental: (ADICIONAR NOVOS)

In [None]:
if datas_para_carregar.empty:
    print("\nNão há novos meses completos para carregar. Seus dados já estão atualizados!")
else:
    print(f"\nIniciando o carregamento de {len(datas_para_carregar)} novo(s) mese(s) de dados...\n")

    for data in datas_para_carregar:
        ano = data.strftime('%Y')
        mes = data.strftime('%m')

        url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{ano}-{mes}.parquet"

        print(f"Processando: {ano}-{mes}")
        try:
            response = requests.get(url, timeout=20)
            response.raise_for_status()

            df_mensal = pd.read_parquet(BytesIO(response.content), columns=colunas_para_usar)
            df_mensal = df_mensal.dropna(subset=['tpep_pickup_datetime'])

            print(f"Enviando {len(df_mensal):,} linhas para o BigQuery...")

            pandas_gbq.to_gbq(
                df_mensal,
                destination_table=tabela_destino_bq,
                project_id=project_id,
                if_exists='append'
            )
            print(f"Dados de {ano}-{mes} carregados com sucesso!\n")

        except requests.exceptions.RequestException as e:
            print(f"AVISO: Falha ao baixar dados de {ano}-{mes}. O arquivo pode não existir ainda ou houve um erro de rede. Erro: {e}\n")

print("--- Carga de dados em lote concluída! ---")

- ## Puxando tabela silver (dados_limpos) do GCP:

- # Configurações do BigQuery

In [2]:
spark = SparkSession.builder.appName("PySparkAmostraGrande_5M").getOrCreate()
project_id = 'leanttro-projeto-taxi'
table_id = 'dados_analise.dados_limpos'

- ## Movendo 2 milhões de registros pela rede do BigQuery para o Colab.

In [None]:
sql_query_amostra_grande = f"SELECT * FROM `{project_id}.{table_id}` LIMIT 2000000"
print(f"\nBuscando 2 milhões de registros do BigQuery para o Pandas...")
df_pandas = pandas_gbq.read_gbq(sql_query_amostra_grande, project_id=project_id)
print(f"Amostra de {len(df_pandas):,} linhas carregada com sucesso no Pandas.")
print("\nConvertendo para DataFrame PySpark...")
df_silver_spark = spark.createDataFrame(df_pandas)
df_silver_spark.cache()
print("DataFrame PySpark criado e pronto para uso!")


Buscando 2 milhões de registros do BigQuery para o Pandas...
Downloading: 100%|[32m██████████[0m|
Amostra de 2,000,000 linhas carregada com sucesso no Pandas.

Convertendo para DataFrame PySpark...


- # Criando DataFrame Gold:

In [None]:
from pyspark.sql.functions import count, sum, avg, round, col
df_gold_spark = df_silver_spark.groupBy("pickup_day_of_week", "pickup_hour") \
                            .agg(
                                count("*").alias("total_de_corridas"),
                                round(sum("total_amount"), 2).alias("faturamento_total"),
                                round(avg("total_amount"), 2).alias("faturamento_medio_por_corrida"),
                                round(avg("trip_distance"), 2).alias("distancia_media_percorrida"),
                                round(avg("trip_duration_minutes"), 2).alias("duracao_media_minutos"),
                                round(avg("passenger_count"), 2).alias("media_de_passageiros"),
                                round(avg(col("trip_distance") / (col("trip_duration_minutes") / 60)), 2).alias("velocidade_media_kmh")
                            )\
                            .filter(col("duracao_media_minutos") > 0)

df_gold_spark.cache()

print("DataFrame Gold criado com sucesso!")
df_gold_spark.show(5)

- # Análise Exploratória 2 milhões de Dados:

- TOTAL DE CORRIDAS POR HORA

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

print("Calculando o total de corridas por hora...")
corridas_por_hora = df_gold_spark.groupBy("pickup_hour") \
                               .agg(sum("total_de_corridas").alias("volume_total_corridas")) \
                               .orderBy("pickup_hour")

# Converter o resultado (que é pequeno, só 24 linhas) para Pandas para plotar
corridas_por_hora_pd = corridas_por_hora.toPandas()

# Gerar o gráfico
print("Gerando o gráfico de barras...")
plt.figure(figsize=(14, 7))
sns.barplot(x='pickup_hour', y='volume_total_corridas', data=corridas_por_hora_pd, palette='YlOrRd')
plt.title('Total de Corridas por Hora do Dia (Amostra de 2 Milhões)', fontsize=16)
plt.xlabel('Hora do Dia', fontsize=12)
plt.ylabel('Volume Total de Corridas', fontsize=12)
plt.xticks(rotation=45)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()

- FATURAMENTO POR DIA DA SEMANA

In [None]:
from pyspark.sql.functions import sum
import pandas as pd

print("Calculando faturamento por dia da semana...")
faturamento_por_dia = df_gold_spark.groupBy("pickup_day_of_week") \
                                   .agg(sum("faturamento_total").alias("faturamento")) \
                                   .orderBy("pickup_day_of_week")

# Converter para Pandas para plotar
faturamento_por_dia_pd = faturamento_por_dia.toPandas()

# Mapear os números dos dias para nomes para o gráfico ficar mais legível
dias_map = {1: 'Domingo', 2: 'Segunda', 3: 'Terça', 4: 'Quarta', 5: 'Quinta', 6: 'Sexta', 7: 'Sábado'}
faturamento_por_dia_pd['dia_nome'] = faturamento_por_dia_pd['pickup_day_of_week'].map(dias_map)


# Gerar o gráfico
print("Gerando o gráfico de pizza...")
plt.figure(figsize=(10, 10))
plt.pie(faturamento_por_dia_pd['faturamento'], labels=faturamento_por_dia_pd['dia_nome'], autopct='%1.1f%%', startangle=140, colors=sns.color_palette('YlOrRd', 7))
plt.title('Distribuição do Faturamento por Dia da Semana', fontsize=16)
plt.ylabel('') # Remove o label 'faturamento' do eixo y
plt.show()

- GRÁFICO COMBINADO DE TRÂNSITO

In [None]:
print("Calculando métricas de trânsito por hora...")
transito_por_hora = df_gold_spark.groupBy("pickup_hour") \
                               .agg(avg("duracao_media_minutos").alias("duracao_media"),
                                    avg("velocidade_media_kmh").alias("velocidade_media")) \
                               .orderBy("pickup_hour")

transito_por_hora_pd = transito_por_hora.toPandas()
print("Gerando o gráfico combinado...")
fig, ax1 = plt.subplots(figsize=(14, 7))

sns.barplot(x='pickup_hour', y='duracao_media', data=transito_por_hora_pd, color='gold', ax=ax1, label='Duração Média (min)')
ax1.set_xlabel('Hora do Dia', fontsize=12)
ax1.set_ylabel('Duração Média (minutos)', fontsize=12, color='gold')
ax1.tick_params(axis='y', labelcolor='gold')

# Criar um segundo eixo Y para a Velocidade
ax2 = ax1.twinx()
sns.lineplot(x='pickup_hour', y='velocidade_media', data=transito_por_hora_pd, color='red', marker='o', ax=ax2, label='Velocidade Média (km/h)')
ax2.set_ylabel('Velocidade Média (km/h)', fontsize=12, color='red')
ax2.tick_params(axis='y', labelcolor='red')

plt.title('Análise de Trânsito: Duração da Viagem vs. Velocidade Média', fontsize=16)
fig.tight_layout()
plt.show()

## **MACHINE LEARNING (AMOSTA 2 MILHÕES CORRIDAS):**

#### Ensinar um modelo a prever a coluna trip_duration_minutes com base em outras características da corrida com PySpark MLlib

In [None]:
from pyspark.sql import SparkSession
from google.colab import auth
spark = SparkSession.builder\
    .master("local[*]")\
    .appName("Colab-PySpark-ML-Taxi")\
    .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.23.2") \
    .getOrCreate()

# 2. Autenticar com sua conta Google
auth.authenticate_user()
print('Usuário autenticado com o Google Cloud!')

print("\nCarregando 2 milhões de registros do BigQuery...")
df_silver_completo = spark.read.format('bigquery') \
  .option('table', f'{project_id}:dados_analise.dados_limpos') \
  .load() \
  .limit(2000000) # Aplicando o mesmo limite de 2M do seu notebook

# Colocar em cache para performance
df_silver_completo.cache()

print(f"Dados carregados! Total de {df_silver_completo.count()} registros.")
df_silver_completo.printSchema()

Usuário autenticado com o Google Cloud!

Carregando 2 milhões de registros do BigQuery...


Py4JJavaError: An error occurred while calling o94.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: bigquery. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 15 more


- Carregando os Dados

In [None]:
df_modelo = df_silver_spark.select(
    "pickup_hour",
    "pickup_day_of_week",
    "trip_distance",
    "passenger_count",
    "trip_duration_minutes",
    "total_amount"
).na.drop()

- Dividindo os dados em treino (80%) e teste (20%):

In [None]:
(df_treino, df_teste) = df_modelo.randomSplit([0.8, 0.2], seed=42)


-  Colocando em cache novamente:

In [None]:
df_treino.cache()
df_teste.cache()
print(f"Dados prontos para o modelo. Treino: {df_treino.count()}, Teste: {df_teste.count()}")

.cache(): É uma otimização do Spark para guardar esses dois conjuntos de dados na memória, tornando o acesso a eles mais rápido nas etapas seguintes de treinamento.

## Preparando as Features:

In [None]:
from pyspark.ml.feature import VectorAssembler
features_cols = ["pickup_hour", "pickup_day_of_week", "trip_distance", "passenger_count"]
assembler = VectorAssembler(inputCols=features_cols, outputCol="features")

# Transformando os dataframes de treino e teste:
df_treino_ml = assembler.transform(df_treino)
df_teste_ml = assembler.transform(df_teste)

## **MODELO DE ML:** Regressão Linear

-  Carregar dados:

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
lr = LinearRegression(featuresCol="features", labelCol="trip_duration_minutes")
lr_modelo = lr.fit(df_treino_ml)


- Fazendo previsões:

In [None]:
previsoes_lr = lr_modelo.transform(df_teste_ml)

- Avaliando modelo:

In [None]:
evaluator = RegressionEvaluator(labelCol="trip_duration_minutes", predictionCol="prediction")
rmse_lr = evaluator.evaluate(previsoes_lr, {evaluator.metricName: "rmse"})
r2_lr = evaluator.evaluate(previsoes_lr, {evaluator.metricName: "r2"})

## - Resultado do Modelo Linear:

In [None]:
print("\n--- Resultado da Regressão Linear ---")
print(f"RMSE: {rmse_lr:.2f} minutos")
print(f"R²: {r2_lr:.2%}")

RMSE: Ele pode errar até 14.01 min pra mais ou pra menos do que a duração da viagem!

Um R² de 0.35% não é negativo, como na amostra inferior, mas é considerado muito baixo. Ele está ativamente fazendo previsões ruins. POR ISSO, vamos treinar OUTRO modelo e avaliar novamente:

## **MODELO DE ML:** Random Forest

n_estimators=50 significa que ele usará 50 "árvores de decisão" para votar na melhor previsão.
n_jobs=-1 usa todos os processadores disponíveis para acelerar o treinamento.

- Carregando dados:

In [None]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="trip_duration_minutes",
    seed=42,
    numTrees=50  # Equivalente ao n_estimators
)
rf_modelo = rf.fit(df_treino_ml)

- Fazendo previsões:

In [None]:
previsoes_rf = rf_modelo.transform(df_teste_ml)

- Avaliando o modelo (usando o mesmo 'evaluator' de antes):


In [None]:
rmse_rf = evaluator.evaluate(previsoes_rf, {evaluator.metricName: "rmse"})
r2_rf = evaluator.evaluate(previsoes_rf, {evaluator.metricName: "r2"})


## - Resultado:

In [None]:
print("\n--- Resultado do Random Forest ---")
print(f"RMSE: {rmse_rf:.2f} minutos")
print(f"R²: {r2_rf:.2%}")


%md
RMSE: Ele pode errar até 7.59 min pra mais ou pra menos do que a duração da viagem!

Um R² de 70.75% significa que ele pode acertar mais que 2/3 das previsões! Bem melhor do que o modelo de regressão linear para esse caso!

**OU SEJA, OBTEVE UM RESULTADO MELHOR QUE O PRIMEIRO TESTE!**

#### **Ensinar um modelo a prever a coluna total_amount com base nas características da corrida, incluindo a duração que nosso primeiro modelo previa usando PySpark MLlib**

**MODELO DE ML:** Random Forest Regressor:

- Criando coluna "Duração Prevista":

In [None]:
df_treino_com_previsao_tempo = rf_modelo.transform(df_treino_ml) \
                                        .withColumnRenamed("prediction", "duracao_prevista")
df_teste_com_previsao_tempo = rf_modelo.transform(df_teste_ml) \
                                       .withColumnRenamed("prediction", "duracao_prevista")

print("Coluna 'duracao_prevista' criada com sucesso.")
df_treino_com_previsao_tempo.select("trip_duration_minutes", "duracao_prevista").show(5)

- Criando Features:

In [None]:
from pyspark.ml.feature import VectorAssembler
features_valor_cols = [
    "pickup_hour",
    "pickup_day_of_week",
    "trip_distance",
    "passenger_count",
    "duracao_prevista"  # <<< A previsão do primeiro modelo entra aqui!
]
assembler_valor = VectorAssembler(inputCols=features_valor_cols, outputCol="features_valor")
df_treino_valor = assembler_valor.transform(df_treino_com_previsao_tempo)
df_teste_valor = assembler_valor.transform(df_teste_com_previsao_tempo)
print("Features para o modelo de valor preparadas.")

- Carrendo dados focado no valor (total_amount):

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
rf_valor = RandomForestRegressor(
    featuresCol="features_valor",
    seed=42,
    numTrees=50
)

- Treinando modelo:

In [None]:
# Treinar o modelo
modelo_valor = rf_valor.fit(df_treino_valor)
print("Treinamento concluído!")

# Fazer previsões no conjunto de teste
previsoes_valor = modelo_valor.transform(df_teste_valor)


- Avaliando o modelo:

In [None]:
evaluator_valor = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction")
rmse_valor = evaluator_valor.evaluate(previsoes_valor, {evaluator_valor.metricName: "rmse"})
r2_valor = evaluator_valor.evaluate(previsoes_valor, {evaluator_valor.metricName: "r2"})

In [None]:
print("\n--- Resultado do Modelo de Previsão de Valor ---")
print(f"RMSE: ${rmse_valor:.2f} DÓLARES")
print(f"R²: {r2_valor:.2%}")

RMSE: O valor está R$7,48 DÓLARES de diferença (pra mais ou pra menos) do valor real, considerando todas as variações que podem alterar no valor final


R²: 86.64% significa que o modelo  consegue captar bem a relação entre as variáveis (tempo, distância, hora, etc.) e o preço.

# **SIMULADOR DE PREÇOS PARA CORRIDA (AMOSTRA 2 MILHÕES DE CORRIDAS):**

In [None]:
# =================================================================
# SIMULADOR COMPLETO (TEMPO E VALOR) COM MODELOS PYSPARK
# =================================================================
from pyspark.ml.regression import RandomForestRegressionModel

# Função que usa os dois modelos PySpark salvos
def simular_corrida_spark_completo(km, hora, dia_semana, passageiros=1):

    # --- 1. Carregar os modelos treinados ---
    try:
        modelo_tempo = RandomForestRegressionModel.load("./modelo_tempo_spark")
        modelo_valor = RandomForestRegressionModel.load("./modelo_valor_spark")
    except Exception as e:
        print(f"Erro ao carregar os modelos. Certifique-se de que as pastas './modelo_tempo_spark' e './modelo_valor_spark' existem.")
        print(f"Detalhe do erro: {e}")
        return

    # --- 2. Preparar os dados de ENTRADA do usuário ---
    milhas = km * 0.621371
    schema_inicial = "pickup_hour INT, pickup_day_of_week INT, trip_distance DOUBLE, passenger_count INT"
    dados_nova_corrida = spark.createDataFrame(
        data=[(hora, dia_semana, milhas, passageiros)],
        schema=schema_inicial
    )

    # --- 3. PREVISÃO DO TEMPO (PRIMEIRO MODELO) ---
    # Transforma os dados de entrada usando o primeiro assembler (para o modelo de tempo)
    dados_para_prever_tempo = assembler.transform(dados_nova_corrida)
    # Faz a previsão do tempo e renomeia a coluna de previsão para 'duracao_prevista'
    df_com_duracao_prevista = modelo_tempo.transform(dados_para_prever_tempo).withColumnRenamed("prediction", "duracao_prevista")

    # Extrai o valor da duração prevista
    duracao_prevista = df_com_duracao_prevista.select("duracao_prevista").first()[0]

    # --- 4. PREVISÃO DO VALOR (SEGUNDO MODELO) ---
    # Transforma o DataFrame (que agora tem a 'duracao_prevista') usando o segundo assembler (para o modelo de valor)
    dados_para_prever_valor = assembler_valor.transform(df_com_duracao_prevista)
    # Faz a previsão do valor
    previsao_valor_df = modelo_valor.transform(dados_para_prever_valor)

    # Extrai o resultado da previsão de valor
    valor_previsto = previsao_valor_df.select("prediction").first()[0]

    # --- 5. Exibir o resultado (SAÍDA para o usuário) ---
    print("\n--- Previsão da Corrida (PySpark) ---")
    print(f"Distância: {km:.2f} km")
    print(f"Hora: {hora}:00h")
    print(f"Dia da semana: {dia_semana}")
    print(f">> Tempo estimado: {duracao_prevista:.1f} min")
    print(f">> Valor estimado: ${valor_previsto:.2f}")

# --- Parte Interativa ---
# (Pede os dados para o usuário e chama a função completa)
try:
    print("\n--- Simulador de Corrida Interativo (PySpark) ---")
    km_usuario = float(input("Digite a distância da corrida em KM (ex: 12.5): "))
    hora_usuario = int(input("Digite a hora do dia (0 a 23): "))
    dia_semana_usuario = int(input("Digite o dia da semana (1=Dom, 2=Seg, ..., 7=Sáb): "))

    simular_corrida_spark_completo(km_usuario, hora_usuario, dia_semana_usuario)

except ValueError:
    print("\nErro: Entrada inválida.")
except NameError:
    print("\nErro: Os 'assemblers' não foram definidos. Execute o código de treinamento primeiro.")