# enriched

In [None]:
from workflows.enriched import process_enriched_drivers, process_enriched_orders, process_enriched_payments, process_enriched_shipments, process_enriched_users

In [None]:
process_enriched_drivers.main()

In [None]:
process_enriched_orders.main()

In [None]:
process_enriched_payments.main()

In [None]:
process_enriched_shipments.main()

In [None]:
process_enriched_users.main()

# curated

In [None]:
from workflows.curated import process_dim_users, process_dim_drivers, process_dim_date, process_dim_locations

In [None]:
process_dim_users.main()

In [None]:
process_dim_date.main()

In [None]:
process_dim_locations.main()

In [None]:
process_dim_drivers.main()

In [None]:
from workflows.curated import process_fact_processing_orders

In [None]:
process_fact_processing_orders.main()

In [None]:
fact_orders = (
    spark.read.format('delta')
    .option('path', '/curated/transactional/mysql/logistics/facts/fact_orders')
    .load()
)

In [None]:
fact_orders.printSchema()

In [None]:
from config.config import Config

In [None]:
class Table:
    def __init__(self):
        self.app_config = Config()
        
        #COMMON
        self.spark = self.app_config.spark
        self.logger = self.app_config.logger
        
        #ENRICHED CONFIGS
        self.enriched_configs = self.app_config.enriched_configs
        self.enriched_base_path = self.enriched_configs.get('base_path')
        self.enriched_format = self.enriched_configs.get('format', 'parquet')

        #PREVIOUS DATE DETAILS
        self.previous_date_details = self.app_config.previous_date_details
        self.previous_year = self.previous_date_details['year']
        self.previous_month = self.previous_date_details['month']
        self.previous_day = self.previous_date_details['day']

        #ENRICHED DATA EXTRACTOR
        self.enriched_data_extractor = EnrichedDataExtractor(
            self.spark
            , self.logger
            , self.enriched_base_path
            , self.enriched_format
            , self.previous_year
            , self.previous_month
            , self.previous_day
        )

        #CURATED CONFIGS
        self.curated_configs = self.app_config.curated_configs
        self.dimension_base_path = self.curated_configs.get('dimension_base_path')
        self.fact_base_path = self.curated_configs.get('fact_base_path')

        #CURATED DATA LOADER
        self.curated_data_loader = CuratedDataLoader()
        

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from delta import *
from pyspark.sql.window import Window
from extract.enriched_data_extractor import EnrichedDataExtractor
from load.curated_data_loader import CuratedDataLoader 

