Skip to content

Commit

Permalink
refactor queueinputtrainer a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
ppwwyyxx committed Apr 24, 2016
1 parent 0e7f338 commit 08821b5
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 148 deletions.
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
sys.path.insert(0, os.path.abspath('../'))

import mock
MOCK_MODULES = ['numpy', 'scipy', 'tensorflow']
MOCK_MODULES = ['numpy', 'scipy', 'tensorflow', 'scipy.misc', 'h5py', 'nltk']
for mod_name in MOCK_MODULES:
sys.modules[mod_name] = mock.Mock()

Expand Down
2 changes: 1 addition & 1 deletion examples/ResNet/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

## ResNet

Implements the paper "Deep Residual Learning for Image Recognition", [http://arxiv.org/abs/1512.03385](http://arxiv.org/abs/1512.03385)
Implement the paper "Deep Residual Learning for Image Recognition", [http://arxiv.org/abs/1512.03385](http://arxiv.org/abs/1512.03385)
with the variants proposed in "Identity Mappings in Deep Residual Networks", [https://arxiv.org/abs/1603.05027](https://arxiv.org/abs/1603.05027).

The train error shown here is a moving average of the error rate of each batch in training.
Expand Down
5 changes: 3 additions & 2 deletions examples/mnist-convnet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: mnist_convnet.py
# File: mnist-convnet.py
# Author: Yuxin Wu <ppwwyyxx@gmail.com>

import tensorflow as tf
Expand Down Expand Up @@ -121,5 +121,6 @@ def get_config():
config = get_config()
if args.load:
config.session_init = SaverRestore(args.load)
tp.SimpleTrainer(config).train()
#tp.SimpleTrainer(config).train()
tp.QueueInputTrainer(config).train()

22 changes: 1 addition & 21 deletions tensorpack/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,8 @@ Define a `DataFlow` instance to feed data.
See [Dataflow documentation](https://github.com/ppwwyyxx/tensorpack/tree/master/tensorpack/dataflow)

### How to define a model
Take a look at the `get_model` function in [mnist example](https://github.com/ppwwyyxx/tensorpack/blob/master/example_mnist.py) first.

To define a model, write a `get_model` function which accepts two arguments:
+ inputs: a list of variables used as input in training. inputs could be batched or not batched (see
[training](#how-to-perform-training))
+ is_training: the graph for training and inference could be different (e.g. dropout, augmentation),
`get_model` function should use this variable to know is it doing training or inference.

The function should define a graph based on input variables.
It could use any pre-defined routines in [tensorpack/models](https://github.com/ppwwyyxx/tensorpack/tree/master/tensorpack/models),
or use tensorflow symbolic functions.

It may also define other helper variables to monitor the training,
(e.g. accuracy), and add tensorboard summaries you need. (See [howto summary](#use-tensorboard-summary))

Also, it's helpful to give names to some important variables used in inference. (See
[inference](#how-to-perform-inference)).

The function should at last return the cost to minimize.
Take a look at [mnist example](https://github.com/ppwwyyxx/tensorpack/blob/master/example_mnist.py) first.

### How to perform training

Expand All @@ -35,6 +18,3 @@ The function should at last return the cost to minimize.
### How to add new models

### Use tensorboard summary
<!--
- what will be automatically summaried
-->
12 changes: 6 additions & 6 deletions tensorpack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
# File: __init__.py
# Author: Yuxin Wu <ppwwyyxx@gmail.com>

import models
import train
import utils
import tfutils
import callbacks
import dataflow
from . import models
from . import train
from . import utils
from . import tfutils
from . import callbacks
from . import dataflow

from .train import *
from .models import *
Expand Down
36 changes: 35 additions & 1 deletion tensorpack/dataflow/dftools.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
# Author: Yuxin Wu <ppwwyyxx@gmail.com>

import sys, os
import multiprocessing
from scipy.misc import imsave

from ..utils.fs import mkdir_p

# TODO name_func to write label?
__all__ = ['dump_dataset_images', 'dataflow_to_process_queue']

# TODO pass a name_func to write label as filename?
def dump_dataset_images(ds, dirname, max_count=None, index=0):
""" Dump images from a `DataFlow` to a directory.
Expand All @@ -25,3 +28,34 @@ def dump_dataset_images(ds, dirname, max_count=None, index=0):
return
img = dp[index]
imsave(os.path.join(dirname, "{}.jpg".format(i)), img)


def dataflow_to_process_queue(ds, size, nr_consumer):
"""
Convert a `DataFlow` to a multiprocessing.Queue.
:param ds: a `DataFlow`
:param size: size of the queue
:param nr_consumer: number of consumer of the queue.
will add this many of `DIE` sentinel to the end of the queue.
:returns: (queue, process). The process will take data from `ds` to fill
the queue once you start it.
"""
q = multiprocessing.Queue(size)
class EnqueProc(multiprocessing.Process):
def __init__(self, ds, q, nr_consumer):
super(EnqueProc, self).__init__()
self.ds = ds
self.q = q

def run(self):
try:
for idx, dp in enumerate(self.ds.get_data()):
self.q.put((idx, dp))
finally:
for _ in range(nr_consumer):
self.q.put((DIE, None))

proc = EnqueProc(ds, q, nr_consumer)
return q, proc


10 changes: 0 additions & 10 deletions tensorpack/models/model_desc.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ def reuse_input_vars(self):
def _get_input_vars(self):
pass

# TODO move this to QueueInputTrainer
def get_input_queue(self, input_vars):
"""
return the queue for input. the dequeued elements will be fed to self.get_cost
if queue is None, datapoints from dataflow will be fed to the graph directly.
when running with multiGPU, queue cannot be None
"""
assert input_vars is not None
return tf.FIFOQueue(100, [x.dtype for x in input_vars], name='input_queue')

def get_cost(self, input_vars, is_training):
"""
:param input_vars: a list of input variable in the graph
Expand Down
42 changes: 7 additions & 35 deletions tensorpack/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
# Author: Yuxin Wu <ppwwyyxx@gmail.com>

import tensorflow as tf
from itertools import count
import argparse
from collections import namedtuple
import numpy as np
from collections import namedtuple
from tqdm import tqdm
from six.moves import zip
from six.moves import zip, range

import multiprocessing
from .utils.concurrency import ensure_proc_terminate, OrderedResultGatherProc, DIE
Expand All @@ -17,6 +15,7 @@
from .utils import logger
from .tfutils.modelutils import describe_model
from .dataflow import DataFlow, BatchData
from .dataflow.dftools import dataflow_to_process_queue

__all__ = ['PredictConfig', 'DatasetPredictor', 'get_predict_func']

Expand Down Expand Up @@ -102,7 +101,7 @@ class PredictWorker(multiprocessing.Process):
""" A worker process to run predictor on one GPU """
def __init__(self, idx, gpuid, inqueue, outqueue, config):
"""
:param idx: index of the worker
:param idx: index of the worker. the 0th worker will print log.
:param gpuid: id of the GPU to be used
:param inqueue: input queue to get data point
:param outqueue: output queue put result
Expand All @@ -118,7 +117,7 @@ def __init__(self, idx, gpuid, inqueue, outqueue, config):
def run(self):
os.environ['CUDA_VISIBLE_DEVICES'] = self.gpuid
G = tf.Graph() # build a graph for each process, because they don't need to share anything
with G.as_default(), tf.device('/gpu:{}'.format(self.idx)):
with G.as_default(), tf.device('/gpu:0'):
self.func = get_predict_func(self.config)
if self.idx == 0:
describe_model()
Expand All @@ -131,33 +130,6 @@ def run(self):
res = PredictResult(dp, self.func(dp))
self.outqueue.put((tid, res))

def DFtoQueue(ds, size, nr_consumer):
"""
Build a queue that produce data from `DataFlow`, and a process
that fills the queue.
:param ds: a `DataFlow`
:param size: size of the queue
:param nr_consumer: number of consumer of the queue.
will add this many of `DIE` sentinel to the end of the queue.
:returns: (queue, process)
"""
q = multiprocessing.Queue(size)
class EnqueProc(multiprocessing.Process):
def __init__(self, ds, q, nr_consumer):
super(EnqueProc, self).__init__()
self.ds = ds
self.q = q

def run(self):
for idx, dp in enumerate(self.ds.get_data()):
self.q.put((idx, dp))
print "Enqueue ends"
for _ in range(nr_consumer):
self.q.put((DIE, None))

proc = EnqueProc(ds, q, nr_consumer)
return q, proc

class DatasetPredictor(object):
"""
Run the predict_config on a given `DataFlow`.
Expand All @@ -171,12 +143,12 @@ def __init__(self, config, dataset):
self.ds = dataset
self.nr_gpu = config.nr_gpu
if self.nr_gpu > 1:
self.inqueue, self.inqueue_proc = DFtoQueue(self.ds, 10, self.nr_gpu)
self.inqueue, self.inqueue_proc = dataflow_to_process_queue(self.ds, 10, self.nr_gpu)
self.outqueue = multiprocessing.Queue()
try:
gpus = os.environ['CUDA_VISIBLE_DEVICES'].split(',')
except KeyError:
gpus = range(self.nr_gpu)
gpus = list(range(self.nr_gpu))
self.workers = [PredictWorker(i, gpus[i], self.inqueue, self.outqueue, config)
for i in range(self.nr_gpu)]
self.result_queue = OrderedResultGatherProc(self.outqueue)
Expand Down
2 changes: 1 addition & 1 deletion tensorpack/tfutils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ..utils.naming import *
import tensorflow as tf

def get_default_sess_config(mem_fraction=0.99):
def get_default_sess_config(mem_fraction=0.9):
"""
Return a better session config to use as default.
Tensorflow default session config consume too much resources.
Expand Down
14 changes: 5 additions & 9 deletions tensorpack/tfutils/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import six
import tensorflow as tf
import re

from ..utils import *
from . import get_global_step_var
Expand Down Expand Up @@ -69,23 +70,18 @@ def perform(var, action):
for act in actions:
perform(p, act)

# TODO get rid of the cost_var thing...
def summary_moving_average(cost_var):
def summary_moving_average():
""" Create a MovingAverage op and summary for all variables in
MOVING_SUMMARY_VARS_KEY, as well as `cost_var`.
MOVING_SUMMARY_VARS_KEY.
:returns: a op to maintain these average.
"""
global_step_var = get_global_step_var()
averager = tf.train.ExponentialMovingAverage(
0.99, num_updates=global_step_var, name='moving_averages')
vars_to_summary = [cost_var] + \
tf.get_collection(MOVING_SUMMARY_VARS_KEY)
vars_to_summary = tf.get_collection(MOVING_SUMMARY_VARS_KEY)
avg_maintain_op = averager.apply(vars_to_summary)
for idx, c in enumerate(vars_to_summary):
name = c.op.name
if idx == 0:
name = 'train_cost'
name = re.sub('tower[0-9]+/', '', c.op.name)
tf.scalar_summary(name, averager.average(c))
return avg_maintain_op

2 changes: 1 addition & 1 deletion tensorpack/train/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _process_summary(self, summary_str):
summary = tf.Summary.FromString(summary_str)
for val in summary.value:
if val.WhichOneof('value') == 'simple_value':
val.tag = re.sub('tower[0-9]*/', '', val.tag) # TODO move to subclasses
val.tag = re.sub('tower[0-9]+/', '', val.tag) # TODO move to subclasses
self.stat_holder.add_stat(val.tag, val.simple_value)
self.summary_writer.add_summary(summary, self.global_step)

Expand Down

0 comments on commit 08821b5

Please sign in to comment.