Skip to content

Latest commit

 

History

History
662 lines (540 loc) · 13.9 KB

PROTOCOL.adoc

File metadata and controls

662 lines (540 loc) · 13.9 KB

RabbitMQ Stream Protocol Reference

This is the reference of the RabbitMQ stream protocol. Note the protocol is still under development and is subject to change.

The RabbitMQ Stream Java client is currently the reference implementation.

Types

int8, int16, int32, int64 - Signed integers (big endian order)

uint8, uint16, uint32, uint64 - Unsigned integers (big endian order)

bytes - int32 for the length followed by the bytes of content, length of -1 indicates null.

string - int16 for the length followed by the bytes of content, length of -1 indicates null.

arrays - int32 for the length followed by the repetition of the structure, notation uses [], e.g. [int32] for an array of int32.

Frame Structure

Frame => Size (Request | Response | Command)
  Size => uint32 (size without the 4 bytes of the size element)

Request => Key Version (CorrelationId) Content
  Key => uint16
  Version => uint16
  CorrelationId => uint32
  Command => bytes // see command details below

Response => Key Version CorrelationId ResponseCode
  Key => uint16
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16

Command => Key Version Content
  Key => uint16
  Version => uint16
  Content => bytes // see command details below

Most commands are request/reply, but some commands (e.g. Deliver) are one-direction only and thus does not contain a correlation ID.

Some responses may carry additional information than just the response code, this is specified in the command definition.

Keys are uint16, but the actual value is defined on the last 15 bits, the most significant bit being used to make the difference between a request (0) and a response (1). Example for subscribe (key is 6):

0b00000000 00000110 => subscribe request
0b10000000 00000110 => subscribe response

Response Codes

Table 1. Stream Protocol Response Codes
Response Code

OK

1

Stream does not exist

2

Subscription ID already exists

3

Subscription ID does not exist

4

Stream already exists

5

Stream not available

6

SASL mechanism not supported

7

Authentication failure

8

SASL error

9

SASL challenge

10

SASL authentication failure loopback

11

Virtual host access failure

12

Unknown frame

13

Frame too large

14

Internal error

15

Access refused

16

Precondition failed

17

Publisher does not exist

18

Commands

Table 2. Stream Protocol Commands
Command From Key Expects response?

DeclarePublisher

Client

1

Yes

Publish

Client

2

No

PublishConfirm

Server

3

No

PublishError

Server

4

No

QueryPublisherSequence

Client

5

Yes

DeletePublisher

Client

6

Yes

Subscribe

Client

7

Yes

Deliver

Server

8

No

Credit

Client

9

No

StoreOffset

Client

10

No

QueryOffset

Client

11

Yes

Unsubscribe

Client

12

Yes

Create

Client

13

Yes

Delete

Client

14

Yes

Metadata

Client

15

Yes

MetadataUpdate

Server

16

No

PeerProperties

Client

17

Yes

SaslHandshake

Client

18

Yes

SaslAuthenticate

Client

19

Yes

Tune

Server

20

Yes

Open

Client

21

Yes

Close

Client & Server

22

Yes

Heartbeat

Client & Server

23

No

Route (experimental)

Client

24

Yes

Partitions (experimental)

Client

25

Yes

DeclarePublisher

DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream
  Key => uint16 // 1
  Version => uint16
  CorrelationId => uint32
  PublisherId => uint8
  PublisherReference => string // max 256 characters
  Stream => string

DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId
  Key => uint16 // 1
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16

Publish

Publish => Key Version PublisherId PublishedMessages
  Key => uint16 // 2
  Version => uint16
  PublisherId => uint8
  PublishedMessages => [PublishedMessage]
  PublishedMessage => PublishingId Message
  PublishingId => uint64
  Message => bytes

PublishConfirm

PublishConfirm => Key Version PublishingIds
  Key => uint16 // 3
  Version => uint16
  PublisherId => uint8
  PublishingIds => [uint64] // to correlate with the messages sent

PublishError

PublishError => Key Version [PublishingError]
  Key => uint16 // 4
  Version => uint16
  PublisherId => uint8
  PublishingError => PublishingId Code
  PublishingId => uint64
  Code => uint16 // code to identify the problem

QueryPublisherSequence

QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream
  Key => uint16 // 5
  Version => uint16
  CorrelationId => uint32
  PublisherReference => string // max 256 characters
  Stream => string

QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
  Key => uint16 // 5
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  Sequence => uint64

DeletePublisher

DeletePublisherRequest => Key Version CorrelationId PublisherId
  Key => uint16 // 6
  Version => uint16
  CorrelationId => uint32
  PublisherId => uint8

DeletePublisherResponse => Key Version CorrelationId ResponseCode
  Key => uint16 // 6
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16

Subscribe

Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Credit Properties
  Key => uint16 // 7
  Version => uint16
  CorrelationId => uint32 // correlation id to correlate the response
  SubscriptionId => uint8 // client-supplied id to identify the subscription
  Stream => string // the name of the stream
  OffsetSpecification => OffsetType Offset
  OffsetType => uint16 // 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp)
  Offset => uint64 (for offset) | int64 (for timestamp)
  Credit => uint16
  Properties => [Property]
  Property => Key Value
  Key => string
  Value => string

Deliver

Deliver => Key Version SubscriptionId OsirisChunk
  Key => uint16 // 8
  Version => uint32
  SubscriptionId => uint8
  OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages
  MagicVersion => int8
  NumEntries => uint16
  NumRecords => uint32
  Epoch => uint64
  ChunkFirstOffset => uint64
  ChunkCrc => int32
  DataLength => uint32
  Messages => [Message] // no int32 for the size for this array
  Message => EntryTypeAndSize
  Data => bytes

NB: See the Osiris project for details on the structure of messages.

Credit

Credit => Key Version SubscriptionId Credit
  Key => uint16 // 9
  Version => uint16
  SubscriptionId => uint8
  Credit => uint16 // the number of chunks that can be sent

CreditResponse => Key Version ResponseCode SubscriptionId
  Key => uint16 // 9
  Version => uint16
  ResponseCode => uint16
  SubscriptionId => uint8

NB: the server sent a response only in case of problem, e.g. crediting an unknown subscription.

StoreOffset

StoreOffset => Key Version Reference Stream Offset
  Key => uint16 // 10
  Version => uint16
  Reference => string // max 256 characters
  Stream => string // the name of the stream
  Offset => uint64

QueryOffset

QueryOffsetRequest => Key Version CorrelationId Reference Stream
  Key => uint16 // 11
  Version => uint16
  CorrelationId => uint32
  Reference => string // max 256 characters
  Stream => string

QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
  Key => uint16 // 11
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  Offset => uint64

Unsubscribe

Unsubscribe => Key Version CorrelationId SubscriptionId
  Key => uint16 // 12
  Version => uint16
  CorrelationId => uint32
  SubscriptionId => uint8

Create

Create => Key Version CorrelationId Stream Arguments
  Key => uint16 // 13
  Version => uint16
  CorrelationId => uint32
  Stream => string
  Arguments => [Argument]
  Argument => Key Value
  Key => string
  Value => string

Delete

Delete => Key Version CorrelationId Stream
  Key => uint16 // 14
  Version => uint16
  CorrelationId => uint32
  Stream => string

Metadata

MetadataQuery => Key Version CorrelationId [Stream]
  Key => uint16 // 15
  Version => uint16
  CorrelationId => uint32
  Stream => string

MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata]
  Key => uint16 // 15
  Version => uint16
  CorrelationId => uint32
  Broker => Reference Host Port
    Reference => uint16
    Host => string
    Port => uint32
  StreamMetadata => StreamName LeaderReference ReplicasReferences
     StreamName => string
     ResponseCode => uint16
     LeaderReference => uint16
     ReplicasReferences => [uint16]

MetadataUpdate

MetadataUpdate => Key Version MetadataInfo
  Key => uint16 // 16
  Version => uint16
  MetadataInfo => Code Stream
  Code => uint16 // code to identify the information
  Stream => string // the stream implied

