In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Khai báo thư viện

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import csv
import sys
import cv2
import json
import os
import glob2

import imageio
from matplotlib.animation import FuncAnimation
import matplotlib.animation as manimation
from IPython import display
from time import sleep

# Các hàm mục tiêu

## Hàm Sphere

In [None]:
def Sphere(x):
  result = 0
  for i in range(len(x)):
    result += x[i] * x[i]
  
  return result

## Hàm Rastrigin

In [None]:
def Rastrigin(x):
  result = 10.0 * len(x)
  for i in range(len(x)):
    result += (x[i] * x[i] - 10 * np.cos(2 * np.pi * x[i]))

  return result

## Hàm Rosenbrock

In [None]:
def Rosenbrock(x):
  result = 0
  for i in range(len(x)-1):
    result += (100 * (x[i+1] - x[i] * x[i]) * (x[i+1] - x[i] * x[i]) + (x[i] - 1) * (x[i] - 1))

  return result

## Hàm Griewank

In [None]:
def Griewank(x):
  sub_result = 0
  sub_result1 = 1
  for i in range(len(x)):
    sub_result += (x[i] * x[i]) / 4000
    sub_result1 *= np.cos(x[i] / np.sqrt(i+1))

  return sub_result - sub_result1 + 1

## Hàm Ackley

In [None]:
def Ackley(x):
  sub_result = 0
  sub_result1 = 0
  for i in range(len(x)):
    sub_result += x[i] * x[i]
    sub_result1 += np.cos(2 * np.pi * x[i])

  return -20 * np.exp(-0.2 * np.sqrt((1 / len(x)) * sub_result)) - np.exp((1 / len(x)) * sub_result1) + 20 + np.exp(1)

# Các hàm bổ trợ

## Hàm tính giá trị mục tiêu

In [None]:
def calc_fobj_value(x, fobj_option=0):
  result = 0
  if fobj_option == 0:
    result = Sphere(x)
    return result
  elif fobj_option == 1:
    result = Rastrigin(x)
    return result
  elif fobj_option == 2:
    result = Rosenbrock(x)
    return result
  elif fobj_option == 3:
    result = Griewank(x)
    return result
  else:
    result = Ackley(x)
    return result
  return result

## Hàm lấy miền giá trị của các hàm mục tiêu

In [None]:
def get_domain(fobj_option=0):
  obj_func = {0:'Sphere', 1:'Rastrigin', 2:'Rosenbrock', 3:'Griewank', 4:'Ackley'}
  domain_func = {'Sphere': 
                 [(-5.12, 5.12)], 'Rastrigin': [(-5.12, 5.12)], 'Rosenbrock': [(-5, 10)], 'Griewank': [(-600, 600)], 'Ackley': [(-32.768, 32.768)]}

  return domain_func[obj_func[fobj_option]]

## Hàm kiểm tra kết thúc thuật toán

In [None]:
def is_Stop(num_of_evaluations, dimensions=2):
  if dimensions == 2:
    if num_of_evaluations > 100000:
      return True
    else:
      return False
  else:
    if num_of_evaluations > 1000000:
      return True
    else:
      return False
    
  return False

## Hàm ghi chép quá trình tối ưu các hàm mục tiêu

In [None]:
def record(lines):
  record_file = open('/content/drive/MyDrive/Hoc_Tap/CS410_MangNeural/Continuous_Optimization/objFuncs_optimizing_record_Sphere.txt', 'a')
  for line in lines:
    record_file.write(line)
    record_file.write('\n')
    print(line)

