In [None]:
import snowflake.snowpark as snowpark
from snowflake.snowpark import functions as F
from snowflake.snowpark.window import Window
from snowflake.snowpark import types as T
from datetime import datetime 

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


In [None]:
# Reading declaring master file and account rollup file

master_file = session.table("Vessops_D.L20_BDV.SAT_VFM_NCL_MASTER_DATA").with_column("SAIL_DAT", F.to_date(F.col("SAIL_DAT"))).drop("MD5_HUB_VOYAGE","HASH_DIFF","LDTS","RCSR","LAST_MODIFIED_BY")

master_file=master_file.with_column('STRADDLE_PAX_PD', F.col('PAX_DAYS')/F.col('SAIL_DAY_QTY'))\
                        .with_column('STRADDLE_PAX', F.col('STRADDLE_PAX_PD')*F.col('CONV_SAIL_DAY_QTY'))


mapping = session.table("VESSOPS_D.L10_RDV.VFM_NCLH_ACCOUNT_ROLLUP_MAPPING").select("ACCOUNT_NUMBER","LEVEL_1","LEVEL_2","LEVEL_3","LEVEL_4")


# SIXTH MAN MAPPING FILE
SIX_MAP = session.table("VESSOPS_D.L10_RDV.VFM_NCL_SIXTHMAN_MAPPING").select(F.col("VOYAGE_ID").alias("SIXTHMAN_VOYAGE"), F.col("NCL_ID").alias("TRADITIONAL_VOYAGE_ID"))


valid_op_units = [
    'NCL Pride of America', 'NCL Bliss', 'NCL Breakaway', 'NCL Dawn', 'NCL Encore', 'NCL Epic', 'NCL Escape', 
    'NCL Gem', 'NCL Getaway', 'NCL Jade', 'NCL Joy', 'NCL Jewel', 'NCL Prima', 'NCL Pearl', 'NCL Sky', 
    'NCL Spirit', 'NCL Star', 'NCL Sun', 'NCL Viva', 'Common Shipside', 'Common Shoreside', 'NCL Aqua'
]
replacement_map = F.create_map(
    F.lit("SPR240607"), F.lit("SPR240605"), F.lit("SPR240610"), F.lit("SPR240605"),
    F.lit("SPR240614"), F.lit("SPR240605"), F.lit("SPR240618"), F.lit("SPR240605"),
    F.lit("SPR240621"), F.lit("SPR240605"), F.lit("SPR240623"), F.lit("SPR240605"),
    F.lit("SPR240628"), F.lit("SPR240605"), F.lit("SPR240630"), F.lit("SPR240605"),
    F.lit("SPR240703"), F.lit("SPR240701"), F.lit("SPR240707"), F.lit("SPR240701"),
    F.lit("SPR240710"), F.lit("SPR240701"), F.lit("SPR240713"), F.lit("SPR240701"),
    F.lit("SPR240717"), F.lit("SPR240701"), F.lit("SPR240721"), F.lit("SPR240701"),
    F.lit("SPR240722"), F.lit("SPR240701"), F.lit("SPR240724"), F.lit("SPR240701"),
    F.lit("SPR240726"), F.lit("SPR240701"), F.lit("SPR240727"), F.lit("SPR240701"),
    F.lit("SPR240730"), F.lit("SPR240701"), F.lit("SPR240731"), F.lit("SPR240701"),
    F.lit("SPR240804"), F.lit("SPR240801"), F.lit("SPR240809"), F.lit("SPR240801"),
    F.lit("SPR240811"), F.lit("SPR240801"), F.lit("SPR240814"), F.lit("SPR240801"),
    F.lit("SPR240818"), F.lit("SPR240801"), F.lit("SPR240821"), F.lit("SPR240801"),
    F.lit("SPR240825"), F.lit("SPR240801"), F.lit("SPR240830"), F.lit("SPR240801"),
    F.lit("SPR240906"), F.lit("SPR240901"), F.lit("SPR240908"), F.lit("SPR240901"),
    F.lit("SPR240911"), F.lit("SPR240901"), F.lit("SPR240914"), F.lit("SPR240901"),
    F.lit("SPR240917"), F.lit("SPR240901"), F.lit("SPR240920"), F.lit("SPR240901")
)

In [None]:
master_file.count()

In [None]:
# --- Configuration for this specific script ---
PEOPLESOFT_TABLE = "VESSOPS_D.L00_STG.NCL_VFM_NTR"


df_raw = session.table(PEOPLESOFT_TABLE).select("FISCAL_YEAR_NUMBER","ACCOUNTING_PERIOD","BUSINESS_UNIT_COMBINED_DESCRIPTION","OPERATING_UNIT_DESCRIPTION",
                                                "SHIP_CD","M0_M1","ACCOUNT","JOURNAL_MONETARY_AMOUNT_ADJ","VOYAGE_ID")

# # --- Step 3: Initial Cleaning and Filtering of PeopleSoft Data ---
# print("Step 3: Cleaning PeopleSoft data...")

df_cleaned = df_raw.filter(~((F.col("OPERATING_UNIT_DESCRIPTION")=='NCL Pride of America') &(F.col("BUSINESS_UNIT_COMBINED_DESCRIPTION")=='NCL00-Norwegian Cruise Line') ))\
                   .with_column("VOYAGE_ID",F.coalesce(F.col("VOYAGE_ID"), F.lit("No Voyage Cd")))\
                   .with_column("VOYAGE_ID", F.coalesce(F.get(replacement_map, F.col("VOYAGE_ID")), F.col("VOYAGE_ID")))\
                   .rename(F.col("BUSINESS_UNIT_COMBINED_DESCRIPTION"), "BUSINESS_UNIT_DESCRIPTION")

df_grouped = df_cleaned.group_by("FISCAL_YEAR_NUMBER", "ACCOUNTING_PERIOD", "ACCOUNT", "SHIP_CD", "BUSINESS_UNIT_DESCRIPTION", "OPERATING_UNIT_DESCRIPTION","VOYAGE_ID","M0_M1") \
                         .agg(F.sum("JOURNAL_MONETARY_AMOUNT_ADJ").alias("JOURNAL_MONETARY_AMOUNT_ADJ"))\

df_charter = df_grouped.filter(F.col("BUSINESS_UNIT_DESCRIPTION").in_('SIX00-Sixthman Ltd.','EL013-NCL BERMUDA JURISDICTION ELIM','EL004-Compass & Sweden Elimination')|( F.col("VOYAGE_ID").startswith("SM") | F.col("VOYAGE_ID").startswith("SX")))

df_inter1 = df_grouped.filter(
                                ~((F.col("BUSINESS_UNIT_DESCRIPTION").in_('SIX00-Sixthman Ltd.', 'EL013-NCL BERMUDA JURISDICTION ELIM','EL004-Compass & Sweden Elimination') |
                                    (F.col("VOYAGE_ID").startswith("SM") | F.col("VOYAGE_ID").startswith("SX")))
                                   ))

