In [1]:
%pip install -r ./requirements.txt

Collecting pyspark==3.5.7 (from -r ./requirements.txt (line 2))
  Using cached pyspark-3.5.7-py2.py3-none-any.whl
Collecting boto3==1.41.5 (from -r ./requirements.txt (line 3))
  Obtaining dependency information for boto3==1.41.5 from https://files.pythonhosted.org/packages/3c/56/f47a80254ed4991cce9a2f6d8ae8aafbc8df1c3270e966b2927289e5a12f/boto3-1.41.5-py3-none-any.whl.metadata
  Using cached boto3-1.41.5-py3-none-any.whl.metadata (6.8 kB)
Collecting pandas==2.2.3 (from -r ./requirements.txt (line 4))
  Obtaining dependency information for pandas==2.2.3 from https://files.pythonhosted.org/packages/cd/5f/4dba1d39bb9c38d574a9a22548c540177f78ea47b32f99c0ff2ec499fac5/pandas-2.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Using cached pandas-2.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (89 kB)
Collecting google-auth-oauthlib==1.2.3 (from -r ./requirements.txt (line 5))
  Obtaining dependency information for google-auth-oauthlib==1.2.3

In [107]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, count, max, add_months, to_timestamp
from typing import Dict, List
from functools import reduce
import boto3

In [3]:
def create_spark_session():
        """
        Create and return a Spark session configured for AWS S3 access.
        Returns:
            SparkSession: Configured Spark session object.
        """
        aws_profile_name = "default"

        spark = SparkSession.builder \
                            .appName("SplitColumnExample") \
                            .config(
                                "spark.jars.packages",
                                "org.apache.hadoop:hadoop-aws:3.3.4,"
                                "com.amazonaws:aws-java-sdk-bundle:1.12.262"
                            ) \
                            .config(
                                "spark.hadoop.fs.s3a.aws.credentials.provider",
                                "com.amazonaws.auth.profile.ProfileCredentialsProvider"
                            ) \
                            .config("spark.hadoop.fs.s3a.profile", aws_profile_name) \
                            .getOrCreate()
        
        return spark

In [20]:
def get_s3_data(spark: SparkSession, combine_paths_to_df: bool = True, s3_scheme: str = "s3a"):
    """
    Traverses page_iterator (response from list_objects_v2 / boto3 paginator), groups file paths by table name (taking the element at table_index of the Key split by /),and reads the parquet files of each table into a DataFrame.
    Args:
        spark (SparkSession): Configured Spark session object.
        combine_paths_to_df (boolean, optional): If True, returns DataFrames; if False, returns only lists of paths.
        s3_scheme (string, optional): Scheme used to build the S3 path (e.g., "s3a" or "s3n" â€” default "s3a").
    Returns:
        tables_dfs (Dict): A dict where the key is the table name and the value is a DataFrame (or a list of paths if combine_paths_to_df=False).
    """

    s3 = boto3.client('s3')
    bucket_name = "data-challenge-loadsmart"
    bucket_prefix = "athena/loadsmart_analytics/"

    paginator = s3.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=bucket_prefix)

    table_paths: Dict[str, List[str]] = {}

    for page in page_iterator:
        if 'Contents' in page:
            for obj in page['Contents']:
                print(obj['Key'])
                data_path = f"{s3_scheme}://{bucket_name}/{obj['Key']}"
                parts = obj['Key'].split('/')
                table_name = parts[2]
                table_paths.setdefault(table_name, []).append(data_path)
    if not combine_paths_to_df:
        return table_paths
    
    tables_dfs: Dict[str, DataFrame] = {}

    for table, paths in table_paths.items():
        try:
            df = spark.read.parquet(*paths)
            tables_dfs[table] = df
        except Exception as e:
            print(f"Error to read paths combined for the table '{table}': {e}")
            print("Trying to read file a file and unite them by unionByName...")
            dfs = []
            for p in paths:
                try:
                    dfs.append(spark.read.parquet(p))
                except Exception as e:
                    print(f"Error by reading {p}: {e} (skipping this file)")
            try:
                combined = reduce(lambda a,b: a.unionByName(b, allowMissingColumns=True), dfs)
            except TypeError:
                combined = reduce(lambda a,b: a.unionByName(b), dfs)
            tables_dfs[table] = combined

    return tables_dfs

In [21]:
spark = create_spark_session()
dfs_por_tabela = get_s3_data(spark)