In [None]:
class FactOrders(Table):
    
    def __init__(self):
        super().__init__()
        self.dim_users_path = self.dimension_base_path + '/dim_users'
        self.dim_locations_path = self.dimension_base_path + '/dim_locations'
        self.fact_orders_path = self.fact_base_path + '/fact_orders'

    @property
    def dim_users(self):
        dim_users = (
            self.spark.read.format('delta').option('path', self.dim_users_path).load()
            .where((F.col('is_current')))
            .select(F.col('user_id'), F.col('user_key'))
        )
        return dim_users
        
    @property
    def dim_locations(self):
        dim_locations = self.spark.read.format('delta').option('path', self.dim_locations_path).load()
        return dim_locations
        
    
    def join_orders_with_dimensions(self, orders_df):
        orders_df_with_latest_update = (
            orders_df
            .withColumn('rank', F.rank().over(Window.partitionBy(F.col('order_id'), F.col('status')).orderBy(F.col('event_timestamp').desc())))
            .withColumn('event_timestap', F.min(F.col('event_timestamp')).over(Window.partitionBy(F.col('order_id'), F.col('status'))))
            .where(F.col('rank') == 1)
        )
        joined_orders = (
            orders_df_with_latest_update.alias('orders')
            .join(self.dim_users.alias('dim_users'), F.col('orders.user_id') == F.col('dim_users.user_id'), 'left')
            .join(self.dim_locations.alias('pickup_location'), F.col('orders.pickup_address') == F.col('pickup_location.location'), 'left')
            .join(self.dim_locations.alias('delivery_location'), F.col('orders.delivery_address') == F.col('delivery_location.location'), 'left')
            .select(
                F.col('orders.order_id').cast(T.LongType())
                , F.col('orders.package_description')
                , F.col('orders.package_weight')
                , F.col('orders.delivery_time')
                , F.col('orders.created_at')
                , F.col('orders.event_timestamp')
                , F.col('orders.status')
                , F.col('dim_users.user_key')
                , F.col('pickup_location.location_key').alias('pick_up_location_key')
                , F.col('delivery_location.location_key').alias('delivery_location_key')
            )
        )
        return joined_orders

    @staticmethod
    def make_interval(start_date_key, start_time_key, end_date_key, end_time_key):

        interval = (
            F.to_timestamp(F.concat_ws(' ', end_date_key, end_time_key), 'yyyyMMdd HH:mm:ss') 
            - F.to_timestamp(F.concat_ws(' ', start_date_key, start_time_key), 'yyyyMMdd HH:mm:ss')
        )

        interval_struct = F.named_struct(
            F.lit('days'), F.extract(F.lit('D'), interval)
            , F.lit('hours'), F.extract(F.lit('H'), interval)
            , F.lit('minutes'), F.extract(F.lit('m'), interval)
            , F.lit('seconds'), F.extract(F.lit('s'), interval).cast(T.IntegerType())
            
        )
        return interval_struct
        
 
    def process_fact_orders(self):
        
        enriched_orders_df = self.enriched_data_extractor.extract_enriched_orders()
        joined_orders_df =  self.join_orders_with_dimensions(enriched_orders_df)

        is_exists = DeltaTable.isDeltaTable(self.spark, self.fact_orders_path)
        if not is_exists:
            (
                DeltaTable.create(self.spark)
                .tableName("fact_orders")
                .addColumn("order_id", "LONG", nullable = True)
                .addColumn("user_key", "LONG", nullable = True)
                .addColumn("pick_up_location_key", "LONG", nullable = True)
                .addColumn("delivery_location_key", "LONG", nullable = True)
                .addColumn("dd_package_description", "STRING", nullable = True)
                .addColumn("dd_status", "STRING", nullable = True)
                .addColumn("package_weight", "FLOAT", nullable = True)
                .addColumn("created_order_date_key", "INT", nullable = True)
                .addColumn("created_order_time_key", "STRING", nullable = True)
                .addColumn("accepted_date_key", "INT", nullable = True)
                .addColumn("accepted_time_key", "STRING", nullable = True)
                .addColumn("in_transit_date_key", "INT", nullable = True)
                .addColumn("in_transit_time_key", "STRING", nullable = True)
                .addColumn("delivered_date_key", "INT", nullable = True)
                .addColumn("delivered_time_key", "STRING", nullable = True)
                .addColumn("delivery_date_key", "INT", nullable = True)
                .addColumn("delivery_time_key", "STRING", nullable = True)
                .addColumn("created_to_accepted_lag", "STRUCT<days: INT, hours: INT, minutes: INT, seconds: INT>", nullable = True)
                .addColumn("accepted_to_in_transit_lag", "STRUCT<days: INT, hours: INT, minutes: INT, seconds: INT>", nullable = True)
                .addColumn("in_transit_to_delivered_lag", "STRUCT<days: INT, hours: INT, minutes: INT, seconds: INT>", nullable = True)
                .addColumn("delivered_and_delivery_difference", "STRUCT<days: INT, hours: INT, minutes: INT, seconds: INT>", nullable = True)
                .location(self.fact_orders_path)
                .execute()
            )
   
        processing_enriched_orders_df = joined_orders_df.where(F.col('status') == 'processing')    
        accepted_enriched_orders_df = joined_orders_df.where(F.col('status') == 'accepted')  
        in_transit_enriched_orders_df = joined_orders_df.where(F.col('status') == 'in_transit')  
        delivered_enriched_orders_df = joined_orders_df.where(F.col('status') == 'delivered')  
        fact_orders_df = self.spark.read.format('delta').option('path', self.fact_orders_path).load()
      
        source_orders_df = (
            processing_enriched_orders_df.alias('p')
            .join(accepted_enriched_orders_df.alias('a'), F.col('p.order_id') == F.col('a.order_id'), 'full')
            .join(in_transit_enriched_orders_df.alias('it'), F.col('a.order_id') == F.col('it.order_id'), 'full')
            .join(delivered_enriched_orders_df.alias('d'), F.col('it.order_id') == F.col('d.order_id'), 'full')
            .join(fact_orders_df.alias('o'), F.col('d.order_id') == F.col('o.order_id'), 'left')
        )
        
        final_source_orders_df = source_orders_df.select(
            F.coalesce(F.col('d.order_id'), F.col('it.order_id'), F.col('a.order_id'), F.col('p.order_id')).alias('order_id')
            , F.coalesce(F.col('d.user_key'), F.col('it.user_key'), F.col('a.user_key'), F.col('p.user_key')).alias('user_key')
            , F.coalesce(F.col('d.pick_up_location_key'), F.col('it.pick_up_location_key'), F.col('a.pick_up_location_key'), F.col('p.pick_up_location_key')).alias('pick_up_location_key')
            , F.coalesce(F.col('d.delivery_location_key'), F.col('it.delivery_location_key'), F.col('a.delivery_location_key'), F.col('p.delivery_location_key')).alias('delivery_location_key')
            , F.coalesce(F.col('d.package_description'), F.col('it.package_description'), F.col('a.package_description'), F.col('p.package_description')).alias('dd_package_description')
            , F.coalesce(F.col('d.status'), F.col('it.status'), F.col('a.status'), F.col('p.status')).alias('dd_status')
            , F.coalesce(F.col('d.package_weight'), F.col('it.package_weight'), F.col('a.package_weight'), F.col('p.package_weight')).alias('package_weight')
            , F.date_format(F.coalesce(F.col('d.created_at'), F.col('it.created_at'), F.col('a.created_at'), F.col('p.created_at')), 'yyyyMMdd').cast(T.IntegerType()).alias('created_order_date_key')
            , F.date_format(F.coalesce(F.col('d.created_at'), F.col('it.created_at'), F.col('a.created_at'), F.col('p.created_at')), 'HH:mm:ss').alias('created_order_time_key')
            , F.expr('''
                case 
                    when isnull(o.order_id) and isnull(p.order_id) then cast(99991231 as int)
                    when isnull(o.order_id) or isnull(p.order_id) then coalesce(cast(date_format(p.event_timestamp, 'yyyyMMdd') as int), o.accepted_date_key)
                    when o.accepted_date_key == 99991231 then cast(date_format(p.event_timestamp, 'yyyyMMdd') as int)
                    else o.accepted_date_key
                end
            ''').alias('accepted_date_key')
            , F.expr('''
                case 
                    when isnull(o.order_id) and isnull(p.order_id) then '00:00:00'
                    when isnull(o.order_id) or isnull(p.order_id) then coalesce(date_format(p.event_timestamp, 'HH:mm:ss'), o.accepted_time_key)
                    when o.accepted_time_key == '00:00:00' then date_format(p.event_timestamp, 'HH:mm:ss')
                    else o.accepted_time_key
                end
            ''').alias('accepted_time_key')
            , F.expr('''
                case 
                    when isnull(o.order_id) and isnull(it.order_id) then cast(99991231 as int)
                    when isnull(o.order_id) or isnull(it.order_id) then coalesce(cast(date_format(it.event_timestamp, 'yyyyMMdd') as int), o.in_transit_date_key)
                    when o.in_transit_date_key == 99991231 then cast(date_format(it.event_timestamp, 'yyyyMMdd') as int)
                    else o.in_transit_date_key
                end
            ''').alias('in_transit_date_key')
            , F.expr('''
                case 
                    when isnull(o.order_id) and isnull(it.order_id) then '00:00:00'
                    when isnull(o.order_id) or isnull(it.order_id) then coalesce(date_format(it.event_timestamp, 'HH:mm:ss'), o.in_transit_time_key)
                    when o.in_transit_time_key == '00:00:00' then date_format(it.event_timestamp, 'HH:mm:ss')
                    else o.in_transit_time_key
                end
            ''').alias('in_transit_time_key')
            , F.expr('''
                case 
                    when isnull(o.order_id) and isnull(d.order_id) then cast(99991231 as int)
                    when isnull(o.order_id) or isnull(d.order_id) then coalesce(cast(date_format(d.event_timestamp, 'yyyyMMdd') as int), o.delivered_date_key)
                    when o.delivered_date_key == 99991231 then cast(date_format(d.event_timestamp, 'yyyyMMdd') as int)
                    else o.delivered_date_key
                end
            ''').alias('delivered_date_key')
            , F.expr('''
                case 
                    when isnull(o.order_id) and isnull(d.order_id) then '00:00:00'
                    when isnull(o.order_id) or isnull(d.order_id) then coalesce(date_format(d.event_timestamp, 'HH:mm:ss'), o.delivered_time_key)
                    when o.delivered_time_key == '00:00:00' then date_format(d.event_timestamp, 'HH:mm:ss')
                    else o.delivered_time_key
                end
            ''').alias('delivered_time_key')
            , F.date_format(F.coalesce(F.col('d.delivery_time'), F.col('it.delivery_time'), F.col('a.delivery_time'), F.col('p.delivery_time')), 'yyyyMMdd').cast(T.IntegerType()).alias('delivery_date_key')
            , F.date_format(F.coalesce(F.col('d.delivery_time'), F.col('it.delivery_time'), F.col('a.delivery_time'), F.col('p.delivery_time')), 'HH:mm:ss').alias('delivery_time_key')

        )
            
        final_source_orders_df = final_source_orders_df.select(
            F.col('*')
            , (
                F.when(
                    F.col('accepted_date_key') != F.lit(99991231)
                    , self.make_interval(F.col('created_order_date_key'), F.col('created_order_time_key'), F.col('accepted_date_key'), F.col('accepted_time_key'))
                )
                .otherwise(F.named_struct(F.lit('days'), F.lit(0), F.lit('hours'), F.lit(0), F.lit('minutes'), F.lit(0), F.lit('seconds'), F.lit(0)))
            ).alias('created_to_accepted_lag')
            , (
                F.when(
                    F.col('in_transit_date_key') != F.lit(99991231)
                    , self.make_interval(F.col('accepted_date_key'), F.col('accepted_time_key'), F.col('in_transit_date_key'), F.col('in_transit_time_key'))
                )
                .otherwise(F.named_struct(F.lit('days'), F.lit(0), F.lit('hours'), F.lit(0), F.lit('minutes'), F.lit(0), F.lit('seconds'), F.lit(0)))
            ).alias('accepted_to_in_transit_lag')
            , (
                F.when(
                    F.col('in_transit_date_key') != F.lit(99991231)
                    , self.make_interval(F.col('in_transit_date_key'), F.col('in_transit_time_key'), F.col('delivered_date_key'), F.col('delivery_time_key'))
                )
                .otherwise(F.named_struct(F.lit('days'), F.lit(0), F.lit('hours'), F.lit(0), F.lit('minutes'), F.lit(0), F.lit('seconds'), F.lit(0)))
            ).alias('in_transit_to_delivered_lag')
            , (
                F.when(
                    F.col('in_transit_date_key') != F.lit(99991231)
                    , self.make_interval(F.col('delivered_date_key'), F.col('delivered_time_key'), F.col('delivery_date_key'), F.col('delivery_time_key'))
                )
                .otherwise(F.named_struct(F.lit('days'), F.lit(0), F.lit('hours'), F.lit(0), F.lit('minutes'), F.lit(0), F.lit('seconds'), F.lit(0)))
            ).alias('delivered_and_delivery_difference')
        )
        
        self.curated_data_loader.load_fact_orders(self, final_source_orders_df)