df_norm=df_inter1.filter(~(F.col("OPERATING_UNIT_DESCRIPTION").in_('Common Shoreside','Common Shipside')))

df_com=df_inter1.filter(F.col("OPERATING_UNIT_DESCRIPTION").in_('Common Shoreside','Common Shipside'))



df_charter_mapped= df_charter.join(SIX_MAP, df_charter["VOYAGE_ID"] == SIX_MAP["SIXTHMAN_VOYAGE"], "left")
df_charter_map=df_charter_mapped.join(master_file.drop("SHIP_CD"), F.col("TRADITIONAL_VOYAGE_ID") == master_file["MXP_VOYAGE_CD"], "left")


df_charter2 = df_charter_map.with_column("SHIP_CD",F.when((F.col("TRADITIONAL_VOYAGE_ID").is_not_null()) & 
                                                        (F.length(F.col("TRADITIONAL_VOYAGE_ID")) <= 9),
                                                        F.substring(F.col("TRADITIONAL_VOYAGE_ID"), 1, 3)
                            ).otherwise(F.lit("Unknown")))


df_charter2 = df_charter2.with_column("TOTAL_PAX_DAYS", F.sum("STRADDLE_PAX").over(Window.partition_by("FISCAL_YEAR_NUMBER", "ACCOUNTING_PERIOD", "BUSINESS_UNIT_DESCRIPTION","OPERATING_UNIT_DESCRIPTION", "VOYAGE_ID", "ACCOUNT")))\
                         .with_column("PAX_WEIGHTAGE", F.iff(F.col("TOTAL_PAX_DAYS") == 0, 0, F.col("STRADDLE_PAX") / F.col("TOTAL_PAX_DAYS")))

updated_amount_col = (
    F.when((F.col('STRADDLE_FLAG') == 'straddle') &
            F.col('SAIL_DAY_QTY') > 0,
            (F.col('JOURNAL_MONETARY_AMOUNT_ADJ') / F.col('SAIL_DAY_QTY')) * F.col('CONV_SAIL_DAY_QTY')
        ).otherwise(F.lit(0))
    .when(       
        (F.col('STRADDLE_FLAG') == 'straddle') &
        (F.col('MXP_VOYAGE_CD').is_null() | (F.col('MXP_VOYAGE_CD') == 0)),F.lit(0))
    .otherwise(F.col('JOURNAL_MONETARY_AMOUNT_ADJ')))

df_charter2 = df_charter2.with_column('JOURNAL_MONETARY_AMOUNT_ADJ',updated_amount_col)


# Common shipside & shoreside treatment

df_no_voyage = df_com.filter(F.col("VOYAGE_ID") == 'No Voyage Cd')
df_with_voyage = df_com.filter(F.col("VOYAGE_ID") != 'No Voyage Cd')

df_no_voyage_merged = df_no_voyage.join(master_file.drop("SHIP_CD"),
                                        (df_no_voyage["FISCAL_YEAR_NUMBER"] == master_file["CONV_SAIL_YEAR"]) &
                                           ( df_no_voyage["ACCOUNTING_PERIOD"] == master_file["CONV_SAIL_MONTH"]),
                                        how="left")

df_with_voyage_merged = df_with_voyage.join(master_file.drop("SHIP_CD"),
                                                (df_with_voyage["FISCAL_YEAR_NUMBER"] == master_file["CONV_SAIL_YEAR"]) &
                                                    (df_with_voyage["ACCOUNTING_PERIOD"] == master_file["CONV_SAIL_MONTH"]) &
                                                    (df_with_voyage["VOYAGE_ID"] == master_file["MXP_VOYAGE_CD"]),
                                                how="left")

# Define the "window" or group for our calculation (each year and month).
window_spec = Window.partition_by("FISCAL_YEAR_NUMBER", "ACCOUNTING_PERIOD")

#  Use the window function to count non-'SX' voyages per group
# The 'count_non_sx' column is temporary.
df_with_count = df_no_voyage_merged.with_column("count_non_sx",
                                    F.sum(F.when(F.upper(F.col("BFS_CHARTER_CD")) != 'SX',
                                                 F.lit(1)).otherwise(F.lit(0))).over(window_spec))

# 4. Filter the DataFrame
df_no_voyage_merged = df_with_count.filter((F.col("count_non_sx") > 0) | (F.upper(F.col("BFS_CHARTER_CD")) != 'SX'))\
                                   .drop("count_non_sx")


# df_com1=df_no_voyage_merged.union_all(df_with_voyage_merged)

# # ALLOCATED_AMOUNT tenporary column

# df_com1=df_com1.with_column("TOTAL_PAX_DAYS", F.sum("STRADDLE_PAX").over(Window.partition_by("FISCAL_YEAR_NUMBER", "ACCOUNTING_PERIOD", "BUSINESS_UNIT_DESCRIPTION","OPERATING_UNIT_DESCRIPTION", "VOYAGE_ID", "ACCOUNT")))\
#                .with_column("PAX_WEIGHTAGE", F.iff(F.col("TOTAL_PAX_DAYS") == 0, 0, F.col("STRADDLE_PAX") / F.col("TOTAL_PAX_DAYS")))\
#                .with_column("ALLOCATED_AMOUNT", F.col("JOURNAL_MONETARY_AMOUNT_ADJ") * F.col("PAX_WEIGHTAGE"))\
#                .with_column("ALLOCATED_AMOUNT",
#                             F.when((F.col("STRADDLE_FLAG") == 'straddle') & F.col("ALLOCATED_AMOUNT").is_null(),
#                                     F.col("JOURNAL_MONETARY_AMOUNT_ADJ") / 2
#                             ).otherwise(F.coalesce(F.col("ALLOCATED_AMOUNT"), F.col("JOURNAL_MONETARY_AMOUNT_ADJ"))))\
#                .with_column("JOURNAL_MONETARY_AMOUNT_ADJ", F.col("ALLOCATED_AMOUNT"))\
#                .drop("ALLOCATED_AMOUNT")


# # 2. Chain all operations together for efficiency
# df_com1 = df_com1.with_column("VOYAGE_ID",F.when(F.col("VOYAGE_ID") == 'No Voyage Cd', F.col("MXP_VOYAGE_CD"))
#                  .otherwise(F.col("VOYAGE_ID")))\
#                 .with_column("SHIP_CD",F.substring(F.col("VOYAGE_ID"), 1, 3))\
#                 .with_column("STRADDLE_OCCURRENCE",F.row_number()
#                                                     .over(Window.partition_by("SHIP_CD", "BUSINESS_UNIT_DESCRIPTION", "OPERATING_UNIT_DESCRIPTION",
#                                                                                 "FISCAL_YEAR_NUMBER", "ACCOUNTING_PERIOD", "VOYAGE_ID", "ACCOUNT",
#                                                                                 "STRADDLE_FLAG", "VOYAGE_CD"
#                                                                             ).order_by(F.col("SAIL_DAT"))))




# # # Define the conditions for the initial buckets ---
# # condition1 = (F.col("ACCOUNT") == '40013-Cruise Revenue Promo Packages') & (~F.col("BFS_CHARTER_CD").isin('DC', 'SX')) &\
# #                     (~F.col("VOYAGE_ID").startswith('SM')) & (~F.col("VOYAGE_ID").startswith('SX'))

# # condition2 = (F.col("VOYAGE_ID") == 'No Voyage Cd') & (F.col("MXP_VOYAGE_CD").is_null()) & (F.col("ACCOUNT") != '40013-Cruise Revenue Promo Packages') &\
# #                     (~F.col("BFS_CHARTER_CD").isin('DC', 'SX')) & (~F.col("VOYAGE_ID").startswith('SM')) & (~F.col("VOYAGE_ID").startswith('SX'))

# # condition3 = (F.col("STRADDLE_FLAG") == 'Non-straddle') & (F.col("ACCOUNTING_PERIOD") == F.col("CONV_SAIL_MONTH")) & (F.col("FISCAL_YEAR_NUMBER") == F.col("CONV_SAIL_YEAR")) &\
# #                     (F.col("ACCOUNT") != '40013-Cruise Revenue Promo Packages') & (~F.col("BFS_CHARTER_CD").isin('DC', 'SX')) & (~F.col("VOYAGE_ID").startswith('SM')) &\
# #                     (~F.col("VOYAGE_ID").startswith('SX'))

# # condition4 = (F.col("STRADDLE_FLAG") == 'straddle') & (F.col("ACCOUNTING_PERIOD") == F.col("CONV_SAIL_MONTH")) &\
# #                     (F.col("FISCAL_YEAR_NUMBER") == F.col("CONV_SAIL_YEAR")) & (F.col("ACCOUNT") != '40013-Cruise Revenue Promo Packages') & \
# #                     (~F.col("BFS_CHARTER_CD").isin(['DC', 'SX'])) & (~F.col("VOYAGE_ID").startswith('SM')) & (~F.col("VOYAGE_ID").startswith('SX'))


# # # Apply the initial bucketing ---
# # df_com2 = df_com1.with_column(
# #                     "VOYAGEBUCKET_TEMP",
# #                     F.when(condition1, 'Bucket 1')
# #                      .when(condition2, 'Bucket 2')
# #                      .when(condition3, 'Bucket 3')
# #                      .when(condition4, 'Bucket 4')
# #                      .otherwise(F.lit(None)))


# # df_com2 = df_com2.with_column("HAS_BUCKET_IN_GROUP",
# #                           F.max(F.when(F.col("VOYAGEBUCKET_TEMP").is_not_null(), 1)
# #                           .otherwise(0)).over(Window.partition_by('VOYAGE_ID', 'SHIP_CD', 'ACCOUNTING_PERIOD', 'FISCAL_YEAR_NUMBER')))

# # # Apply the final "Bucket 5" logic
# # df_com2 = df_com2.with_column("VOYAGEBUCKET",
# #                         F.when(F.col("VOYAGEBUCKET_TEMP").is_not_null(), F.col("VOYAGEBUCKET_TEMP"))
# #                          .when(
# #                             (F.col("VOYAGEBUCKET_TEMP").is_null()) &
# #                             (F.col("HAS_BUCKET_IN_GROUP") == 0) &
# #                             (F.col("STRADDLE_OCCURRENCE") != 2),
# #                             'Bucket 5'
# #                          ).otherwise(F.lit(None)))\
# #                 .drop("VOYAGEBUCKET_TEMP", "HAS_BUCKET_IN_GROUP") \
# #                 .with_column("VOYAGEBUCKET",F.coalesce(F.col("VOYAGEBUCKET"), F.lit("Bucket 5")))
                

# # # Step 2: Aggregate for BUCKET_5
# # group_keys = ["SHIP_CD", "FISCAL_YEAR_NUMBER", "ACCOUNTING_PERIOD",
# #     "BUSINESS_UNIT_DESCRIPTION", "OPERATING_UNIT_DESCRIPTION"]

# # buckets_to_distribute = df_com2.filter(F.col("VOYAGEBUCKET") == "Bucket 5")
# # sums = (buckets_to_distribute.group_by(*group_keys).agg(F.sum("JOURNAL_MONETARY_AMOUNT_ADJ").alias("NONPERIODAMOUNT_TO_DISTRIBUTE")))


# # # Step 3: Compute presence flags 
# # pax_group = (df_com2    .group_by(*group_keys, "VOYAGEBUCKET")
# #     .agg(F.sum("PAX_DAYS").alias("PAX_SUM"))
# # )
# # bucket3_flag = (
# #     pax_group
# #     .filter(F.col("VOYAGEBUCKET") == "Bucket 3")
# #     .with_column("BUCKET_3_EXISTS", F.lit(True))
# #     .select(*group_keys, "BUCKET_3_EXISTS")
# # )
# # bucket4_flag = (pax_group
# #     .filter(F.col("VOYAGEBUCKET") == "Bucket 4")
# #     .with_column("BUCKET_4_EXISTS", F.lit(True))
# #     .select(*group_keys, "BUCKET_4_EXISTS")
# # )
# # flags_base = pax_group.select(*group_keys).distinct()
# # bucket_presence = (
# #     flags_base
# #     .join(bucket3_flag, group_keys, "left")
# #     .join(bucket4_flag, group_keys, "left")
# #     .with_column("BUCKET_3_EXISTS", F.coalesce(F.col("BUCKET_3_EXISTS"), F.lit(False)))
# #     .with_column("BUCKET_4_EXISTS", F.coalesce(F.col("BUCKET_4_EXISTS"), F.lit(False)))
# # )

# # # Step 4: Merge sums with flags
# # sumscount = sums.join(bucket_presence, group_keys)

# # # Step 5: Conditional allocation
# # alloc_base = sumscount.join(df_com2, group_keys)
# # alloc_base = alloc_base.with_column(
# #     "ELIGIBLE_BUCKET",
# #     F.when(
# #         (F.col("BUCKET_3_EXISTS") & F.col("BUCKET_4_EXISTS")) &
# #         ((F.col("VOYAGEBUCKET") == "Bucket 3") | (F.col("VOYAGEBUCKET") == "Bucket 4")), F.lit(True)
# #     ).when(
# #         F.col("BUCKET_3_EXISTS") & (F.col("VOYAGEBUCKET") == "Bucket 3"), F.lit(True)
# #     ).when(
# #         F.col("BUCKET_4_EXISTS") & (F.col("VOYAGEBUCKET") == "Bucket 4"), F.lit(True)
# #     ).when(
# #         ~F.col("BUCKET_3_EXISTS") & ~F.col("BUCKET_4_EXISTS") & (F.col("VOYAGEBUCKET") == "Bucket Charter"), F.lit(True)
# #     ).otherwise(F.lit(False))
# # )
# # filtered_alloc = alloc_base.filter(F.col("ELIGIBLE_BUCKET"))
# # window_spec = Window.partition_by(*group_keys)

# # nonperiodresults =filtered_alloc.with_column("TOTAL_STRADDLE_PAX", F.sum("STRADDLE_PAX").over(window_spec))\
# #                                 .with_column("NONPERIODDISTRIBUTED_AMOUNT",F.when(F.col("TOTAL_STRADDLE_PAX") > 0,
# #                                                  F.col("STRADDLE_PAX") / F.col("TOTAL_STRADDLE_PAX") * F.col("NONPERIODAMOUNT_TO_DISTRIBUTE")
# #                                                 ).otherwise(F.lit(0))).filter(F.col("NONPERIODDISTRIBUTED_AMOUNT") != 0)

# # join_key_columns = ["FISCAL_YEAR_NUMBER", "ACCOUNTING_PERIOD", "BUSINESS_UNIT_DESCRIPTION",
# #     "OPERATING_UNIT_DESCRIPTION", "SHIP_CD", "M0_M1", "VOYAGE_ID","ACCOUNT","VOYAGE_CD",
# #     "STRADDLE_FLAG", "CONV_SAIL_DAT", "CONV_RETURN_DAT","CONV_SAIL_DAY_QTY", "CONV_SAIL_MONTH", "CONV_SAIL_YEAR", "STRADDLE_PAX_PD",
# #     "STRADDLE_OCCURRENCE", "VOYAGEBUCKET"]

# # # 2. Select the required columns from the 'nonperiodresults' DataFrame.
# # nonperiodresults1 = nonperiodresults.select(*join_key_columns,"NONPERIODDISTRIBUTED_AMOUNT")


# # df_com3 = df_com2.join(nonperiodresults1,on=join_key_columns,how='left')


# # df_com3 = df_com3.with_column("FINAL_AMOUNT",
# #                         F.when(F.col("VOYAGEBUCKET") == 'Bucket 1',F.col("JOURNAL_MONETARY_AMOUNT_ADJ"))
# #                          .when(F.col("VOYAGEBUCKET").in_('Bucket 3', 'Bucket 4', 'Bucket Charter'),
# #                                F.col("JOURNAL_MONETARY_AMOUNT_ADJ") + F.col("NONPERIODDISTRIBUTED_AMOUNT"))
# #                          .otherwise(F.lit(None)))



# # # NON Charter and Non Common OU data treatment


# # # 1. Define the window (the group)
# # window_spec = Window.partition_by("SHIP_CD", "BUSINESS_UNIT_DESCRIPTION", "OPERATING_UNIT_DESCRIPTION",
# #                                     "FISCAL_YEAR_NUMBER", "ACCOUNTING_PERIOD", "VOYAGE_ID", "ACCOUNT",
# #                                     "STRADDLE_FLAG", "VOYAGE_CD").order_by(F.col("CONV_SAIL_DAT"))

# # master=master_file.with_column_renamed("SHIP_CD" , "SHIP_CD_1")

# # #2. Chain all operations together
# # df_norm=df_norm.filter(F.col('SHIP_CD')!='Unknown')

# # df_norm1 = df_norm.with_column("M0_M1",F.when(F.col("ACCOUNT") == '40013-Cruise Revenue Promo Packages', F.lit('NTR-CRpromo'))
# #                                         .otherwise(F.col("M0_M1")))\
# #                   .join(master,((df_norm["VOYAGE_ID"] == master["MXP_VOYAGE_CD"]) &
# #                                 (df_norm["SHIP_CD"] == master["SHIP_CD_1"])),"left").distinct()\
# #                   .with_column("STRADDLE_OCCURRENCE",F.row_number().over(window_spec))\
# #                   .with_column("VOYAGE_ID",F.coalesce(F.col("VOYAGE_ID"), F.lit("No Voyage Cd")))\
# #                   .with_column("FISCAL_YEAR_NUMBER",F.col("FISCAL_YEAR_NUMBER").cast(T.FloatType()))\
# #                   .with_column("ACCOUNTING_PERIOD",F.col("ACCOUNTING_PERIOD").cast(T.FloatType()))
                

# # #  Define the conditions for the initial buckets ---

# # condition_promo = (df_norm1['ACCOUNT'] == '40013-Cruise Revenue Promo Packages') & (df_norm1['VOYAGE_ID'] == 'No Voyage Cd')
# # condition1 = (df_norm1['ACCOUNT'] == '40013-Cruise Revenue Promo Packages') & (df_norm1['VOYAGE_ID'] != 'No Voyage Cd')
# # condition2 = (df_norm1['ACCOUNT'] != '40013-Cruise Revenue Promo Packages') & (df_norm1['VOYAGE_ID'] == 'No Voyage Cd')
# # condition3 = (df_norm1['ACCOUNT'] != '40013-Cruise Revenue Promo Packages') & (df_norm1['STRADDLE_FLAG'] == 'Non-straddle') & (df_norm1['ACCOUNTING_PERIOD'] == df_norm1['CONV_SAIL_MONTH']) & (df_norm1['FISCAL_YEAR_NUMBER'] == df_norm1['CONV_SAIL_YEAR']) & (~df_norm1['BFS_CHARTER_CD'].isin(['SX']))
# # condition4 = (df_norm1['ACCOUNT'] != '40013-Cruise Revenue Promo Packages') & (df_norm1['STRADDLE_FLAG'] == 'straddle') & (df_norm1['ACCOUNTING_PERIOD'] == df_norm1['CONV_SAIL_MONTH']) & (df_norm1['FISCAL_YEAR_NUMBER'] == df_norm1['CONV_SAIL_YEAR']) & (~df_norm1['BFS_CHARTER_CD'].isin(['SX']))
# # condition_charter = (df_norm1['ACCOUNT'] != '40013-Cruise Revenue Promo Packages') & (df_norm1['BFS_CHARTER_CD'].isin(['SX']) | df_norm1['VOYAGE_ID'].startswith('SM') | df_norm1['VOYAGE_ID'].startswith('SX'))



# # #Apply the initial bucketing
# # dfnorm2 = df_norm1.with_column("VOYAGEBUCKET",
# #                                     F.when(condition_promo, 'Bucket Promo')
# #                                      .when(condition1, 'Bucket 1')
# #                                      .when(condition2, 'Bucket 2')
# #                                      .when(condition3, 'Bucket 3')
# #                                      .when(condition4, 'Bucket 4')
# #                                      .when(condition_charter, 'Bucket Charter')
# #                                      .otherwise('Bucket 5')) # Default to Bucket 5

# # condition = ((F.col("FISCAL_YEAR_NUMBER") != F.col("CONV_SAIL_YEAR")) |
# #              (F.col("ACCOUNTING_PERIOD") != F.col("CONV_SAIL_MONTH")) |
# #              (F.col("CONV_SAIL_MONTH").is_null() & (F.col("SHIP_CD") != F.col("SHIP_CD_1"))))

# # dfnorm2 = dfnorm2.with_column("NEW_VOYAGE_ID",F.when(condition, F.lit('Other voyages'))
# #                                                .otherwise(F.col("VOYAGE_ID"))).with_column("NEW_VOYAGE_ID", F.col("NEW_VOYAGE_ID").cast(T.StringType()))


# # dfnorm2 = dfnorm2.with_column("JOURNAL_MONETARY_AMOUNT_ADJ",F.when(F.col("STRADDLE_FLAG") == "straddle",
# #         F.when(
# #             (F.col("MXP_VOYAGE_CD").is_null()) | (F.col("MXP_VOYAGE_CD") ==F.lit('0')),
# #             F.when(F.col("VOYAGEBUCKET").is_null(), F.lit(0))
# #              .otherwise(F.col("JOURNAL_MONETARY_AMOUNT_ADJ"))
# #         )
# #         .when(
# #             F.col("SAIL_DAY_QTY") > 0,
# #             (F.col("JOURNAL_MONETARY_AMOUNT_ADJ") / F.col("SAIL_DAY_QTY")) * F.col("CONV_SAIL_DAY_QTY")
# #         )
# #         .otherwise(F.col("JOURNAL_MONETARY_AMOUNT_ADJ"))
# #     ).otherwise(F.col("JOURNAL_MONETARY_AMOUNT_ADJ"))
# # )
# # dfnorm2 = dfnorm2.with_column("GSS", F.coalesce(F.col("GSS"), F.lit(0)))\
# #     .with_column("PORTCD_ACTIVITY", F.coalesce(F.col("PORTCD_ACTIVITY"), F.lit('NA')))\
# #     .with_column("PRODUCT_CD", F.coalesce(F.col("PRODUCT_CD"), F.lit('NA')))\
# #     .with_column("PRODUCT_DESC", F.coalesce(F.col("PRODUCT_DESC"), F.lit('NA')))\
# #     .with_column("RM_ROLLUP_PRODUCT_CD", F.coalesce(F.col("RM_ROLLUP_PRODUCT_CD"), F.lit('NA')))\
# #     .with_column("RM_ROLLUP_PRODUCT_DESC", F.coalesce(F.col("RM_ROLLUP_PRODUCT_DESC"), F.lit('NA')))


# # def distribute_amounts(bucket_name, distributed_col_name):
    
# #     # Define the main window (group) for all calculations ---
# #     window_spec = Window.partition_by("SHIP_CD", "FISCAL_YEAR_NUMBER", "ACCOUNTING_PERIOD", "BUSINESS_UNIT_DESCRIPTION", "OPERATING_UNIT_DESCRIPTION")
    
# #     # 1. Calculate group-level metrics, one column at a time
# #     group_calcs = dfnorm2.with_column('BUCKET_3_EXISTS', F.max(F.when(F.col('VOYAGEBUCKET') == 'Bucket 3', 1).otherwise(0)).over(window_spec))\
# #                          .with_column('BUCKET_4_EXISTS', F.max(F.when(F.col('VOYAGEBUCKET') == 'Bucket 4', 1).otherwise(0)).over(window_spec))\
# #                          .with_column('AMOUNT_TO_DISTRIBUTE', F.sum(F.when(F.col('VOYAGEBUCKET') == bucket_name, F.col('JOURNAL_MONETARY_AMOUNT_ADJ')).otherwise(0)).over(window_spec))

# #     # 2. Determine which rows are "eligible" for allocation
# #     eligible_rows = group_calcs.with_column(
# #         'IS_ELIGIBLE',
# #         F.when( (F.col('BUCKET_3_EXISTS') == 1) & (F.col('BUCKET_4_EXISTS') == 1), F.col('VOYAGEBUCKET').isin('Bucket 3', 'Bucket 4') )
# #          .when( (F.col('BUCKET_3_EXISTS') == 1), F.col('VOYAGEBUCKET') == 'Bucket 3' )
# #          .when( (F.col('BUCKET_4_EXISTS') == 1), F.col('VOYAGEBUCKET') == 'Bucket 4' )
# #          .otherwise(F.col('VOYAGEBUCKET') == 'Bucket Charter')
# #     )

# #     # 3. Calculate the total 'Straddle Pax' for ONLY the eligible rows
# #     pax_days_calcs = eligible_rows.with_column('TOTAL_ELIGIBLE_PAX_DAYS', F.sum(F.when(F.col('IS_ELIGIBLE'), F.col('STRADDLE_PAX')).otherwise(0)).over(window_spec))

# #     # 4. Calculate the final distributed amount for each eligible row
# #     final_calcs = pax_days_calcs.with_column( distributed_col_name, F.when(
# #             (F.col('IS_ELIGIBLE')) & (F.col('TOTAL_ELIGIBLE_PAX_DAYS') > 0),
# #             (F.col('STRADDLE_PAX') / F.col('TOTAL_ELIGIBLE_PAX_DAYS')) * F.col('AMOUNT_TO_DISTRIBUTE')
# #         ).otherwise(0))
    
# #     # Return a DataFrame with only the original columns plus the new distributed amount
# #     return final_calcs.filter(F.col('IS_ELIGIBLE')).select(dfnorm2.columns + [distributed_col_name])


# # null_period_results = distribute_amounts( 'Bucket 2', 'NULLPERIODDISTRIBUTED_AMOUNT')
# # non_period_results = distribute_amounts( 'Bucket 5', 'NONPERIODDISTRIBUTED_AMOUNT')

