Skip to content

Latest commit

 

History

History
840 lines (690 loc) · 18.6 KB

PROTOCOL.adoc

File metadata and controls

840 lines (690 loc) · 18.6 KB

RabbitMQ Streams Protocol Reference

This is the reference of the RabbitMQ Streams protocol.

The RabbitMQ Stream Java client is currently the reference implementation.

Types

int8, int16, int32, int64 - Signed integers (big endian order)

uint8, uint16, uint32, uint64 - Unsigned integers (big endian order)

bytes - int32 for the length followed by the bytes of content, length of -1 indicates null.

string - int16 for the length followed by the bytes of UTF8-encoded content, length of -1 indicates null.

arrays - int32 for the length followed by the repetition of the structure, notation uses [], e.g. [int32] for an array of int32.

Frame Structure

Frame => Size (Request | Response | Command)
  Size => uint32 (size without the 4 bytes of the size element)

Request => Key Version (CorrelationId) Content
  Key => uint16
  Version => uint16
  CorrelationId => uint32
  Command => bytes // see command details below

Response => Key Version CorrelationId ResponseCode
  Key => uint16
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16

Command => Key Version Content
  Key => uint16
  Version => uint16
  Content => bytes // see command details below

Most commands are request/reply, but some commands (e.g. Deliver) are one-direction only and thus does not contain a correlation ID.

Some responses may carry additional information than just the response code, this is specified in the command definition.

Keys are uint16, but the actual value is defined on the last 15 bits, the most significant bit being used to make the difference between a request (0) and a response (1). Example for subscribe (key is 6):

0x0006 => subscribe request
0x8006 => subscribe response

Response Codes

Table 1. Stream Protocol Response Codes
Response Code

OK

0x01

Stream does not exist

0x02

Subscription ID already exists

0x03

Subscription ID does not exist

0x04

Stream already exists

0x05

Stream not available

0x06

SASL mechanism not supported

0x07

Authentication failure

0x08

SASL error

0x09

SASL challenge

0x0a

SASL authentication failure loopback

0x0b

Virtual host access failure

0x0c

Unknown frame

0x0d

Frame too large

0x0e

Internal error

0x0f

Access refused

0x10

Precondition failed

0x11

Publisher does not exist

0x12

No offset

0x13

Commands

Table 2. Stream Protocol Commands
Command From Key Expects response?

DeclarePublisher

Client

0x0001

Yes

Publish

Client

0x0002

No

PublishConfirm

Server

0x0003

No

PublishError

Server

0x0004

No

QueryPublisherSequence

Client

0x0005

Yes

DeletePublisher

Client

0x0006

Yes

Subscribe

Client

0x0007

Yes

Deliver

Server

0x0008

No

Credit

Client

0x0009

No

StoreOffset

Client

0x000a

No

QueryOffset

Client

0x000b

Yes

Unsubscribe

Client

0x000c

Yes

Create

Client

0x000d

Yes

Delete

Client

0x000e

Yes

Metadata

Client

0x000f

Yes

MetadataUpdate

Server

0x0010

No

PeerProperties

Client

0x0011

Yes

SaslHandshake

Client

0x0012

Yes

SaslAuthenticate

Client

0x0013

Yes

Tune

Server

0x0014

Yes

Open

Client

0x0015

Yes

Close

Client & Server

0x0016

Yes

Heartbeat

Client & Server

0x0017

No

Route

Client

0x0018

Yes

Partitions

Client

0x0019

Yes

ConsumerUpdate

Server

0x001a

Yes

ExchangeCommandVersions

Client

0x001b

Yes

StreamStats

Client

0x001c

Yes

CreateSuperStream

Client

0x001d

Yes

DeleteSuperStream

Client

0x001e

Yes

DeclarePublisher

DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream
  Key => uint16 // 0x0001
  Version => uint16
  CorrelationId => uint32
  PublisherId => uint8
  PublisherReference => string // max 256 characters
  Stream => string

DeclarePublisherResponse => Key Version CorrelationId ResponseCode
  Key => uint16 // 0x8001
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16

Publish

Version 1

Publish => Key Version PublisherId PublishedMessages
  Key => uint16 // 0x0002
  Version => uint16
  PublisherId => uint8
  PublishedMessages => [PublishedMessage]
  PublishedMessage => PublishingId Message
  PublishingId => uint64
  Message => bytes

