Skip to content
master
Switch branches/tags
Code

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
src
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Streamr

Streamr Java Client

Using this library, you can easily interact with Streamr over HTTP and websocket APIs from Java-based applications.

This library covers the following functionality:

Build Status

Installation

This library is published to the Maven Central repository.

Using Maven

In your pom.xml, add the repository:

<repositories>
  <repository>
    <url>https://dl.bintray.com/ethereum/maven</url>
  </repository>
  ...
</repositories>

And the artifact itself (replace x.y.z with the latest version):

<dependencies>
  <dependency>
    <groupId>com.streamr</groupId>
    <artifactId>client</artifactId>
    <version>x.y.z</version>
  </dependency>
  ...
</dependencies>

Using Gradle

In your build.gradle, add the repository:

repositories {
    maven {
        url "https://dl.bintray.com/ethereum/maven/"
    }
}

And the artifact itself (replace x.y.z with the latest version):

dependencies {
    implementation 'com.streamr:client:x.y.z'
}

Usage

Every interaction with Streamr is done through a StreamrClient instance. In the following sections, we will see how to:

Instantiation and options

Quickstart (unauthenticated):

StreamrClient client = new StreamrClient();

Quickstart (authenticated):

// An Ethereum private key to use for signing and identity
String myPrivateKey = "0x..."; 
StreamrClient client = new StreamrClient(new EthereumAuthenticationMethod(myPrivateKey));

For full configuration of the client's behavior, you can construct the client with StreamrClientOptions:

StreamrClientOptions options = new StreamrClientOptions(...);
StreamrClient client = new StreamrClient(options);

The complete constructor of the StreamrClientOptions has the following signature:

StreamrClientOptions(
  AuthenticationMethod authenticationMethod,
  SigningOptions signingOptions,
  EncryptionOptions encryptionOptions,
  String websocketApiUrl,
  String restApiUrl,
  int propagationTimeout,
  int resendTimeout,
  boolean skipGapsOnFullQueue
)

The next subsections will cover every parameter of the StreamrClientOptions constructor:

Authentication

To authenticate as a Streamr user, provide an AuthenticationMethod instance. We have one concrete class that extend AuthenticationMethod:

  • EthereumAuthenticationMethod(String ethereumPrivateKey)

To authenticate with an Ethereum account, create an EthereumAuthenticationMethod instance and pass it to the StreamrClient constructor:

StreamrClient client = new StreamrClient(new EthereumAuthenticationMethod(myEthereumPrivateKey)); 

The library will automatically initiate a challenge-response protocol to allow you to prove that you own the Ethereum private key without revealing it. You will be identified with the associated Ethereum public address. At the end of the protocol, the library will fetch a session token to allow authenticated requests to be made.

You can access public resources without authenticating. In this case you can create the instance without any arguments:

StreamrClient client = new StreamrClient();

Signing

The events published to streams can be signed using an Ethereum private key and verified using the corresponding Ethereum public key. The signing options define two policies: one deciding when to sign, the other when to verify.

The SigningOptions instance can be constructed as follows:

SigningOptions.SignatureComputationPolicy signPol = SigningOptions.SignatureComputationPolicy.AUTO; // or ALWAYS or NEVER
SigningOptions.SignatureVerificationPolicy verPol = SigningOptions.SignatureVerificationPolicy.AUTO; // or ALWAYS or NEVER
SigningOptions signingOptions = new SigningOptions(signPol, verPol);

The following table describes the meaning of the different values for the SignatureComputationPolicy enum.

Option value Description
AUTO Default value. Published events will be signed if and only if the client is authenticated using the EthereumAuthenticationMethod.
ALWAYS The constructor will throw if the authentication method is not EthereumAuthenticationMethod. Will sign events otherwise.
NEVER Won't sign published events.

The following table describes the meaning of the different values for the SignatureVerificationPolicy enum. Note that every stream has a list of valid Ethereum addresses that are allowed to publish. Every stream also has a metadata boolean flag set by the creator of the stream that determines whether events on the stream are supposed to be signed or not.

