### osu!nn #7: GAN map flow generator

Generate a nice map using a GAN and the data we have gathered until now.

Synthesis of "flowData"
* training_flowData x 10 ~ 99 (Quality: 60+)
* rhythmData x 1
* momentumData x 1
* (Discriminator) x 1
* (Generator) x 1

Synthesis Time: ~15 mins

Last edit: 2018/8/16

First of all, let's welcome -

## Cute Sophie!!

<img src="https://ar3.moe/files/sophie.jpg" />

In the previous notebook we have predicted... or estimated our rhythm, and now we will be trying to create the new map by imitating the existing dataset, using a Generative Adversial Network (GAN).

GAN is, kind of, hard to train; and I personally felt the pain when I downloaded 10+ github repos where no one worked. While all of those code are based on MNIST, they either do not train any image that looks good, or simply fail to run.

As a consequence, I coded this notebook using tf.contrib.eager and tf.keras - took a while to find out how to modify that loss function. The biggest obstacle was that all the tensors are one dimension higher than the data, which was the batch (first dimension).

tf.contrib.eager is also said to be a pretty new feature in Tensorflow. ~~Idk how to write the other style of code using sessions without getting error anyways,~~ so make sure Tensorflow has the right version. My env is tensorflow v1.9.0. On Win10, python3.5, no cuda.

## Setup

In [1]:
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os, re, subprocess, json
from datetime import datetime

# this line below can only run once in the notebook! otherwise it will cause errors
tf.enable_eager_execution();
tfe = tf.contrib.eager;

Import the rhythm data.

In [2]:
def read_npz(fn):
    with np.load(fn) as data:
        objs = data["objs"];
        obj_indices = [i for i,k in enumerate(objs) if k == 1];
        predictions = data["predictions"];
        momenta = data["momenta"];
        ticks = data["ticks"];
        timestamps = data["timestamps"];
        sv = data["sv"];
        dist_multiplier = data["dist_multiplier"];
    return objs, obj_indices, predictions, momenta, ticks, timestamps, sv, dist_multiplier;

unfiltered_objs, obj_indices, unfiltered_predictions, unfiltered_momenta, unfiltered_ticks, unfiltered_timestamps, sv, dist_multiplier = read_npz("rhythm_data.npz");

first_step_objs =        unfiltered_objs[obj_indices];
first_step_predictions = unfiltered_predictions[obj_indices];
first_step_momenta =     unfiltered_momenta[obj_indices];
first_step_ticks =       unfiltered_ticks[obj_indices];
first_step_timestamps =  unfiltered_timestamps[obj_indices];

momentum_multiplier = 1.0;
angular_momentum_multiplier = 1.0;

first_step_is_slider = first_step_predictions[:, 2];
first_step_is_spinner = first_step_predictions[:, 3];
first_step_is_sliding = first_step_predictions[:, 4];
first_step_is_spinning = first_step_predictions[:, 5];

In [3]:
# convert notes with is_slider flag to sliders
# if there is next note, slide to next note
# else, slide for 4 ticks

# Problems:
# - how to deal with momenta? (change the slider speed! which is obviously not good)
# - do we use AM?
# - do we use is_sliding?
# - do we use the slider model? it's heavily overfit... (try with some other dataset, other than sota!!!)
# - does the shape cause overlapping? add some penalty loss? let it learn from classifier? ...
# - and many more!
skip_this = False;
new_obj_indices = [];
slider_ticks = [];
for i in range(len(first_step_objs)):
    if skip_this:
        first_step_is_slider[i] = 0;
        skip_this = False;
        continue;
    if first_step_is_slider[i]: # this one is a slider!!
        if i == first_step_objs.shape[0]-1: # Last Note.
            new_obj_indices.append(i);
            slider_ticks.append(4);
            continue;
        if first_step_ticks[i+1] >= first_step_ticks[i] + 5: # too long! end here
            new_obj_indices.append(i);
            slider_ticks.append(4);
        else:
            skip_this = True;
            new_obj_indices.append(i);
            slider_ticks.append(max(1, first_step_ticks[i+1] - first_step_ticks[i]));
    else: # not a slider!
        new_obj_indices.append(i);
        slider_ticks.append(0);

# Filter the removed objects out!
objs =        first_step_objs[new_obj_indices];
predictions = first_step_predictions[new_obj_indices];
momenta =     first_step_momenta[new_obj_indices];
ticks =       first_step_ticks[new_obj_indices];
timestamps =  first_step_timestamps[new_obj_indices];
is_slider =   first_step_is_slider[new_obj_indices];
is_spinner =  first_step_is_spinner[new_obj_indices];
is_sliding =  first_step_is_sliding[new_obj_indices];
is_spinning = first_step_is_spinning[new_obj_indices];
slider_ticks = np.array(slider_ticks);

