In [1]:
import pandas as pd
import json
import os

# Create source data (simulating a production database table)
data = {
    "order_id": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    "order_date": [
        "2024-06-01", "2024-06-01", "2024-06-02", "2024-06-02", "2024-06-03",
        "2024-06-03", "2024-06-04", "2024-06-04", "2024-06-05", "2024-06-05"
    ],
    "customer": ["Alice", "Bob", "Charlie", "Diana", "Alice",
                 "Bob", "Eve", "Charlie", "Diana", "Alice"],
    "product": ["Widget A", "Widget B", "Widget A", "Widget C", "Widget B",
                "Widget A", "Widget C", "Widget B", "Widget A", "Widget C"],
    "amount": [75.00, 50.00, 150.00, 30.00, 100.00, 
               75.00, 60.00, 50.00, 150.00, 90.00]
}

source_df = pd.DataFrame(data)
source_df.to_csv("source_orders.csv", index=False)

print("Source orders created:")
print(source_df)
print(f"\nTotal rows: {len(source_df)}")
print(f"Order ID range: {source_df['order_id'].min()} to {source_df['order_id'].max()}")


Source orders created:
   order_id  order_date customer   product  amount
0         1  2024-06-01    Alice  Widget A    75.0
1         2  2024-06-01      Bob  Widget B    50.0
2         3  2024-06-02  Charlie  Widget A   150.0
3         4  2024-06-02    Diana  Widget C    30.0
4         5  2024-06-03    Alice  Widget B   100.0
5         6  2024-06-03      Bob  Widget A    75.0
6         7  2024-06-04      Eve  Widget C    60.0
7         8  2024-06-04  Charlie  Widget B    50.0
8         9  2024-06-05    Diana  Widget A   150.0
9        10  2024-06-05    Alice  Widget C    90.0

Total rows: 10
Order ID range: 1 to 10


In [2]:
pd.DataFrame(columns=source_df.columns).to_csv("warehouse_orders.csv", index=False)
print("Empty warehouse file created.")


Empty warehouse file created.


In [5]:
CHECKPOINT_FILE = "checkpoint.json"

def read_checkpoint():
    """
    Read the last processed order_id from the checkpoint file.
    Returns 0 if no checkpoint exists (first run).
    """
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, "r") as f:
            checkpoint = json.load(f)
        print(f"Checkpoint found: last_processed_id = {checkpoint['last_processed_id']}")
        return checkpoint["last_processed_id"]
    else:
        print("No checkpoint found — this is the FIRST RUN")
        return 0  # No checkpoint = start from beginning

def write_checkpoint(last_processed_id):
    """
    Save the last processed order_id to the checkpoint file.
    """
    checkpoint = {
        "last_processed_id": last_processed_id,
        "updated_at": pd.Timestamp.now().isoformat()
    }
    with open(CHECKPOINT_FILE, "w") as f:
        json.dump(checkpoint, f, indent=2)
    print(f"Checkpoint updated: last_processed_id = {last_processed_id}")


In [6]:
last_id = read_checkpoint()
print(f"Starting from order_id > {last_id}")

write_checkpoint(5)
last_id = read_checkpoint()
print(f"After writing 5, checkpoint returns: {last_id}")

if os.path.exists(CHECKPOINT_FILE):
    os.remove(CHECKPOINT_FILE)
    print("Checkpoint removed for fresh start")


No checkpoint found — this is the FIRST RUN
Starting from order_id > 0
Checkpoint updated: last_processed_id = 5
Checkpoint found: last_processed_id = 5
After writing 5, checkpoint returns: 5
Checkpoint removed for fresh start