# # null_cols=['FISCAL_YEAR_NUMBER','ACCOUNTING_PERIOD','BUSINESS_UNIT_DESCRIPTION','OPERATING_UNIT_DESCRIPTION','SHIP_CD','M0_M1','VOYAGE_ID','ACCOUNT','JOURNAL_MONETARY_AMOUNT_ADJ','VOYAGE_CD','MXP_VOYAGE_CD',
# #     'VOY_GRAB','ALBS','LOAD_FACTOR','PAX_DAYS','DO_CAPS_DAYS','PORTCD_ACTIVITY','GSS','BFS_CHARTER_CD','SAIL_DAT','RETURN_DAT','SAIL_DAY_QTY','EMBARK_PORT_CD',
# #     'DISEMBARK_PORT_CD','SAIL_STATUS_CD','PRODUCT_CD','PRODUCT_DESC','RM_ROLLUP_PRODUCT_CD','RM_ROLLUP_PRODUCT_DESC','MAIN_VOYAGE_CD','END_OF_MONTH','STRADDLE_FLAG',
# #     'CONV_SAIL_DAT','CONV_RETURN_DAT','CONV_SAIL_DAY_QTY','CONV_SAIL_MONTH','CONV_SAIL_YEAR','STRADDLE_PAX_PD','STRADDLE_PAX',
# #     'STRADDLE_OCCURRENCE','VOYAGEBUCKET','NEW_VOYAGE_ID']
# # join_keys=['FISCAL_YEAR_NUMBER','ACCOUNTING_PERIOD','BUSINESS_UNIT_DESCRIPTION','OPERATING_UNIT_DESCRIPTION','SHIP_CD','M0_M1','VOYAGE_ID','ACCOUNT','VOYAGE_CD','MXP_VOYAGE_CD',
# #     'VOY_GRAB','ALBS','LOAD_FACTOR','PAX_DAYS','DO_CAPS_DAYS','GSS','PORTCD_ACTIVITY','BFS_CHARTER_CD','SAIL_DAT','RETURN_DAT','SAIL_DAY_QTY','EMBARK_PORT_CD',
# #     'DISEMBARK_PORT_CD','SAIL_STATUS_CD','PRODUCT_CD','PRODUCT_DESC','RM_ROLLUP_PRODUCT_CD','RM_ROLLUP_PRODUCT_DESC','END_OF_MONTH','STRADDLE_FLAG',
# #     'CONV_SAIL_DAT','CONV_RETURN_DAT','CONV_SAIL_DAY_QTY','CONV_SAIL_MONTH','CONV_SAIL_YEAR','STRADDLE_PAX_PD','STRADDLE_PAX',
# #     'STRADDLE_OCCURRENCE','VOYAGEBUCKET','NEW_VOYAGE_ID']

# # # Select the necessary columns from each result set before joining
# # null_to_join = null_period_results.select(null_cols + ['NULLPERIODDISTRIBUTED_AMOUNT'])
# # non_to_join = non_period_results.select(join_keys + ['NONPERIODDISTRIBUTED_AMOUNT'])

# # # Perform a full outer join to combine all rows from both results
# # dfnorm3 = null_to_join.join(
# #     non_to_join,
# #     on=join_keys,
# #     how='left'
# # )


# # dfnorm4 = dfnorm3.select("FISCAL_YEAR_NUMBER", "ACCOUNTING_PERIOD","BUSINESS_UNIT_DESCRIPTION", "OPERATING_UNIT_DESCRIPTION",
# #                         "SHIP_CD", "M0_M1", "VOYAGE_ID", "ACCOUNT", "VOYAGE_CD","MXP_VOYAGE_CD", "VOY_GRAB", "LOAD_FACTOR", "PAX_DAYS", "DO_CAPS_DAYS",
# #                         "BFS_CHARTER_CD", "ALBS", "PORTCD_ACTIVITY", "GSS", "SAIL_DAT","RETURN_DAT", "SAIL_DAY_QTY", "EMBARK_PORT_CD", "DISEMBARK_PORT_CD",
# #                         "SAIL_STATUS_CD", "PRODUCT_CD", "PRODUCT_DESC","RM_ROLLUP_PRODUCT_CD", "RM_ROLLUP_PRODUCT_DESC", "MAIN_VOYAGE_CD",
# #                         "END_OF_MONTH", "STRADDLE_FLAG", "CONV_SAIL_DAT","CONV_RETURN_DAT", "CONV_SAIL_DAY_QTY", "CONV_SAIL_MONTH",
# #                         "CONV_SAIL_YEAR", "VOYAGEBUCKET", "NEW_VOYAGE_ID","STRADDLE_OCCURRENCE", "NULLPERIODDISTRIBUTED_AMOUNT","NONPERIODDISTRIBUTED_AMOUNT")


# # join_keys = ["FISCAL_YEAR_NUMBER", "ACCOUNTING_PERIOD",
# #     "BUSINESS_UNIT_DESCRIPTION", "OPERATING_UNIT_DESCRIPTION",
# #     "SHIP_CD", "M0_M1", "VOYAGE_ID", "ACCOUNT",
# #     "MXP_VOYAGE_CD", "VOY_GRAB", "LOAD_FACTOR", "PAX_DAYS", "DO_CAPS_DAYS",
# #     "BFS_CHARTER_CD", "ALBS", "PORTCD_ACTIVITY", "GSS", "SAIL_DAT",
# #     "RETURN_DAT", "SAIL_DAY_QTY", "EMBARK_PORT_CD", "DISEMBARK_PORT_CD",
# #     "VOYAGE_CD", "SAIL_STATUS_CD", "PRODUCT_CD", "PRODUCT_DESC",
# #     "RM_ROLLUP_PRODUCT_CD", "RM_ROLLUP_PRODUCT_DESC", "MAIN_VOYAGE_CD",
# #     "END_OF_MONTH", "STRADDLE_FLAG", "CONV_SAIL_DAT",
# #     "CONV_RETURN_DAT", "CONV_SAIL_DAY_QTY", "CONV_SAIL_MONTH",
# #     "CONV_SAIL_YEAR", "STRADDLE_OCCURRENCE", "VOYAGEBUCKET", "NEW_VOYAGE_ID"]

# # dfnorm5 = dfnorm2.join(dfnorm4, join_keys, "left")

# # dfnorm5 = dfnorm5.with_column("NONPERIODDISTRIBUTED_AMOUNT", F.coalesce(F.col("NONPERIODDISTRIBUTED_AMOUNT"), F.lit(0)))\
# #                  .with_column("NULLPERIODDISTRIBUTED_AMOUNT", F.coalesce(F.col("NULLPERIODDISTRIBUTED_AMOUNT"), F.lit(0)))\
# #                  .with_column("JOURNAL_MONETARY_AMOUNT_ADJ", F.coalesce(F.col("JOURNAL_MONETARY_AMOUNT_ADJ"), F.lit(0)))



# # dfnorm5 = dfnorm5.with_column("FINAL_AMOUNT",
# #                             F.when(
# #                                 F.col("VOYAGEBUCKET").isin(["Bucket 1", "Bucket Promo"]),
# #                                 F.col("JOURNAL_MONETARY_AMOUNT_ADJ")
# #                             ).when(
# #                                 F.col("VOYAGEBUCKET").isin(["Bucket 3", "Bucket 4", "Bucket Charter"]),
# #                                 F.col("JOURNAL_MONETARY_AMOUNT_ADJ") + F.col("NONPERIODDISTRIBUTED_AMOUNT") + F.col("NULLPERIODDISTRIBUTED_AMOUNT")
# #                             ).otherwise(F.lit(None))
# #                         ).drop('SHIP_CD_1')