athena/loadsmart_analytics/dim_book/52312e26-fd1f-4eaa-b73a-c5742203cb8f/20251203_051343_00016_ydp5s_7e891e4d-aa78-4bcb-979b-1f04fe7938e2
athena/loadsmart_analytics/dim_carriers/6785e746-bb04-4227-b3ef-3cb4f7698f4e/20251203_045540_00052_59x32_6acc931a-6655-4893-a294-3539e6a1d77e
athena/loadsmart_analytics/dim_delivery/3e7d0e7e-1a19-43b0-a7de-b42613b212e4/20251203_045554_00223_hji59_83e25cf3-4ed8-4442-a881-399f0445b8af
athena/loadsmart_analytics/dim_pickup/ec49864a-38aa-4751-938d-ca242517d7dd/20251203_045608_00268_8zxb3_cf76baa5-5127-46d9-91c5-5f5bab5f20e1
athena/loadsmart_analytics/dim_shippers/a70754e1-a02d-4629-99fe-9f3b862503ad/20251203_045616_00061_aazau_54f56ec1-783a-46df-a843-87ed58a10411
athena/loadsmart_analytics/fact_loadsmart/19c70d10-22e8-49de-8d39-1538b7f80296/20251203_052917_00214_hi6mf_43566975-95d1-4db2-a44d-eed7bcea12c9


                                                                                

In [27]:
dfs_por_tabela.keys()

dict_keys(['dim_book', 'dim_carriers', 'dim_delivery', 'dim_pickup', 'dim_shippers', 'fact_loadsmart'])

In [25]:
df_book = dfs_por_tabela["dim_book"]
df_book.show()

[Stage 21:>                                                         (0 + 1) / 1]

+------------+-------------------+-------------------+-------------------+----------+------------+-------------------------+-------------------------+-----------------------+----------------+---------------+------------------------+-------------------------+------------------+
|loadsmart_id|         quote_date|          book_date|        source_date|book_price|source_price|has_mobile_app_tracking23|has_mobile_app_tracking24|has_macropoint_tracking|has_edi_tracking|contracted_load|load_booked_autonomously|load_sourced_autonomously|load_was_cancelled|
+------------+-------------------+-------------------+-------------------+----------+------------+-------------------------+-------------------------+-----------------------+----------------+---------------+------------------------+-------------------------+------------------+
|   206431033|2024-12-15 13:08:00|2024-12-15 13:09:00|2024-12-15 13:44:00|   8922.51|      8500.0|                    false|                    false|                

                                                                                

In [26]:
df_book.count()

                                                                                

5361

In [28]:
df_carriers = dfs_por_tabela["dim_carriers"]
df_carriers.show()

[Stage 25:>                                                         (0 + 1) / 1]

+------------+--------------+-----------+------------------------+---------------+-------------------------+---------------------------+-----------------------+
|loadsmart_id|carrier_rating|vip_carrier|carrier_dropped_us_count|   carrier_name|carrier_on_time_to_pickup|carrier_on_time_to_delivery|carrier_on_time_overall|
+------------+--------------+-----------+------------------------+---------------+-------------------------+---------------------------+-----------------------+
|   206431033|          NULL|      false|                       0| Carrier 605817|                     true|                       true|                   true|
|   206521177|          NULL|      false|                       0|Carrier 1396487|                     true|                       true|                   true|
|   206694049|          NULL|      false|                       0|Carrier 1044585|                     true|                       true|                   true|
|   206553113|          NULL|     

                                                                                

In [29]:
df_carriers.count()

                                                                                

5361

In [30]:
df_delivery = dfs_por_tabela["dim_delivery"]
df_delivery.show()

[Stage 29:>                                                         (0 + 1) / 1]

+------------+--------------+--------------+-------------------+-------------------------+
|loadsmart_id| delivery_city|delivery_state|      delivery_date|delivery_appointment_time|
+------------+--------------+--------------+-------------------+-------------------------+
|   206431033|Upper Marlboro|            MD|2024-12-21 02:00:00|      2024-12-21 00:00:00|
|   206521177|          Reno|            NV|2024-11-27 16:00:00|      2024-11-27 17:00:00|
|   206694049|Upper Marlboro|            MD|2024-06-09 04:10:00|      2024-06-08 23:00:00|
|   206553113|          Reno|            NV|2024-11-28 19:00:00|      2024-11-28 19:00:00|
|   206518817|      Portland|            OR|2024-10-02 11:55:00|      2024-10-02 10:00:00|
|   206427025|      Portland|            OR|2025-01-04 12:00:00|      2024-12-29 14:00:00|
|   206478177|      Portland|            OR|2024-11-03 10:00:00|      2024-11-03 12:00:00|
|   206754337|      Portland|            OR|2025-01-05 03:10:00|      2025-01-05 01:00:00|

                                                                                

In [31]:
df_delivery.count()

                                                                                

5361

In [32]:
df_pickup = dfs_por_tabela["dim_pickup"]
df_pickup.show()

