In [42]:
# importing libraries

import numpy as np
import pandas as pd
from scipy.optimize import minimize_scalar
from scipy.optimize import minimize
import math

##########################################################################################################################

# initialize all given data to variables:

def exploration(c,T,unit_T, t,xt,K,P1,p,z,customers_per_price, test_counts):
     
    
    # c,T,t,P1,P2,K,p,pt,dt,p_opt,p_low,p_upp,z,z_opt,
    # 
    #
    # c     = total inventory.
    # T     = total booking time.
    # t     = time instances to divide booking time.
    # xt    = Unit of Booking time T one instance has.
    # K     = subphases in a phase (K x t = T)
    # P1    = total percent of phase 1.
    # P2    = total percent of phase 2,  P1+P2 = T, a phase can have K subphases.
    # p     = initial price created after imputs of Highest and Lowest price taken form user.
    # df    = data frame for Exploration demannd generated.
    # pt    = price for a particular instance, pt is in range p.
    # z     = dual variable at beginning, can be zero at 1st instance.
    # z_opt = optimal dual variable.
    # p_opt = optimal price in exploration.
    # nabla = Nabla (∇) used in exploration for narrowing price intervals near optimal price.
    # p_low = lower price limit when we narrow price intervals towards optimal price.
    # p_upp = upper price limit when we narrow price intervals towards optimal price.
    # num_steps = Total number of Price wanted
    
    # Customer_data = Empty List to store customer data
    # demand_probability    = demand function for a particular instance, probability of buying at price pt and instance t.



    #  Demand Generation function used in below code to generte demand

    # Setting Parameters for demand generation
    # np.random.seed(42)

    
    customer_data = []
    customer_id = 1

    
    

    for pt in p:
        
        base_prob = logistic_demand(pt,p)
    
        for _ in range(customers_per_price):

            # Add controlled noise
            noise = np.random.normal(0, 0.5)
            prob = np.clip(base_prob + noise, 0.01, 0.99)  # Clamp between [0.01, 0.99]
            buy = int(np.random.rand() < prob)
            customer_data.append([customer_id, pt, buy])
            customer_id += 1
        # Update test_counts for this price
        test_counts = test_counts_function(p, pt)
    
    # Print the generated demand
    # data frame for customer data 
    df = pd.DataFrame(customer_data, columns=["customer_id", "price", "buy"])
    print(df)
    # --- Save to CSV ---
    df.to_csv("customer_demand_log.csv", index=False)
    print("Saved customer data to 'customer_demand_log.csv'")


    # Reduce inventory after demand generation and selling products:
    # Calculate total units sold in this exploration phase
    units_sold = df['buy'].sum()
    # Reduce inventory c by units sold (ensure c is float or int)
    c = float(c) - units_sold
    if c < 0:
        c = 0  # inventory cannot be negative
    print(f"Units sold in this phase: {units_sold}")
    print(f"Remaining inventory after sales: {c}")
    

    #calling - compute demand probability for each price
    demand_probability, delta = compute_demand_probability(df, p, test_counts, T)


    # calling Revenue function to calculate revenue at each price
    total_revenue, revenue_per_price = calculate_revenue(df)
    print("\n*** Revenue Calculation ***")
    for pt in p:
        print(f"Price: {pt}, Revenue: {revenue_per_price.get(pt, 0):.2f}")
    print(f"Total Revenue: {total_revenue:.2f}")


    # call z* dual optimal calculator function, Make sure c is a float if it was entered via input()
    z_opt = dual_objective(float(z), demand_probability, p, float(c))
    print("\n","Optimal dual variable (z*):", z_opt)


    # Calling Optimal price function:
    p_opt = exploration_optimal_price(z_opt, demand_probability, p)


    # Narrowing Price intervals
    p_low, p_upp, p = narrow_price_interval(p_opt, T, customers_per_price, p, context['num_steps'], test_counts, delta)
    print("*New Prices after Narrowing: ",p)


    #Calculate remaining Booking period 
    T=T-(context['num_steps']*xt)
    print ("Remaining booking period = ", T, unit_T)
    
    return df, z_opt, p_opt, c, T, test_counts, delta, p_low, p_upp, p #demand_prob    


