Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement streaming operator and API #858

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

senderista
Copy link
Contributor

This first draft doesn't even work properly for all but the simplest examples, but I wanted to get feedback on the design sooner rather than later. Briefly, I'm introducing a new MyriaL statement stream(relationVar) analogous to store() and sink(). Whenever a stream() statement appears, RACO inserts a Stream pseudo-operator which is expanded into a chain of MyriaX operators CollectProducer->CollectConsumer->StreamingSink (the latter is a pseudo-operator defined in MyriaX). On the MyriaX side, StreamingSink is expanded from its encoding to a TupleSink with a PipeSink instance of its DataSink member, so we can get an InputStream with the query results. In StreamingSink.construct(), the PipeSink's InputStream is registered with the QueryManager under its query ID so we can retrieve it later and connect it to the HTTP Response object. In QueryResource.postNewStreamingQuery(), which is mapped to the new /query/stream endpoint, we retrieve all registered InputStreams from PipeSinks instantiated as part of a StreamingSink in the query plan, and form a SequenceInputStream which we pass to ResponseBuilder.entity(), so the HTTP client receives all outputs in the order in which their respective stream() statements appeared in the MyriaL query.

This does seem to work for simple queries like this:

E = scan(TwitterK);
V = select distinct E.$0 from E;
stream(V);

But it fails with the connected components sample query from myria-web, and also with sequenced stream() statements like this:

T1 = load("file://Users/tdbaker/sequence_test_1.txt",
csv(schema(col1:string),skip=0));
stream(T1);
T2 = load("file://Users/tdbaker/sequence_test_2.txt",
csv(schema(col1:string),skip=0));
stream(T2);
T3 = load("file://Users/tdbaker/sequence_test_3.txt",
csv(schema(col1:string),skip=0));
stream(T3);

I haven't diagnosed the CC query failure yet, but I think the sequence query failure is due to a simple deadlock. I think the Myria Sequence operator has to wait for each subquery to finish before running the next one, but that requires the client to consume all tuples, and somehow the SequenceInputStream that combines subquery results isn't flushed until the entire query has finished.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.07%) to 27.232% when pulling 0da4542 on streaming_query_results into 4aff005 on master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.08%) to 27.224% when pulling a32e40a on streaming_query_results into 4aff005 on master.

@senderista
Copy link
Contributor Author

One thing I forgot to mention that might be a problem: we can't set Response.entity() until after the query has been submitted. The reason is that we can only retrieve the InputStreams associated with this query by its QueryId, and we don't have the QueryId until we submit the query. Allowing queries to be identified by client-supplied tokens would fix this problem.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.04%) to 26.719% when pulling f4e476d on streaming_query_results into 0141614 on master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants