This repository has been archived by the owner on Mar 24, 2022. It is now read-only.
/
streaming.md
179 lines (133 loc) · 15.9 KB
/
streaming.md
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# Streaming Interaction Model Specification
When running in streaming mode, a function invoker MUST behave as a gRPC server and answer to the following rpc (defined by its [full protobuf definition](riff-rpc.proto)):
```
service Riff {
rpc Invoke (stream InputSignal) returns (stream OutputSignal) {}
}
```
That streaming rpc will be invoked by a *streaming processor* whose responsibility is to send (streaming) input parameters to the invoker, as well as forward back the (streaming) output parameters that it receives from the invoker. Where those input parameters come from, where the output parameters end up or when the streaming processor decides to trigger an rpc (slicing the input streams into *windows*) is entirely the responsibility of the streaming processor and is beyond the scope of this document.
Because gRPC allows only one input parameter and one output parameter in an rpc definition, it is likely that a streaming processor will *multiplex* input parameters and *de-multiplex* output parameters that it receives back. Conversely, it will be the responsibility of the invoker to *de-multiplex* the input parameters and *multiplex* the return parameters back.
![](img/mux-demux.png)
The streaming protocol relies heavily on the concept of *content-type*: a piece of information (hereafter *headers*) that instructs how to interpret a *payload*, made of a series of bytes. Such a content-type may be used to depict input data, or to instruct a function to produce output data in a certain fashion (see `expectedContentTypes` later in this document)
## Lifecycle
When a streaming invoker starts, it MUST start a gRPC server on the port defined by the `GRPC_PORT` environment variable (it MUST fall back on port 8081 if that variable is not defined). That server MUST be able to satisfy the `Invoke()` rpc defined above. It MAY load the function early although an invoker MAY also assume that user functions may be poorly written and may maintain (mutable) state when they shouldn't, and hence MAY decide to re-load the function at each rpc invocation.
As a general rule, the unit of interaction is the rpc invocation. What that means is that any state accumulated for a given invocation SHOULD be cleared as soon as the invocation ends, and that two invocations MUST not share state.
## Invocation
Invocations of the rpc by the streaming processor MAY happen concurrently and the invoker MUST maintain isolation of state inside the function as much as possible given the target runtime. In particular, it is expected that each invocation of the rpc will trigger one and exactly one invocation of the function. Function authors MAY decide to maintain state per-invocation or more globally per-function *instance* but it is not the responsibility of the invoker to protect against race conditions in those cases.
For each invocation of the rpc, the invoker will have access to a stream of `InputSignals` that may convey incoming data to the function and a stream of `OutputSignals` that are used to forward data produced by the function back to the streaming invoker. Those two streams of data MUST be handled as fully asynchronous and autonomous: the invoker may receive several (including zero) `InputFrames` after the `StartFrame` (see below) before it sends any `OutputFrames`.
This behavior is entirely a consequence of how the function is implemented, and an invoker MUST allow non-blocking receiving of input signals relative to the production of output signals.
## Input Signals
For each invocation of the rpc, an invoker MUST accept the following sequence of `InputSignals` (where an `InputSignal` is either a `StartFrame` or an `InputFrame`), defined using EBNF:
```
input stream ::= StartFrame, InputFrame* ;
```
An invoker MUST check this protocol. If the first frame received on the input stream is not a `StartFrame`, an invoker MUST fail the rpc invocation but SHOULD not fail globally. Similarly, if for a given rpc invocation a subsequent `InputSignal` is a `StartFrame` (while it should be an `InputFrame` per the above), it MUST fail the rpc invocation.
The complete protobuf definition relevant to input signals is the following:
```
service Riff {
rpc Invoke (stream InputSignal) returns (stream OutputSignal) {}
}
// Represents data flowing in when invoking a riff function. A special StartFrame is sent first to specify metadata
// about the invocation
message InputSignal {
oneof frame {
StartFrame start = 1;
InputFrame data = 2;
}
}
// Contains setup data for an invocation
message StartFrame {
// The ContentTypes that an invocation is allowed to produce for each output parameter
repeated string expectedContentTypes = 1;
// The logical names for input arguments
repeated string inputNames = 2;
// The logical names for output arguments
repeated string outputNames = 3;
}
// Contains actual invocation data, as input events.
message InputFrame {
// The actual content of the event.
bytes payload = 1;
// How to interpret the bytes that make up the payload of this frame.
string contentType = 2;
// Additional custom headers.
map<string, string> headers = 3;
// The input argument index this frame pertains to.
int32 argIndex = 4;
}
```
### StartFrame
The role of the `StartFrame` is to carry information to the invoker that is relevant *per invocation*.
Such information is made of:
* *expectedContentTypes*: a list of N strings, where N is the number of output parameters of the function. Each string is a comma separated list of MIME types and indicates, in order, the preference of the streaming processor when it comes to receiving output data: if the i-th string is equal to `MIME-Type1, MIME-Type2` then every `OutputFrame` pertaining to the i-th output (that is, where `resultIndex == i`) MUST have its `contentType` field set to a MIME type that is either `MIME-type1` or `MIME-type2` (or a compatible subtype thereof).
* *inputNames*: a list of M strings, where M is the number of input parameters of the function. Each string is a name that the invoker MAY choose to expose to the function. These names are mapped to input streams.
* *outputNames*: a list of N strings (same N as above). Each string is a name that the invoker MAY choose to expose to the function. These names are mapped to output streams.
An invoker MAY decide to wait until it receives the `StartFrame` before loading the function.
In particular, the size of the `expectedContentTypes` field (as well as the `outputNames` field) is equal to the output arity of the function. The same is true for the `inputNames` field and the input arity. These MAY be used by the invoker to look up the function in cases where *e.g.* overloading is supported by the target runtime, or where strict function arity matching is required.
### InputFrame
`InputFrames` represent incoming data to the function. Upon reception of an `InputFrame`, an invoker
1. MUST deserialize the *payload* according to the *contentType* field, which SHOULD be set and represent a concrete MIME type (*i.e.* not a wildcard type) or default to `application/octet-stream`. The data structure that corresponds to the result of de-serialization is runtime-specific and is beyond the scope of this specification. To the best extent though, an invoker SHOULD use data structures that are *idiomatic* to the target runtime and pass them to the function if feasible. As an example, if the incoming `contentType` is `application/json` and `payload` contains the bytes of `{"foo": 5}`, then the invoker may use a *map* or a *dictionary* with key `foo`.
2. If the *contentType* is not supported, an invoker MUST craft a gRPC status error of code [InvalidArgument](https://github.com/grpc/grpc/blob/2a7191abe717d3ce33b5fcbfd6bd118367499cde/doc/statuscodes.md#status-codes-and-their-use-in-grpc) and the error message MUST start with `"Invoker: Unsupported Media Type"`. An invoker MAY append error details to that message prefix. These details SHOULD start with punctuation to separate them from the error message prefix.
3. MUST fail this rpc invocation if it cannot otherwise successfully deserialize data using such a MIME type with a crafted status error of code [Unknown](https://github.com/grpc/grpc/blob/2a7191abe717d3ce33b5fcbfd6bd118367499cde/doc/statuscodes.md#status-codes-and-their-use-in-grpc)
4. MAY use any additional hints carried in the custom `headers` field to decide how to deserialize the incoming data
5. MAY decide to craft a data structure that embeds the `contentType` field, as well as some or all of the custom `headers` and pass that to the function
6. MUST forward the result of the deserialization to the i-th input *stream* of the function, where `i` is the value of the `argIndex` field.
The ordering guarantees relevant to `InputFrames` are the following: given two `InputFrames` A and B whose `argIndex` field are equal, if A is received before B by the invoker, then the invoker MUST forward A before B to the function.
## Output Signals
For each invocation of the rpc, an invoker MUST emit the following sequence of `OutputSignals`, defined using EBNF:
```
output stream ::= OutputFrame* ;
```
That is, an invoker MUST emit zero or more `OutputFrame` per invocation, where each `OutputFrame` corresponds to the production of a result on an output stream of the function.
The complete protobuf definition relevant to input signals is the following:
```
service Riff {
rpc Invoke (stream InputSignal) returns (stream OutputSignal) {}
}
// Represents data flowing out when invoking a riff function. Represented as a oneof with a single case to allow for
// future extensions
message OutputSignal {
oneof frame {
OutputFrame data = 1;
}
}
// Contains actual function invocation result data, as output events.
message OutputFrame {
// The actual content of the event.
bytes payload = 1;
// How to interpret the bytes that make up the payload of this frame.
string contentType = 2;
// Additional custom headers.
map<string, string> headers = 3;
// The index of the result this frame pertains to.
int32 resultIndex = 4;
}
```
### OutputFrame
`OutputFrames` represent data coming out of the function. Upon reception of a result data structure on the j-th output stream of a function, an invoker
1. MUST serialize the data structure to a series of bytes (hereafter the `OutputFrame payload`) by selecting a MIME type that is compatible with the preference ordered list of MIME types specified in the j-th string of `expectedContentTypes` seen in the [`StartFrame`](#StartFrame). As a consequence, the invoker MUST not send any `OutputFrames` until it has received the `StartFrame`.
2. If such a MIME type is not supported, then it MUST fail this rpc invocation with a crafted status error of code [InvalidArgument](https://github.com/grpc/grpc/blob/2a7191abe717d3ce33b5fcbfd6bd118367499cde/doc/statuscodes.md#status-codes-and-their-use-in-grpc) and the error message MUST start with `"Invoker: Not Acceptable"`. An invoker MAY append error details to that message prefix. These details SHOULD start with punctuation to separate them from the error message prefix.
3. If it cannot otherwise successfully serialize data using such a MIME type, then it MUST fail this rpc invocation with a crafted status error of code [Unknown](https://github.com/grpc/grpc/blob/2a7191abe717d3ce33b5fcbfd6bd118367499cde/doc/statuscodes.md#status-codes-and-their-use-in-grpc)
4. Otherwise, it MUST set the value of the `payload` field of the frame to the byte representation of the data, and the `contentType` field to the *concrete* MIME type that was selected for serialization
5. MUST set the `resultIndex` field of the frame to `j`
6. MAY set additional custom `headers`.
The ordering guarantees relevant to `OutputFrames` are the following: if a function emits two data structures A and B on its j-th output stream, and A is emitted before B, then an invoker MUST send the serialized representation of A before the representation of B back to the streaming processor.
[//]: # (Comment: The following section also appears in request-reply.md)
## Supported MIME Types
An invoker SHOULD support the following MIME types, both when dealing with receiving data and when asked to serialize data back to the streaming processor:
* `text/plain`: when receiving data tagged with this content type and a function argument expects a "string", then an invoker MUST be capable of fulfilling that value. Conversely, when asked to produce that content type and receiving a "string" from the function, an invoker MUST be able to serialize the string using that MIME type.
Additionally, when dealing with "byte arrays" in the function signature, an invoker SHOULD be able to serialize/deserialize from/to a value using this MIME type, honoring the value of the `encoding` MIME type parameter if present.
* `application/json`: when receiving data tagged with this content type, an invoker SHOULD attempt to map the JSON structure to the function argument using idiomatic behavior from the target runtime. This MAY involve using general purpose data structures (*e.g.* maps or dictionaries), or trying to map content to structured data. Conversely, when asked to produce that content type, an invoker SHOULD use idiomatic conventions of the target runtime to serialize JSON. The behavior when encountering missing or extra fields, or relative to circular references is beyong the scope of this document and is left at the discretion of the invoker.
* `application/octet-stream`: when receiving data tagged with this content type and a function argument expects a "byte array", then an invoker MUST be capable of fulfilling that value, passing the `payload` as-is. Conversely, when asked to produce that content type and receiving a "byte array" from the function, an invoker MUST be able to produce a payload.
In addition, an invoker SHOULD provide a way for the function to extend the set of supported MIME types, *e.g.* by providing an extension mechanism to register additional "handlers". The specific details of such a mechanism are beyond the scope of this document.
## Stream Completion and Error Conditions
The streaming protocol described in this document is built on top of the gRPC protocol. In many languages, bindings for gRPC adhere to reactive streams semantics, exposing `onNext()`, `onError()` and `onComplete()` events. This section describes how an invoker should behave when such events happen.
* When an invoker receives the (normal) stream completion signal on its input stream, it MUST propagate that termination to all of the function input streams.
* When an invoker receives the error stream completion signal on its input stream, it MUST propagate that error termination to all of the function input streams.
* When a function emits the (normal) stream completion signal on one of its several output streams, an invoker MUST not forward that signal back to the main output stream, unless it is the last sub-stream to complete. If the completion signal happens on the last sub-stream to complete, then the invoker MUST trigger the completion of the main output stream.
* When a function emits an error completion signal on one of its several output streams, an invoker MUST immediately forward that signal back to the main output stream.
## <a name="support-for-request-reply-functions"></a>Support for Request Reply Functions
A streaming invoker MAY support request / reply functions (that is, functions that accept a single value and synchronously produce a single result) by adhering to the following semantics:
* the function MUST be exposed as if it had a single input stream and a single output stream
* each value on the output stream MUST be obtained by applying the synchronous function to each value on the input stream. That is to say, the streaming function MUST apply the [map()](http://reactivex.io/documentation/operators/map.html) operator involving the original function.
* a consequence of the above is that, should the synchronous function produce an error, the output stream MUST terminate with an error, forcing the client to trigger a new invocation if it wants to compute additional values.