### Start of Incremental Load
Start of incremental loads of every table in the schema. It runs everyday, but keeps only last available date + month ends.

In [0]:
# Example: Check if there is a difference between the last loaded and current data in a generic incremental ETL pipeline
# This is a common pattern for incremental data loads in data engineering

import sys
from datetime import datetime

# Replace 'mydb.incremental_table' and 'mydb.current_table' with your actual table names
day_delta_df = spark.sql("""
  SELECT 
      i.load_date,
      SUM(i.amount) - b.total_amount AS day_delta
  FROM mydb.incremental_table i
  JOIN (
      SELECT SUM(c.amount) AS total_amount
      FROM mydb.current_table c
  ) b
  GROUP BY i.load_date, b.total_amount
  ORDER BY i.load_date DESC
  LIMIT 1
""")

# Get the difference value
try:
    day_delta = day_delta_df.select("day_delta").collect()[0][0]
except IndexError:
    day_delta = None

current_weekday = datetime.now().strftime('%A')

# If there is no difference and today is a weekday, stop the script
if (day_delta is None or day_delta == 0) and current_weekday not in ['Saturday', 'Sunday']:
    sys.exit("No new data to process. Aborting script.")

In [0]:
# Example: Incremental load for a dimension table in a generic data warehouse
# This pattern loads new records and unions with previous incremental data, keeping only the latest and month-end records

# Replace 'source_db.generic_table' and 'analytics_db.incremental_table' with your own table names

generic_df = spark.sql("""
  SELECT *, current_date() AS run_date
  FROM source_db.generic_table
""")

# Load previous incremental data, excluding today's records and keeping only month-ends
prev_incremental_data = spark.sql("""
  SELECT *
  FROM analytics_db.incremental_table
  WHERE run_date <> current_date() AND date_format(run_date, 'dd') = '01'
""")

# Union new and previous data
incremental_data = generic_df.unionByName(prev_incremental_data)

# Overwrite the incremental table with the new data
incremental_data.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.incremental_table")

In [0]:
# Example: Incremental load for a fact table in a generic data warehouse
# Replace 'source_db.fact_table' and 'analytics_db.incremental_table' with your own table names

fact_table_df = spark.sql("""
  SELECT *, current_date() AS run_date
  FROM source_db.fact_table
""")

prev_incremental_table = spark.sql("""
  SELECT *
  FROM analytics_db.incremental_table
  WHERE run_date <> current_date() AND date_format(run_date, 'dd') = '01'
""")

incremental_table = fact_table_df.unionByName(prev_incremental_table)

incremental_table.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.incremental_table")

In [0]:
# Example: Incremental load for the Behavior Classification dimension table
# Replace 'source_db.generic_behavior' and 'analytics_db.incremental_behavior' with your own table names

generic_behavior_df = spark.sql("""
  SELECT *, current_date() AS run_date
  FROM source_db.generic_behavior
""")

prev_incremental_behavior = spark.sql("""
  SELECT *
  FROM analytics_db.incremental_behavior
  WHERE run_date <> current_date() AND date_format(run_date, 'dd') = '01'
""")

incremental_behavior = generic_behavior_df.unionByName(prev_incremental_behavior)

incremental_behavior.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.incremental_behavior")

In [0]:
# Example: Incremental load for the Equipment dimension table
# Replace 'source_db.equipment' and 'analytics_db.incremental_equipment' with your own table names

equipment_df = spark.sql("""
  SELECT *, current_date() AS run_date
  FROM source_db.equipment
""")

prev_incremental_equipment = spark.sql("""
  SELECT *
  FROM analytics_db.incremental_equipment
  WHERE run_date <> current_date() AND date_format(run_date, 'dd') = '01'
""")

incremental_equipment = equipment_df.unionByName(prev_incremental_equipment)

incremental_equipment.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.incremental_equipment")

In [0]:
# Incremental load for the Product dimension table
# Replace 'source_db.product' and 'analytics_db.incremental_product' with your own table names

product_df = spark.sql("""
  SELECT *, current_date() AS run_date
  FROM source_db.product
""")

prev_incremental_product = spark.sql("""
  SELECT *
  FROM analytics_db.incremental_product
  WHERE run_date <> current_date() AND date_format(run_date, 'dd') = '01'
""")

incremental_product = product_df.unionByName(prev_incremental_product)

incremental_product.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.incremental_product")

In [0]:
# Example: Incremental load for the Equipment Dates dimension table
# Replace 'source_db.equipment_dates' and 'analytics_db.incremental_equipment_dates' with your own table names

equipment_dates_df = spark.sql("""
  SELECT *, current_date() AS run_date
  FROM source_db.equipment_dates
""")

prev_incremental_equipment_dates = spark.sql("""
  SELECT *
  FROM analytics_db.incremental_equipment_dates
  WHERE run_date <> current_date() AND date_format(run_date, 'dd') = '01'
""")

incremental_equipment_dates = equipment_dates_df.unionByName(prev_incremental_equipment_dates)

incremental_equipment_dates.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.incremental_equipment_dates")

In [0]:
# Incremental load for a Fact table with joins in a generic data warehouse
# This example demonstrates joining a fact table with calculation and inventory tables, then performing an incremental load pattern.
# Replace all table and column names with your own as needed.

# Step 1: Load today's fact data, joining with calculation and inventory tables
fact_today_df = spark.sql("""
  SELECT
    f.*, 
    current_date() AS run_date,
    inv.inventory_id,
    inv.inventory_flag,
    calc.product_type,
    calc.interest_code
  FROM source_db.fact_table f
  INNER JOIN source_db.fact_table_calc calc ON f.fact_key = calc.fact_key
  LEFT OUTER JOIN source_db.fact_inventory_link inv ON f.fact_key = inv.fact_key
""")

# Step 2: Load previous incremental data, excluding today's records and keeping only month-ends
prev_incremental_fact = spark.sql("""
  SELECT *
  FROM analytics_db.incremental_fact_table
  WHERE run_date <> current_date() AND date_format(run_date, 'dd') = '01'
""")

# Step 3: Union today's and previous data
incremental_fact = fact_today_df.unionByName(prev_incremental_fact)

# Step 4: Overwrite the incremental fact table with the new data
incremental_fact.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.incremental_fact_table")

# Note: Replace 'source_db' and 'analytics_db' with your own database/schema names.
#       Replace column names with those relevant to your data model.

End of incremental loads and start of Fact Table transformation

In [0]:
# Filtering and creation of key fields for the fact table
# This cell demonstrates how to join incremental fact and dimension tables, create key fields, and apply business logic in a generic data warehouse

T1_Fact = spark.sql('''
  select 
    date_add(f.run_date, -1)                                      as run_date,
    f.fact_key                                                    as fact_id,
    f.credit_line_id                                              as credit_line_id,
    f.invoice_id                                                  as invoice_id,
    d.dealer_id                                                   as dealer_id,
    f.company_id                                                  as company_id,
    f.receivable_date                                             as receivable_date,
    f.unpaid_balance                                              as unpaid_balance,
    f.past_due_amount                                             as past_due_amount,
    f.owner                                                       as owner,
    f.ag_percentage                                               as ag_percentage,
    f.ce_percentage                                               as ce_percentage,
    f.last_interest_date                                          as last_interest_date,
    f.current_interest_date                                       as current_interest_date,
    f.current_maturity_date                                       as current_maturity_date,
    f.maturity_date                                               as maturity_date,
    f.interest_bearing_code                                       as interest_bearing_code,
    f.major_flag                                                  as major_flag,
    f.equipment_id                                                as equipment_id,
    f.suffix                                                      as suffix,
    f.product_brand                                               as product_brand,
    d.country                                                     as country,
    f.bearing_type                                                as bearing_type,
    e.new_or_used_indicator                                       as new_or_used_indicator,
    -- Example business logic for aging comparison
    if(
      (f.unpaid_balance > 0 or (e.current_status_code = '4990' and f.credit_line_id = '011') or (e.current_status_code = '3000'))
      and d.termination_code not in ('8', '9')
      and f.credit_line_id not in ('13', '15')
      and e.current_status_code not in ('5000','5500','6000','6100','7000','9000'),
      1, 0
    ) as aging_comparison
  from analytics_db.incremental_fact_table f
    inner join analytics_db.incremental_table d 
      on f.dealer_id = d.dealer_id and f.company_id = d.company_id and f.run_date = d.run_date
    left join analytics_db.incremental_equipment e 
      on e.equipment_id = f.equipment_id and f.run_date = e.run_date
  where 
    f.unpaid_balance <> 0
    and f.owner = 'C'
    and (f.major_flag is null or f.major_flag = 'Y')
    and d.brand in ('CASE', 'FPT', 'MFS', 'n-CAP', 'n-CASE', 'n-NH', 'NH')
''')

T1_Fact.createOrReplaceTempView("T1_Fact")

In [0]:
from pyspark.sql.functions import lit

# Get distinct values for each column from the generic fact table
distinct_run_date = Generic_Fact_Table.select("run_date").distinct()  # Distinct run dates
distinct_product_brand = Generic_Fact_Table.select("product_brand").distinct()  # Distinct product brands
distinct_country = Generic_Fact_Table.select("country").distinct()  # Distinct countries
distinct_bearing_type = Generic_Fact_Table.select("bearing_type").distinct()  # Distinct bearing types
distinct_new_or_used = Generic_Fact_Table.select("new_or_used_indicator").distinct()  # Distinct new or used indicators

# Cross join the distinct values to get all possible combinations
AllCombinations = (
    distinct_run_date
    .crossJoin(distinct_product_brand)
    .crossJoin(distinct_country)
    .crossJoin(distinct_bearing_type)
    .crossJoin(distinct_new_or_used)
)

# Register as temporary view for further processing
AllCombinations.createOrReplaceTempView("AllCombinations")

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, when, col

# Fact table transformations: join with all combinations, create keys, and add derived columns
Complete_Fact = spark.sql('''
  select *,
    run_date||dealer_id as dealer_key,
    run_date||equipment_id as equipment_key,
    year_month||product_brand||country||bearing_type||new_or_used_indicator as plan_id,
    year_month||currency as fx_id  
  from(
    select
      a.bearing_type,
      a.run_date,
      a.product_brand,
      a.country,
      a.new_or_used_indicator,
      date_format(a.run_date, 'MMM') as run_month,    
      to_number(date_format(a.run_date, 'M'), '99') as run_month_id,  
      date_format(a.run_date, 'yyyyMM') as year_month,
      date_format(a.run_date, 'yyyy') as run_year,      
      fact_id,
      cast(credit_line_id as int) as credit_line_id,
      invoice_id,
      dealer_id,
      company_id,
      receivable_date,
      unpaid_balance,
      past_due_amount,
      owner,
      ag_percentage,
      ce_percentage,
      last_interest_date,
      current_interest_date,
      current_maturity_date,
      maturity_date,
      interest_bearing_code,
      major_flag,
      equipment_id,
      suffix,
      aging_comparison,
      datediff(a.run_date, maturity_date) as past_due_days,
      datediff(last_day(a.run_date), maturity_date) as rounded_past_due_days,
      CASE WHEN suffix in ('A8', 'R8') THEN 'Yes' ELSE 'No' END as dealer_credits,
      CASE
        WHEN datediff(a.run_date, receivable_date) between 0 and 30 THEN '0-30 Days'
        WHEN datediff(a.run_date, receivable_date) between 31 and 60 THEN '31-60 Days'
        WHEN datediff(a.run_date, receivable_date) between 61 and 90 THEN '61-90 Days'
        WHEN datediff(a.run_date, receivable_date) between 91 and 180 THEN '91-180 Days'
        WHEN datediff(a.run_date, receivable_date) between 181 and 365 THEN '181-365 Days'
        WHEN datediff(a.run_date, receivable_date) between 366 and 730 THEN '366-730 Days'
        WHEN datediff(a.run_date, receivable_date) > 730 THEN '731+ Days'
      END as aging_buckets,
      CASE
        WHEN datediff(a.run_date, receivable_date) between 0 and 365 THEN '0-365 Days'
        WHEN datediff(a.run_date, receivable_date) between 366 and 730 THEN '366-730 Days'
        WHEN datediff(a.run_date, receivable_date) > 730 THEN '731+ Days'
      END as short_aging_buckets,
      CASE 
        WHEN a.product_brand = 'Case IH' THEN 'AG'
        WHEN a.product_brand = 'Case CE' THEN 'CE'
        WHEN a.product_brand = 'NH AG' THEN 'AG'
        WHEN a.product_brand = 'NH CE' THEN 'CE' 
        WHEN a.product_brand = 'ALLIED' THEN 
          CASE 
            WHEN ag_percentage = 1 THEN 'AG'
            WHEN ce_percentage = 1 THEN 'CE'
            ELSE 'FPT'
          END
        WHEN a.product_brand = 'FPT' THEN 'FPT'
      END as product_ind,
      CASE WHEN a.product_brand = 'ALLIED' THEN 'ALLIED' ELSE 'CAPTIVE' END as captive_allied, 
      if(a.new_or_used_indicator = 'N', 'New Units', if(a.new_or_used_indicator = 'U', 'Used Units', 'Other')) as new_or_used_desc,
      if(a.country = 'CA', 'CAD', 'USD') as currency  
    from AllCombinations a
    left join T1_Fact t
      on a.run_date||a.product_brand||a.country||a.bearing_type||a.new_or_used_indicator 
      =  t.run_date||t.product_brand||t.country||t.bearing_type||t.new_or_used_indicator
  )
''')

# Add sequential IDs to null fact_id values
Complete_Fact = Complete_Fact.withColumn(
    "fact_id",
    when(col("fact_id").isNull(), monotonically_increasing_id()).otherwise(col("fact_id"))
)

# Save the transformed fact table to the analytics database
Complete_Fact.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.complete_fact")

End of transformations. Start of loading to final tables.

In [0]:
from pyspark.sql.functions import col

# Create the final fact table, ensuring all dealers with credit limits are included
FACT_Fact = spark.sql('''
  select
    fact_id||run_date as key_fact,  -- Unique key for the fact record
    dealer_key,                     -- Key for the dealer
    equipment_key,                  -- Key for the equipment
    plan_id,                        -- Key for the plan
    fx_id,                          -- Key for the foreign exchange
    run_date,                       -- Date of the run
    year_month,                     -- Year and month of the run
    credit_line_id,                 -- Key for the credit line
    invoice_id,                     -- Key for the invoice
    dealer_id,                      -- ID of the dealer
    company_id,                     -- ID of the company
    unpaid_balance,                 -- Amount of unpaid balance
    past_due_amount,                -- Amount past due
    major_flag,                     -- Indicator for major status
    equipment_id                    -- ID of the equipment
  from generic_db.generic_fact_table  
''')

# Ensure all dealers with credit limits are present in the fact table
all_dealers = spark.sql('''
  select distinct
    date_add(d.run_date, -1) as run_date,  -- Adjusted run date for consistency
    d.dealer_id as dealer_id,               -- ID of the dealer
    date_add(d.run_date, -1)||d.dealer_id as dealer_key  -- Unique key for the dealer
  from generic_db.generic_incremental_dealer d
  left join 
      (select 
        dealer_key, 
        date_add(run_date, -1) as run_date,
        sum(credit_limit_amt) as credit_limit_amt  -- Total credit limit amount
      from generic_db.generic_incremental_creditlimit
      group by dealer_key, run_date) cr 
    on d.dealer_key = cr.dealer_key and date_add(d.run_date, -1) = cr.run_date
  where d.dealer_id is not null and cr.credit_limit_amt is not null and cr.credit_limit_amt <> 0
''')

# Combine the fact table with all dealers ensuring all necessary records are included
FACT_Fact = FACT_Fact.unionByName(all_dealers, allowMissingColumns=True).withColumn("run_date", col("run_date").cast("date"))

# Write the final fact table to the specified location
FACT_Fact.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("generic_db.generic_fact_table")

DIM tables transformation and store