# should be slider length each tick, which is usually SV * SMP * 100 / 4
# e.g. SV 1.6, timing section x1.00, 1/4 divisor, then slider_length_base = 40
slider_length_base = sv // 4;

# these data must be kept consistent with the sliderTypes in load_map.js
slider_types = np.random.randint(0, 3, is_slider.shape).astype(int); # needs to determine the slider types!! also it is 5!!!
slider_type_rotation = np.array([0, -0.40703540572409336, 0.40703540572409336, -0.20131710837464062, 0.20131710837464062]);
slider_cos = np.cos(slider_type_rotation);
slider_sin = np.sin(slider_type_rotation);

# this is vector length! I should change the variable name probably...
slider_type_length = np.array([1.0, 0.97, 0.97, 0.97, 0.97]);

slider_lengths = np.array([slider_type_length[int(k)] * slider_length_base[i] for i, k in enumerate(slider_types)]) * slider_ticks;

# print(slider_lengths.shape)
# print(timestamps.shape)

In [4]:
timestamps_plus_1 = np.concatenate([timestamps[1:], timestamps[-1:] + (timestamps[-1:] - timestamps[-2:-1])])
timestamps_after = timestamps_plus_1 - timestamps;
timestamps_before = np.concatenate([[4777], timestamps_after[:-1]]); # why 4777????

note_distances = timestamps_before * momenta[:, 0] * momentum_multiplier;
note_angles = timestamps_before * momenta[:, 1] * angular_momentum_multiplier;

In [5]:
is_slider = predictions[:, 2];
is_sliding = predictions[:, 4];
#print(is_slider * is_sliding - is_slider); # is all 0!!
# print(is_slider * is_sliding);

Some line graph plotting functions found on the web.

Unfortunately I forgot where it was found, so let's assume stackoverflow.

In [6]:
import matplotlib.pyplot as plt
import matplotlib.lines as lines
import matplotlib.transforms as mtransforms
import matplotlib.text as mtext


class MyLine(lines.Line2D):
    def __init__(self, *args, **kwargs):
        # we'll update the position when the line data is set
        self.text = mtext.Text(0, 0, '')
        lines.Line2D.__init__(self, *args, **kwargs)

        # we can't access the label attr until *after* the line is
        # inited
        self.text.set_text(self.get_label())

    def set_figure(self, figure):
        self.text.set_figure(figure)
        lines.Line2D.set_figure(self, figure)

    def set_axes(self, axes):
        self.text.set_axes(axes)
        lines.Line2D.set_axes(self, axes)

    def set_transform(self, transform):
        # 2 pixel offset
        texttrans = transform + mtransforms.Affine2D().translate(2, 2)
        self.text.set_transform(texttrans)
        lines.Line2D.set_transform(self, transform)

    def set_data(self, x, y):
        if len(x):
            self.text.set_position((x[-1], y[-1]))

        lines.Line2D.set_data(self, x, y)

    def draw(self, renderer):
        # draw my label at the end of the line with 2 pixel offset
        lines.Line2D.draw(self, renderer)
        self.text.draw(renderer)

Load the data for classifier.

In [7]:
import os;
import numpy as np;

root = ".\\mapdata";

chunk_size = 10;
step_size = 5;
divisor = 4;

max_x = 512;
max_y = 384;

# "TICK", "TIME", "TYPE", "X", "Y", "IN_DX", "IN_DY", "OUT_DX", "OUT_DY"
def read_map_npz(file_path):
    with np.load(file_path) as data:
        flow_data = data["flow"];
    return flow_data;