# # dfnorm5_processed = dfnorm5.with_column('CAPS_PER_DAY', F.col('DO_CAPS_DAYS') / F.col('SAIL_DAY_QTY'))\
# #                             .with_column('PAX_PER_DAY', F.col('PAX_DAYS') / F.col('SAIL_DAY_QTY'))\
# #                             .with_column('PRTD_CAPS_DAYS', F.col('CAPS_PER_DAY') * F.col('CONV_SAIL_DAY_QTY'))\
# #                             .with_column('PRTD_PAX_DAYS', F.col('PAX_PER_DAY') * F.col('CONV_SAIL_DAY_QTY'))\
# #                             .with_column('PCD',  F.col('FINAL_AMOUNT') / F.col('PRTD_CAPS_DAYS'))\
# #                             .with_column('PPD',  F.col('FINAL_AMOUNT') / F.col('PRTD_PAX_DAYS'))\
# #                             .with_columns(
# #                             # Add columns to match schema for future union
# #                             ['AMOUNT_TO_DISTRIBUTE', 'TOTALPAXDAYS', 'PAXPERCENT'],
# #                             [F.lit(0), F.lit(0), F.lit(0)]
# #                         )

# # df_comv1_processed = df_com1.with_column_renamed('TOTAL_PAX_DAYS', 'TOTALPAXDAYS') \
# #                              .with_column_renamed('PAX_WEIGHTAGE', 'PAXPERCENT')

# # # Add and calculate new columns to align with dfnorm5 schema
# # df_comv1_processed = df_comv1_processed.with_columns(
# #     [   'NEW_VOYAGE_ID', 'AMOUNT_TO_DISTRIBUTE', 'CAPS_PER_DAY', 'PRTD_CAPS_DAYS',
# #         'PCD', 'PAX_PER_DAY', 'PRTD_PAX_DAYS', 'PPD', 'NONPERIODDISTRIBUTED_AMOUNT',
# #         'NULLPERIODDISTRIBUTED_AMOUNT', 'STRADDLE_PAX', 'STRADDLE_PAX_PD'
# #     ],
# #     [   F.lit('0'), F.lit(0), F.lit(0), F.lit(0), F.lit(0), F.lit(0), F.lit(0),
# #         F.lit(0), F.lit(0), F.lit(0), F.lit(0), F.lit(0)
# #     ]).with_column('FINAL_AMOUNT', F.col('JOURNAL_MONETARY_AMOUNT_ADJ'))\
# #       .with_column('VOYAGEBUCKET', F.lit('Common -OU'))


# # final_cols_med = sorted(list(set(dfnorm5_processed.columns) | set(df_comv1_processed.columns)))
# # df_med = df_comv1_processed.select(final_cols_med).union_all(dfnorm5_processed.select(final_cols_med))

# # df_med = df_med.with_columns(['SIXTHMAN_VOYAGE', 'TRADITIONAL_VOYAGE_ID'],[F.lit('NA'), F.lit('NA')])

# # df_charter2_processed = df_charter2.with_column_renamed('TOTAL_PAX_DAYS', 'TOTALPAXDAYS') \
# #                                    .with_column_renamed('PAX_WEIGHTAGE', 'PAXPERCENT')\
# #                                    .with_column('VOYAGEBUCKET', F.lit('Bucket Charter'))

# # # Add and calculate columns to align with df_med schema
# # df_charter2_processed = df_charter2_processed.with_columns(
# #     [   'VOYAGEBUCKET', 'STRADDLE_OCCURRENCE', 'NEW_VOYAGE_ID', 'AMOUNT_TO_DISTRIBUTE',
# #         'CAPS_PER_DAY', 'PRTD_CAPS_DAYS', 'PCD', 'PAX_PER_DAY', 'PRTD_PAX_DAYS', 'PPD',
# #         'STRADDLE_PAX', 'STRADDLE_PAX_PD', 'NULLPERIODDISTRIBUTED_AMOUNT', 'NONPERIODDISTRIBUTED_AMOUNT'
# #     ],
# #     [   F.lit('Bucket Charter'), F.lit(0), F.lit('NA'), F.lit(0), F.lit(0), F.lit(0),
# #         F.lit(0), F.lit(0), F.lit(0), F.lit(0), F.lit(0), F.lit(0), F.lit(0), F.lit(0)
# #     ]).with_column('FINAL_AMOUNT', F.col('JOURNAL_MONETARY_AMOUNT_ADJ'))


# # final_cols_all = sorted(list(set(df_med.columns) | set(df_charter2_processed.columns)))
# # df_med1 = df_med.select(final_cols_all).union_all(df_charter2_processed.select(final_cols_all))


# # df_final = df_med1.select('FISCAL_YEAR_NUMBER','ACCOUNTING_PERIOD','BUSINESS_UNIT_DESCRIPTION','OPERATING_UNIT_DESCRIPTION','SHIP_CD','M0_M1',
# #                                     'VOYAGE_ID','ACCOUNT','JOURNAL_MONETARY_AMOUNT_ADJ','VOYAGE_CD','MXP_VOYAGE_CD','VOY_GRAB','ALBS','LOAD_FACTOR','PAX_DAYS',
# #                                     'DO_CAPS_DAYS','PORTCD_ACTIVITY','GSS','SAIL_DAT','RETURN_DAT','SAIL_DAY_QTY','EMBARK_PORT_CD','DISEMBARK_PORT_CD','BFS_CHARTER_CD',
# #                                     'RM_ROLLUP_PRODUCT_DESC','MAIN_VOYAGE_CD','END_OF_MONTH','STRADDLE_FLAG','CONV_SAIL_DAT','CONV_RETURN_DAT','CONV_SAIL_DAY_QTY',
# #                                     'CONV_SAIL_MONTH','CONV_SAIL_YEAR','STRADDLE_OCCURRENCE','VOYAGEBUCKET','NEW_VOYAGE_ID','AMOUNT_TO_DISTRIBUTE','TOTALPAXDAYS','PAXPERCENT',
# #                                     'FINAL_AMOUNT','CAPS_PER_DAY','PRTD_CAPS_DAYS','PCD','PAX_PER_DAY','PRTD_PAX_DAYS','PPD','SAIL_STATUS_CD','PRODUCT_CD','RM_ROLLUP_PRODUCT_CD',
# #                                     'SIXTHMAN_VOYAGE','TRADITIONAL_VOYAGE_ID','SHIP_NAME','SHIP_CLASS')


In [None]:
df_with_count.count()

In [None]:
df_with_voyage_merged.agg(F.sum("JOURNAL_MONETARY_AMOUNT_ADJ"))

# df_charter_map.group_by("mxp_voyage_cd").count()

In [None]:
# 1. Load Data (Assume data is already in Snowflake stages/tables)
product_map = session.table("L00_STG.NCLH_PRODUCT_MAPPING").with_column_renamed("PRODUCT_DESC", "PRODUCT_DESCRIPTION") \

# 2. Schema Cleaning & Hierarchy Join
df = df_final.with_column("ACCOUNT_NUMBER", F.substring(F.col("ACCOUNT"), 1, 5))