Version 2

Publish => Key Version PublisherId PublishedMessages
  Key => uint16 // 0x0002
  Version => uint16
  PublisherId => uint8
  PublishedMessages => [PublishedMessage]
  PublishedMessage => PublishingId Message
  PublishingId => uint64
  FilterValue => string
  Message => bytes

PublishConfirm

PublishConfirm => Key Version PublishingIds
  Key => uint16 // 0x0003
  Version => uint16
  PublisherId => uint8
  PublishingIds => [uint64] // to correlate with the messages sent

PublishError

PublishError => Key Version [PublishingError]
  Key => uint16 // 0x0004
  Version => uint16
  PublisherId => uint8
  PublishingError => PublishingId Code
  PublishingId => uint64
  Code => uint16 // code to identify the problem

QueryPublisherSequence

QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream
  Key => uint16 // 0x0005
  Version => uint16
  CorrelationId => uint32
  PublisherReference => string // max 256 characters
  Stream => string

QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
  Key => uint16 // 0x8005
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  Sequence => uint64

DeletePublisher

DeletePublisherRequest => Key Version CorrelationId PublisherId
  Key => uint16 // 0x0006
  Version => uint16
  CorrelationId => uint32
  PublisherId => uint8

DeletePublisherResponse => Key Version CorrelationId ResponseCode
  Key => uint16 // 0x8006
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16

Subscribe

Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Credit Properties
  Key => uint16 // 0x0007
  Version => uint16
  CorrelationId => uint32 // correlation id to correlate the response
  SubscriptionId => uint8 // client-supplied id to identify the subscription
  Stream => string // the name of the stream
  OffsetSpecification => OffsetType Offset
  OffsetType => uint16 // 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp)
  Offset => uint64 (for offset) | int64 (for timestamp)
  Credit => uint16
  Properties => [Property]
  Property => Key Value
  Key => string
  Value => string

NB: Timestamp is Erlang system time, milliseconds from epoch

Supported properties:

  • single-active-consumer: set to true to enable single active consumer for this subscription.

  • super-stream: set to the name of the super stream the subscribed is a partition of.

  • filter. (e.g. filter.0, filter.1, etc): prefix to use to define filter values for the subscription.

  • match-unfiltered: whether to return messages without any filter value or not.

Deliver

Version 1

Deliver => Key Version SubscriptionId OsirisChunk
  Key => uint16 // 0x0008
  Version => uint16
  SubscriptionId => uint8
  OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages
  MagicVersion => int8
  ChunkType => int8 // 0: user, 1: tracking delta, 2: tracking snapshot
  NumEntries => uint16
  NumRecords => uint32
  Timestamp => int64 // erlang system time in milliseconds, since epoch
  Epoch => uint64
  ChunkFirstOffset => uint64
  ChunkCrc => int32
  DataLength => uint32
  TrailerLength => uint32
  Reserved => unit32 // unused 4 bytes
  Messages => [Message] // no int32 for the size for this array; the size is defined by NumEntries field above
  Message => EntryTypeAndSize
  Data => bytes

Version 2

Deliver => Key Version SubscriptionId CommittedOffset OsirisChunk
  Key => uint16 // 0x0008
  Version => uint16
  SubscriptionId => uint8
  CommittedChunkId => uint64
  OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages
  MagicVersion => int8
  ChunkType => int8 // 0: user, 1: tracking delta, 2: tracking snapshot
  NumEntries => uint16
  NumRecords => uint32
  Timestamp => int64 // erlang system time in milliseconds, since epoch
  Epoch => uint64
  ChunkFirstOffset => uint64
  ChunkCrc => int32
  DataLength => uint32
  TrailerLength => uint32
  Reserved => unit32 // unused 4 bytes
  Messages => [Message] // no int32 for the size for this array; the size is defined by NumEntries field above
  Message => EntryTypeAndSize
  Data => bytes

NB: See the Osiris project for details on the structure of messages.

Credit

Credit => Key Version SubscriptionId Credit
  Key => uint16 // 0x0009
  Version => uint16
  SubscriptionId => uint8
  Credit => uint16 // the number of chunks that can be sent