[Stage 33:>                                                         (0 + 1) / 1]

+------------+-----------+------------+-------------------+-----------------------+
|loadsmart_id|pickup_city|pickup_state|        pickup_date|pickup_appointment_time|
+------------+-----------+------------+-------------------+-----------------------+
|   206431033| Hood River|          OR|2024-12-15 11:00:00|    2024-12-15 20:00:00|
|   206521177|     Etowah|          TN|2024-11-21 14:00:00|    2024-11-21 14:00:00|
|   206694049|    Salinas|          CA|2024-06-03 02:00:00|    2024-06-03 02:00:00|
|   206553113| Montpelier|          OH|2024-11-22 17:00:00|    2024-11-22 17:00:00|
|   206518817|     Newark|          DE|2024-09-26 17:00:00|    2024-09-26 17:00:00|
|   206427025|     Maxton|          NC|2024-12-23 15:00:00|    2024-12-23 15:00:00|
|   206478177|     Maxton|          NC|2024-10-31 14:00:00|    2024-10-31 14:00:00|
|   206754337|    Manning|          SC|2025-01-03 08:00:00|    2025-01-03 09:00:00|
|   206424601| Montpelier|          OH|2024-11-21 13:00:00|    2024-11-21 13

                                                                                

In [33]:
df_pickup.count()

                                                                                

5361

In [34]:
df_shippers = dfs_por_tabela["dim_shippers"]
df_shippers.show()

[Stage 37:>                                                         (0 + 1) / 1]

+------------+--------+-------+--------------+----------------+------------+
|loadsmart_id|     pnl|mileage|equipment_type|sourcing_channel|shipper_name|
+------------+--------+-------+--------------+----------------+------------+
|   206431033|  422.51| 2753.5|           RFR|          dat_in| Shipper 758|
|   206426897|  -24.37| 2948.9|           RFR|            NULL| Shipper 758|
|   206512777| 2409.97| 2591.7|           DRV|            NULL|Shipper 1644|
|   206437137|  -97.62| 2847.9|           RFR|carrier_capacity|Shipper 1249|
|   206513177| -128.58| 2702.1|           RFR|            NULL| Shipper 758|
|   206709073|   250.0| 2509.6|           RFR|            NULL| Shipper 585|
|   206554409|     0.0| 2777.3|           DRV|            NULL|Shipper 1644|
|   206506609|-2355.04| 3069.0|           RFR|            NULL| Shipper 832|
|   206516521|  -89.63| 2733.2|           RFR|            NULL|Shipper 1249|
|   206428753|  379.49| 2733.2|           RFR|     source_list|Shipper 1249|

                                                                                

In [35]:
df_shippers.count()

                                                                                

5361

In [36]:
df_loadsmart = dfs_por_tabela["fact_loadsmart"]
df_loadsmart.show()

[Stage 41:>                                                         (0 + 1) / 1]

+------------+-------------------+-------------------+-------------------+----------+------------+--------------+------------------------+-------------------+-------------------------+-------------------+-----------------------+--------+-------+
|loadsmart_id|         quote_date|          book_date|        source_date|book_price|source_price|carrier_rating|carrier_dropped_us_count|      delivery_date|delivery_appointment_time|        pickup_date|pickup_appointment_time|     pnl|mileage|
+------------+-------------------+-------------------+-------------------+----------+------------+--------------+------------------------+-------------------+-------------------------+-------------------+-----------------------+--------+-------+
|   206431033|2024-12-15 13:08:00|2024-12-15 13:09:00|2024-12-15 13:44:00|   8922.51|      8500.0|          NULL|                       0|2024-12-21 02:00:00|      2024-12-21 00:00:00|2024-12-15 11:00:00|    2024-12-15 20:00:00|  422.51| 2753.5|
|   206521177|20

                                                                                

In [37]:
df_loadsmart.show()

[Stage 42:>                                                         (0 + 1) / 1]

+------------+-------------------+-------------------+-------------------+----------+------------+--------------+------------------------+-------------------+-------------------------+-------------------+-----------------------+--------+-------+
|loadsmart_id|         quote_date|          book_date|        source_date|book_price|source_price|carrier_rating|carrier_dropped_us_count|      delivery_date|delivery_appointment_time|        pickup_date|pickup_appointment_time|     pnl|mileage|
+------------+-------------------+-------------------+-------------------+----------+------------+--------------+------------------------+-------------------+-------------------------+-------------------+-----------------------+--------+-------+
|   206431033|2024-12-15 13:08:00|2024-12-15 13:09:00|2024-12-15 13:44:00|   8922.51|      8500.0|          NULL|                       0|2024-12-21 02:00:00|      2024-12-21 00:00:00|2024-12-15 11:00:00|    2024-12-15 20:00:00|  422.51| 2753.5|
|   206521177|20

                                                                                

In [110]:
df_join = df_loadsmart.alias("loadsmart") \
            .join(
                df_book.alias("book"),
                col('loadsmart.loadsmart_id') == col('book.loadsmart_id'),
                how='inner'
            ) \
            .join(
                df_carriers.alias("carriers"),
                col('loadsmart.loadsmart_id') == col('carriers.loadsmart_id'),
                how='inner'
            ) \
            .join(
                df_delivery.alias("delivery"),
                col('loadsmart.loadsmart_id') == col('delivery.loadsmart_id'),
                how='inner'
            ) \
            .join(
                df_pickup.alias("pickup"),
                col('loadsmart.loadsmart_id') == col('pickup.loadsmart_id'),
                how='inner'
            ) \
            .join(
                df_shippers.alias("shippers"),
                col('loadsmart.loadsmart_id') == col('shippers.loadsmart_id'),
                how='inner'
            ) \
            .select(
                "loadsmart.loadsmart_id",
                "shippers.shipper_name",
                "delivery.delivery_date",
                "pickup.pickup_city",
                "pickup.pickup_state",
                "delivery.delivery_city",
                "delivery.delivery_state",
                "book.book_price",
                "carriers.carrier_name"
            )

In [112]:
df_join.select('delivery_date').distinct().sort(col('delivery_date').desc()).show()

[Stage 363:>                                                        (0 + 1) / 1]

+-------------------+
|      delivery_date|
+-------------------+
|2025-03-15 13:50:00|
|2025-01-16 08:00:00|
|2025-01-11 23:00:00|
|2025-01-11 21:35:00|
|2025-01-10 16:00:00|
|2025-01-10 11:30:00|
|2025-01-10 11:00:00|
|2025-01-09 20:00:00|
|2025-01-09 15:30:00|
|2025-01-09 15:05:00|
|2025-01-08 23:38:00|
|2025-01-08 12:26:00|
|2025-01-08 11:45:00|
|2025-01-08 10:45:00|
|2025-01-08 10:44:00|
|2025-01-08 10:40:00|
|2025-01-08 10:27:00|
|2025-01-08 09:21:00|
|2025-01-08 09:15:00|
|2025-01-08 08:03:00|
+-------------------+
only showing top 20 rows



                                                                                

In [113]:
max_delivery_date = df_join.agg(max(col('delivery_date')).alias('max_delivery_date'))
get_last_month = max_delivery_date.select(to_timestamp(add_months(col('max_delivery_date'), -1)).alias('last_month_delivery')).first()[0]

max_delivery_date.show()

print(get_last_month)
print(type(get_last_month))

[Stage 379:>                                                        (0 + 1) / 1]

+-------------------+
|  max_delivery_date|
+-------------------+
|2025-03-15 13:50:00|
+-------------------+

2025-02-15 00:00:00
<class 'datetime.datetime'>


                                                                                

In [114]:
df_join = df_join.filter(col("delivery_date") == get_last_month)
df_join.show()

                                                                                

+------------+------------+-------------+-----------+------------+-------------+--------------+----------+------------+
|loadsmart_id|shipper_name|delivery_date|pickup_city|pickup_state|delivery_city|delivery_state|book_price|carrier_name|
+------------+------------+-------------+-----------+------------+-------------+--------------+----------+------------+
+------------+------------+-------------+-----------+------------+-------------+--------------+----------+------------+



In [None]:
df_final = df_join.drop_duplicates()
df_final.show()

[Stage 387:>  (0 + 1) / 1][Stage 388:>  (0 + 1) / 1][Stage 389:>  (0 + 1) / 1]

In [None]:
df_final.count()

                                                                                

5357

In [None]:
df_final.groupBy("loadsmart_id").agg(count('*').alias('total')).sort(col('total').desc()).show()

[Stage 149:>                                                        (0 + 1) / 1]

+------------+-----+
|loadsmart_id|total|
+------------+-----+
|   206429385|    1|
|   206701857|    1|
|   206628913|    1|
|   206549705|    1|
|   206690257|    1|
|   206601449|    1|
|   206535497|    1|
|   206514105|    1|
|   206550521|    1|
|   206437929|    1|
|   206438505|    1|
|   206656609|    1|
|   206569145|    1|
|   206501081|    1|
|   206628961|    1|
|   206523817|    1|
|   206647441|    1|
|   206531585|    1|
|   206608633|    1|
|   206435417|    1|
+------------+-----+
only showing top 20 rows



                                                                                

In [None]:
df_final.write.csv("last_month_delivery.csv")