Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Data processing server #56

Merged
merged 10 commits into from
Mar 28, 2015
Merged

WIP: Data processing server #56

merged 10 commits into from
Mar 28, 2015

Conversation

bartvm
Copy link
Member

@bartvm bartvm commented Mar 27, 2015

As requested in #7. We already have some basic multi-processing, but we've already run into issues with HDF5 files (#47) so I figured it was worth revisiting the idea of having a separate server that pushes data around through sockets (optimized for NumPy data).

As I'm kind of figuring out ZMQ and messaging frameworks as I go along, this is a very rough draft, and I might be doing it completely wrong. I'm basically abusing the extended request-and-reply pattern, making the broker act as a buffer. The idea is that the user creates his data stream in one file and calls start_server(data_stream) at that end end. In the script with the main loop the data stream is then given as ServerDataStream().

Long to-do list:

  • If the client dies, it can't reconnect to the server. If we want to allow this, we need something like the lazy pirate pattern I think.
  • Support for multiple sources.
  • Server needs to send stop iteration message to client
  • Documentation
  • Client interface (ServerDataStream)
  • Benchmarking
  • Python 3 compatibility (byte strings, buffer interface)
  • Checkpointing support
  • Iterator protocol support (e.g. go to next iteration)
  • Automate sources discovery

Any feedback welcome. Like I said, making this up as I go along.

@bartvm
Copy link
Member Author

bartvm commented Mar 27, 2015

Crude way of sending StopIteration messages now, and also wrote a ServerDataStream class (which largely ignores the entire data stream protocol). But this should work now:

python fuel/server.py
from fuel.streams import ServerDataStream
data_stream = ServerDataStream(('features',))
it = data_stream.get_epoch_iterator()
next(it)

@bartvm
Copy link
Member Author

bartvm commented Mar 27, 2015

Woohoo! For my dogs vs. cats model, testing this on a single run, the time to read data fell from 76.84 seconds per epoch to 5.61 seconds. That means it went from being 38% of the training time to 4% of the training time :)

@bartvm
Copy link
Member Author

bartvm commented Mar 27, 2015

Whoever wants to have a look, this should be good for initial review (@dwf, @vdumoulin, @rizar)

@vdumoulin
Copy link
Contributor

I'm by no means acquainted with networking, but if you provide a minimal starting example I can "kick the tires" and tell you what I think.

@bartvm
Copy link
Member Author

bartvm commented Mar 27, 2015

# server.py
from fuel.datasets import MNIST
from fuel.streams import DataStream
from fuel.schemes import SequentialScheme
mnist = MNIST('train')
data_stream = DataStream(
    mnist, iteration_scheme=SequentialScheme(1500, 500)
)
start_server(data_stream)
# train.py
from fuel.streams import ServerDataStream
data_stream = ServerDataStream(('features', 'targets'))
epoch = data_stream.get_epoch_iterator()
batch = next(epoch)
print(batch)

Running python server.py followed by python train.py should work now.

@vdumoulin
Copy link
Contributor

Thanks, I'll have a look at it.

@vdumoulin
Copy link
Contributor

I still haven't read your code into detail, but if I call python train.py twice after calling python server.py (which needs an import for start_server to work), it gets stuck on the second call.

@bartvm
Copy link
Member Author

bartvm commented Mar 27, 2015

Oops, forgot the start_server import.

I'm aware of the inability for the client to reconnect, it's the "lazy pirate pattern" problem I referred to the in the description. Basically, ZMQ binds the processes together in the background somehow. If the client dies and tries to reconnect, its ID changes and it can't connect to the server anymore. The only way to fix this is by manually implementing a pattern where the client forcefully closes the socket, deregisters from the poll, and reconnects (see e.g. this code).

I was planning on fixing that in a later PR, if at all. It normally doesn't make sense to reconnect the client without restarting the server. The client would start receiving data from the middle of an epoch, which doesn't make much sense.

@vdumoulin
Copy link
Contributor

Makes sense.



def send_arrays(socket, arrays, flags=0, copy=True, track=False, stop=False):
"""Send a NumPy array using the buffer interface and some metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

singular, yet below you say we're sending a list of numpy arrays

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, should be plural.

@bartvm
Copy link
Member Author

bartvm commented Mar 28, 2015

Nothing I can do about the drop in coverage by the way. Coverage doesn't count lines run in subprocesses. It actually has an experimental flag for this in the latest alpha releases, but I can't get that to play nice with Coveralls (whose support for Coverage 4 is alpha level, so alpha level support for an alpha level release...). Give it a month or two and I guess we should be able to enable it.

@bartvm
Copy link
Member Author

bartvm commented Mar 28, 2015

As I said, figuring this out as I go along: After reading the documentation a bit more I figured I could switch to a push-pull pattern and use ZeroMQ's high-water mark to limit the queue instead, which turns out to be significantly faster. Reading time for my Dogs vs. Cats model (originally 76.84 s) went from 5.61 s to now 1.9 s, 3 times faster and only 1.4% of the training time now.

bartvm added a commit that referenced this pull request Mar 28, 2015
WIP: Data processing server
@bartvm bartvm merged commit c36524a into master Mar 28, 2015
@bartvm bartvm deleted the server branch September 22, 2015 12:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants