In [14]:
import csv
import numpy as np

In [29]:
def read_microdata(filename):
    # Open the file in read mode
    with open(filename, mode='r', newline='') as file:
        reader = csv.reader(file)
        first_line = next(reader)
        headers = first_line[1:]
        data_array = np.empty((0, len(headers)), dtype=float)
        micro_data_ids =[]
        # Iterate through each row in the CSV file
        for row in reader:
            # Process each row
            micro_data_ids.append(row[0])
            data_array = np.append(data_array, [np.array(row[1:], dtype=float)], axis=0)
    return {
        'headers':headers,
        'micro_data_ids':micro_data_ids,
        'data_array':data_array
    }

In [30]:
def read_constraints(filename):
    # Open the file in read mode
    with open(filename, mode='r', newline='') as file:
        reader = csv.reader(file)
        first_line = next(reader)
        headers = first_line[2:]
        data_array = np.empty((0, len(headers)), dtype=float)
        geo_data_ids =[]
        population_totals=[]
        # Iterate through each row in the CSV file
        for row in reader:
            # Process each row
            geo_data_ids.append(row[0])
            population_totals.append(row[1])
            data_array = np.append(data_array, [np.array(row[2:], dtype=float)], axis=0)
    return {
        'headers':headers,
        'geo_data_ids':geo_data_ids,
        'population_totals':population_totals,
        'data_array':data_array
    }

In [31]:
micro_data = read_microdata('testdata/microdata_encoded.csv')
constraint_data = read_constraints('testdata/constraint_targets.csv')

In [32]:
micro_data['headers'] == constraint_data['headers']

True

In [None]:
class SimulatedAnnealing:
    def __init__(self, as_attributes, pop_attributes):
        self.pos = pop_attributes['pos']
        self.geo_id = pop_attributes['geo_id']
        self.population_size = pop_attributes['population_size']
        self.constraints = pop_attributes['constraints']
        self.micro_data = pop_attributes['micro_data']
        rows, cols = self.micro_data.shape
        self.micro_rows = rows
        self.micro_cols = cols


                # SA parameters (you may want to make these configurable)
        self.initial_temp = as_attributes['initial_temp']
        self.cooling_rate = as_attributes['cooling_rate']
        self.current_temp = self.initial_temp
        self.min_temp = as_attributes['min_temp']
        self.steps_per_temp = as_attributes['steps_per_temp']

        self.fraction = 1 / self.population_size
        print(self.fraction)

        # Create initial population
        self.population = [np.random.randint(self.micro_rows) for _ in range(int(self.population_size))]
        self.macro_data = np.zeros(self.micro_cols)

        for p in self.population:
            self.macro_data = self.macro_data + (self.micro_data[p] * self.fraction)

    def chi_squared_distance(self,):
        epsilon = 1e-10  # Avoid division by zero
        return np.sum((self.constraints - self.macro_data) ** 2 / (self.constraints + epsilon))

    def kl_divergence(self):
        epsilon = 1e-10  # Avoid log(0)
        return np.sum(self.constraints * np.log((self.constraints + epsilon) / (self.macro_data + epsilon)))

    def run(self):
        """Execute the simulated annealing algorithm"""
        current_energy = self.energy()
        best_energy = current_energy
        best_solution = self.population.copy()
        
        while self.current_temp > self.min_temp:
            for _ in range(self.steps_per_temp):
                # Generate neighbor
                neighbor = self.generate_neighbor()
                old_population = self.population
                self.population = neighbor
                self.macro_data = self.calculate_macro_data()
                
                # Calculate new energy
                new_energy = self.energy()
                
                # Decide whether to accept neighbor
                if self.acceptance_probability(current_energy, new_energy) > random.random():
                    current_energy = new_energy
                    if current_energy < best_energy:
                        best_energy = current_energy
                        best_solution = self.population.copy()
                else:
                    self.population = old_population
                    self.macro_data = self.calculate_macro_data()
            
            # Cool system
            self.current_temp *= 1 - self.cooling_rate
        
        # Return best solution found
        self.population = best_solution
        self.macro_data = self.calculate_macro_data()
        return {
            'population': best_solution,
            'macro_data': self.macro_data,
            'energy': best_energy
        }

In [None]:

pop_attributes = {
        'pos':0,
        'geo_id': constraint_data['geo_data_ids'][0],
        'population_size':float(constraint_data['population_totals'][0]),
        'constraints':constraint_data['data_array'],
        'micro_data':micro_data['data_array']
}
sa_atteributes = {
        'initial_temp':1000,
        'cooling_rate':0.003,
        'min_temp':1,
        'steps_per_temp':100
}

test = SimulatedAnnealing(pop_attributes,sa_atteributes)

8.814455707360071e-05


In [118]:
test.chi_squared_distance()

np.float64(7.893688397038704)

In [None]:
import numpy as np
import math
import random

class SimulatedAnnealing:
    def __init__(self, attributes):
        # Initialize parameters
        self.pos = attributes['pos']
        self.geo_id = attributes['geo_id']
        self.population_size = attributes['population_size']
        self.constraints = attributes['constraints']
        self.micro_data = attributes['micro_data']
        
        # Debugging info
        print("Type of micro_data:", type(self.micro_data))
        print("Shape of micro_data:", self.micro_data.shape)

        # Problem dimensions
        rows, cols = self.micro_data.shape
        self.micro_rows = rows
        self.micro_cols = cols
        self.fraction = 1 / self.population_size

        # Create initial population and macro data
        self.population = self.initialize_population()
        self.macro_data = self.calculate_macro_data()
        
        # SA parameters (you may want to make these configurable)
        self.initial_temp = 1000
        self.cooling_rate = 0.003
        self.current_temp = self.initial_temp
        self.min_temp = 1
        self.steps_per_temp = 100

    def initialize_population(self):
        """Create random initial population"""
        return [np.random.randint(self.micro_rows) for _ in range(int(self.population_size))]

    def calculate_macro_data(self):
        """Calculate macro data from current population"""
        macro = np.zeros(self.micro_cols)
        for p in self.population:
            macro += (self.micro_data[p] * self.fraction)
        return macro

    def chi_squared_distance(self):
        """Calculate chi-squared distance between constraints and macro data"""
        epsilon = 1e-10  # Avoid division by zero
        return np.sum((self.constraints - self.macro_data) ** 2 / (self.constraints + epsilon))

    def kl_divergence(self):
        """Calculate KL divergence between constraints and macro data"""
        epsilon = 1e-10  # Avoid log(0)
        return np.sum(self.constraints * np.log((self.constraints + epsilon) / (self.macro_data + epsilon)))

    def energy(self):
        """Objective function to minimize (using KL divergence here)"""
        return self.kl_divergence()

    def generate_neighbor(self):
        """Generate a neighboring solution by perturbing one individual"""
        new_population = self.population.copy()
        idx_to_change = random.randint(0, len(new_population)-1)
        new_population[idx_to_change] = random.randint(0, self.micro_rows-1)
        return new_population

    def acceptance_probability(self, current_energy, new_energy):
        """Calculate acceptance probability"""
        if new_energy < current_energy:
            return 1.0
        return math.exp((current_energy - new_energy) / self.current_temp)