In [0]:
# Create and store the DIM Dealer table with generic names
DIM_Dealer = spark.sql('''
  select distinct
    date_add(d.run_date, -1)||d.dealer_id   as dealer_key,  -- Unique key for the dealer
    date_add(d.run_date, -1)                as run_date,      -- Date of the run, adjusted by one day
    d.country                               as country,        -- Country of the dealer
    d.dealer_id                             as dealer_number,  -- Dealer identification number
    d.pm_initials                           as pm,            -- Initials of the primary manager
    d.wsa_initials                          as wsa,           -- Initials of the warranty service advisor
    d.gm                                    as rsd,           -- General manager
    d.dealer_full_name                      as dealer_name,    -- Full name of the dealer
    d.state                                 as state,         -- State where the dealer is located
    d.brand                                 as dealer_brand,   -- Brand associated with the dealer
    d.termination_code                      as termination_code,-- Code indicating termination status
    d.machine_terms_code                    as machine_terms_code, -- Code for machine credit terms
    d.machine_terms_date                    as machine_terms_date, -- Date for machine credit terms
    d.machine_terms_name                    as machine_terms_name, -- Name for machine credit terms
    d.parts_terms_code                      as parts_terms_code,   -- Code for parts credit terms
    d.parts_terms_date                      as parts_terms_date,   -- Date for parts credit terms
    d.parts_terms_name                      as parts_terms_name,   -- Name for parts credit terms
    d.entity_dealer_number                  as entity_dealer_number,-- Entity dealer number
    d.credit_rating                         as credit_rating,      -- Credit rating of the dealer
    cr.credit_limit_amt                     as credit_limit_amt,   -- Credit limit amount from the credit limit table
    date_format(d.run_date, 'yyyyMM')       as fx_id,            -- Formatted date for currency exchange
    d.brand                                 as dealer_brand2      -- Brand for dealer, potentially for further categorization
  from analytics_db.incremental_table d
    left join 
      (select 
        dealer_key, 
        date_add(run_date, -1) as run_date,  -- Adjusted run date for credit limit aggregation
        sum(credit_limit_amt) as credit_limit_amt -- Total credit limit amount for the dealer
      from analytics_db.incremental_creditlimit
      group by dealer_key, run_date) cr 
    on d.dealer_key = cr.dealer_key and date_add(d.run_date, -1) = cr.run_date
  where d.dealer_id is not null  -- Ensure only valid dealers are included
''')

# Write the resulting DataFrame to a table in the analytics database
DIM_Dealer.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.dim_dealer")

In [0]:
# Create and store the DIM Dealer Behaviour table with generic names
DIM_DealerBehaviour = spark.sql('''
  select distinct
    date_add(b.run_date, -1)||b.dealer_id as dealer_key,  -- Unique dealer key generated from run date and dealer ID
    CASE 
      WHEN b.behavior_code = 'PF' THEN 'Performing'        -- Classify as Performing
      WHEN b.behavior_code = 'PD' THEN 'Problem Dealer'    -- Classify as Problem Dealer
      WHEN b.behavior_code = 'BD' THEN 'Bad Debt'          -- Classify as Bad Debt
      WHEN b.behavior_code = 'CW' THEN 'Credit Watch'      -- Classify as Credit Watch
      WHEN b.behavior_code = 'LG' THEN 'Litigation'        -- Classify as Litigation
    END as behavior_desc,                                   -- Description of dealer behavior
    b.problem_dealer,                                      -- Indicator for problem dealer
    b.bad_debt,                                           -- Indicator for bad debt
    cast(b.loss_estimate as decimal(18,2)) as loss_estimate,  -- Loss estimate cast to decimal
    cast(b.loss_reserve as decimal(18,2)) as loss_reserve   -- Loss reserve cast to decimal
  from analytics_db.incremental_behavior b                 -- Source table for incremental behavior data
''')

# Write the results to a new table in the analytics database
DIM_DealerBehaviour.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.dim_dealerbehavior")  -- Save as a new table

