In [5]:
import numpy as np
from scipy import stats

# ---
# Monte Carlo Integration Exercises
# ---
# This script estimates the integral of exp(x) from 0 to 1 using four different
# Monte Carlo simulation techniques as requested in the exercises.

# First, let's find the true analytical value of the integral for comparison:
# Integral of e^x is e^x.
# So, integral from 0 to 1 is e^1 - e^0 = exp(1) - 1
true_value = np.exp(1) - 1
print(f"True Analytical Value: {true_value:.6f}\n")


# Set parameters for the simulations
n = 100  # Number of samples as specified in Exercise 1
np.random.seed(42) # for reproducibility


# ---
# 1. Crude Monte Carlo Estimator
# ---
# The principle is that the integral of f(x) from a to b can be estimated as
# (b-a) * E[f(U)], where U is a uniform random variable on [a, b].
# Since our integral is from 0 to 1, (b-a) = 1.
# The estimator is simply the mean of f(U).

print("--- Exercise 1: Crude Monte Carlo ---")

# Generate 100 random samples from a Uniform(0, 1) distribution
u_crude = np.random.uniform(0, 1, n)

# Apply the function e^x to each sample
y_crude = np.exp(u_crude)

# Point estimate is the mean of the results
point_estimate_crude = np.mean(y_crude)

# Calculate the standard error and the 95% confidence interval
se_crude = np.std(y_crude, ddof=1) / np.sqrt(n)
# Using scipy.stats.t.interval to get a more accurate CI for small samples
ci_crude = stats.t.interval(0.95, df=n-1, loc=point_estimate_crude, scale=se_crude)

print(f"Point Estimate: {point_estimate_crude:.6f}")
print(f"95% CI: ({ci_crude[0]:.4f}, {ci_crude[1]:.4f})")
print(f"CI Width: {ci_crude[1] - ci_crude[0]:.6f}\n")


# ---
# 2. Antithetic Variables
# ---
# This is a variance reduction technique. For each random number U, we also use its
# "antithetic" counterpart, 1-U. This helps to balance out the random sampling.
# We only need to generate n/2 random numbers to get n total evaluations.

print("--- Exercise 2: Antithetic Variables ---")

# Use n/2 = 50 samples to keep computer resources comparable (100 total evaluations)
m = n // 2
u_antithetic = np.random.uniform(0, 1, m)

# Calculate the function for both the original and the antithetic variables
y1 = np.exp(u_antithetic)
y2 = np.exp(1 - u_antithetic)

# The estimator for each pair is the average of the two
y_antithetic_pairs = (y1 + y2) / 2

# The final point estimate is the mean of these paired averages
point_estimate_antithetic = np.mean(y_antithetic_pairs)

# Calculate the CI based on the m = n/2 paired samples
se_antithetic = np.std(y_antithetic_pairs, ddof=1) / np.sqrt(m)
ci_antithetic = stats.t.interval(0.95, df=m-1, loc=point_estimate_antithetic, scale=se_antithetic)

print(f"Point Estimate: {point_estimate_antithetic:.6f}")
print(f"95% CI: ({ci_antithetic[0]:.4f}, {ci_antithetic[1]:.4f})")
print(f"CI Width: {ci_antithetic[1] - ci_antithetic[0]:.6f}\n")


# ---
# 3. Control Variable
# ---
# We use a second function that is highly correlated with our target function e^x,
# but whose integral we know exactly. A good choice is g(x) = x.
# We know Integral(x) from 0 to 1 is [x^2/2]_0^1 = 0.5.

print("--- Exercise 3: Control Variable ---")

# We use the same 100 uniform samples as the crude method
u_control = u_crude 

# Y is our variable of interest, X is the control variable
Y = np.exp(u_control)
X = u_control
mu_x = 0.5 # The known true mean of the control variable

# Estimate the optimal constant 'c'
c_hat = np.cov(Y, X)[0, 1] / np.var(X)

# The new control variate estimator
Y_control = Y - c_hat * (X - mu_x)

# The final point estimate is the mean of these new values
point_estimate_control = np.mean(Y_control)

# Calculate the CI based on the control variate samples
se_control = np.std(Y_control, ddof=1) / np.sqrt(n)
ci_control = stats.t.interval(0.95, df=n-1, loc=point_estimate_control, scale=se_control)

print(f"Point Estimate: {point_estimate_control:.6f}")
print(f"95% CI: ({ci_control[0]:.4f}, {ci_control[1]:.4f})")
print(f"CI Width: {ci_control[1] - ci_control[0]:.6f}\n")


# ---
# 4. Stratified Sampling
# ---
# We divide the interval [0, 1] into smaller, non-overlapping strata.
# We then perform a Monte Carlo estimate in each stratum and combine the results.
# To keep resources comparable, we'll use 10 strata with 10 samples each (100 total).