PeerProperties

PeerPropertiesRequest => Key Version PeerProperties
  Key => uint16 // 17
  Version => uint16
  CorrelationId => uint32
  PeerProperties => [PeerProperty]
  PeerProperty => Key Value
  Key => string
  Value => string

PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties
  Key => uint16 // 17
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  PeerProperties => [PeerProperty]
  PeerProperty => Key Value
  Key => string
  Value => string

SaslHandshake

SaslHandshakeRequest => Key Version CorrelationId Mechanism
  Key => uint16 // 18
  Version => uint16
  CorrelationId => uint32

SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism]
  Key => uint16 // 18
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  Mechanism => string

SaslAuthenticate

SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData
  Key => uint16 // 19
  Version => uint16
  CorrelationId => uint32
  Mechanism => string
  SaslOpaqueData => bytes

SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData
  Key => uint16 // 19
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  SaslOpaqueData => bytes

Tune

TuneRequest => Key Version FrameMax Heartbeat
  Key => uint16 // 20
  Version => uint16
  FrameMax => uint32 // in bytes, 0 means no limit
  Heartbeat => uint32 // in seconds, 0 means no heartbeat

TuneResponse => TuneRequest

Open

OpenRequest => Key Version CorrelationId VirtualHost
  Key => uint16 // 21
  Version => uint16
  CorrelationId => uint32
  VirtualHost => string

OpenResponse => Key Version CorrelationId ResponseCode ConnectionProperties
  Key => uint16 // 21
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  ConnectionProperties => [ConnectionProperty]
  ConnectionProperty => Key Value
  Key => string
  Value => string

Close

CloseRequest => Key Version CorrelationId ClosingCode ClosingReason
  Key => uint16 // 22
  Version => uint16
  CorrelationId => uint32
  ClosingCode => uint16
  ClosingReason => string

CloseResponse => Key Version CorrelationId ResponseCode
  Key => uint16 // 22
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16

Heartbeat

Heartbeat => Key Version
  Key => uint16 // 23
  Version => uint16

Route

Experimental

RouteQuery => Key Version CorrelationId RoutingKey SuperStream
  Key => uint16 // 24
  Version => uint16
  CorrelationId => uint32
  RoutingKey => string
  SuperStream => string

RouteResponse => Key Version CorrelationId Stream
  Key => uint16 // 24
  Version => uint16
  CorrelationId => uint32
  Stream => string

Partitions

Experimental

PartitionsQuery => Key Version CorrelationId SuperStream
  Key => uint16 // 25
  Version => uint16
  CorrelationId => uint32
  SuperStream => string

PartitionsResponse => Key Version CorrelationId [Stream]
  Key => uint16 // 25
  Version => uint16
  CorrelationId => uint32
  Stream => string

Authentication

Once a client is connected to the server, it initiates an authentication sequence. The next figure shows the steps of the sequence:

Authentication Sequence
Client                      Server
  +                           +
  | Peer Properties Exchange  |
  |-------------------------->|
  |<--------------------------|
  |                           |
  |      SASL Handshake       |
  |-------------------------->|
  |<--------------------------|
  |                           |
  |     SASL Authenticate     |
  |-------------------------->|
  |<--------------------------|
  |                           |
  |           Tune            |
  |<--------------------------|
  |-------------------------->|
  |                           |
  |           Open            |
  |-------------------------->|
  |<--------------------------|
  |                           |
  +                           +
  • SaslHandshake: the client asks about the SASL mechanisms the server supports. It can then pick one from the list the server returns.

  • SaslAuthenticate: the client answers to the server’s challenge(s), using the SASL mechanism it picked. The server will send a Tune frame once it is satisfied with the client authentication response.

  • Tune: the server sends a Tune frame to suggest some settings (max frame size, heartbeat). The client answers with a Tune frame with the settings he agrees on, possibly adjusted from the server’s suggestions.

  • Open: the client sends an Open frame to pick a virtual host to connect to. The server answers whether it accepts the access or not.

Resources