In [0]:
# Create and store the DIM Equipment table with generic names
DIM_Equipment = spark.sql('''
  WITH EQUIP AS
    (SELECT distinct 
        e.*,
        coalesce(p.gamma_desc, p2.gamma_desc) as gamma_desc,
        coalesce(p.industry_class_nm, p2.industry_class_nm) as industry_class_nm,
        coalesce(p.product_desc, p2.product_desc) as product_desc,
        coalesce(p.category_nm, p2.category_nm) as category_nm
    from analytics_db.incremental_equipment e
    left join analytics_db.incremental_product p
        on e.tech_type = p.tech_type 
        and e.run_date = p.run_date
    left join analytics_db.incremental_product as p2 
        on trim(e.price_book||e.series||e.model) = trim(p2.pb_code||p2.series||p2.model) 
        and e.run_date = p2.run_date
    ) 
  select distinct
    date_add(e.run_date, -1)||e.equipment_id    as equipment_key,  -- Unique key for equipment
    date_add(e.run_date, -1)                    as run_date,         -- Run date for the equipment
    trim(e.price_book||e.series||e.model)       as pbsm_id,         -- Price book, series, and model ID
    e.serial_number                            as serial_number,     -- Serial number of the equipment
    e.manufacturer_code                        as manufacturer_code,  -- Manufacturer code
    e.current_status_code                      as current_status_code, -- Current status of the equipment
    e.used_equipment_product_code              as used_equipment_product_code, -- Used equipment product code
    e.gamma_desc                               as gamma_desc,        -- Gamma description
    e.industry_class_nm                        as industry_class,     -- Industry class name
    CASE WHEN isnull(e.product_desc) or trim(e.product_desc) = '' THEN 'MISCELLANEOUS' ELSE e.product_desc END as product_desc, -- Product description
    CASE WHEN isnull(e.category_nm) or trim(e.category_nm) = '' THEN 'Miscellaneous' ELSE e.category_nm END as category_name -- Category name
  from EQUIP e
    inner join analytics_db.complete_fact r 
      on e.equipment_id = r.equipment_id and date_add(e.run_date, -1) = r.run_date
''')

DIM_Equipment.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.dim_equipment")  -- Save the DIM Equipment table

In [0]:
# Create and store the DIM Dates table with generic names
DIM_Dates = spark.sql('''
  select distinct
    run_date,          -- The date of the run
    run_month,         -- The month of the run
    run_month_id,      -- The identifier for the run month
    year_month,        -- The year and month combined
    run_year           -- The year of the run
  from analytics_db.complete_fact   -- Source table for the data
''')

# Write the DIM_Dates DataFrame to a table with overwrite mode
DIM_Dates.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.dim_dates")  -- Target table for the data

In [0]:
# Create and store the DIM Receivables table with generic names
DIM_Receivables = spark.sql('''
  select distinct
    fact_id||run_date as key_fact,          -- Unique identifier for the fact
    bearing_type,                           -- Type of bearing
    product_brand,                          -- Brand of the product
    country,                                -- Country of the transaction
    new_or_used_indicator,                  -- Indicator for new or used item
    receivable_date,                        -- Date of the receivable
    owner,                                  -- Owner of the receivable
    ag_percentage,                          -- AG percentage
    ce_percentage,                          -- CE percentage
    last_interest_date,                     -- Last interest date
    current_interest_date,                  -- Current interest date
    current_maturity_date,                  -- Current maturity date
    maturity_date,                          -- Maturity date of the receivable
    interest_bearing_code,                  -- Code for interest bearing
    suffix,                                 -- Suffix for the receivable
    aging_comparison,                       -- Aging comparison flag
    past_due_days,                         -- Number of past due days
    rounded_past_due_days,                 -- Rounded number of past due days
    dealer_credits,                         -- Dealer credits flag
    aging_buckets,                          -- Aging buckets
    short_aging_buckets,                    -- Short aging buckets
    product_ind,                           -- Product indicator
    captive_allied,                         -- Captive allied flag
    new_or_used_desc,                       -- Description for new or used
    currency                                -- Currency of the transaction  
  from analytics_db.complete_fact           -- Source table for the data
''')
DIM_Receivables.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("analytics_db.dim_receivables")  # Save the table with overwrite mode