## Deafio Open Co

In [None]:
pip install pyspark

## Importações necessárias

In [None]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from datetime import datetime, timedelta


## Criando conexão com spark **ou buscando uma já criada com o nome de open_01**

In [None]:
spark = (
    SparkSession.builder
    .master('local')
    .appName('open_01')
    .getOrCreate()
)

## Carregando dataframes

In [None]:
df_2009 = spark.read.json('data-nyctaxi-trips-2009.json')
df_2010 = spark.read.json('data-nyctaxi-trips-2010.json')
df_2011 = spark.read.json('data-nyctaxi-trips-2011.json')
df_2012 = spark.read.json('data-nyctaxi-trips-2012.json')
type_pay = spark.read.csv('data-payment_lookup.csv', header=True, inferSchema=True)
vendedor = spark.read.csv('data-vendor_lookup.csv', header=True, inferSchema=True)




## Trata a coluna de dado para ficar mais facil

In [None]:
def extrair_data_pickup_date(year_df):
    year_df = year_df.withColumn("pickup_date", F.to_date("pickup_datetime"))
    return year_df

df_2009 = extrair_data_pickup_date(df_2009)
df_2010 = extrair_data_pickup_date(df_2010)
df_2011 = extrair_data_pickup_date(df_2011)
df_2012 = extrair_data_pickup_date(df_2012)


## Qual vendedor viajou mais no ano de 2009 á 2012?



In [None]:
def find_max_trips_vendor(year_df, vendedor) -> None:
    """
    Esta função encontra o vendedor que mais viajou em um ano específico.

    Args:
        year_df (DataFrame): Um DataFrame contendo dados de viagens para um único ano.
        vendedor (DataFrame): Um DataFrame contendo dados sobre os vendedores.

    Returns:
        DataFrame: Um DataFrame contendo o vendedor que mais viajou e o número de viagens que ele realizou.
    """

    joined_data = year_df.join(vendedor, on="vendor_id", how="inner")

    vendor_trips = joined_data.groupBy("vendor_id", "name").agg(count("vendor_id").alias("total_trips"))

    max_trips_vendor = vendor_trips.orderBy(desc("total_trips")).limit(1)

    max_trips_vendor.select("name", "total_trips").show()

print("No ano de 2009:")
find_max_trips_vendor(df_2009, vendedor)
print("\n")
print("No ano de 2010:")
find_max_trips_vendor(df_2010, vendedor)
print("\n")
print("No ano de 2011:")
find_max_trips_vendor(df_2011, vendedor)
print("\n")
print("No ano de 2012:")
find_max_trips_vendor(df_2012, vendedor)


No ano de 2009:
+------------+-----------+
|        name|total_trips|
+------------+-----------+
|VeriFone Inc|      15379|
+------------+-----------+



No ano de 2010:
+------------+-----------+
|        name|total_trips|
+------------+-----------+
|VeriFone Inc|      16630|
+------------+-----------+



No ano de 2011:
+------------+-----------+
|        name|total_trips|
+------------+-----------+
|VeriFone Inc|      17472|
+------------+-----------+



No ano de 2012:
+------------+-----------+
|        name|total_trips|
+------------+-----------+
|VeriFone Inc|      17472|
+------------+-----------+



## Qual a semana de cada ano que mais teve viagens de táxi?

---



In [None]:
def find_week_with_max_trips(year_df):
    """
    This function finds the week with the most trips for a specific year.

    Args:
        year_df (DataFrame): A DataFrame containing trip data for a single year.

    Returns:
        DataFrame: A DataFrame containing the week with the most trips.
    """

    year_df = year_df.withColumn("week_number", F.weekofyear("pickup_date"))

    trips_per_week = year_df.groupBy("week_number").agg(F.count("*").alias("total_trips"))

    max_trips_week = trips_per_week.orderBy(F.desc("total_trips")).limit(1)

    return max_trips_week