In the following table, by "verify" we mean:

  1. Extract the Ethereum address from the signature and check it's equal to the publisher's address (verify the signature itself)
  2. Check that the set of valid publishers Ethereum addresses contains the publisher's address.
Option value Description
AUTO Default value. All signed events are verified. Unsigned events are accepted if and only if the stream does not require signed data according to the metadata boolean flag.
ALWAYS Only signed and verified events are accepted.
NEVER All signed events are verified. Unsigned events are always accepted.

Encryption

We first introduce the GroupKey class: it defines a symmetric AES-256 group key used by the publisher to encrypt data and by the subscriber to decrypt data. A new, random GroupKey can be generated as follows:

GroupKey groupKey = GroupKey.generate();

The GroupKey can then be passed to the publish method to publish end-to-end encrypted messages:

client.publish(stream, payload, groupKey);

To rotate the key, simply generate a new one. This will announce the new key to everyone who has the current key. Rotating the key every now and then establishes forward secrecy: compromised future GroupKeys will not reveal previous messages.

groupKey = GroupKey.generate(); // Generate new key
client.publish(stream, payload, groupKey); // Publish as usual

Subscribers normally obtain the GroupKey via an automatic key exchange mechanism, which is triggered if the subscriber receives messages for which they don't have the key. As an alternative, keys can also be pre-shared manually and configured on the client like this:

client.getKeyStore().add(streamId, new GroupKey(keyId, groupKeyHex));

We also need a way to revoke subscribers whose subscription has expired. This is accomplished with a rekey, which means that a new group key is chosen by the publisher and sent to the remaining valid subscribers but not to the revoked ones. The rekey is a fairly intensive operation which should be used only when necessary.

There are two ways to rekey (examples below):

  • By using the automatic built-in revocation mechanism: it periodically checks how many subscribers should be revoked and rekey if the number reaches a threshold (5 subscribers).
  • By explicitly calling the client.rekey(stream) method at any time.
// autoRevoke determines whether the automatic revocation mechanism is to be used or not. 
// In this case, it is deactivated.
boolean autoRevoke = false; // default is true 
EncryptionOptions encryptionOptions = new EncryptionOptions(autoRevoke);

StreamrClient client = new StreamrClient(new StreamrClientOptions(...)); // passing the encryptionOptions here

GroupKey key = GroupKey.generate()
client.publish(Stream, payload, key); // publishing some message with an initial key

// You can trigger a rekey of the stream at any moment to revoke any expired subscribers from the next message.
key = client.rekey(Stream);

// Publish with the new key generated during the rekey.
client.publish(Stream, payload, key); 

Other options

The following table describes the other options of the StreamrClientOptions constructor and their default values.

Option Default value Description
websocketApiUrl wss://streamr.network/api/v1/ws Address of the websocket endpoint to connect to.
restApiUrl https://streamr.network/api/v1 Base URL of the Streamr REST API.
propagationTimeout 5 seconds When a gap between two received events is detected, a resend request is sent periodically until the gap is resolved. This option determines that period.
resendTimeout 5 seconds When subscribing with a resend option (See this section), the messages requested by a first resend request might not be available yet. This option determines after how much time, the resend must be requested a second time.
skipGapsOnFullQueue true Determine behaviour in the case of gap filling failure. Default behaviour (true) is to clear the internal queue of messages and start immediately processing new incoming messages. This means that any queued messages are effectively ignored and skipped. If it is more important that messages be processed at the expense of latency, this should be set to false. This will mean that in the case of gap filling failure, the next messages (and potential gaps) in the queue will be processed in order. This comes at the expense of the real-time.

Handling Errors

You can customize error handling by registering an error handler.

client.setErrorMessageHandler({ ErrorResponse error ->
    // handle error
})

If no error message handler is register then the error is logged.

Creating streams

You create streams via the create(Stream) method, passing in a prototype Stream object with fields set as you wish. The method returns the Stream object that was actually created.

Stream created = client.createStream(new Stream("Stream name", "Stream description"));

Looking up streams

You can look up streams by id:

Stream stream = client.getStream("id-of-the-stream");

Or by the name of the stream (expects an unique result):

Stream stream = client.getStreamByName("My Fancy Stream");

Publishing events to Streams

Events in Streams are key-value pairs, represented in Java as Map objects. Below is an example of creating an event payload and publishing it into a Stream:

// Create the message payload, which is represented as a Map
// Each 'Object' in the Map must be serializable to JSON.
Map<String, Object> msg = new LinkedHashMap<>();
msg.put("foo", "bar");
msg.put("random", Math.random());

// Then publish it!
client.publish(stream, msg);

All events are timestamped. The above example assigns the current timestamp to the new event, but you can also provide a timestamp explicitly:

client.publish(stream, msg, new Date());

By default streams have one partition. For streams with multiple partitions, you can map messages to partitions using a partition key. The same partition key always maps to the same partition:

client.publish(stream, msg, new Date(), "myPartitionKey");

The events can be end-to-end encrypted by passing a GroupKey to the publish method:

GroupKey key = GroupKey.generate();
client.publish(stream, msg, key);

// You can rotate the key at any time
GroupKey newKey = GroupKey.generate();
client.publish(stream, msg2, newKey); // message is encrypted with newKey instead of key

Subscribing and unsubscribing to streams

By subscribing to streams, your application gets immediately notified about new events in the stream. You provide a MessageHandler which gets called with new events.

Subscription sub = client.subscribe(stream, new MessageHandler() {
    @Override
    public void onMessage(Subscription s, StreamMessage message) {
        // Here you can react to the latest message
        System.out.println(message.getParsedContent().toString());
    }
});

You can also choose other options such as a specific partition to subscribe to (for load balancing high-volume, partitioned streams), or specify a resend option:

int partition = 0;
MessageHandler handler = ...
ResendOption resendOption = ...
Subscription sub = client.subscribe(stream, partition, handler, resendOption);

Below are examples of ways to construct the ResendOption.

// Resends the last 10 events
ResendOption opt = new ResendLastOption(10);
// Resends the events from a specific timestamp (and sequence number) for a particular message chain of a publisher
Date from = new Date(341298709);
int sequenceNumber = 0;
ResendOption opt = new ResendFromOption(from, sequenceNumber, "publisherId", "msgChainId");
// Resends the events between two timestamps for a particular message chain of a publisher
Date from = new Date(341298709);
Date to = new Date(341299000);
// the 0s are sequence numbers
ResendOption opt = new ResendRangeOption(from, 0, to, 0, "publisherId", "msgChainId");

To stop receiving events from a stream, pass the Subscription object you got when subscribing to the unsubscribe method:

client.unsubscribe(sub);

Data Unions

This library provides functions for working with Data Unions. The Data Union is a system of efficient revenue splitting contracts that have components on the mainnet and a sidechain. Please see the Data Unions README for more details. The mainnet contract is basically a conduit to the sidechain contract, which handles member addition and removal, does accounting, and stores tokens.

To get a Data Union client instance, call:

client.dataUnionClient(mainnetPrivateKey, sideChainPrivateKey)

The client can be used to deploy and connect to existing DataUnions. mainnetPrivateKey and sideChainPrivateKey are the keys that will be used to sign transactions in this sessions.

To deploy a new Data Union, call:

dataUnionClient.deployDataUnion(name, adminAddress, adminFeeFraction, agents)

Note that the deployed address is a function of name + mainnetKey.

To get an existing instance of Data Union, call:

dataUnionFromMainnetAddress(mainnetAddress) or dataUnionClient.dataUnionFromName(name)

Functions that trigger mainnet transactions (possibly expensive)