In [7]:
# Incremental Load Function
def incremental_load(source_file, warehouse_file):
    """
    Load only NEW rows from source since the last checkpoint.
    """
    print(f"\n{'=' * 55}")
    print(f"  INCREMENTAL LOAD")
    print(f"{'=' * 55}")
    
    # Step 1: Read checkpoint
    last_processed_id = read_checkpoint()
    
    # Step 2: Read source and filter new rows
    source_df = pd.read_csv(source_file)
    new_rows = source_df[source_df["order_id"] > last_processed_id]
    
    print(f"\nSource total rows: {len(source_df)}")
    print(f"New rows (order_id > {last_processed_id}): {len(new_rows)}")
    
    # Step 3: Handle no new data
    if len(new_rows) == 0:
        print("No new data to load. Skipping.")
        return pd.read_csv(warehouse_file) if os.path.exists(warehouse_file) else pd.DataFrame()
    
    # Step 4: Append to warehouse
    if os.path.exists(warehouse_file) and os.path.getsize(warehouse_file) > 0:
        warehouse_df = pd.read_csv(warehouse_file)
    else:
        warehouse_df = pd.DataFrame(columns=source_df.columns)
    
    print(f"Warehouse rows BEFORE: {len(warehouse_df)}")
    
    warehouse_df = pd.concat([warehouse_df, new_rows], ignore_index=True)
    warehouse_df.to_csv(warehouse_file, index=False)
    
    print(f"Warehouse rows AFTER: {len(warehouse_df)}")
    
    # Step 5: Update checkpoint
    new_max_id = int(new_rows["order_id"].max())
    write_checkpoint(new_max_id)
    
    print(f"\n--- Incremental Load Summary ---")
    print(f"New rows loaded: {len(new_rows)}")
    print(f"New checkpoint: {new_max_id}")
    
    return warehouse_df


In [8]:
if os.path.exists(CHECKPOINT_FILE):
    os.remove(CHECKPOINT_FILE)

pd.DataFrame(columns=["order_id","order_date","customer","product","amount"]).to_csv("warehouse_orders.csv", index=False)

print("Clean state: no checkpoint, empty warehouse\n")


Clean state: no checkpoint, empty warehouse



In [9]:
print("===== FIRST RUN (no checkpoint) =====")
result = incremental_load("source_orders.csv", "warehouse_orders.csv")
print("\nWarehouse contents:")
print(result)


===== FIRST RUN (no checkpoint) =====

  INCREMENTAL LOAD
No checkpoint found — this is the FIRST RUN

Source total rows: 10
New rows (order_id > 0): 10
Warehouse rows BEFORE: 0
Warehouse rows AFTER: 10
Checkpoint updated: last_processed_id = 10

--- Incremental Load Summary ---
New rows loaded: 10
New checkpoint: 10

Warehouse contents:
  order_id  order_date customer   product  amount
0        1  2024-06-01    Alice  Widget A    75.0
1        2  2024-06-01      Bob  Widget B    50.0
2        3  2024-06-02  Charlie  Widget A   150.0
3        4  2024-06-02    Diana  Widget C    30.0
4        5  2024-06-03    Alice  Widget B   100.0
5        6  2024-06-03      Bob  Widget A    75.0
6        7  2024-06-04      Eve  Widget C    60.0
7        8  2024-06-04  Charlie  Widget B    50.0
8        9  2024-06-05    Diana  Widget A   150.0
9       10  2024-06-05    Alice  Widget C    90.0


  warehouse_df = pd.concat([warehouse_df, new_rows], ignore_index=True)


In [10]:
# Simulate New Data
new_orders = pd.DataFrame({
    "order_id": [11, 12, 13],
    "order_date": ["2024-06-06", "2024-06-06", "2024-06-07"],
    "customer": ["Frank", "Alice", "Bob"],
    "product": ["Widget A", "Widget B", "Widget C"],
    "amount": [200.00, 75.00, 45.00]
})

source_df = pd.read_csv("source_orders.csv")
source_df = pd.concat([source_df, new_orders], ignore_index=True)
source_df.to_csv("source_orders.csv", index=False)

print(f"Source now has {len(source_df)} rows")


Source now has 13 rows


In [11]:
print("\n===== SECOND RUN =====")
result = incremental_load("source_orders.csv", "warehouse_orders.csv")
print("\nTotal warehouse rows:", len(result))



===== SECOND RUN =====

  INCREMENTAL LOAD
Checkpoint found: last_processed_id = 10

Source total rows: 13
New rows (order_id > 10): 3
Warehouse rows BEFORE: 10
Warehouse rows AFTER: 13
Checkpoint updated: last_processed_id = 13

--- Incremental Load Summary ---
New rows loaded: 3
New checkpoint: 13

Total warehouse rows: 13


In [12]:
# No New Data

print("\n===== THIRD RUN (no new data) =====")
result = incremental_load("source_orders.csv", "warehouse_orders.csv")
print("\nTotal warehouse rows:", len(result))



===== THIRD RUN (no new data) =====

  INCREMENTAL LOAD
Checkpoint found: last_processed_id = 13

Source total rows: 13
New rows (order_id > 13): 0
No new data to load. Skipping.

Total warehouse rows: 13
