Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,40 @@ Here's a quick example. More detailed examples for the browser and node.js can b
```javascript
// Create a StreamrClient instance
var client = new StreamrClient({
// See below for connection options
// See below for options
})

// Subscribe to a stream
var sub = client.subscribe(
'stream-id',
function(message, streamId, timestamp, counter) {
// Do something with a message, which is an object
},
{
// Resend options, see below
stream: 'streamId',
partition: 0, // Optional, defaults to zero. Use for partitioned streams to select partition.
authKey: 'authKey' // Optional. If not given, uses the authKey given at client creation time.
// optional resend options here
},
function(message, metadata) {
// Do something with the message, which is an object
}
)
```

### Handling messages

The second argument to `client.subscribe(streamId, callback, resendOptions)` is the callback function that will be called for each message as they arrive. Its arguments are as follows:

Argument | Description
-------- | -----------
message | A javascript object containing the message itself
streamId | The id of the stream the message belongs to
timestamp| (optional) A javascript Date object containing the timestamp for this message, if available.
counter | (optional) A sequence number for this message, if available.


### Connection options
### Client options

Option | Default value | Description
------ | ------------- | -----------
url | ws://www.streamr.com/api/v1/ws | Address of the Streamr websocket endpoint to connect to.
autoConnect | true | If set to `true`, the client connects automatically on the first call to `subscribe()`. Otherwise an explicit call to `connect()` is required.
autoDisconnect | true  | If set to `true`, the client automatically disconnects when the last stream is unsubscribed. Otherwise the connection is left open and can be disconnected explicitly by calling `disconnect()`.
authKey | null | Define default authKey to use when none is specified in the call to `client.subscribe`.

### Message handler callback

The second argument to `client.subscribe(options, callback)` is the callback function that will be called for each message as they arrive. Its arguments are as follows:

Argument | Description
-------- | -----------
message | A javascript object containing the message itself
metadata | Metadata for the message, for example `metadata.timestamp` etc.

### Resend options

Expand All @@ -73,7 +73,7 @@ Name | Description
connect() | Connects to the server, and also subscribes to any streams for which `subscribe()` has been called before calling `connect()`.
disconnect() | Disconnects from the server, clearing all subscriptions.
pause() | Disconnects from the server without clearing subscriptions.
subscribe(streamId, callback, resendOptions) | Subscribes to a stream identified by the string `streamId`. Messages in this stream are passed to the `callback` function. See the above table for `resendOptions`. Returns a `Subscription` object.
subscribe(streamId, authId, callback, resendOptions) | Subscribes to a stream identified by the string `streamId`. Authentication key `authId` is used. Messages in this stream are passed to the `callback` function. See the above table for `resendOptions`. Returns a `Subscription` object.
unsubscribe(Subscription) | Unsubscribes the given `Subscription`.
unsubscribeAll(`streamId`) | Unsubscribes all `Subscriptions` for `streamId`.
getSubscriptions(`streamId`) | Returns a list of `Subscriptions` for `streamId`.
Expand Down
13 changes: 6 additions & 7 deletions examples/browser.html
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
<html>
<head>
<!-- For debug messages, include debug.js from https://github.com/visionmedia/debug and set localStorage.debug = 'StreamrClient' -->
<script src="https://cdn.socket.io/socket.io-1.3.7.js"></script>
<!-- For debug messages, include debug.js and set localStorage.debug = 'StreamrClient'. See from https://github.com/visionmedia/debug -->
<script src="../streamr-client.js"></script>

<script>
Expand All @@ -15,14 +14,14 @@
var client = new StreamrClient()
// Subscribe to a stream
var subscription = client.subscribe(
'1ef8TbyGTFiAlZ8R2gCaJw',
{
stream: '1ef8TbyGTFiAlZ8R2gCaJw',
// Resend the last 10 messages on connect
resend_last: 10
},
function(message) {
// Handle the messages in this stream
log(JSON.stringify(message))
},
{
// Resend the last 10 messages on connect
resend_last: 10
}
)

Expand Down
10 changes: 5 additions & 5 deletions examples/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ var StreamrClient = require('../streamr-client')
var client = new StreamrClient()
// Subscribe to a stream
var subscription = client.subscribe(
'1ef8TbyGTFiAlZ8R2gCaJw',
{
stream: '1ef8TbyGTFiAlZ8R2gCaJw',
// Resend the last 10 messages on connect
resend_last: 10
},
function(message) {
// Handle the messages in this stream
console.log(message)
},
{
// Resend the last 10 messages on connect
resend_last: 10
}
)

Expand Down
43 changes: 30 additions & 13 deletions streamr-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@
return id.toString()
};

function Subscription(streamId, callback, options) {
function Subscription(streamId, streamPartition, authKey, callback, options) {
EventEmitter.call(this); // call parent constructor

if (!streamId)
Expand All @@ -492,6 +492,8 @@

this.id = generateSubscriptionId()
this.streamId = streamId
this.streamPartition = streamPartition
this.authKey = authKey
this.callback = callback
this.options = options || {}
this.queue = []
Expand Down Expand Up @@ -607,7 +609,7 @@
// Normal case where prevOffset == null || lastReceivedOffset == null || prevOffset === lastReceivedOffset
else {
this.lastReceivedOffset = offset
this.callback(content, this.streamId, timestamp, offset)
this.callback(content, msg)
if (content[BYE_KEY]) {
this.emit('done')
}
Expand Down Expand Up @@ -679,7 +681,8 @@
// Automatically connect on first subscribe
autoConnect: true,
// Automatically disconnect on last unsubscribe
autoDisconnect: true
autoDisconnect: true,
authKey: null
}
this.subsByStream = {}
this.subById = {}
Expand Down Expand Up @@ -720,19 +723,33 @@
return this.subsByStream[streamId] || []
}

StreamrClient.prototype.subscribe = function(streamId, callback, options) {
StreamrClient.prototype.subscribe = function(options, callback, legacyOptions) {
var _this = this

if (!streamId)
throw "subscribe: Invalid arguments: stream id is required!"
else if (typeof streamId !== 'string')
throw "subscribe: stream id must be a string!"

if (!callback)
if (!options) {
throw "subscribe: Invalid arguments: subscription options is required!"
} else if (!callback) {
throw "subscribe: Invalid arguments: callback is required!"
}

// Backwards compatibility for giving a streamId as first argument
if (typeof options === 'string') {
options = {
stream: options
}
} else if (typeof options !== 'object') {
throw "subscribe: options must be an object"
}

// Backwards compatibility for giving an options object as third argument
extend(options, legacyOptions)

if (!options.stream) {
throw "subscribe: Invalid arguments: options.stream is not given"
}

// Create the Subscription object and bind handlers
var sub = new Subscription(streamId, callback, options)
var sub = new Subscription(options.stream, options.partition || 0, options.authKey || this.options.authKey, callback, options)
sub.on('gap', function(from, to) {
_this._requestResend(sub, {resend_from: from, resend_to: to})
})
Expand Down Expand Up @@ -985,7 +1002,7 @@

// If this is the first subscription for this stream, send a subscription request to the server
if (!subs._subscribing && subscribedSubs.length === 0) {
var req = extend({}, sub.options, {type: 'subscribe', stream: sub.streamId})
var req = extend({}, sub.options, { type: 'subscribe', stream: sub.streamId, authKey: sub.authKey })
debug("_requestSubscribe: subscribing client: %o", req)
subs._subscribing = true
_this.connection.send(req)
Expand Down Expand Up @@ -1021,7 +1038,7 @@

sub.resending = true

var request = extend({}, options, resendOptions, {type: 'resend', stream: sub.streamId, sub: sub.id})
var request = extend({}, options, resendOptions, {type: 'resend', stream: sub.streamId, partition: sub.streamPartition, authKey: sub.authKey, sub: sub.id})
debug("_requestResend: %o", request)
this.connection.send(request)
}
Expand Down
Loading