## Pre train

In [None]:
!pip install git+https://github.com/nnaisense/pgpelib.git#egg=pgpelib

Collecting pgpelib
  Cloning https://github.com/nnaisense/pgpelib.git to /tmp/pip-install-xktxa24y/pgpelib_d1caf9b0bbff45de8461800dd1d50481
  Running command git clone -q https://github.com/nnaisense/pgpelib.git /tmp/pip-install-xktxa24y/pgpelib_d1caf9b0bbff45de8461800dd1d50481
Collecting sacred
  Downloading sacred-0.8.2-py2.py3-none-any.whl (106 kB)
[K     |████████████████████████████████| 106 kB 5.2 MB/s 
Collecting pybullet
  Downloading pybullet-3.2.1-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (90.8 MB)
[K     |████████████████████████████████| 90.8 MB 1.2 MB/s 
[?25hCollecting ray
  Downloading ray-1.9.1-cp37-cp37m-manylinux2014_x86_64.whl (57.6 MB)
[K     |████████████████████████████████| 57.6 MB 1.3 MB/s 
Collecting box2d-py~=2.3.5
  Downloading box2d_py-2.3.8-cp37-cp37m-manylinux1_x86_64.whl (448 kB)
[K     |████████████████████████████████| 448 kB 44.2 MB/s 
Collecting redis>=3.5.0
  Downloading redis-4.1.0-py3-none-any.whl (171 kB)
[K     |████████████████

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
from PIL import Image, ImageDraw
import os
from datetime import datetime
import time
import numba
os.environ['OPENBLAS_NUM_THREADS'] = '1'
os.environ['OMP_NUM_THREADS'] = '1'



import json
import multiprocessing as mp
import re
from pgpelib import PGPE

In [None]:
def img2arr(img):
    """
    Image to numpy
    """
    return np.array(img)

def arr2img(arr):
    """
    numpy to image
    """
    return Image.fromarray(arr)

def rgba2rgb(rgba_img):
    """
    convert image rgba to image rgb
    """
    h, w = rgba_img.size
    rgb_img = Image.new('RGB', (h, w))
    rgb_img.paste(rgba_img)
    return rgb_img

def save_as_gif(fn, imgs, fps=24):
    """
    save gif from list imgs
    """
    img, *imgs = imgs #move first img in list imgs
    with open(fn, 'wb') as fp_out:
        img.save(fp=fp_out, format='GIF', append_images=imgs,
             save_all=True, duration=int(1000./fps), loop=0)

def save_as_frames(fn, imgs, overwrite=True):
    """
    save list imgs in folder
    """
    # save to folder `fn` with sequenced filenames
    os.makedirs(fn, exist_ok=True)
    for i, img in enumerate(imgs):
        this_fn = os.path.join(fn, f'{i:08}.png') # mean 00000x
        if overwrite or not os.path.exists(this_fn):
            save_as_png(this_fn, img)

def save_as_png(fn, img):
    """
    save image .png
    """
    if not fn.endswith('.png'):
        fn = f'{fn}.png'
    img.save(fn)

def isnotebook():
    try:
        shell = get_ipython().__class__.__name__  # type: ignore 
        if shell == 'ZMQInteractiveShell':
            return True   # Jupyter notebook or qtconsole
        elif shell == 'TerminalInteractiveShell':
            return False  # Terminal running IPython
        else:
            return False  # Other type (?)
    except NameError:
        return False      # Probably standard Python interpreter

# Copied from https://github.com/makinacorpus/easydict/blob/master/easydict/__init__.py
class EasyDict(dict):
    def __init__(self, d=None, **kwargs):
        if d is None:
            d = {}
        if kwargs:
            d.update(**kwargs)
        for k, v in d.items():
            setattr(self, k, v)
        # Class attributes
        for k in self.__class__.__dict__.keys():
            if not (k.startswith('__') and k.endswith('__')) and not k in ('update', 'pop'):
                setattr(self, k, getattr(self, k))

    def __setattr__(self, name, value):
        if isinstance(value, (list, tuple)):
            value = [self.__class__(x)
                     if isinstance(x, dict) else x for x in value]
        elif isinstance(value, dict) and not isinstance(value, self.__class__):
            value = self.__class__(value)
        super(EasyDict, self).__setattr__(name, value)
        super(EasyDict, self).__setitem__(name, value)

    __setitem__ = __setattr__

    def update(self, e=None, **f):
        d = e or dict()
        d.update(f)
        for k in d:
            setattr(self, k, d[k])

    def pop(self, k, d=None):
        delattr(self, k)
        return super(EasyDict, self).pop(k, d)

In [None]:


class TrianglesPainter(object):

    def __init__(self, h, w, n_triangle=10, alpha_scale=0.1, coordinate_scale=1.0):
        self.h = h
        self.w = w
        self.n_triangle = n_triangle
        self.alpha_scale = alpha_scale  #transparency 
        self.coordinate_scale = coordinate_scale
        
    @property
    def n_params(self):
        return self.n_triangle * 10 # [x0, y0, x1, y1, x2, y2, r, g, b, a]
         
    def random_params(self):
        return np.random.rand(self.n_params)
    
    def render(self, params, background='noise'):
        h, w = self.h, self.w
        alpha_scale = self.alpha_scale
        coordinate_scale = self.coordinate_scale
        
        params = params.reshape(-1, 10).copy()
        
        n_triangle = params.shape[0]
        n_feature = params.shape[1]
        #Scale các tham số theo cột
        for j in range(n_feature):
            params[:, j] = (params[:, j] - params[:, j].min()) / (params[:, j].max() - params[:, j].min())
        
        if background == 'noise':
            img = Image.fromarray(  (np.random.rand( h, w, 3 ) * 255).astype(np.uint8) )
        elif background == 'white':
            img = Image.new("RGB", (w, h), (255, 255, 255))
        elif background == 'black':
            img = Image.new("RGB", (w, h), (0, 0, 0))
        else:
            assert False
        draw = ImageDraw.Draw(img, 'RGBA')
        
        params = params.tolist()
        for i in range(n_triangle):
            slice_ = params[i]
            ### Note: 0--->x
            ###       |
            ###       |
            ###       |
            ###       y
            x0, y0, x1, y1, x2, y2, r, g, b, a = slice_
            xc, yc = (x0 + x1 + x2) / 3. , (y0 + y1 + y2) / 3.
            
            #shape of triangle not change, adjust coordinate_scale make triangle be bigger or smaller
            x0, y0 = xc + (x0 - xc) * coordinate_scale, yc + (y0 - yc) * coordinate_scale
            x1, y1 = xc + (x1 - xc) * coordinate_scale, yc + (y1 - yc) * coordinate_scale
            x2, y2 = xc + (x2 - xc) * coordinate_scale, yc + (y2 - yc) * coordinate_scale
            
            x0, x1, x2 = int(x0 * h), int(x1 * h), int(x2 * h)
            y0, y1, y2 = int(y0 * w), int(y1 * w), int(y2 * w)
            r, g, b, a = int(r * 255), int(g * 255), int(b * 255), int(a * alpha_scale * 255)
            
            draw.polygon([(y0, x0), (y1, x1), (y2, x2)], (r, g, b, a))

        del draw
        
        img_arr = np.array(img)
        return img_arr

In [None]:


class RectanglesPainter(object):

    def __init__(self, h, w, n_triangle=10, alpha_scale=0.1, coordinate_scale=1.0):
        self.h = h
        self.w = w
        self.n_triangle = n_triangle
        self.alpha_scale = alpha_scale  #transparency 
        self.coordinate_scale = coordinate_scale
        
    @property
    def n_params(self):
        return self.n_triangle * 8 # [x0, y0, x1, y1, x2, y2, r, g, b, a]
         
    def random_params(self):
        return np.random.rand(self.n_params)
    
    def render(self, params, background='noise'):
        h, w = self.h, self.w
        alpha_scale = self.alpha_scale
        coordinate_scale = self.coordinate_scale
        
        params = params.reshape(-1, 8).copy()
        
        n_triangle = params.shape[0]
        n_feature = params.shape[1]
        #Scale các tham số theo cột
        for j in range(n_feature):
            params[:, j] = (params[:, j] - params[:, j].min()) / (params[:, j].max() - params[:, j].min())
        
        if background == 'noise':
            img = Image.fromarray(  (np.random.rand( h, w, 3 ) * 255).astype(np.uint8) )
        elif background == 'white':
            img = Image.new("RGB", (w, h), (255, 255, 255))
        elif background == 'black':
            img = Image.new("RGB", (w, h), (0, 0, 0))
        else:
            assert False
        draw = ImageDraw.Draw(img, 'RGBA')
        
        params = params.tolist()
        for i in range(n_triangle):
            slice_ = params[i]
            ### Note: 0--->x
            ###       |
            ###       |
            ###       |
            ###       y
            x0, y0, x1, y1, r, g, b, a = slice_
            xc, yc = (x0 + x1 ) / 2. , (y0 + y1 ) / 2.
            
            #shape of triangle not change, adjust coordinate_scale make triangle be bigger or smaller
            x0, y0 = xc + (x0 - xc) * coordinate_scale, yc + (y0 - yc) * coordinate_scale
            x1, y1 = xc + (x1 - xc) * coordinate_scale, yc + (y1 - yc) * coordinate_scale

            
            x0, x1 = int(x0 * h), int(x1 * h)
            y0, y1 = int(y0 * w), int(y1 * w)
            r, g, b, a = int(r * 255), int(g * 255), int(b * 255), int(a * alpha_scale * 255)
            
            draw.polygon([(y0,x0), (y1, x0), (y1,x1),(y0,x1)], (r,g,b,a))



        del draw
        
        img_arr = np.array(img)
        return img_arr

In [None]:
#fn mean canvas image
def _tell_fn_pgpe(solver, solutions, fitnesses):
    solver.tell(fitnesses)  # PGPE maximizes.


def get_tell_fn(flavor='pgpe'):
    print(flavor)
    return {'pgpe': _tell_fn_pgpe}[flavor]


def _best_params_fn_pgpe(solver):
    return solver.center


def get_best_params_fn(flavor='pgpe'):
    return {'pgpe': _best_params_fn_pgpe}[flavor]


class Hook(object):
    def __init__(self):
        pass

    def __call__(self, i, solver, fitness_fn, best_params_fn):
        raise NotImplementedError

    def close(self):
        pass


class PrintStepHook(Hook):
    def __init__(self):
        super().__init__()

    def __call__(self, i, solver, fitness_fn, fitnesses_fn, best_params_fn):
        print(i, end=' ... ')


class PrintCostHook(Hook):
    def __init__(self, fitnesses_fn_is_wrapper=True):
        super().__init__()
        self.fitnesses_fn_is_wrapper = fitnesses_fn_is_wrapper

    def __call__(self, i, solver, fitness_fn, fitnesses_fn, best_params_fn):
        best_params = best_params_fn(solver)
        if self.fitnesses_fn_is_wrapper:
            cost = fitnesses_fn(fitness_fn, [best_params])
        else:
            cost = fitnesses_fn([best_params])
        print()
        print(f'[{datetime.now()}]   Iteration: {i}   cost: {cost}')


class SaveCostHook(Hook):
    def __init__(self, save_fp, fitnesses_fn_is_wrapper=True):
        super().__init__()
        self.save_fp = save_fp
        self.fitnesses_fn_is_wrapper = fitnesses_fn_is_wrapper
        self.record = []  # list of (i, cost)

    def __call__(self, i, solver, fitness_fn, fitnesses_fn, best_params_fn):
        best_params = best_params_fn(solver)
        if self.fitnesses_fn_is_wrapper:
            cost = fitnesses_fn(fitness_fn, [best_params])
        else:
            cost = fitnesses_fn([best_params])
        self.record.append(f'[{datetime.now()}]   Iteration: {i}   cost: {cost}')
        with open(self.save_fp, 'w') as fout:
            list(map(lambda r: print(r, file=fout), self.record))


class StoreImageHook(Hook):
    def __init__(self, render_fn, save_fp, fps=12, save_interval=0):
        super().__init__()
        self.render_fn = render_fn
        self.save_fp = save_fp
        self.fps = fps
        self.save_interval = save_interval

        self.imgs = []

    def __call__(self, i, solver, fitness_fn, fitnesses_fn, best_params_fn):
        best_params = best_params_fn(solver)
        img = arr2img(self.render_fn(best_params))
        self.imgs.append(img)
        if i % self.save_interval == 0:
            self.save()

    def close(self):
        self.save()

    def save(self):
        save_as_gif(f'{self.save_fp}.gif', self.imgs, fps=self.fps)
        save_as_frames(f'{self.save_fp}.frames', self.imgs, overwrite=False)


class ShowImageHook(Hook):
    def __init__(self, render_fn):
        super().__init__()
        self.render_fn = render_fn

    def __call__(self, i, solver, fitness_fn, fitnesses_fn, best_params_fn):
        if isnotebook():
            best_params = best_params_fn(solver)
            img = arr2img(self.render_fn(best_params))
            # pylint:disable=undefined-variable
            display(img)  # type: ignore

## Main

### Argument for painting 

In [None]:
def parse_dics_args(**kwargs):

  keys = ['out_dir','height','width','target_fn','n_triangle','loss_type','alpha_scale','coordinate_scale',\
          'fps','n_population','n_iterations','mp_batch_size','solver','report_interval','step_report_interval','save_as_gif_interval','painter']
  dics = {'out_dir':'es_bitmap_out','height':200,'width':-1,'target_fn':"",'n_triangle':50,'loss_type':'l2','alpha_scale':0.5,'coordinate_scale':1.0,\
          'fps':12,'n_population':256,'n_iterations':10000,'mp_batch_size':1,'solver':'pgpe','report_interval':50,'step_report_interval':50,'save_as_gif_interval':50,'painter':"triangle"}
  for key, value in kwargs.items():
    if key in keys:
      dics[key] = value
    else:
      print(f"{key} is invalid")
  if dics["target_fn"] == "":
    print("Input target for training ")
  return dics

In [None]:
def parse_args(cmd_args):

  args = EasyDict()

  args.out_dir = cmd_args['out_dir']
  args.height = cmd_args['height']
  args.width = cmd_args['width']
  args.target_fn = cmd_args['target_fn']
  args.n_triangle = cmd_args['n_triangle']
  args.loss_type = cmd_args['loss_type']
  args.alpha_scale = cmd_args['alpha_scale']
  args.coordinate_scale = cmd_args['coordinate_scale']
  args.fps = cmd_args['fps']
  args.n_population = cmd_args['n_population']
  args.n_iterations = cmd_args['n_iterations']
  args.mp_batch_size = cmd_args['mp_batch_size']
  args.solver = cmd_args['solver']
  args.report_interval = cmd_args['report_interval']
  args.step_report_interval = cmd_args['step_report_interval']
  args.save_as_gif_interval = cmd_args['save_as_gif_interval']
  args.painter = cmd_args['painter']

  return args

### Setup directory

In [None]:
def pre_training_loop(args):

    out_dir = args.out_dir
    os.makedirs(out_dir, exist_ok=True)
    assert os.path.isdir(out_dir)
    prev_ids = [re.match(r'^\d+', fn) for fn in os.listdir(out_dir)]
    new_id = 1 + max([-1] + [int(id_.group()) if id_ else -1 for id_ in prev_ids])
    desc = f'{os.path.splitext(os.path.basename(args.target_fn))[0]}-' \
           f'{args.n_triangle}-{args.painter}-' \
           f'{args.n_iterations}-iterations-' \
           f'{args.n_population}-population-' \
           f'{args.solver}-solver-' \
           f'{args.loss_type}-loss'
    args.working_dir = os.path.join(out_dir, f'{new_id:04d}-{desc}')

    os.makedirs(args.working_dir)
    args_dump_fn = os.path.join(args.working_dir, 'args.json')
    with open(args_dump_fn, 'w') as f:
        json.dump(args, f, indent=4)

In [None]:
def infer_height_and_width(hint_height, hint_width, fn):
  """
  calculate height and width of fn_image
  """
  fn_width, fn_height = Image.open(fn).size
  if hint_height <= 0:
      if hint_width <= 0:
          inferred_height, inferred_width = fn_height, fn_width  # use target image's size
      else:  # hint_width is valid
          inferred_width = hint_width
          inferred_height = hint_width * fn_height // fn_width
  else:  # hint_height is valid
      if hint_width <= 0:
          inferred_height = hint_height
          inferred_width = hint_height * fn_width // fn_height
      else:  # hint_width is valid
          inferred_height, inferred_width = hint_height, hint_width  # use hint size

  print(f'Inferring height and width. '
        f'Hint: {hint_height, hint_width}, File: {fn_width, fn_height}, Inferred: {inferred_height, inferred_width}')

  return inferred_height, inferred_width

In [None]:

def load_target(fn, resize):
    """
    load target image
    """
    img = Image.open(fn)
    img = rgba2rgb(img)
    h, w = resize
    img = img.resize((w, h), Image.LANCZOS)
    img_arr = img2arr(img)
    return img_arr

### Function for evaluating fitness

#### Fitness

In [None]:
def fitness_fn(params, painter, target_arr, loss_type):
    """
    evaluate fitness of solutions
    """
    NUM_ROLLOUTS = 3 # run 5 times
    losses = []
    for _ in range(NUM_ROLLOUTS):
        rendered_arr = painter.render(params)
        rendered_arr_rgb = rendered_arr[..., :3] ## origin shape is [a,b,4]
        rendered_arr_rgb = rendered_arr_rgb.astype(np.float32) / 255.

        target_arr_rgb = target_arr[..., :3]
        target_arr_rgb = target_arr_rgb.astype(np.float32) / 255.

        if loss_type == 'l2':
            pixelwise_l2_loss = (rendered_arr_rgb - target_arr_rgb)**2
            l2_loss = pixelwise_l2_loss.mean()
            loss = l2_loss
        elif loss_type == 'l1':
            pixelwise_l1_loss = np.abs(rendered_arr_rgb - target_arr_rgb)
            l1_loss = pixelwise_l1_loss.mean()
            loss = l1_loss
        else:
            raise ValueError(f'Unsupported loss type \'{loss_type}\'')
        losses.append(loss)

    return -np.mean(losses)  # pgpe *maximizes*


#### For multiprocessing

In [None]:
worker_assets = None

#Not need if dont run multi Processing
def init_worker(painter, target_arr, loss_type):
  global worker_assets
  worker_assets = {'painter': painter, 'target_arr': target_arr, 'loss_type': loss_type}

In [None]:
def fitness_fn_by_worker(params):
  """
  Not need if dont run multi processing
  evaluate fitness 
  """
  global worker_assets
  painter = worker_assets['painter']
  target_arr = worker_assets['target_arr']
  loss_type = worker_assets['loss_type']

  return fitness_fn(params, painter, target_arr, loss_type)

In [None]:

def batch_fitness_fn_by_workers(params_batch):
  """
  Not need if dont run multi processing
  train batch by batch,would prevent the rendered canvas from overfitting and increase the stability in the optimization
  """
  return [fitness_fn_by_worker(params) for params in params_batch]

### Main function

In [None]:
def training_loop(args):
    height, width = infer_height_and_width(args.height, args.width, args.target_fn)

    allowed_painter = ["triangle","rectangle"]
    if args.painter not in allowed_painter:
      raise ValueError(f'Only following solver(s) is/are supported: {allowed_painter}')

    painter = None
    if args.painter == 'triangle':
      painter = TrianglesPainter(
          h=height,
          w=width,
          n_triangle=args.n_triangle,
          alpha_scale=args.alpha_scale,
          coordinate_scale=args.coordinate_scale,
      )
    elif args.painter == 'rectangle':
      painter = RectanglesPainter(
          h=height,
          w=width,
          n_triangle=args.n_triangle,
          alpha_scale=args.alpha_scale,
          coordinate_scale=args.coordinate_scale,)
    else:
        raise ValueError()


    target_arr = load_target(args.target_fn, (height, width))
    save_as_png(os.path.join(args.working_dir, 'target'), arr2img(target_arr))

    # hooks use to save gif, image step, cost ... of canvas
    hooks = [
        (args.step_report_interval, PrintStepHook()),
        (args.report_interval, PrintCostHook()),
        (args.report_interval, SaveCostHook(save_fp=os.path.join(args.working_dir, 'cost.txt'))),
        (
            args.report_interval,
            StoreImageHook(
                render_fn=lambda params: painter.render(params, background='white'),
                save_fp=os.path.join(args.working_dir, 'animate-background=white'),
                fps=args.fps,
                save_interval=args.save_as_gif_interval,
            ),
        ),
        (args.report_interval, ShowImageHook(render_fn=lambda params: painter.render(params, background='white'))),
    ]

    allowed_solver = ['pgpe']
    if args.solver not in allowed_solver:
        raise ValueError(f'Only following solver(s) is/are supported: {allowed_solver}')

    solver = None
    if args.solver == 'pgpe':
        solver = PGPE(
            solution_length=painter.n_params,
            popsize=args.n_population,
            optimizer='clipup',
            optimizer_config={'max_speed': 0.15},
        )
    else:
        raise ValueError()

    tell_fn = get_tell_fn(args.solver)
    best_params_fn = get_best_params_fn(args.solver)
    loss_type = args.loss_type
    # fitnesses_fn is OK to be inefficient as it's for hook's use only.
    fitnesses_fn = lambda fitness_fn, solutions: [fitness_fn(_, painter, target_arr, loss_type) for _ in solutions]
    n_iterations = args.n_iterations
    mp_batch_size = args.mp_batch_size
    proc_pool = mp.Pool(processes=mp.cpu_count(), initializer=init_worker, initargs=(painter, target_arr, loss_type))

    shape = np.array([height,width])
    step_save_param = 500 ###
    file_save_npz = os.path.join(args.working_dir, 'param')
    for i in range(1, 1 + n_iterations):
        solutions = solver.ask()

        batch_it = (solutions[start:start + mp_batch_size] for start in range(0, len(solutions), mp_batch_size))
        batch_output = proc_pool.imap(func=batch_fitness_fn_by_workers, iterable=batch_it)
        fitnesses = [item for batch in batch_output for item in batch]

        tell_fn(solver, solutions, fitnesses)
        

        
        for hook in hooks:
            trigger_itervel, hook_fn_or_obj = hook
            if i % trigger_itervel == 0:
                hook_fn_or_obj(i, solver, fitness_fn, fitnesses_fn, best_params_fn)
    
        if i % step_save_param == 0:
            param = best_params_fn(solver)
            np.savez(file_save_npz,shape,param)
  
    for hook in hooks:
        _, hook_fn_or_obj = hook
        if hasattr(hook_fn_or_obj, 'close') and callable(hook_fn_or_obj.close):
            hook_fn_or_obj.close()

    proc_pool.close()
    proc_pool.join()


tic = time.perf_counter()
dics_args = parse_dics_args(target_fn = "/content/chip-resize.jpg",out_dir="/content/drive/MyDrive/GA/test",n_iterations = 10000,n_triangle=100,
                            painter='triangle',report_interval=100,step_report_interval=100,save_as_gif_interval=100) ###
args = parse_args(dics_args)
pre_training_loop(args)


training_loop(args)

toc = time.perf_counter()
print(toc-tic)