Name Returns Description
DataUnionClient.deployDataUnion() DataUnion deploy new DU
DataUnionClient.portTxsToMainnet(txHash, prvKey) List<TransactionReceipt> takes a sidechain TX as input, and ports all triggered bridge TXs to mainnet
DataUnion.sendTokensToBridge() TransactionReceipt sends tokens stored in mainnet DU to sidechain DU
DataUnion.setAdminFeeFraction(fraction) TransactionReceipt sets the fraction that will be kept by admin (admin-only function)

Functions that trigger sidechain transactions (cheap)

Name Returns Description
addMembers(String ...members) TransactionReceipt Add members
partMembers(String ...members) TransactionReceipt Remove members from Data Union
withdrawTokensForSelfOrAsAdmin(String memberAddress, BigInteger amount, boolean sendWithdrawToMainnet) TransactionReceipt Withdraw members tokens to given address
withdrawTokensForMember(BigInteger privateKey, String to, BigInteger amount, boolean sendWithdrawToMainnet) TransactionReceipt Withdraw members tokens

Adding members using admin functions is not at feature parity with the member function join. The newly added member will not be granted publish permissions to the streams inside the Data Union. This will need to be done manually using, streamr.grantPermission(stream_publish, user). Similarly, after removing a member using the admin function removeMembers, the publish permissions will need to be removed in a secondary step using revokePermission(permissionId).

When withdrawing, you can choose to send tokens to sidechain or mainnet with the sendWithdrawToMainnet boolean. If you withdraw to mainnet, you must port the resulting TransactionReceipt across the bridge with DataUnionClient.portTxsToMainnet(sidechainTxReceipt, prvKey), which costs ETH. If you keep tokens on sidechain, you can use the xDai bridge to transfer them to mainnet at a later time.

Read-only functions (free)

Name Returns Description
isDeployed() boolean Check if Data Union deployment has completed
getWithdrawableEarnings(member) BigInteger get member's earnings
isMemberActive(String memberAddress) boolean true if member is active
isMemberInactive(String memberAddress) boolean true if member was removed
totalEarnings() BigInteger
totalEarningsWithdrawn() BigInteger
activeMemberCount() BigInteger
inactiveMemberCount() BigInteger
lifetimeMemberEarnings() BigInteger
joinPartAgentCount() BigInteger
getAdminFeeFraction() BigInteger fee fraction expressed in wei (ie 10^18 means 1)
getEarnings(String member) BigInteger
getWithdrawn(String member) BigInteger
getWithdrawableEarnings(String member) BigInteger

Code Examples

Deploy a data union contract and set the admin fee:

DataUnionClient dataUnionClient = new StreamrClient(new StreamrClientOptions()).dataUnionClient("mainnetAdminPrvKey", "sidechainAdminPrvKey");
// 2% of mainnet revenue will go to admin fee
double adminFeeFraction = 0.02;
List<String> agents = Arrays.asList("0x<address of agent>");
DataUnion dataUnion = dataUnionClient.deployDataUnion("Cool Data Union", adminAddress, adminFeeFraction, agents);

Withdraw for another (ie withdrawer key signs TX, DataUnionClient key pays for it) :

BigInteger withdrawerPrivateKey = new BigInteger("0x...");
String to = "0x....";
EthereumTransactionReceipt receipt = dataUnion.withdrawAllTokensForMember(withdrawerPrivateKey, to)

see also DataUnion.createWithdrawRequest, which creates the withdrawl request that the above code signs. The above method creates and signs the request, but the signature can be created by withdrawer separately.

Here's an example on how to get a member's withdrawable token balance (in "wei", where 1 DATA = 10^18 wei)

BigInteger withdrawable = dataUnion.getEarnings(member);

Contributions

This library is officially developed and maintained by the Streamr core dev team, but community contributions are very welcome!

About

Java library for interacting with Streamr APIs: publishing and subscribing to data, creating streams, etc.

Resources

License

Packages

No packages published