# TICK, TIME, TYPE, X, Y, IN_DX, IN_DY, OUT_DX, OUT_DY
def read_maps():
    result = [];
    for file in os.listdir(root):
        if file.endswith(".npz"):
            #print(os.path.join(root, file));
            flow_data = read_map_npz(os.path.join(root, file));
            for i in range(0, (flow_data.shape[0] - chunk_size) // step_size):
                chunk = flow_data[i * step_size:i * step_size + chunk_size];
                result.append(chunk);
                
    # normalize the TICK col and remove TIME col
    result = np.array(result)[:, :, [0, 2, 3, 4, 5, 6, 7, 8, 9, 10]];
    result[:, :, 0] %= divisor;
    result[:, :, 2] /= max_x;
    result[:, :, 3] /= max_y;
    result[:, :, 8] /= max_x;
    result[:, :, 9] /= max_y;
    
    # TICK, TYPE, X, Y, IN_DX, IN_DY, OUT_DX, OUT_DY, END_X, END_Y
    # only use X,Y,OUT_DX,OUT_DY,END_X,END_Y
    result = np.array(result)[:, :, [2, 3, 6, 7, 8, 9]];
    return result;

# The default dataset so people don't have to come up with a whole dataset to use this.
# To save the flow data to a flow_dataset.npz, it is simple - just run the following after reading maps:
# np.savez_compressed("flow_dataset", maps = maps);
try:
    maps = read_maps();
    labels = np.ones(maps.shape[0]);
except:
    with np.load("flow_dataset.npz") as flow_dataset:
        maps = flow_dataset["maps"];
        labels = np.ones(maps.shape[0]);

order2 = np.argsort(np.random.random(maps.shape[0]));
special_train_data = maps[order2];
special_train_labels = labels[order2];
# order3 = np.argsort(np.random.random(false_maps.shape[0]));
# special_false_data = false_maps[order2];
# special_false_labels = false_labels[order2];


Define the classifier model.

The model structure can be probably optimized... while I currently have no good idea about this.

In [8]:
from tensorflow import keras;

def build_classifier_model():
    model = keras.Sequential([
        keras.layers.SimpleRNN(64, input_shape=(special_train_data.shape[1], special_train_data.shape[2])),
        keras.layers.Dense(64),# activation=tf.nn.elu, input_shape=(train_data.shape[1],)),
        keras.layers.Dense(64, activation=tf.nn.relu),
        keras.layers.Dense(64, activation=tf.nn.tanh),
        keras.layers.Dense(64, activation=tf.nn.relu),
        keras.layers.Dense(1, activation=tf.nn.tanh),
        keras.layers.Lambda(lambda x: (x+1)/2, output_shape=(1,)),
    ])

    optimizer = tf.train.AdamOptimizer(0.001) #Adamoptimizer?

    model.compile(loss='mse',
                optimizer=optimizer,
                metrics=['mae'])
    return model


Define some sort of plotting functions to show the generator and discriminator losses.

In [9]:
import matplotlib.pyplot as plt


def plot_history(history):
    plt.figure()
    plt.xlabel('Epoch')
    plt.ylabel('Mean Abs Error [Limitless]')
    plt.plot(history.epoch, np.array(history.history['loss']), 
           label='Train Loss')
    plt.plot(history.epoch, np.array(history.history['val_loss']),
           label = 'Val loss')
    plt.legend()
    plt.show()

early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=20)

# actual_train_data = np.concatenate((special_train_data[0:1], special_false_data[0:1]), axis=0);
# actual_train_labels = np.concatenate((special_train_labels[0:1], special_false_labels[0:1]), axis=0);

# history = classifier_model.fit(actual_train_data, actual_train_labels, epochs=1,
#                     validation_split=0.2, verbose=0,
#                     callbacks=[])

# plot_history(history)

Functions for map generation.

In [10]:
# A regularizer to keep the map inside the box.
# It's so the sliders and notes don't randomly fly out of the screen!
def inblock_loss(vg):
    #priority = tf.constant(list(k**2 for k in range(vg.shape[0], 0, -1)))
    wall_var_l = tf.to_float(tf.less(vg, 0.2)) * tf.square(0.3 - vg);
    wall_var_r = tf.to_float(tf.greater(vg, 0.8)) * tf.square(vg - 0.7);
#     wall_var_ins = -1 * tf.to_float(tf.greater(vg, 0.2)) * tf.to_float(tf.less(vg, 0.8)) * tf.square(vg - 0.5);
    return tf.reduce_mean(tf.reduce_mean(wall_var_l + wall_var_r, axis=2), axis=1);

def inblock_trueness(vg):
    #priority = tf.constant(list(k**2 for k in range(vg.shape[0], 0, -1)))
    wall_var_l = tf.to_float(tf.less(vg, 0));
    wall_var_r = tf.to_float(tf.greater(vg, 1));
#     wall_var_ins = -1 * tf.to_float(tf.greater(vg, 0.2)) * tf.to_float(tf.less(vg, 0.8)) * tf.square(vg - 0.5);
    return tf.reduce_mean(tf.reduce_mean(wall_var_l + wall_var_r, axis=2), axis=1);