In [None]:
temp_fact_order = FactOrders()

In [None]:
fact_orders_target = spark.read.format('delta').option('path', '/curated/transactional/mysql/logistics/facts/fact_orders').load()

In [None]:
fact_orders_target.show()

In [None]:
t = temp_fact_order.process_fact_orders()

In [None]:
t.orderBy(F.col('order_id')).printSchema()

# fact_payments

In [None]:
from config.config import Config

In [None]:
class FactPayments(Table):
    def __init__(self):
        super().__init__()
        self.fact_payments_path = self.fact_base_path + '/fact_payments'
        self.fact_orders_path = self.fact_base_path + '/fact_orders'
        
    @property
    def fact_orders(self):
        fact_orders = self.spark.read.format('delta').option('path', self.fact_orders_path).load()
        return fact_orders

    def process_fact_payments(self):
        enriched_payments_df = self.enriched_data_extractor.extract_enriched_payments()

        is_exists = DeltaTable.isDeltaTable(self.spark, self.fact_payments_path)
        if not is_exists:
            (
                DeltaTable.create(spark)
                .tableName("payments")
                .addColumn("payment_id", "LONG", nullable = True)
                .addColumn("order_id", "LONG", nullable = True)
                .addColumn("user_key", "LONG", nullable = True)
                .addColumn("pick_up_location_key", "LONG", nullable = True)
                .addColumn("delivery_location_key", "LONG", nullable = True)
                .addColumn("dd_package_description", "STRING", nullable = True)
                .addColumn("created_order_date_key", "INT", nullable = True)
                .addColumn("created_order_time_key", "STRING", nullable = True)
                .addColumn("payment_date_key", "INT", nullable = True)
                .addColumn("payment_time_key", "STRING", nullable = True)
                .addColumn("dd_payment_method", "STRING", nullable = True)
                .addColumn("dd_payment_status", "STRING", nullable = True)
                .addColumn("package_weight", "FLOAT", nullable = True)
                .addColumn("amount", "DECIMAL(10,2)", nullable = True)
                .location(self.fact_payments_path)
                .execute()
            )
        
        joined_payments_df = (
            enriched_payments_df.alias('p')
            .join(self.fact_orders.alias('o'), F.col('o.order_id') == F.col('p.order_id'), 'left' )
            .select(
                F.col('p.payment_id').cast(T.LongType()).alias('payment_id')
                , F.col('p.order_id').cast(T.LongType()).alias('order_id')
                , F.col('o.user_key').cast(T.LongType()).alias('user_key')
                , F.col('o.pick_up_location_key').cast(T.LongType()).alias('pick_up_location_key')
                , F.col('o.delivery_location_key').cast(T.LongType()).alias('delivery_location_key')
                , F.col('o.dd_package_description').cast(T.LongType()).alias('dd_package_description')
                , F.col('o.created_order_date_key').alias('created_order_date_key')
                , F.col('o.created_order_time_key').alias('created_order_time_key')
                , F.date_format(F.col('payment_date'), 'yyyyMMdd').cast(T.IntegerType()).alias('payment_date_key')
                , F.date_format(F.col('payment_date'), 'HH:mm:ss').alias('payment_time_key')
                , F.col('payment_method').alias('dd_payment_method')
                , F.col('payment_status').alias('dd_payment_status')
                , F.col('o.package_weight')
                , F.col('p.amount')
            )
        )
        
        self.curated_data_loader.load_fact_payments(self, joined_payments_df)

