# Waste Projection workflow

In [36]:
# Import Libraries
#import pandas as pd
import sqlite3
import utils

import importlib
importlib.reload(utils)

from utils import calculate_fefo


## Extract

In [27]:
# Specify the database file to delete
database_name = 'hf_database.db'

# Create a SQLite database and a connection
conn = sqlite3.connect(database_name)

# ---- Load demand data in a dataframe # forecast_df
file_path = '/Users/fil/Documents/my_projects/hf_fefo_waste_projection/datasets/forecast_df_2024_11_20.csv'
forecast_df = pd.read_csv(file_path) # Execute the query and load the result into a pandas DataFrame


# ---- Load Inventory Data in a dataframe # full_stock_df
file_path = '/Users/fil/Documents/my_projects/hf_fefo_waste_projection/datasets/full_stock_df_2024_11_20.csv'
full_stock_df = pd.read_csv(file_path)


# ---- Load Exclusion List in a dataframe # exclusion_df
file_path = '/Users/fil/Documents/my_projects/hf_fefo_waste_projection/datasets/exclusion_df_2024_11_20.csv'
exclusion_df = pd.read_csv(file_path)

# Close the connection
conn.close()

# Print an output for verification
print("Loaded 'forecast_df' with " + str(len(forecast_df)) + " lines") # 61.111 lines
print("Loaded 'full_stock_df' with " + str(len(full_stock_df)) + " lines") # 16.191 lines
print("Loaded 'exclusion_df' with " + str(len(exclusion_df)) + " lines") # 50 lines

Loaded 'forecast_df' with 61111 lines
Loaded 'full_stock_df' with 16191 lines
Loaded 'exclusion_df' with 49 lines