###########################################################################################################################
# (Logistic) - Demand function for Simulation 

"""Simulates demand using logistic function"""
def logistic_demand(pt, p, p0=0,L=1, k=0.025 ):
        p0=(p.min()+p.max())/2
        return L / (1 + np.exp(k * (pt - p0)))   
"""Returns the base probability of purchase at price p"""


##########################################################################################################################
# Test Counts Function - counts how many instances a price was tested in exploration
def test_counts_function(p: list, pt: float = None) -> dict:
    global test_counts
    if pt is None:
        test_counts = {float(price): 0 for price in p}  # Ensure keys are floats
        return test_counts
    if pt in p and pt in test_counts:
        test_counts[pt] += 1
    return test_counts


############################################################################################################################
# Compute Demand Probability Function - # Dx(p)

# Global dictionary to store empirical demand probabilities per price
demand_probability = {}




# Modified Compute Demand Probability Function
def compute_demand_probability(df, p, test_counts, T):
    """
    Calculates empirical demand probabilities and confidence bounds from exploration data.
    Uses 'p' list and 'pt' to access prices as per user's convention.
    Stores result in global variable `demand_probability`.
    Returns demand_probability and delta dictionaries.
    """
    global demand_probability
    demand_probability = {}
    delta = {}

    for pt in p:
        subset = df[df['price'] == pt]
        if not subset.empty:
            demand_prob = subset['buy'].sum() / subset['buy'].count()
            demand_probability[pt] = demand_prob
            # Compute delta if tested
            if test_counts.get(pt, 0) > 0:
                delta[pt] = math.sqrt((2 * math.log(2 * T * len(p) / 0.05)) / test_counts[pt])
            else:
                delta[pt] = float('inf')  # Large delta if no tests
        else:
            demand_probability[pt] = 0.0
            delta[pt] = float('inf')

    print("\n*** Empirical Demand Probabilities and Confidence Bounds ***")
    for pt in p:
        print(f"Price: {pt}, Demand Probability: {demand_probability[pt]:.2f}, Delta: {delta[pt]:.4f}")
    
    return demand_probability, delta



#################################################################################################
# Compute Revenue for each price

def calculate_revenue(df):
    """
    Calculates total revenue from exploration sales data
    :param df: DataFrame containing sales data (must have 'price' and 'buy' columns)
    :return: Total revenue, Revenue per price point dictionary
    """
    # Calculate revenue for each transaction
    df['revenue'] = df['price'] * df['buy']
    
    # Calculate total revenue
    total_revenue = df['revenue'].sum()
    
    # Calculate revenue per price point
    revenue_per_price = df.groupby('price')['revenue'].sum().to_dict()
    
    return total_revenue, revenue_per_price


###################################################################################################
# # Dual optimization: Find optimal value i.e. - z_opt in code and z* in paper 

"""For bounds from 0 to maximum price"""
""" Minimize_scalar used to optimize"""
# def dual_objective(z, demand_probability, p, c):
#     def objective(z):
#     # def objective(z_val):
#         return max((pt - z) * demand_probability[pt] for pt in p) + z * c

#     z_opt = minimize_scalar(objective, bounds=(0, max(p)), method='bounded')
#     return z_opt.x

""" For bounds from z (one unit of inventory - given by user at beginning) to maximum price"""
""" only Minimize used to optimize"""
def dual_objective(z, demand_probability, p, c):
    def objective(z_val):
        z_val = z_val[0]
        return (max((pt - z_val) * demand_probability[pt] for pt in p) + z_val * c)

    result = minimize(objective, x0=[z], bounds=[(0, max(p))])
    return result.x[0]


#######################################################################################################
# Calculate and find optimal price - p* in paper

def exploration_optimal_price (z_opt, demand_probability, p):
    """
    Finds optimal price p* that maximizes (p - z*) * D_prob.(pt) (demand_probability[pt])
    where D_prob.(pt) is the empirical demand probability at price p.
    
    Parameters:
    - z_opt: optimal dual value (float)
    - demand_probability: dict mapping price -> demand probability
    - p: list or array of price points

    Returns:
    - p_opt: optimal price
    """
    optimal_price = None
    best_value = float('-inf')

    for pt in p:
        demand = demand_probability.get(pt, 0)
        value = (pt - float(z_opt)) * demand
        if value > best_value:
            best_value = value
            optimal_price = pt

    print(f"\n*** Optimal Price Calculation ***")
    print(f"Optimal Price (p*): {optimal_price}, Objective Value: {best_value:.4f}")
    return optimal_price


################################################################################################
# Modified Narrow Price Interval Function
def narrow_price_interval(p_opt, T, customers_per_price, p, num_steps, test_counts, delta):
    global demand_probability
    # global  test_counts
    if not all(test_counts.get(pt, 0) >= 1 for pt in p):
        print("\n*** Not enough tests to narrow price interval ***")
        return p[0], p[-1], p
    
    # Check confidence bounds
    if not all(delta.get(pt, float('inf')) < 0.6 for pt in p):
        print("\n*** Confidence bounds too large to narrow price interval ***")
        return p[0], p[-1], p
    
    # Estimate derivative for gradient
    def estimate_demand_derivative(pt, p):
        sorted_p = sorted(p)
        idx = sorted_p.index(pt) if pt in sorted_p else np.searchsorted(sorted_p, pt)
        if idx == 0 or idx == len(sorted_p) - 1:
            return 0.0
        p_prev, p_next = sorted_p[idx - 1], sorted_p[idx + 1]
        return (demand_probability.get(p_next, 0) - demand_probability.get(p_prev, 0)) / (p_next - p_prev)
    
    # Compute gradient at p_opt
    nabla_p = demand_probability[p_opt] + (p_opt - z_opt) * estimate_demand_derivative(p_opt, p)
    
    # Narrow based on gradient
    sorted_p = sorted(p)
    idx = sorted_p.index(p_opt) if p_opt in sorted_p else np.searchsorted(sorted_p, p_opt)
    if nabla_p > 0:
        new_p = [x for x in sorted_p if x >= sorted_p[max(idx - 1, 0)]]
    elif nabla_p < 0:
        new_p = [x for x in sorted_p if x <= sorted_p[min(idx + 1, len(sorted_p) - 1)]]
    else:
        new_p = sorted_p[max(0, idx - 1):min(len(sorted_p), idx + 2)]
    
    # Refine grid
    p_low, p_upp = min(new_p), max(new_p)
    new_step = (p_upp - p_low) / (num_steps - 1)
    new_p = [float(x) for x in np.linspace(p_low, p_upp, num_steps)]
    
    # Transfer data
    new_demand_probability = {pt: 0.0 for pt in new_p}
    for old_p in demand_probability:
        closest_p = min(new_p, key=lambda x: abs(x - old_p))
        new_demand_probability[closest_p] = demand_probability[old_p]  # Use closest probability
    test_counts = {pt: 0 for pt in new_p}  # Reset here
    demand_probability = new_demand_probability



    print(f"\n*** Price Interval Narrowed ***\nGradient = {nabla_p:.4f}")
    print(f"Narrowed Interval: [{p_low:.2f}, {p_upp:.2f}]")
    print(f"New Price List: {new_p}")
    
    return p_low, p_upp, new_p

###################################################################################################
# Generate price ranges step wise as mentioned by user

# shared object just for num_steps: number of price points
# made this as num_steps is going to be used in two different functions
context = {}

# Generate price ranges step wise as mentioned by user
def generate_price_list():
    min_price = float(100)  #float(input("Enter the minimum price: "))
    max_price = float(500)  #float(input("Enter the maximum price: "))
    num_steps = int(5)   #int(input("Enter how many price points you want: "))
    context['num_steps'] = num_steps
    price_list = (np.linspace(min_price, max_price, num_steps))
    return price_list


####################################################################################################
# Calculate how much unit of time is allocated to Phase 1 

def phase_one_days_calc():
    xp=int(33)    #int(input("Enter % of Phase 1, between 0 to 100:"))
    num_steps = context['num_steps']
    if (T*(xp/100)) % num_steps == 0:
        phase = T*xp
    else:
        phase = (T*(xp/100)) - ((T*(xp/100))%num_steps)
    return phase


#####################################################################################################
# Take input from the user
# or
# Give given data to code basically

c     = float(500) #float(input("Total Inventory 'c' :   "))
T     = float(100)  #float(input("Total booking peroid 'T' :  "))
unit_T= "days" #input("Enter unit of booking peroid (Usually days):")
t     = float(50)  #float(input("Instances 't' to divide Time peroid T into :  "))
xt    = T/t
p     = generate_price_list()
P1    = phase_one_days_calc()   
K     = int (P1/(context['num_steps']*xt))
z     = 80   #input("Enter imitial value of 1 unit of inventory (Can be zero): ")
customers_per_price = 10    #int(input("Enter number of customer approaching per price (e.g. 10):"))


print("*** USER INPUTS ***")
print("Total Inventory c:", c)
print("Total booking peroid T:", T,unit_T)
print("1 time Instance from t i.e. xt:", xt, unit_T)
print("Generated prices:", p)
print("Total time allocated to Phase 1:",P1)
print("Total subphases 'K' in Phase 1: ",K, '\n')

######################################################################################################################
# Conduct Phase 1   i.e.  ***Exploration***: Call the function with user inputs as given above
# Or
# Phase 1 - Loop

# test_counts = test_counts_function(p)

for ki in range(K): 

    if c>0: # Inventory should exist  before sale
    
        print("\n","***Expoloration Detailed Data:***","Phase 1 (P1), Subphase:",ki+1)
        
        df, z_opt, p_opt, c, T,test_counts, delta, p_low, p_upp, p = exploration(c, T, unit_T, t, xt, K, P1, p, z, customers_per_price, test_counts)
        
        # test_counts = test_counts_function(p)
        
        if ki == K-1:
            print("\n","***Exploration finished over here***","\n")
            print("\n",'The final values after Exploration are:',"\n")
            print("\n","Dual variable, z* = ",z_opt)
            print("Optimal Price, p* = ",p_opt)
            print("Remaining Inventory, c = ",c)
            print("Remaining Booking Period, T = ",T)

    else:
        print("***NOT SUFFICIENT INVENTORY***")





*** USER INPUTS ***
Total Inventory c: 500.0
Total booking peroid T: 100.0 days
1 time Instance from t i.e. xt: 2.0 days
Generated prices: [100. 200. 300. 400. 500.]
Total time allocated to Phase 1: 30.0
Total subphases 'K' in Phase 1:  3 


 ***Expoloration Detailed Data:*** Phase 1 (P1), Subphase: 1
    customer_id  price  buy
0             1  100.0    0
1             2  100.0    1
2             3  100.0    1
3             4  100.0    1
4             5  100.0    1
5             6  100.0    1
6             7  100.0    1
7             8  100.0    1
8             9  100.0    1
9            10  100.0    1
10           11  200.0    0
11           12  200.0    1
12           13  200.0    1
13           14  200.0    1
14           15  200.0    0
15           16  200.0    1
16           17  200.0    0
17           18  200.0    1
18           19  200.0    1
19           20  200.0    1
20           21  300.0    0
21           22  300.0    1
22           23  300.0    1
23           24  300.0   