In [None]:
def optimizing_record(type_record=None, fobj_option=None, dimensions=None, popsize=None, algorithm_option=None, experiment_values=None, final_values=None):
  obj_func = {0:'Sphere', 1:'Rastrigin', 2:'Rosenbrock', 3:'Griewank', 4:'Ackley'}
  algorithms_dict = {0:'Different Evolution', 1:'Evolution Strategies'}
  lines = []
  
  if type_record == 0:
    line1 = '|------------------Objective Functions Optimization Record------------------|'
    line2 = '|-----------------------------------BEGIN-----------------------------------|\n\n'
    lines.append(line1)
    lines.append(line2)
    record(lines)
  elif type_record == 1:
    line = '\n|-----------------------------------DONE-----------------------------------|'
    lines.append(line)
    record(lines)
  elif type_record == 2:
    line = f'- Objective Function: {obj_func[fobj_option]}, Dimensions: {dimensions}'
    lines.append(line)
    record(lines)
  elif type_record == 3:
    line = f'\t- Popsize: {popsize}'
    lines.append(line)
    record(lines)
  elif type_record == 4:
    line = f'\t\t- Algorithm: {algorithms_dict[algorithm_option]}'
    lines.append(line)
    record(lines)
  elif type_record == 5:
    line1 = f'\t\t\t- Seed: {experiment_values[0]}'
    line2 = f'\t\t\t\tBest Answer: {experiment_values[1][0]}'
    line3 = f'\t\t\t\tObjective Function Value: {experiment_values[1][1]}'
    lines.append(line1)
    lines.append(line2)
    lines.append(line3)
    record(lines)
  elif type_record == 6:
    line1 = f'\t\t\t- Mean Objective Function Value: {final_values[0]}'
    line2 = f'\t\t\t- Standard Objective Function Value: {final_values[1]}\n'
    lines.append(line1)
    lines.append(line2)
    record(lines)

## Hàm vẽ đồ thị hội tụ

In [None]:
def plot_converging_graph(x, mean_value, std_value, object_function, dimensions):
    plt.figure(figsize=(10,8))
    plt.plot(x, mean_value[0], 'b', label='DE with n = 128')
    plt.plot(x, mean_value[1], 'r', label='ES with n = 128')
    plt.plot(x, mean_value[2], 'g', label='DE with n = 1024')
    plt.plot(x, mean_value[3], 'violet', label='ES with n = 1024')
    plt.fill_between(x, mean_value[0]-std_value[0], mean_value[0]+std_value[0], facecolor=(0,0,1,0.3))
    plt.fill_between(x, mean_value[1]-std_value[1], mean_value[1]+std_value[1], facecolor=(1,0,0,0.3))
    plt.fill_between(x, mean_value[2]-std_value[2], mean_value[2]+std_value[2], facecolor=(0,1,0,0.3))
    plt.fill_between(x, mean_value[3]-std_value[3], mean_value[3]+std_value[3], facecolor='violet', alpha=0.3)
    plt.xlabel('Number of Evaluation Call')
    plt.ylabel('Objective Function')
    plt.title(f'Errorbar_f_{object_function} with d={dimensions}')
    plt.legend()
    plt.savefig(f'/content/drive/MyDrive/Hoc_Tap/CS410_MangNeural/Continuous_Optimization/graphs/{object_function}_{dimensions}.png')
    # plt.show()

## Hàm tạo file gif

In [None]:
def animate(i, ln, ax, all_pops):
  population = all_pops[i]
  ln.set_data(population[:,0],population[:,1])
  
  return ln,

In [None]:
def create_gif(all_pops, fobj_option, file_name, dimensions=2):
  lower_bound, upper_bound = np.asarray(get_domain(fobj_option)*dimensions).T
  X = np.linspace(lower_bound, upper_bound, 100)
  Y = np.linspace(lower_bound, upper_bound, 100)
  X, Y = np.meshgrid(X, Y)

  Z = np.asarray(calc_fobj_value([X, Y], fobj_option))

  fig, ax = plt.subplots(figsize=(10,10))
  cp = ax.contourf(X, Y, Z, 20, cmap='viridis', alpha=0.8)
  cp = ax.scatter(0.0, 0.0,s=50, marker='*', c='r') 
  ln, = plt.plot([],[], '.w')
  anim = FuncAnimation(fig, animate, fargs = (ln, ax, all_pops),
                              frames=50, interval=20, blit=True)
  
  anim.save(file_name, writer='pillow', fps=5)

  # plt.show()