def cut_map_chunks(c):
    r = [];
    for i in range(0, (c.shape[0] - chunk_size) // step_size):
        chunk = c[i * step_size:i * step_size + chunk_size];
        r.append(chunk);
    return tf.stack(r);

# This function is kind of unused now - should be!
# TODO: delete this.
def construct_map(var_tensor):
    out = [];
    cp = tf.constant([0.5, 0.5]);
    l = 0.3;
    cos_list = l* tf.cos(var_tensor * 6.283);
    sin_list = l* tf.sin(var_tensor * 6.283);
    for k, _ in enumerate(cos_list):
        cp = tf.add(cp, tf.stack([cos_list[k], sin_list[k]]));
        out.append(cp);
    return tf.stack(out, axis=0);

# This function is kind of unused now - I didn't delete it for it could be a backup.
# TODO: delete this.
def construct_map_without_sliders(var_tensor, extvar={}):
    
    var_tensor = tf.to_float(var_tensor);
    wall_l = 0.15;
    wall_r = 0.85;
    x_max = 512;
    y_max = 384;
    out = [];
    cp = tf.constant([256, 192, 0, 0]);
    phase = 0;
    half_tensor = var_tensor.shape[1]//4;
    
    # length multiplier
    if "length_multiplier" in extvar:
        length_multiplier = extvar["length_multiplier"];
    else:
        length_multiplier = 1;

    # notedists
    if "begin" in extvar:
        begin_offset = extvar["begin"];
    else:
        begin_offset = 0;
    batch_size = var_tensor.shape[0];
    note_distances_now = length_multiplier * np.tile(np.expand_dims(note_distances[begin_offset:begin_offset+half_tensor], axis=0), (batch_size, 1));
    note_angles_now = np.tile(np.expand_dims(note_angles[begin_offset:begin_offset+half_tensor], axis=0), (batch_size, 1));

    # init
    l = tf.convert_to_tensor(note_distances_now, dtype="float32");
    sl = l * 0.7;
    sr = tf.convert_to_tensor(note_angles_now, dtype="float32");
#     ntensor_list = ((var_tensor - 0.5)) * 2 * 6.283;
    cos_list = var_tensor[:, 0:half_tensor * 2];
    sin_list = var_tensor[:, half_tensor * 2:];
    len_list = tf.sqrt(tf.square(cos_list) + tf.square(sin_list));
    cos_list = cos_list / len_list;
    sin_list = sin_list / len_list;
#     cos_list2 = tf.cos(var_tensor * 6.283 + phase);
#     sin_list2 = tf.sin(var_tensor * 6.283 + phase);
    wall_l = 0.05 * x_max + l * 0.5;
    wall_r = 0.95 * x_max - l * 0.5;
    wall_t = 0.05 * y_max + l * 0.5;
    wall_b = 0.95 * y_max - l * 0.5;
    rerand = tf.to_float(tf.greater(l, y_max / 2));
    not_rerand = tf.to_float(tf.less_equal(l, y_max / 2));

    # generate
    if "start_pos" in extvar:
        _px, _py = extvar["start_pos"];
    else:
        _px = 256;
        _py = 192;
    _pa = 0;
    # this is not important since the first position starts at _ppos + Δpos
    _x = 256;
    _y = 192;
    for k in range(half_tensor):
        # r_max = 192, r = 192 * k, theta = k * 10
        rerand_x = 256 + 256 * var_tensor[:, k];
        rerand_y = 192 + 192 * var_tensor[:, k + half_tensor*2];

        delta_value_x = l[:, k] * cos_list[:, k];
        delta_value_y = l[:, k] * sin_list[:, k];

        wall_value_l = tf.to_float(tf.less(_px, wall_l[:, k]));
        wall_value_r = tf.to_float(tf.greater(_px, wall_r[:, k]));
        wall_value_xmid = tf.to_float(tf.greater(_px, wall_l[:, k])) * tf.to_float(tf.less(_px, wall_r[:, k]));
        wall_value_t = tf.to_float(tf.less(_py, wall_t[:, k]));
        wall_value_b = tf.to_float(tf.greater(_py, wall_b[:, k]));
        wall_value_ymid = tf.to_float(tf.greater(_py, wall_t[:, k])) * tf.to_float(tf.less(_py, wall_b[:, k]));

        x_delta = tf.abs(delta_value_x) * wall_value_l - tf.abs(delta_value_x) * wall_value_r + delta_value_x * wall_value_xmid;
        y_delta = tf.abs(delta_value_y) * wall_value_t - tf.abs(delta_value_y) * wall_value_b + delta_value_y * wall_value_ymid;

        _x = rerand[:, k] * rerand_x + not_rerand[:, k] * (_px + x_delta);
        _y = rerand[:, k] * rerand_y + not_rerand[:, k] * (_py + y_delta);
#         _x = _px + x_delta;
#         _y = _py + y_delta;
        
        # calculate output vector
        _a = rerand[:, k] * cos_list[:, k + half_tensor] + not_rerand[:, k] * cos_list[:, k];
        _b = rerand[:, k] * sin_list[:, k + half_tensor] + not_rerand[:, k] * sin_list[:, k];
        cp = tf.transpose(tf.stack([_x / x_max, _y / y_max, _a, _b]));
        out.append(cp);
        _px = _x;
        _py = _y;
        _pa = tf.mod(_pa + 0, 6.283);
    return tf.transpose(tf.stack(out, axis=0), [1, 0, 2]);

def construct_map_with_sliders(var_tensor, extvar=[]):

    var_tensor = tf.to_float(var_tensor);
    wall_l = 0.15;
    wall_r = 0.85;
    x_max = 512;
    y_max = 384;
    out = [];
    cp = tf.constant([256, 192, 0, 0]);
    phase = 0;
    half_tensor = var_tensor.shape[1]//4;
    
    # length multiplier
    if "length_multiplier" in extvar:
        length_multiplier = extvar["length_multiplier"];
    else:
        length_multiplier = 1;

    # notedists
    if "begin" in extvar:
        begin_offset = extvar["begin"];
    else:
        begin_offset = 0;
    batch_size = var_tensor.shape[0];
    note_distances_now = length_multiplier * np.tile(np.expand_dims(note_distances[begin_offset:begin_offset+half_tensor], axis=0), (batch_size, 1));
    note_angles_now = np.tile(np.expand_dims(note_angles[begin_offset:begin_offset+half_tensor], axis=0), (batch_size, 1));

    # init
    l = tf.convert_to_tensor(note_distances_now, dtype="float32");
    sl = l * 0.7;
    sr = tf.convert_to_tensor(note_angles_now, dtype="float32");
#     ntensor_list = ((var_tensor - 0.5)) * 2 * 6.283;
    cos_list = var_tensor[:, 0:half_tensor * 2];
    sin_list = var_tensor[:, half_tensor * 2:];
    len_list = tf.sqrt(tf.square(cos_list) + tf.square(sin_list));
    cos_list = cos_list / len_list;
    sin_list = sin_list / len_list;
#     cos_list2 = tf.cos(var_tensor * 6.283 + phase);
#     sin_list2 = tf.sin(var_tensor * 6.283 + phase);
    wall_l = 0.05 * x_max + l * 0.5;
    wall_r = 0.95 * x_max - l * 0.5;
    wall_t = 0.05 * y_max + l * 0.5;
    wall_b = 0.95 * y_max - l * 0.5;
    rerand = tf.to_float(tf.greater(l, y_max / 2));
    not_rerand = tf.to_float(tf.less_equal(l, y_max / 2));

    # generate
    if "start_pos" in extvar:
        _px, _py = extvar["start_pos"];
    else:
        _px = 256;
        _py = 192;
    _pa = 0;
    # this is not important since the first position starts at _ppos + Δpos
    _x = 256;
    _y = 192;
    for k in range(half_tensor):
        note_index = begin_offset + k;
        # r_max = 192, r = 192 * k, theta = k * 10
        rerand_x = 256 + 256 * var_tensor[:, k];
        rerand_y = 192 + 192 * var_tensor[:, k + half_tensor*2];

        delta_value_x = l[:, k] * cos_list[:, k];
        delta_value_y = l[:, k] * sin_list[:, k];

        # It is tensor calculation batched 8~32 each call, so if/else do not work here.
        wall_value_l = tf.to_float(tf.less(_px, wall_l[:, k]));
        wall_value_r = tf.to_float(tf.greater(_px, wall_r[:, k]));
        wall_value_xmid = tf.to_float(tf.greater(_px, wall_l[:, k])) * tf.to_float(tf.less(_px, wall_r[:, k]));
        wall_value_t = tf.to_float(tf.less(_py, wall_t[:, k]));
        wall_value_b = tf.to_float(tf.greater(_py, wall_b[:, k]));
        wall_value_ymid = tf.to_float(tf.greater(_py, wall_t[:, k])) * tf.to_float(tf.less(_py, wall_b[:, k]));

        x_delta = tf.abs(delta_value_x) * wall_value_l - tf.abs(delta_value_x) * wall_value_r + delta_value_x * wall_value_xmid;
        y_delta = tf.abs(delta_value_y) * wall_value_t - tf.abs(delta_value_y) * wall_value_b + delta_value_y * wall_value_ymid;

        _x = rerand[:, k] * rerand_x + not_rerand[:, k] * (_px + x_delta);
        _y = rerand[:, k] * rerand_y + not_rerand[:, k] * (_py + y_delta);
#         _x = _px + x_delta;
#         _y = _py + y_delta;
        
        # calculate output vector
        if is_slider[note_index]:
            sln = slider_lengths[note_index];
            slider_type = slider_types[note_index];
            scos = slider_cos[slider_type];
            ssin = slider_sin[slider_type];
            _a = cos_list[:, k + half_tensor];
            _b = sin_list[:, k + half_tensor];
            # cos(a+θ) = cosa cosθ - sina sinθ
            # sin(a+θ) = cosa sinθ + sina cosθ
            _oa = _a * scos - _b * ssin;
            _ob = _a * ssin + _b * scos;
            cp = tf.transpose(tf.stack([_x / x_max, _y / y_max, _oa, _ob, (_x + _a * sln) / x_max, (_y + _b * sln) / y_max]));
            out.append(cp);
            _px = _x + _a * sln;
            _py = _y + _b * sln;
        else:
            _a = rerand[:, k] * cos_list[:, k + half_tensor] + not_rerand[:, k] * cos_list[:, k];
            _b = rerand[:, k] * sin_list[:, k + half_tensor] + not_rerand[:, k] * sin_list[:, k];
            cp = tf.transpose(tf.stack([_x / x_max, _y / y_max, _a, _b, _x / x_max, _y / y_max]));
            out.append(cp);
            _px = _x;
            _py = _y;
        _pa = tf.mod(_pa + 0, 6.283);
    return tf.transpose(tf.stack(out, axis=0), [1, 0, 2]);

def stack_loss(tensor):
    complex_list = tf.complex(tensor[:, :, 0] * 512, tensor[:, :, 1] * 384);
    stack_limit = 30;
    precise_limit = 1;
    a = [];
    for k in range(tensor.shape[1]):
        w = tf.tile(tf.expand_dims(complex_list[:, k], axis=1), [1, tensor.shape[1]]);
        r = tf.abs(w - complex_list);
        rless = tf.to_float(tf.less(r, stack_limit)) * tf.to_float(tf.greater(r, precise_limit));
        rmean = tf.reduce_mean(rless * (stack_limit - r) / stack_limit);
        a.append(rmean);
    b = tf.reduce_sum(a);
#         print(tf.tile(w, [1, tensor.shape[1]]));
#         print(complex_list);
    return b;

# This polygon loss was an attempt to make the map less likely to overlap each other.
# The idea is: calculate the area of polygon formed from the note positions;
# If it is big, then it is good - they form a convex shape, no overlap.
# ... of course it totally doesn't work like that.
def polygon_loss(tensor):
    tensor_this = tensor[:, :, 0:2];
    tensor_next = tf.concat([tensor[:, 1:, 0:2], tensor[:, 0:1, 0:2]], axis=1);
    sa = (tensor_this[:, :, 0] + tensor_next[:, :, 0]) * (tensor_next[:, :, 1] - tensor_this[:, :, 0]);
    surface = tf.abs(tf.reduce_sum(sa, axis=1))/2;
    return surface;

def construct_map_and_calc_loss(var_tensor, extvar):
    # first make a map from the outputs of generator, then ask the classifier (discriminator) to classify it
    classifier_model = extvar["classifier_model"]
    out = construct_map_with_sliders(var_tensor, extvar=extvar);
    cm = classifier_model(out);
    predmean = 1 - tf.reduce_mean(cm, axis=1);
#    regulator = tf.reduce_mean(tf.reduce_mean(- 0.1 * tf.square(out[:, :, 1:2] - 0.5), axis=2), axis=1); # * tf.square(out[:, :, 1:2] - 2)
    box_loss = inblock_loss(out[:, :, 0:2]);
    box_loss2 = inblock_loss(out[:, :, 4:6]);
#     polygon = polygon_loss(out);
    # print(out.shape); shape is (10, X, 4)
    #return predmean + box_loss*100;
    return predmean + box_loss + box_loss2;


Now we can train the model!

TODO: add more info here

TODO: strip variables group_note_count, good_maps_each_epoch, bad_maps_each_epoch, epoch_g, epoch_c, epoch_both_good, epoch_both_max

In [11]:
inputs  = tfe.Variable(tf.random_normal(shape=[40]))

Cs = []
loss_ma = [90, 90, 90];
extvar = {"begin": 10};

def plot_current_map(inputs):
    # plot it each epoch
    mp = construct_map_with_sliders(inputs, extvar=extvar);
    # to make it clearer
    npa = np.concatenate([[np.concatenate([extvar["start_pos"] / np.array([512, 384]), [0, 0]])], tf.stack(mp).numpy().squeeze()])
    fig, ax = plt.subplots()
    x, y = np.transpose(npa)[0:2]
    #x, y = np.random.rand(2, 20)
    line = MyLine(x, y, mfc='red', ms=12)
    line.text.set_color('red')
    line.text.set_fontsize(16)
    ax.add_line(line)
    plt.show()

def generative_model(in_params, out_params, loss_func):
    model = keras.Sequential([
        keras.layers.Dense(128, input_shape=(in_params,)),# activation=tf.nn.elu, input_shape=(train_data.shape[1],)),
        keras.layers.Dense(128, activation=tf.nn.relu),
        keras.layers.Dense(128, activation=tf.nn.tanh),
        keras.layers.Dense(128, activation=tf.nn.relu),
        keras.layers.Dense(out_params, activation=tf.nn.tanh)#,
#         keras.layers.Lambda(lambda x: (x+1)/2, output_shape=(out_params,))
    ])

    optimizer = tf.train.AdamOptimizer(0.002)

    model.compile(loss=loss_func,
                optimizer=optimizer,
                metrics=['mae'])
    return model

class PrintDot(keras.callbacks.Callback):
    def on_epoch_end(self,epoch,logs):
        if epoch % 100 == 0: print('')
        print('.', end='')

class PrintCross(keras.callbacks.Callback):
    def on_epoch_end(self,epoch,logs):
        if epoch % 100 == 0: print('')
        print('x', end='')


plot_noise = np.random.random((1, 50));

# Pre-fit classifier for 1 epoch
# history = classifier_model.fit(actual_train_data, actual_train_labels, epochs=1,
#                     validation_split=0.2, verbose=0,
#                     callbacks=[])



# we can train all the classifiers first, onto Epoch X [x = 1~10]
# then train the generators to fit to them
# to reduce some training time.
# but i think it doesn't work too well since it's the generator which is slow...

def generate_set(begin = 0, start_pos=[256, 192], group_id=-1, length_multiplier=1, plot_map=True):
    extvar["begin"] = begin;
    extvar["start_pos"] = start_pos;
    extvar["length_multiplier"] = length_multiplier;
    
    classifier_model = build_classifier_model()
    extvar["classifier_model"] = classifier_model;
    
    def loss_function_for_generative_model(y_true, y_pred):
        #print(y_pred.shape); #(?, 20)
        return construct_map_and_calc_loss(y_pred, extvar);
    
#     classifier_true_set_group = special_train_data[np.random.randint(0, special_train_data.shape[0], (500,))];
    
#     classifier_model.summary()
    gmodel = generative_model(50, 40, loss_function_for_generative_model);
#     gmodel.summary()
    max_epoch = 30
    good_epoch = 5

    for i in range(max_epoch):
        g_multiplier = 7;
        c_multiplier = 3;
        gnoise = np.random.random((50, 50));
        glabel = np.ones((50,))

        history = gmodel.fit(gnoise, glabel, epochs=g_multiplier,
                            validation_split=0.2, verbose=0,
                            callbacks=[]) #PrintDot()
        
        predicted_maps_data = gmodel.predict(np.random.random((10, 50)));
        new_false_maps = construct_map_with_sliders(tf.convert_to_tensor(predicted_maps_data, dtype="float32"), extvar=extvar).numpy();
        new_false_labels = np.zeros(10);
        
#         not_predicted_maps_data = np.random.random((10, 40));
    #     new_false_maps2 = construct_map_without_sliders(tf.convert_to_tensor(not_predicted_maps_data, dtype="float32")).numpy();
    #     new_false_labels2 = np.zeros(10);

        rn = np.random.randint(0, special_train_data.shape[0], (50,))
        actual_train_data = np.concatenate((new_false_maps, special_train_data[rn]), axis=0); #special_false_data[st:se], 
        actual_train_labels = np.concatenate((new_false_labels, special_train_labels[rn]), axis=0); #special_false_labels[st:se], 
    #     actual_train_data = special_train_data[st:se];
    #     actual_train_labels = special_train_labels[st:se];

        history2 = classifier_model.fit(actual_train_data, actual_train_labels, epochs=c_multiplier,
                            validation_split=0.2, verbose=0,
                            callbacks=[]) #PrintCross()

        g_loss = np.mean(history.history['loss']);
        c_loss = np.mean(history2.history['loss']);
        print("Group {}, Epoch {}: G loss: {} vs. C loss: {}".format(group_id, 1+i, g_loss, c_loss));
        res = gmodel.predict(plot_noise);
        if plot_map:
            plot_current_map(tf.convert_to_tensor(res));
        
        # early return if found a good solution
        if i >= good_epoch:
            current_map = construct_map_with_sliders(tf.convert_to_tensor(res, dtype="float32"), extvar=extvar);
            if inblock_trueness(current_map[:, :, 0:2]).numpy()[0] == 0 and inblock_trueness(current_map[:, :, 4:6]).numpy()[0] == 0:
                break;

#     plot_history(history);
#     plot_history(history2);
    if plot_map:
        for i in range(3): # it's the same anyways
            res = gmodel.predict(np.random.random((1, 50)));
#         print(construct_map_with_sliders(tf.convert_to_tensor(res), extvar=extvar).numpy().squeeze());
            plot_current_map(tf.convert_to_tensor(res));

    onoise = np.random.random((1, 50));
    
    return construct_map_with_sliders(tf.convert_to_tensor(gmodel.predict(onoise)), extvar=extvar).numpy().squeeze();

# yes! dist_multiplier in #6 is used here!!
def generate_map():
    o = [];
    pos = [np.random.randint(100, 412), np.random.randint(80, 304)];
    for i in range(timestamps.shape[0] // 10):
        z = generate_set(begin = i * 10, start_pos = pos, length_multiplier = dist_multiplier, group_id = i, plot_map=False) * np.array([512, 384, 1, 1, 512, 384]);
        pos = z[-1, 0:2];
        o.append(z);
    a = np.concatenate(o, axis=0);
    return a;

# yes! dist_multiplier in #6 is used here!!
def generate_test():
    o = [];
    pos = [384, 288];
    generate_set(begin = 30, start_pos = pos, length_multiplier = dist_multiplier, group_id = 3, plot_map=True);

# for debugging only! it should be sent to node load_map.js c instead.
def print_osu_text(a):
    for i, ai in enumerate(a):
        if not is_slider[i]:
            print("{},{},{},1,0,0:0:0".format(int(ai[0]), int(ai[1]), int(timestamps[i])));
        else:
            print("{},{},{},2,0,L|{}:{},1,{},0:0:0".format(int(ai[0]), int(ai[1]), int(timestamps[i]), int(round(ai[0] + ai[2] * slider_lengths[i])), int(round(ai[1] + ai[3] * slider_lengths[i])), int(slider_length_base[i] * slider_ticks[i])));
    
osu_a = generate_map();
# generate_test();

Group 0, Epoch 1: G loss: 0.27958315185138155 vs. C loss: 0.1292490524550279
Group 0, Epoch 2: G loss: 0.3746268285172326 vs. C loss: 0.20244843595557746
Group 0, Epoch 3: G loss: 0.10400864992822918 vs. C loss: 0.18398718370331657
Group 0, Epoch 4: G loss: 0.22418356708117895 vs. C loss: 0.18450145588980782
Group 0, Epoch 5: G loss: 0.30156519817454475 vs. C loss: 0.15179098976982963
Group 0, Epoch 6: G loss: 0.3019875896828515 vs. C loss: 0.21671335895856222
Group 1, Epoch 1: G loss: 0.3243919619492122 vs. C loss: 0.18815342916382685
Group 1, Epoch 2: G loss: 0.312267028433936 vs. C loss: 0.15512682580285603
Group 1, Epoch 3: G loss: 0.4788761164460863 vs. C loss: 0.14936687548955283
Group 1, Epoch 4: G loss: 0.24844115759645188 vs. C loss: 0.23109460208151075
Group 1, Epoch 5: G loss: 0.017349985986948015 vs. C loss: 0.20790315833356646
Group 1, Epoch 6: G loss: 0.012608666558350834 vs. C loss: 0.2049229939778646
Group 2, Epoch 1: G loss: 0.5207503395421165 vs. C loss: 0.23599143988

Now convert the generated flow data to a dict, and mix it into the JSON file converted from .osu.

In [12]:
# this needs to be JSON serializable, so we need to carefully convert the types.
# Numpy somehow got its own types like numpy.int64 that does not allow the data to be serialized to JSON.
def convert_to_osu_obj(obj_array):
    output = [];
    for i, obj in enumerate(obj_array):
        if not is_slider[i]: # is a circle; does not consider spinner for now.
            obj_dict = {
                "x": int(obj[0]),
                "y": int(obj[1]),
                "type": 1,
                "time": int(timestamps[i]),
                "hitsounds": 0,
                "extHitsounds": "0:0:0",
                "index": i
            };
        else:
            obj_dict = {
                "x": int(obj[0]),
                "y": int(obj[1]),
                "type": 2,
                "time": int(timestamps[i]),
                "hitsounds": 0,
                "extHitsounds": "0:0:0",
                "sliderGenerator": {
                    "type": int(slider_types[i]),
                    "dOut": [float(obj[2]), float(obj[3])],
                    "len": float(slider_length_base[i] * slider_ticks[i]),
                    "ticks": int(slider_ticks[i]),
                    "endpoint": [int(obj[4]), int(obj[5])]
                },
                "index": i
            };
        output.append(obj_dict);
    return output;

def get_osu_file_name(metadata):
    artist = metadata["artist"];
    title = metadata["title"];
    creator = metadata["creator"];
    diffname = metadata["diffname"];
    outname = (artist+" - " if len(artist) > 0 else "") + title + " (" + creator + ") [" + diffname + "].osu";
    outname = re.sub("[^a-zA-Z0-9\(\)\[\] \.\,\!\~\`\{\}\-\_\=\+\&\^\@\#\$\%\;\']","", outname);
    return outname;

osu_obj_array = convert_to_osu_obj(osu_a);

with open("mapthis.json", encoding="utf-8") as map_json:
    map_dict = json.load(map_json);
    map_meta = map_dict["meta"];
    filename = get_osu_file_name(map_meta);
    map_dict["obj"] = osu_obj_array;

with open('mapthis.json', 'w', encoding="utf-8") as outfile:
    json.dump(map_dict, outfile, ensure_ascii=False);

Finally, ask node.js to convert the JSON back to a .osu file.

In [13]:
subprocess.call(["node", "load_map.js", "c", "mapthis.json", filename]);
print("success! finished on: {}".format(datetime.now()));

success! finished on: 2018-08-17 00:25:54.916286


If it works alright, you should have a nice .osu file under the folder of these notebooks now!

If it does not work, please tell me the problem so probably I could fix it!

@kotri_lv204 / ar3sgice, 2018/8/16