Permalink
Fetching contributors…
Cannot retrieve contributors at this time
975 lines (626 sloc) 32.9 KB

Classes

AMQPClientEventEmitter
LinkEventEmitter
ReceiverLinkLink
SenderLinkLink
Policy

AMQPClient

Extends: EventEmitter
Emits: client:errorReceived, connection:opened, connection:closed

new AMQPClient([policy], [policyOverrides])

Param Type Description
[policy] Policy Policy to use for connection, sessions, links, etc. Defaults to DefaultPolicy.
[policyOverrides] Obeject Additional overrides for the provided policy

AMQPClient is the top-level class for interacting with node-amqp10. Instantiate this class, connect, and then send/receive as needed and behind the scenes it will do the appropriate work to setup and teardown connections, sessions, and links and manage flow. The code does its best to avoid exposing AMQP-specific types and attempts to convert them where possible, but on the off-chance you need to speak AMQP-specific (e.g. to set a filter to a described-type), you can use node-amqp-encoder and the translator adapter to convert it to our internal types. See simple_eventhub_test.js for an example.

Configuring AMQPClient is done through a Policy class. By default, DefaultPolicy will be used - it assumes AMQP defaults wherever possible, and for values with no spec-defined defaults it tries to assume something reasonable (e.g. timeout, max message size).