temp = FactPayments()


In [None]:
temp.process_fact_payments()

# fact_shipments

In [None]:
spark.read.format('delta').option('path', '/curated/transactional/mysql/logistics/facts/fact_orders').load().where(F.col('dd_package_description').isNull()).show()

In [None]:
14 17

In [14]:
enriched_shipments = spark.read.format('parquet').option('path', '/enriched/transactional/mysql/logistics/shipments').load()
dim_drivers = spark.read.format('delta').option('path', '/curated/transactional/mysql/logistics/dimensions/dim_drivers').load()


In [22]:
(
    spark.read.format('avro')
    .option('path', '/raw/transactional/mysql/logistics/topics/logistics_src.logistics.Drivers')
    .load()
    .where(F.col('after.*'))
    .show()
)

+---------+-------+---------------------+------------+------------+
|driver_id|user_id|vehicle_license_plate|vehicle_type|vehicle_year|
+---------+-------+---------------------+------------+------------+
|1        |5      |XYZ-7598             |car         |2025        |
|2        |6      |XYZ-8290             |car         |2025        |
|3        |7      |XYZ-6799             |bike        |2025        |
|4        |11     |XYZ-6274             |car         |2025        |
|5        |14     |XYZ-4392             |truck       |2025        |
|6        |16     |XYZ-9649             |car         |2025        |
|7        |17     |XYZ-2844             |truck       |2025        |
|8        |20     |XYZ-1473             |bike        |2025        |
|9        |22     |XYZ-5097             |bike        |2025        |
|10       |24     |XYZ-1781             |truck       |2025        |
|11       |25     |XYZ-4491             |truck       |2025        |
|12       |26     |XYZ-6596             |car    

In [None]:
enriched_shipments.alias('s').join(dim_drivers.alias('d'), F.col('s.driver_id') == F.col('d.driver_id') ).show()

# test

In [None]:
import sys

In [None]:
spark.sql('drop table fact_orders')

In [None]:
sys.prefix

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [7]:
spark = SparkSession.builder.getOrCreate()

In [1]:
from workflows.curated import process_fact_payments, process_fact_shipments, process_fact_orders

In [2]:
process_fact_orders.main()

2025-02-21 08:10:12,305 - logistics - INFO - HDFS Path: /enriched/transactional/mysql/logistics/orders