# Thuật toán Differential Evolution (DE)

In [None]:
def DE(fobj_option=0, dimensions=2, F_scale=0.8, cross_prob=0.7, popsize=32, max_iters=100):
  num_of_evaluations = 0

  lower_bound, upper_bound = np.asarray(get_domain(fobj_option)*dimensions).T

  diff = np.fabs(lower_bound - upper_bound)

  pop = lower_bound + diff * np.random.rand(popsize, dimensions)

  fitness = np.asarray([calc_fobj_value(ind, fobj_option) for ind in pop])
  num_of_evaluations += popsize

  best_idx = np.argmin(fitness)
  best = pop[best_idx]

  results = []
  all_pops = []
  results.append((best.copy().tolist(), fitness[best_idx]))
  all_pops.append(np.copy(pop))

  while not is_Stop(num_of_evaluations, dimensions):
    for j in range(popsize):
      idxs = [idx for idx in range(popsize) if idx != j]
      a, b, c = pop[np.random.choice(idxs, 3, replace=True)]
      mutant = np.clip(a + F_scale * (b - c), lower_bound, upper_bound)

      cross_points = np.random.rand(dimensions) < cross_prob
      if not np.any(cross_points):
        cross_points[np.random.randint(0, dimensions)] = True

      trial = np.where(cross_points, mutant, pop[j])
      f = calc_fobj_value(trial, fobj_option)
      num_of_evaluations += 1

      if f < fitness[j]:
        pop[j] = trial
        fitness[j] = f
        if f < fitness[best_idx]:
          best = trial
          best_idx = j
    
    results.append((best.copy().tolist(), fitness[best_idx]))
    all_pops.append(np.copy(pop))

  results = np.asarray(results)
  all_pops = np.asarray(all_pops)

  return results, all_pops, num_of_evaluations

# Thuật toán Evolution Strategies (ES)

In [None]:
def ES(fobj_option=0, dimensions=2, sigma_init=1.0, c_inc=1.1, c_dec=0.6, popsize=32, max_iters=100):
  num_of_evaluations = 0

  lower_bound, upper_bound = np.asarray(get_domain(fobj_option)*dimensions).T

  diff = np.fabs(lower_bound - upper_bound)

  mu = lower_bound + diff * np.random.rand(dimensions)
  mu_fitness = calc_fobj_value(mu, fobj_option)
  num_of_evaluations += 1
  
  results = []
  all_pops = []
  results.append((mu.copy().tolist(), mu_fitness))
  sigma = sigma_init

  while not is_Stop(num_of_evaluations, dimensions):
    epsilon = np.random.randn(popsize, dimensions)
    offspring = mu + sigma * epsilon
    offspring = np.clip(offspring, lower_bound, upper_bound)
    offspring_fitness = np.asarray([calc_fobj_value(offspring[i], fobj_option) for i in range(popsize)])
    num_of_evaluations += popsize

    best_idx = offspring_fitness.argmin()
    best_fitness = offspring_fitness[best_idx]
    best_offspring = offspring[best_idx]

    if best_fitness <= mu_fitness:
      mu = best_offspring.copy()
      mu_fitness = best_fitness
      sigma *= c_inc
    else:
      sigma *= c_dec

    results.append((mu.copy().tolist(), mu_fitness))
    all_pops.append(np.copy(offspring))

  results = np.asarray(results)
  all_pops = np.asarray(all_pops)

  return results, all_pops, num_of_evaluations

# Chạy và đánh giá kết quả

## Hàm chạy thực nghiệm

