In [1]:
from uuid import uuid4
from datetime import datetime
import numpy as np
import json
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
import math
import os
import pandas as pd
import random
import itertools
from pathlib import Path
from scipy.stats import truncnorm, norm, uniform, arcsine, binom
from types import SimpleNamespace

In [2]:
class NpEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()

In [3]:
class Instance():
    def __init__(self, pattern='normal', nb_cst = 50):
        
        self.idx = pattern + '-' + str(nb_cst) + '_' + datetime.now().strftime('%Y%m-%d%H-%M%S-') + str(uuid4())

        # Instance features
        self.nb_cst = nb_cst
        if pattern in ['normal', 'uniform', 'bimodal']:
            self.nb_features = 0
            noise_scale = 1
        elif pattern in ['contextual']:
            self.nb_features = 8
            noise_scale = 2
            self.contextual = {}
            self.contextual['noise_scale'] = noise_scale
            
        if pattern in ['bimodal']:
            difference = 2 * np.random.randint(2, 11, nb_cst)
            mean1 = np.random.randint(10, 51 - difference)
            std1 = np.random.randint(2, 11, nb_cst)
            mean2 = np.random.randint(50 + - difference, 101)
            std2 = np.random.randint(2, 11, nb_cst)
            self.mean_demand = np.vstack([mean1, mean2]).T
            self.std_demand = np.vstack([std1, std2]).T
            self.max_inventory = np.ceil(np.random.randint(2, 5, nb_cst) * self.mean_demand.mean(axis=1)).astype(int)
            total_vehicle_capacity = (3/2) * self.mean_demand.mean(axis=1).sum()
        else:
            self.mean_demand = np.random.randint(10, 101, nb_cst)
            self.std_demand = noise_scale * np.random.randint(2, 11, nb_cst)
            self.max_inventory = (np.random.randint(2, 5, nb_cst) * self.mean_demand)
            total_vehicle_capacity = (3/2) * np.sum(self.mean_demand)
        
        self.holding_cost = np.random.uniform(0.02, 0.1, nb_cst)       
        self.vehicle_capacity = np.ceil(total_vehicle_capacity).astype(int)
 
        # Coordinates
        x = np.floor(np.random.rand(nb_cst + 1) * 500)
        self.x = (x[1:] - x[0]).astype(int)
        y = np.floor(np.random.rand(nb_cst + 1) * 500)
        self.y = (y[1:] - y[0]).astype(int)
    
    def create(self, nb_samples = 5000):
        
        pattern = self.idx.split('-')[0]
        if pattern == 'normal':
            lower, upper = 0, np.array(self.max_inventory)
            mu, sigma = np.array(self.mean_demand), np.array(self.std_demand)
            rv = truncnorm((lower - mu) / sigma, (upper - mu) / sigma, loc=mu, scale=sigma)
            setattr(self, "demand_hist", rv.rvs((50, instance.nb_cst)).transpose())
            setattr(self, "demand_eval", rv.rvs((10, 5, instance.nb_cst)).transpose())
            setattr(self, "demand_test", rv.rvs((90, instance.nb_cst)).transpose())
        elif pattern == 'uniform' :   
            rv = uniform(0, 0.5 * np.array(instance.max_inventory))
            setattr(self, "demand_hist", rv.rvs((50, instance.nb_cst)).transpose())
            setattr(self, "demand_eval", rv.rvs((10, 5, instance.nb_cst)).transpose())
            setattr(self, "demand_test", rv.rvs((90, instance.nb_cst)).transpose())   
        elif pattern == ['contextual']:
            distribution = random.choice(['truncnorm', 'uniform', 'arcsin'])
            n_informative = np.random.randint(2, self.nb_features-2+1)
            rng = np.random.default_rng()
            self.contextual['distribution'] = distribution
            rng = np.random.default_rng()

            if distribution == 'truncnorm':
                lower, upper = -1, 1
                mu, sigma = 0, 1  
                dist = truncnorm((lower - mu) / sigma, (upper - mu) / sigma, loc=mu, scale=sigma)
                X = dist.rvs(size=(self.nb_cst, nb_samples, self.nb_features))
            elif distribution == 'uniform':
                X = (rng.uniform(size=(self.nb_cst, nb_samples, self.nb_features))-0.5)*2
            elif distribution == 'arcsin':
                X = (arcsine.rvs(size=(self.nb_cst, nb_samples, self.nb_features))-0.5)*2
            else:
                raise Exception('distribution not defined')
                
            X = X * np.sqrt(np.random.randint(10, 101, (self.nb_cst, nb_samples, self.nb_features)))
            
            combinations = list(itertools.combinations(range(self.nb_features), 2))
            ground_truth = np.zeros((len(combinations) + self.nb_features, 1))
            ground_truth[:n_informative, :] = rng.uniform(size=(n_informative, 1)) * np.random.choice([-1,1], (n_informative, 1))
            y = self.mean_demand.reshape(self.nb_cst, 1).repeat(nb_samples, axis=1) + np.squeeze(np.dot(X, ground_truth[:self.nb_features,:]))
            for k, (f1, f2) in enumerate(combinations):
                if random.choice([False, True]):
                    ground_truth[self.nb_features + k, :] = rng.uniform(size=(1, 1)) * np.random.choice([-1,1], (1, 1))
                    y += np.squeeze(np.dot((X[:,:,f1] * X[:,:,f2]).reshape(self.nb_cst, nb_samples, 1), ground_truth[self.nb_features+k,:].reshape(1,-1)))

            if self.contextual['noise'] > 0:
                y += rng.normal(scale=self.std_demand.reshape(-1, 1), size=y.shape)

            demand = y.clip(0, self.max_inventory.reshape(-1, 1))
            
            self.contextual['coef'] = ground_truth
            
            self.samples_hist = dict()
            self.samples_eval = dict()
            self.samples_test = dict()

            for i in range(self.nb_cst):
                sample_df = pd.DataFrame(np.hstack([X[i], demand[i].reshape(-1,1)])).sample(frac=1).reset_index(drop=True).apply(lambda x: {'features': x[:-1].values, 'label': x.iloc[-1]}, axis=1)

                hist_set = sample_df.iloc[100:].reset_index(drop=True)
                hist_set.index += 1
                test_set = sample_df.iloc[:25].reset_index(drop=True)
                test_set.index += 1
                self.samples_hist[i+1] = dict(hist_set)
                self.samples_test[i+1] = dict(test_set)

                self.samples_eval[i+1] = dict()
                for scenario in range(5):
                    eval_set = sample_df.iloc[25+scenario*15:40+scenario*15].reset_index(drop=True)
                    eval_set.index += 1
                    self.samples_eval[i+1][scenario] = dict(eval_set)
        elif pattern == 'bimodal':
            # Generate truncated samples from the Gaussian mixture model
            samples1 = truncnorm((0 - self.mean_demand[:,0]) / self.std_demand[:,0], (self.max_inventory - self.mean_demand[:,0]) / self.std_demand[:,0], loc=self.mean_demand[:,0], scale=self.std_demand[:,0])
            samples2 = truncnorm((0 - self.mean_demand[:,1]) / self.std_demand[:,1], (self.max_inventory - self.mean_demand[:,1]) / self.std_demand[:,1], loc=self.mean_demand[:,1], scale=self.std_demand[:,1])

            # Combine the samples from the two distributions
            for demand_set, demand_nb in [('hist', 50), ('eval', 50), ('test', 40)]:
                nb_samples = binom.rvs(n=1, p=0.5, size=demand_nb)
                demand = np.concatenate((samples1.rvs(((nb_samples==0).sum(), self.nb_cst)), samples2.rvs(((nb_samples==1).sum(), self.nb_cst)))).transpose()
                setattr(self, "demand_" + demand_set, demand)

    def save(self):
        with open(self.idx + '.json', 'w') as fp:
            json.dump(instance.__dict__, fp, cls = NpEncoder)  

In [4]:
for i in range(1):
    instance = Instance(pattern='bimodal', nb_cst=10)
    instance.create()
    instance.save()