print("--- Exercise 4: Stratified Sampling ---")

num_strata = 10
samples_per_stratum = n // num_strata

strata_means = np.zeros(num_strata)
strata_vars = np.zeros(num_strata)
strata_weights = 1 / num_strata # All strata have equal width (0.1)

for i in range(num_strata):
    # Lower and upper bounds of the stratum
    lower_bound = i / num_strata
    upper_bound = (i + 1) / num_strata
    
    # Generate uniform samples within the stratum
    stratum_samples = np.random.uniform(lower_bound, upper_bound, samples_per_stratum)
    
    # Calculate e^x for these samples
    y_stratum = np.exp(stratum_samples)
    
    # Store the mean and variance for this stratum
    strata_means[i] = np.mean(y_stratum)
    strata_vars[i] = np.var(y_stratum, ddof=1)

# The overall point estimate is the weighted sum of the strata means
point_estimate_stratified = np.sum(strata_weights * strata_means)

# The variance of the stratified estimator is the sum of the variances of the strata estimators
# Var(estimator) = Sum over i [ (weight_i^2 * var_i) / samples_i ]
var_stratified = np.sum(strata_weights**2 * (strata_vars / samples_per_stratum))
se_stratified = np.sqrt(var_stratified)

# CI calculation for stratified sampling is more complex, but for large n, z-score is a good approximation
ci_stratified = (point_estimate_stratified - 1.96 * se_stratified, 
                 point_estimate_stratified + 1.96 * se_stratified)

print(f"Point Estimate: {point_estimate_stratified:.6f}")
print(f"95% CI: ({ci_stratified[0]:.4f}, {ci_stratified[1]:.4f})")
print(f"CI Width: {ci_stratified[1] - ci_stratified[0]:.6f}\n")

True Analytical Value: 1.718282

--- Exercise 1: Crude Monte Carlo ---
Point Estimate: 1.672061
95% CI: (1.5728, 1.7713)
CI Width: 0.198475

--- Exercise 2: Antithetic Variables ---
Point Estimate: 1.721533
95% CI: (1.7037, 1.7393)
CI Width: 0.035572

--- Exercise 3: Control Variable ---
Point Estimate: 1.722307
95% CI: (1.7099, 1.7347)
CI Width: 0.024734

--- Exercise 4: Stratified Sampling ---
Point Estimate: 1.723107
95% CI: (1.7129, 1.7333)
CI Width: 0.020475



In [7]:
import numpy as np
from scipy import stats
import math

# ---
# Discrete-Event Simulation of a Blocking System (M/M/m/m Queue)
# with Control Variate for Variance Reduction
# ---

# --- 1. Simulation Parameters ---
# These are taken from the user's description of the exercises.
m = 10                  # Number of service units (servers)
mean_service_time = 8.0   # Mean time each customer spends with a server
mean_interarrival_time = 1.0 # Mean time between new customers arriving
num_customers = 10000     # Number of customers to simulate in a single run
num_replications = 100    # Number of times we repeat the simulation to get a sample

# --- 2. Analytical Solution (Erlang-B Formula) for Verification ---
# This is the true, theoretical blocking probability for this system.
# It allows us to check if our simulation is behaving correctly.
A = mean_service_time / mean_interarrival_time  # Offered traffic in Erlang

def erlang_b(A, m):
    """Calculates the blocking probability using the Erlang-B formula."""
    inv_b = 1.0
    for i in range(1, m + 1):
        inv_b = 1.0 + (i / A) * inv_b
    return 1.0 / inv_b

true_blocking_prob = erlang_b(A, m)
print(f"--- System Parameters ---")
print(f"Servers (m): {m}")
print(f"Offered Traffic (A): {A:.2f} Erlang")
print(f"Analytical Blocking Probability (Erlang-B): {true_blocking_prob:.6f}\n")


# --- 3. Discrete-Event Simulation Function ---
# This function simulates one full run of the system for a given number of customers.
def simulate_blocking_system(m, mean_service_time, mean_interarrival_time, num_customers):
    """
    Simulates a single run of the M/M/m/m blocking system.
    
    Returns:
        blocking_probability (float): The fraction of customers blocked in this run.
        empirical_offered_load (float): The measured offered load in this run.
    """
    # Server_free_times keeps track of when each of the 'm' servers will become free.
    # Initialize all servers to be free at time 0.
    server_free_times = np.zeros(m)
    
    clock = 0.0
    blocked_customers = 0
    total_service_duration = 0.0 # To calculate empirical service time
    
    for i in range(num_customers):
        # Generate the time until the next customer arrives from an exponential distribution
        interarrival_time = np.random.exponential(mean_interarrival_time)
        clock += interarrival_time
        
        # Find the server that will be free the soonest.
        # This server is only available if it's free *before* the current customer arrives.
        earliest_free_time = np.min(server_free_times)
        
        if earliest_free_time <= clock:
            # At least one server is free. The customer is served.
            # Find the index of the server that becomes free earliest.
            free_server_index = np.argmin(server_free_times)
            
            # Generate the service time for this customer from an exponential distribution
            service_time = np.random.exponential(mean_service_time)
            
            # The server now becomes busy until the service is complete.
            server_free_times[free_server_index] = clock + service_time
            total_service_duration += service_time
        else:
            # No server is free at the time of arrival. The customer is blocked.
            blocked_customers += 1
            
    # --- Calculate results for this single run ---
    blocking_probability = blocked_customers / num_customers
    
    # Calculate the empirical (measured) offered load for the control variate
    # Empirical arrival rate = total customers / total time
    empirical_lambda = num_customers / clock
    # Empirical service time = total service duration / number of served customers
    served_customers = num_customers - blocked_customers
    empirical_s = total_service_duration / served_customers if served_customers > 0 else 0
    empirical_offered_load = empirical_lambda * empirical_s
    
    return blocking_probability, empirical_offered_load