In [None]:
def run_experiments(fobj_option=0, dimensions=2, popsize=32, algorithm_option=0):
  mssv = 19521943

  fobj_value_lst = []
  # num_of_evaluations_lst = []
  mean_obj_funcs_lst = []
  std_obj_funcs_lst = []
  best_fobj_value = sys.maxsize
  best_all_pops = None
  results_lst = []

  for i in range(mssv, mssv+10):
    np.random.rand(i)
    if algorithm_option == 0:
      results, all_pops, num_of_evaluations = DE(fobj_option=fobj_option, dimensions=dimensions, popsize=popsize)
    elif algorithm_option == 1:
      results, all_pops, num_of_evaluations = ES(fobj_option=fobj_option, dimensions=dimensions, popsize=popsize)
    optimizing_record(type_record=5, experiment_values=[i, results[-1]])
    results_lst.append(results.tolist())

    if results[-1][-1] < best_fobj_value:
      best_fobj_value = results[-1:1]
      best_all_pops = np.copy(all_pops)
      
    fobj_value_lst.append(results[:,1])
    # num_of_evaluations_lst.append(num_of_evaluations)

  fobj_value_lst = np.asarray(fobj_value_lst)
  # num_of_evaluations_lst = np.asarray(num_of_evaluations_lst)

  for i in range(fobj_value_lst.shape[1]):
    obj_funcs_lst = ([x[i] for x in fobj_value_lst])
    mean_obj_funcs_lst.append(round(np.mean(obj_funcs_lst), 2))
    std_obj_funcs_lst.append(round(np.std(obj_funcs_lst), 2))
  optimizing_record(type_record=6, final_values=[mean_obj_funcs_lst[-1], std_obj_funcs_lst[-1]])

  mean_obj_funcs_lst = np.asarray(mean_obj_funcs_lst)
  std_obj_funcs_lst = np.asarray(std_obj_funcs_lst)

  return mean_obj_funcs_lst, std_obj_funcs_lst, num_of_evaluations, best_all_pops, results_lst

## Chạy thực nghiệm và ghi chú kết quả

### Tạo các bộ thực nghiệm

In [None]:
fobjs_lst = [0]
dimensions_lst = [2, 10]

experiments_lst = []
for fobj in fobjs_lst:
  for dimensions in dimensions_lst:
    experiments_lst.append((fobj, dimensions))

### Chạy

In [None]:
obj_func = {0:'Sphere', 1:'Rastrigin', 2:'Rosenbrock', 3:'Griewank', 4:'Ackley'}
header = ['Popsize N/Lambda', 'mean_DE', 'std_DE', 'numEva_DE', 'mean_ES', 'std_ES', 'numEva_ES']
algorithms_dict = {0:'DE', 1:'ES'}
popsizes_lst = [32, 64, 128, 256, 512, 1024]
algorithms_lst = [0, 1]
path = '/content/drive/MyDrive/Hoc_Tap/CS410_MangNeural/Continuous_Optimization/'

optimizing_record(type_record=0)
for fobj_option, dimensions in experiments_lst:
  optimizing_record(type_record=2, fobj_option=fobj_option, dimensions=dimensions)

  file_name = f'{path}results_of_the_best_solution/{obj_func[fobj_option]}_{dimensions}.csv'
  fi = open(file_name, 'w')
  writer = csv.writer(fi)
  writer.writerow(header)

  result = []
  for popsize in popsizes_lst:
    optimizing_record(type_record=3, popsize=popsize)
    
    best_value= []
    for algorithm_option in algorithms_lst:
      optimizing_record(type_record=4, algorithm_option=algorithm_option)

      mean_obj_funcs_lst, std_obj_funcs_lst, num_of_evaluations, all_pops, results_lst = run_experiments(fobj_option,dimensions,popsize,algorithm_option)

      best_value.append(mean_obj_funcs_lst[-1])
      best_value.append(std_obj_funcs_lst[-1])
      best_value.append(num_of_evaluations)

      
      if popsize == 128:
        data = {}
        data[f'{algorithms_dict[algorithm_option]}_128'] = {'Results_lst': results_lst}
        result.append(data)
      elif popsize == 1024:
        data = {}
        data[f'{algorithms_dict[algorithm_option]}_1024'] = {'Results_lst': results_lst}
        result.append(data)

      if dimensions == 2:
        pops_result = {'all_pops': all_pops.tolist()}
        algorithms_dict = {0:'DE', 1:'ES'}

        with open(f'{path}results_2_create_gif_file/{obj_func[fobj_option]}_{popsize}_{algorithms_dict[algorithm_option]}_pops.txt', 'w') as outfile:
          json.dump(pops_result, outfile)
        outfile.close()

    best_value.insert(0, popsize) 
    writer.writerow(best_value)

  with open(f'{path}results_2_graph/{obj_func[fobj_option]}_{dimensions}_v4p.txt', 'w') as outfile:
    json.dump(result, outfile)
  outfile.close()

  fi.close()
optimizing_record(type_record=1)

## Trực quan hóa kết quả

### Vẽ đồ thị hội tụ

In [None]:
obj_func = {0:'Sphere', 1:'Rastrigin', 2:'Rosenbrock', 3:'Griewank', 4:'Ackley'}
algoSize_dict = {0: 'DE_128', 1:'ES_128', 2:'DE_1024', 3:'ES_1024'}
numEva_lst_10 = {0: 1000064, 1: 1000065, 2: 1000448, 3: 1000449}
numEva_lst_2 = {0: 100096, 1: 100097, 2: 100352, 3: 100353}
dimensions_dict = {2: 50000, 10: 500000}
path = '/content/drive/MyDrive/Hoc_Tap/CS410_MangNeural/Continuous_Optimization/results_2_graph/'

for ext in ['*.txt']:
  all_files = glob2.glob(os.path.join(path, ext))

for file in all_files:
  with open(file) as json_file:
    data = json.load(json_file)

    mean_fobj_lst = []
    std_fobj_lst = []
    obj_func, dimensions, _ = file.split('/')[-1].split('_')
    dimensions = int(dimensions)
    x = np.arange(0,dimensions_dict[dimensions],dimensions_dict[dimensions]//100)

    for i in range(len(data)):
      fobj_interp_lst = []
      results = data[i][algoSize_dict[i]]['Results_lst']

      for seed in range(len(results)):
        result = results[seed]
        fobj = np.array([result[gene][-1] for gene in range(len(result))])

        if dimensions == 2:
          num_eva = np.arange(0, numEva_lst_2[i], int(algoSize_dict[i].split('_')[-1]))
        elif dimensions == 10:
          num_eva = np.arange(0, numEva_lst_10[i], int(algoSize_dict[i].split('_')[-1]))

        fobj_interp = np.interp(x, num_eva, fobj)
        fobj_interp_lst.append(fobj_interp)
      fobj_interp_lst = np.array(fobj_interp_lst)
        
      mean_fobj = []
      std_fobj = []
      for seed in range(fobj_interp_lst.shape[1]):
        temp = [x[seed] for x in fobj_interp_lst]
        mean_fobj.append(np.mean(temp))
        std_fobj.append(np.std(temp))
      mean_fobj = np.array(mean_fobj)
      std_fobj = np.array(std_fobj)
      
      mean_fobj_lst.append(mean_fobj)
      std_fobj_lst.append(std_fobj)

    plot_converging_graph(x, mean_fobj_lst, std_fobj_lst, obj_func, dimensions)

### Tạo file gif

In [None]:
obj_func_dict = {'Sphere': 0, 'Rastrigin': 1, 'Rosenbrock': 2, 'Griewank': 3, 'Ackley': 4}
input_path = '/content/drive/MyDrive/Hoc_Tap/CS410_MangNeural/Continuous_Optimization/results_2_create_gif_file/'
output_path = '/content/drive/MyDrive/Hoc_Tap/CS410_MangNeural/Continuous_Optimization/gif_files/'

for ext in ['*.txt']:
  all_files = glob2.glob(os.path.join(input_path, ext))

for file in all_files:
  with open(file) as json_file:
    data = json.load(json_file)
    file_name = output_path + file.split('/')[-1].split('.')[0] + '.gif'
    obj_func = file.split('/')[-1].split('_')[0]
    create_gif(np.array(data['all_pops']), obj_func_dict[obj_func], file_name)
    print(f'Done: {file_name}')