To define a new policy, you can merge your values into an existing one by calling AMQPClient.policies.merge(yourPolicy, existingPolicy). This does a deep-merge, allowing you to only replace values you need. For instance, if you wanted the default sender settle policy to be auto-settle instead of mixed, you could just use

 var AMQP = require('amqp10');
 var client = new AMQP.Client(AMQP.Policy.merge({
   senderLink: {
     attach: {
       senderSettleMode: AMQP.Constants.senderSettleMode.settled
     }
   }
 });
 

Obviously, setting some of these options requires some in-depth knowledge of AMQP, so I've tried to define specific policies where I can. For instance, for Azure EventHub connections, you can use the pre-build EventHubPolicy.

Also, within the policy, see the encoder and decoder defined in the send/receive policies. These define what to do with the message sent/received, and by default do a simple pass-through, leaving the encoding to/decoding from AMQP-specific types up to the library which does a best-effort job. See EventHubPolicy for a more complicated example, turning objects into UTF8-encoded buffers of JSON-strings.

amqpClient.connect(url, [policyOverrides])

Param Type Description
url string URI to connect to - right now only supports amqp
[policyOverrides] object Policy overrides used for creating this connection

Connects to a given AMQP server endpoint. Sets the default queue, so e.g. amqp://my-activemq-host/my-queue-name would set the default queue to my-queue-name for future send/receive calls.

amqpClient.createReceiver(address, [policyOverrides])

Param Type Description
address string An address to connect this link to. If not provided will use default queue from connection uri.
[policyOverrides] object Policy overrides used for creating this receiver link
[policyOverrides.name] string Explicitly set a name for this link, this is an alias to [policyOverrides.attach.name]

Creates a receiver link for the given address, with optional link policy. The promise returned resolves to a link that is an EventEmitter, which can be used to listen for 'message' events.

amqpClient.createReceiverStream(address, [policyOverrides])

Param Type Description
address string Address used for link creation
[policyOverrides] object Policy overrides used for creating the receiver link

Creates a receiver link wrapped as a Readable stream

amqpClient.createSender(address, [policyOverrides])

Param Type Description
address string An address to connect this link to. If not provided will use default queue from connection uri.
[policyOverrides] object Policy overrides used for creating this sender link
[policyOverrides.name] string Explicitly set a name for this link, this is an alias to [policyOverrides.attach.name]

Creates a sender link for the given address, with optional link policy

amqpClient.createSenderStream(address, [policyOverrides])

Param Type Description
address string Address used for link creation
[policyOverrides] object Policy overrides used for creating this sender link

Creates a sender link wrapped as a Writable stream

amqpClient.disconnect()

Disconnect tears down any existing connection with appropriate Close performatives and TCP socket teardowns.

Event: 'client:errorReceived'

Properties

Name Type Description
the object error received

Error received events

Event: 'connection:closed'

Connection closed event.

Event: 'connection:opened'

Connection opened event.

Link

Extends: EventEmitter
Emits: errorReceived, attached, detached

link.detach([options])

Param Type Description
[options] object detach frame options

Detach the link from the session

Event: 'attached'

Attached event

Event: 'detached'

Detached event

Event: 'errorReceived'

Param Type Description
error object the received error

Error received event

ReceiverLink

Extends: Link
Emits: message

receiverLink.accept(message)

Param Type Description
message string | array message, or array of messages to settle

Settle a message (or array of messages) with an Accepted delivery outcome

receiverLink.addCredits(credits, [flowOptions])

Param Type Description
credits number number of credits to add
[flowOptions] object additional options to include in flow frame

Add credits to this link

receiverLink.detach([options])

Param Type Description
[options] object detach frame options

Detach the link from the session

receiverLink.modify(message, [options])

Param Type Description
message string | array message, or array of messages to settle
[options] object options used for a Modified outcome
[options.deliveryFailed] boolean count the transfer as an unsuccessful delivery attempt
[options.undeliverableHere] boolean prevent redelivery
[options.messageAnnotations] object message attributes to combine with existing annotations

Settle a message (or array of messages) with a Modified delivery outcome

receiverLink.reject(message, [error])

Param Type Description
message string | array message, or array of messages to settle
[error] string error that caused the message to be rejected

Settle a message (or array of messages) with a Rejected delivery outcome

receiverLink.release(message)

Param Type Description
message string | array message, or array of messages to settle

Settle a message (or array of messages) with a Released delivery outcome

receiverLink.settle(message, [state])

Param Type Description
message string | array message, or array of messages to settle
[state] object outcome of message delivery

Settle a message (or array of messages) with a given delivery state

Event: 'attached'

Attached event

Event: 'creditChange'

Credit change event

Event: 'detached'

Detached event

Event: 'errorReceived'

Param Type Description
error object the received error

Error received event

Event: 'message'

Param Type Description
message object the received message
[transferFrame] object the transfer frame the message was extracted from

Message received event. Message payload given as argument.

SenderLink

Extends: Link

senderLink.detach([options])

Param Type Description
[options] object detach frame options

Detach the link from the session

senderLink.send(msg, [options])

Param Type Description
msg object | string | array Message to send. Will be encoded using sender link policy's encoder.
[options] object An object of options to attach to the message including: annotations, properties, and application properties
[options.callback] string Determines when the send operation should callback. Possible options are: 'sent', 'settled' and 'none'. For the best performance choose 'none', which is essentially "send and forget" and notably will not return a promise.
[options.annotations] object Annotations for the message, if any. See AMQP spec for details, and server for specific annotations that might be relevant (e.g. x-opt-partition-key on EventHub). If node-amqp-encoder'd map is given, it will be translated to appropriate internal types. Simple maps will be converted to AMQP Fields type as defined in the spec.

Sends the given message, with the given options on this link

Event: 'attached'

Attached event

Event: 'detached'

Detached event

Event: 'errorReceived'

Param Type Description
error object the received error

Error received event

Policy

new Policy(overrides)

Param Type Description
overrides object override values for the default policy

The default policy for amqp10 clients

policy.parseAddress(address)

Param Type Description
address string the address to parse

Parses an address for use when connecting to an AMQP 1.0 broker

policy.parseLinkAddress(address)

Param Type Description
address string the address to parse

Parses a link address used for creating Sender and Receiver links.

The resulting object has a required name property (used as the source address in the attach performative), as well as an optional subject property which (if specified) will automatically create a source filter.

Policy.connect

connect.options

Options passed into the open performative on initial connection

options.channelMax

The channel-max value is the highest channel number that can be used on the connection

options.containerId

The id of the source container

options.desiredCapabilities

The desired-capability list defines which extension capabilities the sender may use if the receiver offers them

options.hostname

The name of the target host

options.idleTimeout

The idle timeout required by the sender

options.incomingLocales

A list of locales that the sending peer permits for incoming informational text

options.maxFrameSize

The largest frame size that the sending peer is able to accept on this connection

options.offeredCapabilities

A list of extension capabilities the peer may use if the sender offers them

options.outgoingLocales

A list of the locales that the peer supports for sending informational text

options.properties

The properties map contains a set of fields intended to indicate information about the connection and its container

options.sslOptions

Options used to initiate a TLS/SSL connection, with the exception of the following options all options in this object are passed directly to node's tls.connect method.

sslOptions.caFile

Path to the file containing the trusted cert for the client

sslOptions.certFile

Path to the file containing the certificate key for the client

sslOptions.keyFile

Path to the file containing the private key for the client

connect.saslMechanism

Allows the sasl mechanism to be overriden by policy

Policy.defaultSubjects

support subjects in link names with the following characteristics: receiver: "amq.topic/news", means a filter on the ReceiverLink will be made for messages send with a subject "news"

sender: "amq.topic/news", will automatically set "news" as the subject for messages sent on this link, unless the user explicitly overrides the subject.

Policy.receiverLink

receiverLink.attach

Options passed into the attach performative on link attachment

attach.initialDeliveryCount

This must not be null if role is sender, and it is ignored if the role is receiver.

attach.maxMessageSize

The maximum message size supported by the link endpoint

attach.name

This name uniquely identifies the link from the container of the source to the container of the target node

attach.receiverSettleMode

The delivery settlement policy for the receiver

attach.role

The role being played by the peer

receiverLink.credit

A function that determines when (if ever) to refresh the receiver link's credit

receiverLink.creditQuantum

Quantum used in pre-defined credit policy functions

receiverLink.decoder

The decoder used for all incoming data

receiverLink.reattach

Whether the link should attempt reattach on detach

Policy.reconnect

Options related to the reconnect behavior of the client. If this value is null reconnect is effectively disabled

reconnect.forever

Whether or not to attempt reconnection forever

reconnect.retries

How many times to attempt reconnection

reconnect.strategy

The algorithm used for backoff. Can be fibonacci or exponential

Policy.senderLink

senderLink.attach

Options passed into the attach performative on link attachment

attach.initialDeliveryCount

This must not be null if role is sender, and it is ignored if the role is receiver.

attach.maxMessageSize

The maximum message size supported by the link endpoint

attach.name

This name uniquely identifies the link from the container of the source to the container of the target node

attach.role

The role being played by the peer

attach.senderSettleMode

The delivery settlement policy for the sender

senderLink.callback

Determines when a send should call its callback.

senderLink.encoder

The encoder used for all outgoing sends

senderLink.reattach

Whether the link should attempt reattach on detach

Policy.session

session.enableSessionFlowControl

Whether or not session flow control should be performed at all

session.options

Options passed into the begin performative on session start

options.incomingWindow

The maximum number of incoming transfer frames that the endpoint can currently receive

options.nextOutgoingId

The transfer-id to assign to the next transfer frame

options.outgoingWindow

The maximum number of outgoing transfer frames that the endpoint can currently send

session.window

A function used to calculate how/when the flow control window should change

session.windowQuantum

Quantum used in predefined window policies