Skip to content
This repository has been archived by the owner on Mar 11, 2021. It is now read-only.

Commit

Permalink
Export training brackets from Bigtable to GCS
Browse files Browse the repository at this point in the history
Useful when wanting a repeatable series of training sets independent
of the normal RL loop.

As part of this:

  - Flags to control size and sampling of training window
  - Provide random-access version of the two-queue mix
  - Use multiprocessing to speed up batch export
  - Remove individual progress logs when done
    (they're only interesting when incomplete)
  • Loading branch information
gitosaurus committed Apr 16, 2019
1 parent ef1b98b commit 6be6a89
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 17 deletions.
158 changes: 158 additions & 0 deletions batch_exporter.py
@@ -0,0 +1,158 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Copy Minigo training sets from table to GCS..
"""


import bisect
import math
import multiprocessing
import os
import tensorflow as tf
from absl import flags
from absl import app
from tqdm import tqdm
import bigtable_input
import utils


flags.DEFINE_bool('dry_run', False,
'If true, generate and print the windows, rather than export.')

flags.DEFINE_integer('starting_game', None,
'Export beginning with the window that follows this regular game')

flags.DEFINE_integer('training_games', 500000,
'Number of games to include in training window')

flags.DEFINE_integer('training_moves', 2**21,
'Number of moves to select from training window')

flags.DEFINE_float('training_fresh', 0.05,
'Fraction of fresh games in each new training window')

flags.DEFINE_integer('batch_size', 1024,
'How many TFRecords to pull through tf.Session at a time')

flags.DEFINE_string('output_prefix', 'gs://dtj-minigo-us-central1/tryit_',
'Name of output file to receive TFRecords')

flags.DEFINE_integer('concurrency', 4,
'Number of parallel subprocesses')

flags.DEFINE_integer('max_trainings', None,
'Process no more than this many training brackets')

FLAGS = flags.FLAGS


def training_series(cursor_r, cursor_c, mix, increment_fraction=0.05):
"""Given two end-cursors and a mix of games, produce a series of bounds.
"""
while (cursor_r - mix.games_r) >= 0 and (cursor_c - mix.games_c) >= 0:
yield (cursor_r - mix.games_r), cursor_r, (cursor_c - mix.games_c), cursor_c
cursor_r -= math.ceil(mix.games_r * increment_fraction)
cursor_c -= math.ceil(mix.games_c * increment_fraction)


def _export_training_set(args):
spec, start_r, start_c, mix, batch_size, output_url = args
gq_r = bigtable_input.GameQueue(spec.project, spec.instance, spec.table)
gq_c = bigtable_input.GameQueue(spec.project, spec.instance, spec.table + '-nr')
total_moves = mix.moves_r + mix.moves_c

with tf.Session() as sess:
ds = bigtable_input.get_unparsed_moves_from_games(gq_r, gq_c,
start_r, start_c,
mix)
ds = ds.batch(batch_size)
iterator = ds.make_initializable_iterator()
sess.run(iterator.initializer)
get_next = iterator.get_next()
writes = 0
print('Writing to', output_url)
with tf.io.TFRecordWriter(
output_url,
options=tf.io.TFRecordCompressionType.ZLIB) as wr:
log_filename = '/tmp/{}_{}.log'.format(start_r, start_c)
with open(log_filename, 'w') as progress_file:
with tqdm(desc='Records', unit_scale=2, total=total_moves,
file=progress_file) as pbar:
while True:
try:
batch = sess.run(get_next)
pbar.update(len(batch))
for b in batch:
wr.write(b)
writes += 1
if (writes % 10000) == 0:
wr.flush()
except tf.errors.OutOfRangeError:
break
os.unlink(log_filename)


def main(argv):
"""Main program.
"""
del argv # Unused
total_games = FLAGS.training_games
total_moves = FLAGS.training_moves
fresh = FLAGS.training_fresh
batch_size = FLAGS.batch_size
output_prefix = FLAGS.output_prefix

spec = bigtable_input.BigtableSpec(
FLAGS.cbt_project,
FLAGS.cbt_instance,
FLAGS.cbt_table)
gq_r = bigtable_input.GameQueue(spec.project, spec.instance, spec.table)
gq_c = bigtable_input.GameQueue(spec.project, spec.instance, spec.table + '-nr')

mix = bigtable_input.mix_by_decile(total_games, total_moves, 9)
trainings = [(spec, start_r, start_c,
mix, batch_size,
'{}{:0>10}_{:0>10}.tfrecord.zz'.format(output_prefix, start_r, start_c))
for start_r, finish_r, start_c, finish_c
in reversed(list(training_series(gq_r.latest_game_number,
gq_c.latest_game_number,
mix,
fresh)))]

if FLAGS.starting_game:
game = FLAGS.starting_game
starts = [t[1] for t in trainings]
where = bisect.bisect_left(starts, game)
trainings = trainings[where:]

if FLAGS.max_trainings:
trainings = trainings[:FLAGS.max_trainings]

# TODO: have a --dry_run to review
if FLAGS.dry_run:
for t in trainings:
print(t)
raise SystemExit

concurrency = min(FLAGS.concurrency, multiprocessing.cpu_count() * 2)
with tqdm(desc='Training Sets', unit_scale=2, total=len(trainings)) as pbar:
for b in utils.iter_chunks(concurrency, trainings):
with multiprocessing.Pool(processes=concurrency) as pool:
pool.map(_export_training_set, b)
pbar.update(len(b))


if __name__ == '__main__':
app.run(main)
99 changes: 82 additions & 17 deletions bigtable_input.py
Expand Up @@ -92,6 +92,15 @@
['project', 'instance', 'table'])


# Information needed to create a mix of two Game queues.
# r = resign/regular; c = calibration (no-resign)
GameMix = collections.namedtuple(
'GameMix',
['games_r', 'moves_r',
'games_c', 'moves_c',
'selection'])


def cbt_intvalue(value):
"""Decode a big-endian uint64.
Expand Down Expand Up @@ -599,6 +608,33 @@ def set_fresh_watermark(game_queue, count_from, window_size,
game_queue.require_fresh_games(num_to_play)


def mix_by_decile(games, moves, deciles=9):
"""Compute a mix of regular and calibration games by decile.
deciles should be an integer between 0 and 10 inclusive.
"""
assert 0 <= deciles <= 10
# The prefixes and suffixes below have the following meanings:
# ct_: count
# fr_: fraction
# _r: resign (ordinary)
# _nr: no-resign
lesser = 10 - math.floor(deciles)
greater = 10 - lesser
ct_r, ct_nr = greater, lesser
ct_total = ct_r + ct_nr
fr_r = ct_r / ct_total
fr_nr = ct_nr / ct_total
games_r = math.ceil(games * fr_r)
moves_r = math.ceil(moves * fr_r)
games_c = math.floor(games * fr_nr)
moves_c = math.floor(moves * fr_nr)
selection = np.array([0] * ct_r + [1] * ct_nr, dtype=np.int64)
return GameMix(games_r, moves_r,
games_c, moves_c,
selection)


def get_unparsed_moves_from_last_n_games(games, games_nr, n,
moves=2**21,
shuffle=True,
Expand All @@ -622,30 +658,59 @@ def get_unparsed_moves_from_last_n_games(games, games_nr, n,
A dataset containing no more than `moves` examples, sampled
randomly from the last `n` games in the table.
"""
# The prefixes and suffixes below have the following meanings:
# ct_: count
# fr_: fraction
# _r: resign (ordinary)
# _nr: no-resign
ct_r, ct_nr = 9, 1
ct_total = ct_r + ct_nr
fr_r = ct_r / ct_total
fr_nr = ct_nr / ct_total
mix = mix_by_decile(n, moves, 9)
resign = games.moves_from_last_n_games(
math.ceil(n * fr_r),
math.ceil(moves * fr_r),
mix.games_r,
mix.moves_r,
shuffle,
column_family, column)
no_resign = games_nr.moves_from_last_n_games(
math.floor(n * fr_nr),
math.floor(moves * fr_nr),
mix.games_c,
mix.moves_c,
shuffle,
column_family, column)
selection = np.array([0] * ct_r + [1] * ct_nr, dtype=np.int64)
choice = tf.data.Dataset.from_tensor_slices(selection).repeat().take(moves)
ds = tf.contrib.data.choose_from_datasets([resign, no_resign], choice)
choice = tf.data.Dataset.from_tensor_slices(mix.selection).repeat().take(moves)
ds = tf.data.experimental.choose_from_datasets([resign, no_resign], choice)
if shuffle:
ds = ds.shuffle(len(mix.selection) * 2)
if values_only:
ds = ds.map(lambda row_name, s: s)
return ds


def get_unparsed_moves_from_games(games_r, games_c,
start_r, start_c,
mix,
shuffle=True,
column_family=TFEXAMPLE,
column='example',
values_only=True):
"""Get a dataset of serialized TFExamples from a given start point.
Args:
games_r, games_c: GameQueues of the regular selfplay and calibration
(aka 'no resign') games to sample from.
start_r: an integer indicating the game number to start at in games_r.
start_c: an integer indicating the game number to start at in games_c.
mix: the result of mix_by_decile()
shuffle: if True, shuffle the selected move examples.
column_family: name of the column family containing move examples.
column: name of the column containing move examples.
values_only: if True, return only column values, no row keys.
Returns:
A dataset containing no more than the moves implied by `mix`,
sampled randomly from the game ranges implied.
"""
resign = games_r.moves_from_games(
start_r, start_r + mix.games_r, mix.moves_r, shuffle, column_family, column)
calibrated = games_c.moves_from_games(
start_c, start_c + mix.games_c, mix.moves_c, shuffle, column_family, column)
moves = mix.moves_r + mix.moves_c
choice = tf.data.Dataset.from_tensor_slices(mix.selection).repeat().take(moves)
ds = tf.data.experimental.choose_from_datasets([resign, calibrated], choice)
if shuffle:
ds = ds.shuffle(len(selection) * 2)
ds = ds.shuffle(len(mix.selection) * 2)
if values_only:
ds = ds.map(lambda row_name, s: s)
return ds
Expand Down

0 comments on commit 6be6a89

Please sign in to comment.