In [35]:
full_stock_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16191 entries, 0 to 16190
Data columns (total 21 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   sku_code                 16191 non-null  object 
 1   dc_code                  16191 non-null  object 
 2   tm_id__po_id             16191 non-null  object 
 3   lot_code                 0 non-null      float64
 4   location_id              14290 non-null  object 
 5   expiration_date          16191 non-null  object 
 6   quantity                 16191 non-null  float64
 7   category                 16191 non-null  object 
 8   unit_cost                16191 non-null  float64
 9   discardment_date         16191 non-null  object 
 10  opening_stock__arr_date  16191 non-null  object 
 11  snapshot_time            14290 non-null  object 
 12  product_types            16142 non-null  object 
 13  hellofresh_week          16079 non-null  object 
 14  hf_week_out           

In [37]:
# Clean Inventory Data by Filtering Out the Eclusion List | # merged = full stock | # cleaned = stock

# Merge the inventory list and the exclusion list
merged_inventory_df = full_stock_df.merge(exclusion_df, on=['sku_code', 'supplier_code', 'data_source'], how='left', indicator=True)

# Filter out: from the _merge column keep only values that do not appear in the right (exclusion) table
stock_df = merged_inventory_df[merged_inventory_df['_merge'] == 'left_only'].drop(columns='_merge')

## Run the Calculation of FEFO

In [38]:
# Calculate FEFO
calc_df = calculate_fefo(forecast_df, stock_df)

# Print an output for verification
print("Calculation complete: " + str(len(calc_df)) + " lines in the output")

Calculation complete: 16186 lines in the output


In [40]:
calc_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16186 entries, 0 to 16185
Data columns (total 21 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   sku_id            16186 non-null  object        
 1   batch_id          0 non-null      float64       
 2   pallet_id         16186 non-null  object        
 3   expiration_date   16186 non-null  datetime64[ns]
 4   discardment_date  16186 non-null  datetime64[ns]
 5   remaining_qty     16186 non-null  float64       
 6   consumed_qty      16186 non-null  float64       
 7   dc                16186 non-null  object        
 8   location          14289 non-null  object        
 9   category          16186 non-null  object        
 10  unit_cost         16186 non-null  float64       
 11  line_cost         16186 non-null  float64       
 12  type              16137 non-null  object        
 13  hf_week           16074 non-null  object        
 14  hf_week_out       1618

In [39]:
calc_df.head()

Unnamed: 0,sku_id,batch_id,pallet_id,expiration_date,discardment_date,remaining_qty,consumed_qty,dc,location,category,...,line_cost,type,hf_week,hf_week_out,temp_class,data_source,logical_mlor,mlor_source,snapshot_time,supplier_code
0,C_1-10344,,id_493476,2025-02-24,2025-02-20,3213.0,6787.0,FI,,C_1,...,3148.74,Ingredient SKU,2025-W08,3,a1,po,84.0,fixed_value_MLOR,NaT,s_5230
1,C_1-10344,,id_211571,2025-05-02,2025-04-28,200.0,0.0,FI,loc-2115,C_1,...,196.0,Ingredient SKU,2025-W18,3,a1,in,,,2024-11-19 23:45:04.657,
2,C_1-10344,,id_211648,2025-05-02,2025-04-28,1200.0,0.0,FI,loc-1818,C_1,...,1176.0,Ingredient SKU,2025-W18,3,a1,in,,,2024-11-19 23:45:04.657,
3,C_1-10344,,id_871980,2025-05-02,2025-04-28,5.0,0.0,FI,loc-6725,C_1,...,4.9,Ingredient SKU,2025-W18,3,a1,in,,,2024-11-19 23:45:04.657,
4,C_1-10344,,id_915969,2025-05-02,2025-04-28,8.0,0.0,FI,loc-6769,C_1,...,7.84,Ingredient SKU,2025-W18,3,a1,in,,,2024-11-19 23:45:04.657,


## Aggregations

In [None]:

# Define the weeks you want to filter for
window_1_3 = [1, 2, 3]  # Weeks 1-24
window_1_2 = [1, 2]  # Weeks 1-12
window_2 = [2]  # Weeks 7-12

# Define the mapping from HF Week Out to the desired name
hf_week_out_mapping = {
    0: 'W <0',
    1: 'W 01-06',
    2: 'W 07-12',
    3: 'W 13-24',
    4: 'W > 24'
}

# Map the HF Week Out to the desired names
calc_df['HF Week Out Name'] = calc_df['hf_week_out'].map(hf_week_out_mapping)



            ### Table that shows aggregate sum of Cost by Category and Window
# Filter the DataFrame for the specific HF Week values
window_1_3_df = calc_df[calc_df['hf_week_out'].isin(window_1_3)]

# Pivot the table to have DC as columns, aggregating the sum of Line Cost
category_agg_window_df = window_1_3_df.pivot_table(index=['category', 'HF Week Out Name'], columns='dc', values='line_cost', aggfunc='sum', fill_value=0).reset_index()

# Sort the pivoted DataFrame by 'Category' and 'HF Week Out Name'
category_agg_window_df = category_agg_window_df.sort_values(by=['HF Week Out Name', 'category'])



            ### Table that shows aggregate sum of Cost by Category, weekly
# Filter the DataFrame for the specific HF Week values
window_1_2_df = calc_df[calc_df['hf_week_out'].isin(window_1_2)]

# Pivot the table to have DC as columns, aggregating the sum of Line Cost
category_agg_weekly_df = window_1_2_df.pivot_table(index=['dc', 'category'], columns='hf_week', values='line_cost', aggfunc='sum', fill_value=0).reset_index()

# Sort the pivoted DataFrame by 'Category' and 'HF Week Out Name'
category_agg_weekly_df = category_agg_weekly_df.sort_values(by=['dc', 'category'])



INFO:py4j.clientserver:Received command c on object id p0


"\n            ### Table that shows Top 5 SKUs by Cost by Category and by Window\n\n\n# Filter the DataFrame for the specific HF Week values\nwindow_2 = calc_df[calc_df['hf_week_out'].isin(window_2)]\n\n# Group by SKU ID, DC, and Category, then calculate the sum of Line Cost for each group\nranked_cat_window = window_2.groupby(['sku_id', 'dc', 'category'])['line_cost'].sum().reset_index()\n\n# Rank the Line Cost within each DC and Category group in descending order\nranked_cat_window['rank'] = ranked_cat_window.groupby(['dc', 'category'])['line_cost'].rank(ascending=False, method='dense')\n\n# Filter to keep only the top 5 SKUs by cost within each Category and DC\ntop_5_ranked_cat_window = ranked_cat_window[ranked_cat_window['rank'] <= 5]\n\n# Merge the top SKUs back with the original DataFrame to include SKU names and other details\ntop_5_ranked_cat_window = pd.merge(top_5_ranked_cat_window, window_2[['sku_id', 'HF Week Out Name']], on='sku_id').drop_duplicates()\n\n# Select only the 

## create Top dataframe

In [None]:
# Grouping the data and aggregating
aggregated_data = calc_df_gsheet\
    .groupby(['SKU ID', 'DC', 'HF Week'])\
    .agg({'Line Cost': 'sum'})\
    .reset_index()

# Add 'Time Stamp' column to aggregated_data
aggregated_data['Time Stamp'] = current_time_berlin

# Filtering the aggregated results
top_waste = aggregated_data[aggregated_data['Line Cost'] > 3000]

print(top_waste.head)

INFO:py4j.clientserver:Received command c on object id p0


<bound method NDFrame.head of                SKU ID  DC   HF Week  Line Cost           Time Stamp
6     BAK-00-002995-5  VE  2025-W35   5581.800  2024-10-03 19:29:24
7     BAK-00-002996-5  BX  2025-W37   4622.700  2024-10-03 19:29:24
9     BAK-00-002996-5  VE  2024-W40   7914.450  2024-10-03 19:29:24
26    BAK-00-003853-5  VE  2025-W07   3833.364  2024-10-03 19:29:24
33    BAK-00-003854-5  VE  2025-W08   3529.440  2024-10-03 19:29:24
...               ...  ..       ...        ...                  ...
3437   SPI-00-90517-5  VE  2025-W45   4768.160  2024-10-03 19:29:24
3459   SPI-00-90523-5  BX  2025-W26   3605.760  2024-10-03 19:29:24
3465  SPI-11-119380-5  VE  2025-W09   3091.200  2024-10-03 19:29:24
3466  SPI-11-119380-5  VE  2025-W13   3709.440  2024-10-03 19:29:24
3468  SPI-11-134623-5  BX  2025-W14   4022.370  2024-10-03 19:29:24

[188 rows x 5 columns]>


## Write to databricks 

### append a ts for identification of the batch

In [None]:
# Add a timestamp column in local timezone
berlin_tz = pytz.timezone('Europe/Berlin')

# Get the current time in Berlin
current_time_berlin = datetime.now(berlin_tz).strftime('%Y-%m-%d %H:%M:%S')

current_time_berlin

'2024-10-03 19:29:24'

In [None]:
# If current_time_berlin is a string, convert it to a timestamp
calc_df['ts'] = pd.to_datetime(current_time_berlin)

### convert to spark and push to databricks

In [None]:
calc_spark = spark.createDataFrame(calc_df)

convert timestampts to strings and blank mlors to numbers

In [None]:
from pyspark.sql.functions import col, date_format
calc_spark = (calc_spark
  .withColumn("expiration_date", date_format("expiration_date", "yyyy-MM-dd"))
  .withColumn("discardment_date", date_format("discardment_date", "yyyy-MM-dd"))
  .withColumn("logical_mlor", col("logical_mlor").cast("double"))
)


convert blanks to nulls

In [None]:
from pyspark.sql.functions import when

# Step 1: Identify all string columns in the DataFrame
string_columns = [col_name for col_name, dtype in calc_spark.dtypes if dtype == "string"]

# Step 2: Replace empty strings with null for all string columns
for col_name in string_columns:
    calc_spark = calc_spark.withColumn(col_name, when(col(col_name) == "", None).otherwise(col(col_name)))

In [None]:
calc_spark.write.mode("append").saveAsTable("views_analysts.fefo_waste_projection")

## Push data to file

### Push Calculation Dataframe to Gsheet

In [None]:
# Save the entire DataFrame to a Google Sheet
spreadsheet = client.open("FEFO Inventoy Management")
worksheet = spreadsheet.worksheet('Calculation')

# Convert datetime.date columns to strings for Google Sheets
calc_df['expiration_date'] = calc_df['expiration_date'].astype(str)
calc_df['discardment_date'] = calc_df['discardment_date'].astype(str)
calc_df['snapshot_time'] = calc_df['snapshot_time'].astype(str)
calc_df['pallet_id'] = calc_df['pallet_id'].astype(str)
calc_df['ts'] = calc_df['ts'].astype(str)


# Modify the column names to make them readable
calc_df_gsheet = calc_df.rename(columns={
    'sku_id': 'SKU ID',
    'batch_id': 'Batch ID',
    'pallet_id': 'ID',
    'expiration_date': 'Expiration Date',
    'discardment_date': 'Discardment Date',
    'remaining_qty': 'Remaining Qty',
    'consumed_qty': 'Consumed Qty',
    'dc': 'DC',
    'name': 'Name',
    'location': 'Location',
    'category': 'Category',
    'unit_cost': 'Unit Cost',
    'line_cost': 'Line Cost',
    'type': 'Type',
    'hf_week': 'HF Week',
    'hf_week_out': 'HF Week Out',
    'temp_class': 'Temperature Class',
    'data_source': 'Data Source',
    'logical_mlor': 'Logical MLOR',
    'mlor_source': 'MLOR source',
    'snapshot_time': 'Snapshot Time',
    'supplier_code': 'Supplier Code'
})

# Convert the DataFrame to a list of lists
data = [calc_df_gsheet.columns.values.tolist()] + calc_df_gsheet.values.tolist()

# Write the entire DataFrame to the Google Sheet
worksheet.clear()  # Clear existing data
worksheet.update('A1', data)  # Update with new data starting from cell A1

print("FEFO Calculation data successfully written to Google Sheet.")

INFO:py4j.clientserver:Received command c on object id p0
  worksheet.update('A1', data)  # Update with new data starting from cell A1


FEFO Calculation data successfully written to Google Sheet.


## Log script end

In [None]:
# Select "Sheet4" within the Google Sheet
worksheet = spreadsheet.worksheet("log")

# Create the message to write
message = f"Job FEFO WASTE CALCULATION executed succesfully at {current_time_berlin}"

# Find the next empty row
next_empty_row = len(worksheet.col_values(1)) + 1

# Write the message in the next empty row
worksheet.update_cell(next_empty_row, 1, message)

INFO:py4j.clientserver:Received command c on object id p0


{'spreadsheetId': '1K96XSVvJmy2qpk0tG7gjrFrKlC-0ygpgyy2dQglqZBA',
 'updatedRange': 'log!A164',
 'updatedRows': 1,
 'updatedColumns': 1,
 'updatedCells': 1}

## Aggregations

In [None]:

# Define the weeks you want to filter for
window_1_3 = [1, 2, 3]  # Weeks 1-24
window_1_2 = [1, 2]  # Weeks 1-12
window_2 = [2]  # Weeks 7-12

# Define the mapping from HF Week Out to the desired name
hf_week_out_mapping = {
    0: 'W <0',
    1: 'W 01-06',
    2: 'W 07-12',
    3: 'W 13-24',
    4: 'W > 24'
}

# Map the HF Week Out to the desired names
calc_df['HF Week Out Name'] = calc_df['hf_week_out'].map(hf_week_out_mapping)



            ### Table that shows aggregate sum of Cost by Category and Window
# Filter the DataFrame for the specific HF Week values
window_1_3_df = calc_df[calc_df['hf_week_out'].isin(window_1_3)]

# Pivot the table to have DC as columns, aggregating the sum of Line Cost
category_agg_window_df = window_1_3_df.pivot_table(index=['category', 'HF Week Out Name'], columns='dc', values='line_cost', aggfunc='sum', fill_value=0).reset_index()

# Sort the pivoted DataFrame by 'Category' and 'HF Week Out Name'
category_agg_window_df = category_agg_window_df.sort_values(by=['HF Week Out Name', 'category'])



            ### Table that shows aggregate sum of Cost by Category, weekly
# Filter the DataFrame for the specific HF Week values
window_1_2_df = calc_df[calc_df['hf_week_out'].isin(window_1_2)]

# Pivot the table to have DC as columns, aggregating the sum of Line Cost
category_agg_weekly_df = window_1_2_df.pivot_table(index=['dc', 'category'], columns='hf_week', values='line_cost', aggfunc='sum', fill_value=0).reset_index()

# Sort the pivoted DataFrame by 'Category' and 'HF Week Out Name'
category_agg_weekly_df = category_agg_weekly_df.sort_values(by=['dc', 'category'])



INFO:py4j.clientserver:Received command c on object id p0


"\n            ### Table that shows Top 5 SKUs by Cost by Category and by Window\n\n\n# Filter the DataFrame for the specific HF Week values\nwindow_2 = calc_df[calc_df['hf_week_out'].isin(window_2)]\n\n# Group by SKU ID, DC, and Category, then calculate the sum of Line Cost for each group\nranked_cat_window = window_2.groupby(['sku_id', 'dc', 'category'])['line_cost'].sum().reset_index()\n\n# Rank the Line Cost within each DC and Category group in descending order\nranked_cat_window['rank'] = ranked_cat_window.groupby(['dc', 'category'])['line_cost'].rank(ascending=False, method='dense')\n\n# Filter to keep only the top 5 SKUs by cost within each Category and DC\ntop_5_ranked_cat_window = ranked_cat_window[ranked_cat_window['rank'] <= 5]\n\n# Merge the top SKUs back with the original DataFrame to include SKU names and other details\ntop_5_ranked_cat_window = pd.merge(top_5_ranked_cat_window, window_2[['sku_id', 'HF Week Out Name']], on='sku_id').drop_duplicates()\n\n# Select only the 

### Push aggregations data to sheet

In [None]:
# Save the entire DataFrame to a Google Sheet
spreadsheet = client.open("FEFO Inventoy Management")
worksheet = spreadsheet.worksheet('Overview')


# Convert the DataFrame to a list of lists
data_1 = [category_agg_window_df.columns.values.tolist()] + category_agg_window_df.values.tolist()

# Write the entire DataFrame to the Google Sheet
worksheet.clear()  # Clear existing data
worksheet.update(range_name='A1', values=data_1)  # Update with new data starting from cell A1



# 
# Convert the DataFrame to a list of lists
data_2 = [category_agg_weekly_df.columns.values.tolist()] + category_agg_weekly_df.values.tolist()

# Write the entire DataFrame to the Google Sheet
worksheet.update(range_name='A31', values=data_2)  # Update with new data starting from cell A1


INFO:py4j.clientserver:Received command c on object id p0


{'spreadsheetId': '1K96XSVvJmy2qpk0tG7gjrFrKlC-0ygpgyy2dQglqZBA',
 'updatedRange': 'Overview!A31:N47',
 'updatedRows': 17,
 'updatedColumns': 14,
 'updatedCells': 238}

### Push aggregation data to PROCUREMENT PERFORMANCE

In [None]:
# Introduce a sleep time to avoid hitting API limits
time.sleep(3)

INFO:py4j.clientserver:Received command c on object id p0


In [None]:
import logging

# Set up logging (optional)
logging.basicConfig(level=logging.INFO)

retries = 3
for attempt in range(retries):
    try:
        # Perform your Google Sheets operation
        spreadsheet = client.open("Procurement Performance")
        worksheet = spreadsheet.worksheet('waste_projection_pivots')
        worksheet.clear()
        worksheet.update(range_name='C4', values=data_2) 
        worksheet.update('A1', [[message]])
        worksheet.update('E2', [["Online Menu W:01-06"]])
        worksheet.update('K2', [["Offline Menu W:07-12"]])

        # Log the successful attempt
        logging.info(f"Operation succeeded on attempt {attempt + 1}")
        # Optionally, you can print instead of logging:
        # print(f"Operation succeeded on attempt {attempt + 1}")
        
        break  # Exit loop on success
    except APIError as e:
        if attempt < retries - 1:
            logging.warning(f"Attempt {attempt + 1} failed, retrying...")
            time.sleep(2)  # Add delay before retrying
        else:
            logging.error("All attempts failed.")
            raise e  # If last attempt fails, raise the exception

INFO:py4j.clientserver:Received command c on object id p0
  worksheet.update('A1', [[message]])
  worksheet.update('E2', [["Online Menu W:01-06"]])
  worksheet.update('K2', [["Offline Menu W:07-12"]])
INFO:root:Operation succeeded on attempt 1


## create Top dataframe

In [None]:
# Grouping the data and aggregating
aggregated_data = calc_df_gsheet\
    .groupby(['SKU ID', 'DC', 'HF Week'])\
    .agg({'Line Cost': 'sum'})\
    .reset_index()

# Add 'Time Stamp' column to aggregated_data
aggregated_data['Time Stamp'] = current_time_berlin

# Filtering the aggregated results
top_waste = aggregated_data[aggregated_data['Line Cost'] > 3000]

print(top_waste.head)

INFO:py4j.clientserver:Received command c on object id p0


<bound method NDFrame.head of                SKU ID  DC   HF Week  Line Cost           Time Stamp
6     BAK-00-002995-5  VE  2025-W35   5581.800  2024-10-03 19:29:24
7     BAK-00-002996-5  BX  2025-W37   4622.700  2024-10-03 19:29:24
9     BAK-00-002996-5  VE  2024-W40   7914.450  2024-10-03 19:29:24
26    BAK-00-003853-5  VE  2025-W07   3833.364  2024-10-03 19:29:24
33    BAK-00-003854-5  VE  2025-W08   3529.440  2024-10-03 19:29:24
...               ...  ..       ...        ...                  ...
3437   SPI-00-90517-5  VE  2025-W45   4768.160  2024-10-03 19:29:24
3459   SPI-00-90523-5  BX  2025-W26   3605.760  2024-10-03 19:29:24
3465  SPI-11-119380-5  VE  2025-W09   3091.200  2024-10-03 19:29:24
3466  SPI-11-119380-5  VE  2025-W13   3709.440  2024-10-03 19:29:24
3468  SPI-11-134623-5  BX  2025-W14   4022.370  2024-10-03 19:29:24

[188 rows x 5 columns]>


### load the top to a temporary storage

In [None]:
# Convert Pandas DataFrame to Spark DataFrame
top_waste_spark = spark.createDataFrame(top_waste)

# Now write the Spark DataFrame to disk
top_waste_spark.write.format("parquet").mode("overwrite").save("/dbfs/tmp/top_waste.parquet")

INFO:py4j.clientserver:Received command c on object id p0