CreditResponse => Key Version ResponseCode SubscriptionId
  Key => uint16 // 0x8009
  Version => uint16
  ResponseCode => uint16
  SubscriptionId => uint8

NB: the server sent a response only in case of problem, e.g. crediting an unknown subscription.

StoreOffset

StoreOffset => Key Version Reference Stream Offset
  Key => uint16 // 0x000a
  Version => uint16
  Reference => string // max 256 characters
  Stream => string // the name of the stream
  Offset => uint64

QueryOffset

QueryOffsetRequest => Key Version CorrelationId Reference Stream
  Key => uint16 // 0x000b
  Version => uint16
  CorrelationId => uint32
  Reference => string // max 256 characters
  Stream => string

QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
  Key => uint16 // 0x800b
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  Offset => uint64

Unsubscribe

Unsubscribe => Key Version CorrelationId SubscriptionId
  Key => uint16 // 0x000c
  Version => uint16
  CorrelationId => uint32
  SubscriptionId => uint8

UnsubscribeResponse => Key Version CorrelationId ResponseCode
  Key => uint16 // 0x800c
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16

Create

Create => Key Version CorrelationId Stream Arguments
  Key => uint16 // 0x000d
  Version => uint16
  CorrelationId => uint32
  Stream => string
  Arguments => [Argument]
  Argument => Key Value
  Key => string
  Value => string

Delete

Delete => Key Version CorrelationId Stream
  Key => uint16 // 0x000e
  Version => uint16
  CorrelationId => uint32
  Stream => string

Metadata

MetadataQuery => Key Version CorrelationId [Stream]
  Key => uint16 // 0x000f
  Version => uint16
  CorrelationId => uint32
  Stream => string

MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata]
  Key => uint16 // 0x800f
  Version => uint16
  CorrelationId => uint32
  Broker => Reference Host Port
    Reference => uint16
    Host => string
    Port => uint32
  StreamMetadata => StreamName ResponseCode LeaderReference ReplicasReferences
     StreamName => string
     ResponseCode => uint16
     LeaderReference => uint16
     ReplicasReferences => [uint16]

MetadataUpdate

MetadataUpdate => Key Version MetadataInfo
  Key => uint16 // 0x0010
  Version => uint16
  MetadataInfo => Code Stream
  Code => uint16 // code to identify the information
  Stream => string // the stream implied

PeerProperties

PeerPropertiesRequest => Key Version PeerProperties
  Key => uint16 // 0x0011
  Version => uint16
  CorrelationId => uint32
  PeerProperties => [PeerProperty]
  PeerProperty => Key Value
  Key => string
  Value => string

PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties
  Key => uint16 // 0x8011
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  PeerProperties => [PeerProperty]
  PeerProperty => Key Value
  Key => string
  Value => string

SaslHandshake

SaslHandshakeRequest => Key Version CorrelationId Mechanism
  Key => uint16 // 0x0012
  Version => uint16
  CorrelationId => uint32

SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanisms]
  Key => uint16 // 0x8012
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  Mechanisms => [Mechanism]
  Mechanism => string

SaslAuthenticate

SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData
  Key => uint16 // 0x0013
  Version => uint16
  CorrelationId => uint32
  Mechanism => string
  SaslOpaqueData => bytes

SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData
  Key => uint16 // 0x8013
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  SaslOpaqueData => bytes

Tune

TuneRequest => Key Version FrameMax Heartbeat
  Key => uint16 // 0x0014
  Version => uint16
  FrameMax => uint32 // in bytes, 0 means no limit
  Heartbeat => uint32 // in seconds, 0 means no heartbeat

TuneResponse => TuneRequest

Open

OpenRequest => Key Version CorrelationId VirtualHost
  Key => uint16 // 0x0015
  Version => uint16
  CorrelationId => uint32
  VirtualHost => string

OpenResponse => Key Version CorrelationId ResponseCode ConnectionProperties
  Key => uint16 // 0x8015
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  ConnectionProperties => [ConnectionProperty]
  ConnectionProperty => Key Value
  Key => string
  Value => string

Close

CloseRequest => Key Version CorrelationId ClosingCode ClosingReason
  Key => uint16 // 0x0016
  Version => uint16
  CorrelationId => uint32
  ClosingCode => uint16
  ClosingReason => string