# --- 4. Running the Replications ---
# We run the simulation many times to get a distribution of results.
print("--- Running Simulations ---")
print(f"Simulating {num_replications} replications of {num_customers} customers each...")

# Store the results from each replication
blocking_probabilities = np.zeros(num_replications)
empirical_loads = np.zeros(num_replications)

for i in range(num_replications):
    prob, load = simulate_blocking_system(m, mean_service_time, mean_interarrival_time, num_customers)
    blocking_probabilities[i] = prob
    empirical_loads[i] = load

print("Simulations complete.\n")


# --- 5. Standard Estimator (No Variance Reduction) ---
# This is the "crude" estimate based on the average of the replications.
print("--- Result 1: Standard Estimator (No Control Variate) ---")
mean_blocking_prob_std = np.mean(blocking_probabilities)
se_std = np.std(blocking_probabilities, ddof=1) / np.sqrt(num_replications)
ci_std = stats.t.interval(0.95, df=num_replications-1, loc=mean_blocking_prob_std, scale=se_std)

print(f"Point Estimate of Blocking Probability: {mean_blocking_prob_std:.6f}")
print(f"95% Confidence Interval: ({ci_std[0]:.6f}, {ci_std[1]:.6f})")
print(f"Confidence Interval Width: {ci_std[1] - ci_std[0]:.6f}\n")


# --- 6. Control Variate Estimator ---
# Here, we use our knowledge of the true offered load (A=8) to improve the estimate.
print("--- Result 2: Control Variate Estimator ---")

# Y is our variable of interest (what we want to estimate)
# X is the control variate (what we are using to help)
Y = blocking_probabilities
X = empirical_loads
mu_x = A  # The known true mean of the control variable (the offered load)

# Estimate the optimal constant 'c' that minimizes the variance.
# c = Cov(Y, X) / Var(X)
c_hat = np.cov(Y, X)[0, 1] / np.var(X, ddof=1)

# Apply the control variate formula to create a new, improved set of estimates
Y_cv = Y - c_hat * (X - mu_x)

# The final point estimate is the mean of these new values
mean_blocking_prob_cv = np.mean(Y_cv)
se_cv = np.std(Y_cv, ddof=1) / np.sqrt(num_replications)
ci_cv = stats.t.interval(0.95, df=num_replications-1, loc=mean_blocking_prob_cv, scale=se_cv)

print(f"Point Estimate of Blocking Probability: {mean_blocking_prob_cv:.6f}")
print(f"95% Confidence Interval: ({ci_cv[0]:.6f}, {ci_cv[1]:.6f})")
print(f"Confidence Interval Width: {ci_cv[1] - ci_cv[0]:.6f}\n")

# --- 7. Comparison and Conclusion ---
print("--- Comparison ---")
variance_reduction = (se_std**2 - se_cv**2) / se_std**2
print(f"Variance Reduction: {variance_reduction * 100:.2f}%")
print("The control variate method provides a more precise estimate (narrower confidence interval) for the same computational effort.")


--- System Parameters ---
Servers (m): 10
Offered Traffic (A): 8.00 Erlang
Analytical Blocking Probability (Erlang-B): 0.121661

--- Running Simulations ---
Simulating 100 replications of 10000 customers each...
Simulations complete.

--- Result 1: Standard Estimator (No Control Variate) ---
Point Estimate of Blocking Probability: 0.122478
95% Confidence Interval: (0.121255, 0.123701)
Confidence Interval Width: 0.002445

--- Result 2: Control Variate Estimator ---
Point Estimate of Blocking Probability: 0.121650
95% Confidence Interval: (0.121020, 0.122281)
Confidence Interval Width: 0.001261

--- Comparison ---
Variance Reduction: 73.42%
The control variate method provides a more precise estimate (narrower confidence interval) for the same computational effort.
