# HNN Knapsack

## Section 2: HNN Model - Without Parameter Tunning

### 1. Load Packages

In [None]:
import numpy as np
import pandas as pd
import json
import time
from tqdm import tqdm

### 2. Read Processed Data

In [None]:
def read_json_to_dict(file_path):
    with open(file_path, 'r') as json_file:
        data_dict = json.load(json_file)
    return data_dict

data = read_json_to_dict('data/data2.json')

### 3. HNN Model

In [None]:
import torch
import torch.nn as nn

class HNNKnapsackYT(nn.Module):

    def __init__(self, data, alpha, rep, is_solve_special_zone=True, is_solve_opt_priority=True):
        super(HNNKnapsackYT, self).__init__()
        
        # Parameters
        self.alpha = alpha
        self.rep = rep
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.is_solve_special_zone = is_solve_special_zone
        self.is_solve_opt_priority = is_solve_opt_priority

        # Load data
        self.data = data
        self.stor_idnt = data['stor_idnt']
        self.stor_spco = data['stor_spco']
        self.stor_spcs = data['stor_spcs']
        self.prod_idnt = torch.tensor(data['prod_idnt'], dtype=torch.float32).flatten().to(self.device)
        self.prod_spco = torch.tensor(data['prod_spco'], dtype=torch.float32).flatten().to(self.device)
        self.prod_kpi1 = torch.tensor(data['prod_kpi1'], dtype=torch.float32).flatten().to(self.device)
        self.prod_kpi2 = torch.tensor(data['prod_kpi2'], dtype=torch.float32).flatten().to(self.device)
        self.prod_kpic = torch.tensor(data['prod_kpic'], dtype=torch.float32).flatten().to(self.device)
        self.prod_optg = torch.tensor(data['prod_optg'], dtype=torch.float32).flatten().to(self.device)
        self.prod_sztg = torch.tensor(data['prod_sztg'], dtype=torch.float32).flatten().to(self.device)
        self.prod_sztn = torch.tensor(data['prod_sztn'], dtype=torch.float32).flatten().to(self.device)

        # Penalization weights
        self.T_List = (10 * torch.logspace(0, 2000, steps=rep, base=0.99)).to(self.device)
    
    # Activation and penalization functions
    # Activation function
    def sigmoid(self, value):
        return torch.sigmoid(value)

    # Penalty function for space constrains
    def pnf1(self, value, alpha):
        return 50 * alpha * torch.relu(value)

    # Penalty function for selection constrains
    def pnf2(self, value):
        return 50 * value ** 2

    # Penalty function for priority constrains
    def pnf3(self, value, alpha):
        return torch.clamp(alpha * value, min=0)

    # Function to create a matrix that identify product
    def create_product_identify_matrix(self, product_id):
        product_id = product_id.flatten()
        unique_ids, inverse_indices = torch.unique(product_id, return_inverse=True)
        C = torch.zeros(len(unique_ids), len(product_id), dtype=torch.float32, device=self.device)
        C[inverse_indices, torch.arange(len(product_id), device=self.device)] = 1
        return C

    # Model computation with parameter tuning
    def forward(self):

        t0 = time.time()

        print(f"==========================================================")
        print(f"Solving Store ID : {self.stor_idnt}")

        # Create identification vectors
        C = self.create_product_identify_matrix(self.prod_idnt)

        n_option = len(self.prod_idnt)
        n_product = len(C)

        # Initialize selection variable
        V = torch.full((n_option,), 0.5, dtype=torch.float32, device=self.device)

        # Initialize penalization parameter (alpha) and perform parameter tuning
        initial_alpha = self.alpha
        max_iter = 10 
        V_best = V.clone()  
        U_best = float('-inf')  
        U_tolr = 0.01 

        for iter in range(1, max_iter + 1):

            print(f"Solving Iteration {iter}.")
            
            # Set the penalization parameter for each tuning iteration
            current_alpha = initial_alpha / iter

            # For each iteration
            for t in tqdm(range(self.rep), desc="Solving Epochs"):

                # Penalization parameter
                T = self.T_List[t]

                # Training for each option with shuffled indices
                for i in torch.randperm(n_option):
                    V1 = V.clone()
                    V0 = V.clone()
                    V1[i] = 1
                    V0[i] = 0

                    # 1. Penalize inequality constraints
                    # (1) Special zone constraint
                    if self.is_solve_special_zone:
                        PartA = self.pnf1(((self.prod_spco * self.prod_sztg) @ V1 - self.stor_spcs), alpha=current_alpha) - \
                                self.pnf1(((self.prod_spco * self.prod_sztg) @ V0 - self.stor_spcs), alpha=current_alpha)
                        PartB = self.pnf1(((self.prod_spco * self.prod_sztn) @ V1 - (self.stor_spco - (self.prod_spco * self.prod_sztg) @ V1)), alpha=current_alpha) - \
                                self.pnf1(((self.prod_spco * self.prod_sztn) @ V0 - (self.stor_spco - (self.prod_spco * self.prod_sztg) @ V0)), alpha=current_alpha)
                    else:
                        PartA = self.pnf1((self.prod_spco @ V1 - self.stor_spcs), alpha=current_alpha) - \
                                self.pnf1((self.prod_spco @ V0 - self.stor_spcs), alpha=current_alpha)
                        PartB = 0
                    
                    # (2) Space constraint
                    if self.is_solve_opt_priority:
                        diff1 = (C * self.prod_optg) @ V1
                        diff0 = (C * self.prod_optg) @ V0
                        PartC = 2 * torch.sum(self.pnf3((diff1.unsqueeze(1) - diff1.unsqueeze(0) - 1), alpha=current_alpha)) - \
                                2 * torch.sum(self.pnf3((diff0.unsqueeze(1) - diff0.unsqueeze(0) - 1), alpha=current_alpha))
                    else:
                        PartC = 0

                    # 2. Penalize equality constraints
                    # (4) Selection constraint
                    PartD = torch.sum(self.pnf2(C @ V1 - 1)) - \
                            torch.sum(self.pnf2(C @ V0 - 1))
                    
                    # Calculate the penalization function and final selection result
                    V[i] = self.sigmoid((-1 / T) * (-self.prod_kpic[i] + (current_alpha / T) * (PartA + PartB + PartC + PartD)))

            # Compute U after tuning iteration
            U = torch.sum(self.prod_kpic * V)

            # If U improves, update the best U and tuned selection variable
            if U > U_best:
                U_best = U
                V_best = V.clone()

            # Check if stopping condition is met
            if abs(U - U_best) < U_tolr:
                break
        
        print(f"Finish. Time used: {round((time.time()-t0)/60,2)} minutes.")
        print(f"==========================================================")
        print()

        # Return the tuned selection variable
        return V_best

### 4. Solving the Problem with Model

In [None]:
t0 = time.time()

for i in range(len(data)):

    try:
        hnn_model = HNNKnapsackYT(data = data[str(i)], alpha = 0.1, rep = 1000)
        result = hnn_model()
        data[str(i)]["solution"] = result.tolist()

    except:
        pass

In [None]:
with open("data/data3b.json", "w") as json_file:
    json.dump(data, json_file, indent=4, ensure_ascii=False) 