def show_week_in_date_format(max_trips_week) -> None:
    """
    This function displays the results in date format.

    Args:
        max_trips_week (DataFrame): A DataFrame containing the week with the most trips.

    Returns:
        None
    """
    max_trips_week_column = str(max_trips_week.select("week_number").first()[0])
    week_start = datetime.strptime("2009-" + max_trips_week_column + "-1", "%Y-%W-%w")
    week_end = week_start + timedelta(days=6)
    print(f"Semana {max_trips_week_column}: de {week_start.strftime('%Y-%m-%d')} até {week_end.strftime('%Y-%m-%d')}")

print("No ano de 2009:")
max_trips_week_2009 = find_week_with_max_trips(df_2009)
show_week_in_date_format(max_trips_week_2009)
print("\n")
print("No ano de 2010:")
max_trips_week_2010 = find_week_with_max_trips(df_2010)
show_week_in_date_format(max_trips_week_2010)
print("\n")
print("No ano de 2011:")
max_trips_week_2011 = find_week_with_max_trips(df_2011)
show_week_in_date_format(max_trips_week_2011)
print("\n")
print("No ano de 2012:")
max_trips_week_2012 = find_week_with_max_trips(df_2012)
show_week_in_date_format(max_trips_week_2012)




No ano de 2009:
Semana 11: de 2009-03-16 até 2009-03-22


No ano de 2010:
Semana 37: de 2009-09-14 até 2009-09-20


No ano de 2011:
Semana 16: de 2009-04-20 até 2009-04-26


No ano de 2012:
Semana 31: de 2009-08-03 até 2009-08-09


## Quantas viagens o vendor com mais viagens naquele ano fez na semana com mais viagens de táxi no ano?

In [None]:
def find_trips_of_top_vendor_in_max_trips_week(year_df)-> None:
    """
    This function finds the week in which the top vendor had the most trips and the number of trips
    they had in that week.

    Args:
        year_df (DataFrame): A DataFrame containing trip data for a single year.

    Returns:
        None
    """
    year_df = year_df.withColumn("week_number", weekofyear(year_df["pickup_date"]))

    trips_per_week = year_df.groupBy("week_number", "vendor_id").agg(
        count("*").alias("total_trips")
    )

    max_trips_week = trips_per_week.orderBy(desc("total_trips")).limit(1)

    top_vendor = max_trips_week.select("vendor_id").first()[0]

    trips_for_top_vendor = year_df.filter((year_df["week_number"] == max_trips_week.select("week_number").first()[0]) & (year_df["vendor_id"] == top_vendor)).groupBy("vendor_id", "week_number").agg(count("*").alias("trips_in_max_week"))

    trips_for_top_vendor.select("vendor_id", "week_number", "trips_in_max_week").show()


print("No ano de 2009:")
find_trips_of_top_vendor_in_max_trips_week(df_2009)
print("\n")
print("No ano de 2010:")
find_trips_of_top_vendor_in_max_trips_week(df_2010)
print("\n")
print("No ano de 2011:")
find_trips_of_top_vendor_in_max_trips_week(df_2011)
print("\n")
print("No ano de 2012:")
find_trips_of_top_vendor_in_max_trips_week(df_2012)





No ano de 2009:
+---------+-----------+-----------------+
|vendor_id|week_number|trips_in_max_week|
+---------+-----------+-----------------+
|      VTS|          7|              364|
+---------+-----------+-----------------+



No ano de 2010:
+---------+-----------+-----------------+
|vendor_id|week_number|trips_in_max_week|
+---------+-----------+-----------------+
|      VTS|          2|              374|
+---------+-----------+-----------------+



No ano de 2011:
+---------+-----------+-----------------+
|vendor_id|week_number|trips_in_max_week|
+---------+-----------+-----------------+
|      VTS|         16|              444|
+---------+-----------+-----------------+



No ano de 2012:
+---------+-----------+-----------------+
|vendor_id|week_number|trips_in_max_week|
+---------+-----------+-----------------+
|      VTS|         29|              445|
+---------+-----------+-----------------+

