In [1]:
# convolution based attempt of the game of life
import torch
import numpy as np
import cv2
import os
import gc
import tqdm
from functools import partial
torch.set_num_threads(2)
video_output_path = None #'ca-experiment.mp4'
in_channels = 3
out_channels = 3
groups = 1
batchsize = 1
fps = 120
length_seconds = 60
grid_resolution = (480, 270)
output_resolution = (1920, 1080)
rule = [2,3,3,3]

conv_out = None

In [2]:
def tocv2_img(src, shape):
    img = src.permute(0,2,3,1).mean(0).squeeze().cpu().numpy()
    img /= img.max()
    img = (255*img).astype('uint8')
    img = cv2.resize(img, shape, interpolation=cv2.INTER_NEAREST)
    return img

def set_seed(grid, seed, loc, index=None):
    shp = seed.shape[-2:]
    if index is None:
        grid[:, :, loc[0]:shp[0]+loc[0], loc[1]:shp[1]+loc[1]] = seed
    else:
        grid[index[0], index[1], loc[0]:shp[0]+loc[0], log[1]:shp[1]+loc[1]] = seed

def step(grid, kernels, rules, detach=True, circular_padding=True):
    global conv_out
    # pad input  
    pad = (kernels.shape[-2]/2, kernels.shape[-2]/2,
           kernels.shape[-1]/2, kernels.shape[-1]/2)
    inps = torch.nn.functional.pad(grid, pad)
    if circular_padding:
        # sides
        inps[:, :, 0:pad[0], pad[2]:-pad[3]] = grid[:, :, -pad[0]:, :]
        inps[:, :, -pad[1]:, pad[2]:-pad[3]] = grid[:, :, 0:pad[1]:, :]
        inps[:, :, pad[0]:-pad[1], 0:pad[2]] = grid[:, :, :, -pad[2]:]
        inps[:, :, pad[0]:-pad[1], -pad[3]:] = grid[:, :, :, 0:pad[3]:]
        # corners
        inps[:, :, 0:pad[0], 0:pad[2]] = grid[:, :, -pad[0]:, -pad[2]:]
        inps[:, :, -pad[1]:, 0:pad[2]] = grid[:, :, 0:pad[1]:, -pad[2]:]
        inps[:, :, 0:pad[0], -pad[3]:] = grid[:, :, -pad[0]:, 0:pad[3]]
        inps[:, :, -pad[1]:, -pad[3]:] = grid[:, :, 0:pad[1], 0:pad[3]:]

    # convolve grid with kernels
    conv_out = torch.conv2d(inps, kernels, groups=groups)
    
    # update grid from rules. this will do a logical or between the list of rules
    update = -grid
    for rule in rules:
        update = update + rule(conv_out, grid).float()
    grid = grid + update
    return grid

def game_of_life_rule(sum_grid, grid, rule=[2,3,3,3]):
    diff_grid = sum_grid - grid
    population_cond = (grid == 1)*((diff_grid >= rule[0])*(diff_grid <= rule[1]))
    reprod_cond = (grid == 0)*((sum_grid >= rule[2])*(sum_grid <= rule[3]))
    surviving_cells = population_cond + reprod_cond
    return surviving_cells

def buggy_game_of_life_rule(sum_grid, grid, rule=[2,3,3,3]):
    neighbours = sum_grid - grid
    return [((neighbours < rule[0]) + (neighbours > rule[1]), 0),
            ((neighbours == rule[0]) + (neighbours == rule[3]), 1)]