# standard left join replacing SQLDF
df = df.join(mapping, "ACCOUNT_NUMBER", "left") 

# 3. Dynamic Voyage Logic (replacing np.where and manual apply)
# Logical conditions based on fiscal attributes
voyage_cond = (
    (F.col("FISCAL_YEAR_NUMBER") != F.col("CONV_SAIL_YEAR")) | 
    (F.col("ACCOUNTING_PERIOD") != F.col("CONV_SAIL_MONTH")) | 
    F.col("CONV_SAIL_MONTH").is_null())


fiscal_match_val = F.concat(
    F.substring(F.col("FISCAL_YEAR_NUMBER").cast('string'), 3, 2),  # Get last two digits of the year (e.g., '25' from 2025)
    F.lpad(F.col("ACCOUNTING_PERIOD").cast('string'), 2, F.lit('0'))) # Pad period with '0' (e.g., '08' from 8)


df = df.with_columns(
    ["NEW_VOYAGE_ID", "SUB_STRING"],
    [
        F.when(voyage_cond, F.lit("Other voyages")).otherwise(F.col("VOYAGE_ID")),
        F.substring(F.col("VOYAGE_ID"), 4, 4)
    ])

# Refine ID based on custom check logic
df = df.with_column("NEW_VOYAGE_ID", 
                    F.when(F.col("SUB_STRING") == fiscal_match_val, F.col("VOYAGE_ID"))
                    .otherwise(F.lit("Other Voyage Code")))

# 6. Fuel Placeholder Metrics
fuel_metrics = ['HFO$','HFOMT','MGO$','MGOMT','BIO$','BIOMT','HFO$/MT','MGO$/MT','BIO$/MT','MGO_TOTAL_MT','HFO_TOTAL_MT','BIO_TOTAL_MT','MGO_TOTAL_MT/SAIL_DAY_QTY',
                'HFO_TOTAL_MT/SAIL_DAY_QTY','BIO_TOTAL_MT/SAIL_DAY_QTY','MGO_CONSUMPTION','HFO_CONSUMPTION','BIO_CONSUMPTION','MGO_MTCOST','HFO_MTCOST','BIO_MTCOST']
for metric in fuel_metrics:
    df = df.with_column(metric, F.lit(0))

df=df.select('FISCAL_YEAR_NUMBER','ACCOUNTING_PERIOD','SHIP_CD','M0_M1','VOYAGE_ID','ACCOUNT','LEVEL_1','LEVEL_2','LEVEL_3','LEVEL_4',
            'VOYAGE_CD','MXP_VOYAGE_CD','VOY_GRAB','ALBS','LOAD_FACTOR','PAX_DAYS','DO_CAPS_DAYS','PORTCD_ACTIVITY','GSS',
            'SAIL_DAT','RETURN_DAT','SAIL_DAY_QTY','EMBARK_PORT_CD','DISEMBARK_PORT_CD','SAIL_STATUS_CD','PRODUCT_CD',
            'RM_ROLLUP_PRODUCT_CD','RM_ROLLUP_PRODUCT_DESC','MAIN_VOYAGE_CD','END_OF_MONTH','STRADDLE_FLAG',
            'CONV_SAIL_DAT','CONV_RETURN_DAT','CONV_SAIL_DAY_QTY','CONV_SAIL_MONTH','CONV_SAIL_YEAR','CAPS_PER_DAY',
            'PRTD_CAPS_DAYS','PAX_PER_DAY','PRTD_PAX_DAYS','VOYAGEBUCKET','JOURNAL_MONETARY_AMOUNT_ADJ','FINAL_AMOUNT',
            'HFO$','HFOMT','MGO$','MGOMT','BIO$','BIOMT','HFO$/MT','MGO$/MT','BIO$/MT','MGO_TOTAL_MT','HFO_TOTAL_MT','BIO_TOTAL_MT','MGO_TOTAL_MT/SAIL_DAY_QTY',
            'HFO_TOTAL_MT/SAIL_DAY_QTY','BIO_TOTAL_MT/SAIL_DAY_QTY','MGO_CONSUMPTION','HFO_CONSUMPTION','BIO_CONSUMPTION','MGO_MTCOST','HFO_MTCOST','BIO_MTCOST',
            'SUB_STRING','NEW_VOYAGE_ID','PCD','PPD','STRADDLE_OCCURRENCE','BUSINESS_UNIT_DESCRIPTION','OPERATING_UNIT_DESCRIPTION',
            'SIXTHMAN_VOYAGE','TRADITIONAL_VOYAGE_ID','BFS_CHARTER_CD','SHIP_NAME','SHIP_CLASS')


# 7. Final Product Join and Materialization
df = df.join(product_map, ["VOYAGE_CD", "RM_ROLLUP_PRODUCT_DESC"], "left")

df = df.with_column('ADJUSTED_FINAL_AMOUNT_NEW', F.col('JOURNAL_MONETARY_AMOUNT_ADJ')) \
       .with_column('ADJUSTED_FINAL_AMOUNT', F.col('JOURNAL_MONETARY_AMOUNT_ADJ')) \
       .with_column("FIN_AMT_PCD", F.round(F.when(F.col("PRTD_CAPS_DAYS") != 0, 
                         F.col("JOURNAL_MONETARY_AMOUNT_ADJ") / F.col("PRTD_CAPS_DAYS")).otherwise(0), 4))\
       .with_column("FIN_AMT_PPD", F.round(F.when(F.col("PRTD_PAX_DAYS") != 0, 
                         F.col("JOURNAL_MONETARY_AMOUNT_ADJ") / F.col("PRTD_PAX_DAYS")).otherwise(0), 4))


In [None]:

# Add metadata columns
df = df.with_column("MD5_HUB_VOYAGE", F.md5(F.col("VOYAGE_ID")))
columns_to_hash = [c for c in df.columns if c not in ("VOYAGE_ID", "LDTS", "RCSR", "LAST_MODIFIED_BY", "MD5_HUB_VOYAGE")]
df = df.withColumn("HASH_DIFF", F.md5(F.concat(*[F.coalesce(F.col(c).cast("string"), F.lit("")).cast("string") for c in columns_to_hash])))
Final_NTR_Output = df.withColumn("LDTS", F.current_timestamp()) \
                   .withColumn("RCSR", F.lit('Master_data,NTR_PSFT')) \
                   .withColumn("LAST_MODIFIED_BY", F.current_user())


# # Materialize result in Snowflake (triggers actual execution)
Final_NTR_Output.write.mode("overwrite").save_as_table("L00_STG.SAT_VFM_NCL_M0_NTR")


# Final_NTR_Output.write.mode("overwrite").save_as_table("VESSOPS_D.L20_BDV.SAT_VFM_NCL_M0_NTR")

In [None]:
df_com1.write.mode("overwrite").save_as_table("VESSOPS_D.L00_STG.VFM_NCL_M0_NTR_TEST")