CloseResponse => Key Version CorrelationId ResponseCode
  Key => uint16 // 0x8016
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16

Heartbeat

Heartbeat => Key Version
  Key => uint16 // 0x0017
  Version => uint16

Route

RouteQuery => Key Version CorrelationId RoutingKey SuperStream
  Key => uint16 // 0x0018
  Version => uint16
  CorrelationId => uint32
  RoutingKey => string
  SuperStream => string

RouteResponse => Key Version CorrelationId ResponseCode [Stream]
  Key => uint16 // 0x8018
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  Stream => string

Partitions

PartitionsQuery => Key Version CorrelationId SuperStream
  Key => uint16 // 0x0019
  Version => uint16
  CorrelationId => uint32
  SuperStream => string

PartitionsResponse => Key Version CorrelationId ResponseCode [Stream]
  Key => uint16 // 0x8019
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  Stream => string

ConsumerUpdate

ConsumerUpdateQuery => Key Version CorrelationId SubscriptionId Active
  Key => uint16 // 0x001a
  Version => uint16
  CorrelationId => uint32
  SubscriptionId => uint8
  Active => uint8 (boolean, 0 = false, 1 = true)

ConsumerUpdateResponse => Key Version CorrelationId ResponseCode OffsetSpecification
  Key => uint16 // 0x801a
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  OffsetSpecification => OffsetType Offset
    OffsetType => uint16 // 0 (none), 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp)
    Offset => uint64 (for offset) | int64 (for timestamp)

ExchangeCommandVersions

CommandVersionsExchangeRequest => Key Version CorrelationId [Command]
  Key => uint16 // 0x001b
  Version => uint16
  CorrelationId => uint32
  Command => Key MinVersion MaxVersion
  Key => uint16
  MinVersion => uint16
  MaxVersion => uint16

CommandVersionsExchangeResponse => Key Version CorrelationId ResponseCode [Command]
  Key => uint16 // 0x801b
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  Command => Key MinVersion MaxVersion
  Key => uint16
  MinVersion => uint16
  MaxVersion => uint16

StreamStats

StreamStatsRequest => Key Version CorrelationId Stream
  Key => uint16 // 0x001c
  Version => uint16
  CorrelationId => uint32
  Stream => string

StreamStatsResponse => Key Version CorrelationId ResponseCode Stats
  Key => uint16 // 0x801c
  Version => uint16
  CorrelationId => uint32
  ResponseCode => uint16
  Stats => [Statistic]
  Statistic => Key Value
  Key => string
  Value => int64

CreateSuperStream

CreateSuperStream => Key Version CorrelationId Name [Partition] [BindingKey] Arguments
  Key => uint16 // 0x001d
  Version => uint16
  CorrelationId => uint32
  Name => string
  Partition => string
  BindingKey => string
  Arguments => [Argument]
  Argument => Key Value
  Key => string
  Value => string

DeleteSuperStream

Delete => Key Version CorrelationId Name
  Key => uint16 // 0x001e
  Version => uint16
  CorrelationId => uint32
  Name => string

Authentication

Once a client is connected to the server, it initiates an authentication sequence. The next figure shows the steps of the sequence:

Authentication Sequence
Client                      Server
  +                           +
  | Peer Properties Exchange  |
  |-------------------------->|
  |<--------------------------|
  |                           |
  |      SASL Handshake       |
  |-------------------------->|
  |<--------------------------|
  |                           |
  |     SASL Authenticate     |
  |-------------------------->|
  |<--------------------------|
  |                           |
  |           Tune            |
  |<--------------------------|
  |-------------------------->|
  |                           |
  |           Open            |
  |-------------------------->|
  |<--------------------------|
  |                           |
  +                           +
  • SaslHandshake: the client asks about the SASL mechanisms the server supports. It can then pick one from the list the server returns.

  • SaslAuthenticate: the client answers to the server’s challenge(s), using the SASL mechanism it picked. The server will send a Tune frame once it is satisfied with the client authentication response.

  • Tune: the server sends a Tune frame to suggest some settings (max frame size, heartbeat). The client answers with a Tune frame with the settings he agrees on, possibly adjusted from the server’s suggestions.

  • Open: the client sends an Open frame to pick a virtual host to connect to. The server answers whether it accepts the access or not.

Resources