In [3]:
# the format is a bit weird, to conform to the minibatch processing API of the conv2d function. 
grid = torch.zeros((batchsize, in_channels, grid_resolution[-1], grid_resolution[-2]))
# convolution with this filter results in adding up the  number of ones in a 3x3 neighborhood
# dimensions are (input channels, output channels, height width)
def build_filters():
    sum_filter_3_3 = torch.ones(in_channels, out_channels, 3, 3)
    ind_sum_filter_3_3 = torch.stack([(i==j)*torch.ones(3, 3)
                                      for i in range(1, in_channels+1)
                                      for j in range(1, out_channels+1)]).reshape(in_channels, out_channels, 3, 3)
    two_channel_sum_filter_3_3 = ind_sum_filter_3_3.clone()
    two_channel_sum_filter_3_3[0, 1] = two_channel_sum_filter_3_3[0, 0].clone().bernoulli_(1.0/9.0)
    #two_channel_sum_filter_3_3[0, 0] = 1 - two_channel_sum_filter_3_3[0, 1]
    two_channel_sum_filter_3_3[1, 2] = two_channel_sum_filter_3_3[1, 1].clone().bernoulli_(1.0/9.0)
    #two_channel_sum_filter_3_3[1, 1] = 1 - two_channel_sum_filter_3_3[1, 2]
    two_channel_sum_filter_3_3[2, 0] = two_channel_sum_filter_3_3[2, 2].clone().bernoulli_(1.0/9.0)
    #two_channel_sum_filter_3_3[2, 2] = 1 - two_channel_sum_filter_3_3[2, 0]
    return [sum_filter_3_3, ind_sum_filter_3_3, two_channel_sum_filter_3_3]

# the seed as a sub image
seed = torch.tensor([[[0,1,1,0],
                      [1,1,0,0],
                      [0,1,0,0],
                      [0,0,0,0]],
                     [[0,0,0,0],
                      [0,1,1,0],
                      [0,0,1,1],
                      [0,0,1,0]],
                     [[0,0,0,0],
                      [0,0,1,0],
                      [0,0,1,1],
                      [0,1,1,0]]]).float()
seed = seed.repeat(batchsize, 1, 1, 1)

# initialize grid
set_seed(grid, seed, ((grid.shape[-2]-seed.shape[-2])/2, (grid.shape[-1]-seed.shape[-1])/2))

locs = np.random.uniform([8,8], [grid.shape[-2]-8, grid.shape[-1]-8], size=(1000,2)).astype('uint32')
for loc in locs:
    set_seed(grid, (torch.rand_like(seed) > 0.5).float(), loc)
    

kernels_list = build_filters()

gol_rule = partial(game_of_life_rule, rule=rule)

In [None]:
cv2.destroyAllWindows()
try:
    fourcc = cv2.VideoWriter_fourcc(*'H264')
    vw = cv2.VideoWriter(video_output_path, fourcc, fps, output_resolution)
except Exception as e:
    print e
    vw = None
        
if torch.cuda.is_available():
    grid = grid.cuda()
    kernels_list = [kernels.cuda() for kernels in kernels_list]
    
conv_out = grid.clone()
for i in tqdm.tqdm(range(int(fps*length_seconds))):
    # every once in a while, change the way neighbours are counted
    if i%47 == 1:
        kernels_list = build_filters()
        if torch.cuda.is_available():
            kernels_list = [kernels.cuda() for kernels in kernels_list]
    # every once in a while, drop random seeds at random locations
    if i%31 == 1:
        locs = np.random.uniform([8,8], [grid.shape[-2]-8, grid.shape[-1]-8], size=(3,2)).astype('uint32')
        for loc in locs:
            set_seed(grid, (torch.rand_like(seed) > 0.5).float(), loc)
    kernels = kernels_list[2]
    # convert to image with channels as last dimension, and desired output resolution
    grid_img = tocv2_img(grid, output_resolution)
    
    # show current grid state
    cv2.imshow('big bang orig', grid_img)
    cv2.waitKey(int(1000.0/fps))
    
    # write output
    if vw is not None:
        vw.write(grid_img)
    
    # step the cellular automaton
    grid = step(grid, kernels, (gol_rule,))
    
    # clean up memory
    gc.collect()
    torch.cuda.empty_cache()

vw.release()
cv2.destroyAllWindows()

  4%|▍         | 278/7200 [00:11<04:51, 23.74it/s]