Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage node cluster #180

Merged
merged 11 commits into from
Oct 4, 2021
Merged

Storage node cluster #180

merged 11 commits into from
Oct 4, 2021

Conversation

hpihkala
Copy link
Contributor

@hpihkala hpihkala commented Sep 2, 2021

This PR proposes a change which allows a set of storage nodes connected to the same cassandra cluster to share load.

Conceptual changes

Currently, streams are assigned to a storage node via the StreamStorageRegistry. However, the registry is actually just a mapping between streams and an Ethereum address; there is no meaning attached to the assigned address as such. We have previously interpreted that address to be the address corresponding to the private key the storage node is configured with, but this does not need to be the case.

Instead, we can interpret the address to be an identifier for a storage cluster, meaning a group of nodes within a single storage facility. The address of the nodes is decoupled from the address of the storage cluster.

Each node in the cluster fetches the stream assignments for the clusterAddress, while each node's own address need not be that address.

The idea is illustrated on this Miro board.

Configuring a storage cluster

The following settings are added to storage plugin config:

    "storage": {
      ...
      "cluster": {
        "clusterAddress": "0x12345",
        "clusterSize": 3,
        "myIndexInCluster": 0
      }
    }

All nodes in the storage cluster must agree on the clusterAddress and clusterSize as well as the cassandra settings. myIndexInCluster must be unique and run from 0 to clusterSize-1. For example, these could be the configs of three individual nodes in a storage cluster 0x12345:

      // Node 0xaaaaaa
      "cluster": {
        "clusterAddress": "0x12345",
        "clusterSize": 3,
        "myIndexInCluster": 0
      }

      // Node 0xbbbbb
      "cluster": {
        "clusterAddress": "0x12345",
        "clusterSize": 3,
        "myIndexInCluster": 1
      }

      // Node 0xccccc
      "cluster": {
        "clusterAddress": "0x12345",
        "clusterSize": 3,
        "myIndexInCluster": 2
      }

In a cluster of size 1, all stream-partitions obviously map to self. In fact, the default config for cluster is the single-node case, making the change 100% backwards compatible:

      // Node 0x12345
      "cluster": {
        "clusterAddress": null, // if null, uses node address 0x12345 for assignment discovery like before
        "clusterSize": 1,
        "myIndexInCluster": 0
      }

Load balancing within the cluster

The nodes subscribe and store streams as follows:

  • As usual, each node queries what streams are assigned to storage node 0x12345 (which is now the address of none of the nodes themselves, but a virtual address of the cluster).
  • Instead of subscribing to all of them, each node subscribes to part of them.
  • The selection is deterministic and based on hashing a key containing streamId and partition, and deriving an index between [0, clusterSize-1] from the hash. The mapped index is compared to myIndexInCluster, and the node only subscribes to the stream if they match. The determinism guarantees that all nodes in the cluster agree on who should store what. Simplified, the formula is:
whoShouldStoreIt = hash(streamId + streamPartition) % clusterSize

Note that this update does not add any fault tolerance feature, only a load balancing feature. However, it does reduce the chance of the whole storage facility failing at once: if there are three nodes in the cluster and one fails, the two remaining ones can continue to answer queries (for all streams assigned to the cluster) as well as subscribe to and store their share of the load (two thirds of all stream-partitions assigned to the cluster).

Other changes in this PR

  • Refactor parts of the message partitioning logic from client to protocol in order to share the same algorithm with the storage code
  • Add some more unit test coverage for StorageConfig

@github-actions github-actions bot added broker Related to Broker Package client Related to Client Package protocol Related to Protocol Package labels Sep 2, 2021
Copy link
Contributor

@teogeb teogeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

packages/broker/src/plugins/storage/StorageConfig.ts Outdated Show resolved Hide resolved
packages/broker/src/plugins/storage/StoragePlugin.ts Outdated Show resolved Hide resolved
const storageConfig = await StorageConfig.createInstance(brokerAddress, apiUrl, this.pluginConfig.storageConfig.refreshInterval)
const storageConfig = await StorageConfig.createInstance(
this.pluginConfig.cluster.clusterAddress || brokerAddress,
this.pluginConfig.cluster.clusterSize || 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema specifies that clusterSize and myIndexInCluster are always non-null numbers -> no need to have fallback values

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it might be better to have one source for default values to avoid any potential confusion.

Copy link
Contributor Author

@hpihkala hpihkala Sep 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the fallbacks for clusterSize and myIndexInCluster. I guess there's no way to have the logic

if clusterAddress is null, use the brokerAddress

...in the schema - so that should still stay here?

@streamr-dev streamr-dev deleted a comment from teogeb Sep 9, 2021
packages/broker/src/plugins/storage/StorageConfig.ts Outdated Show resolved Hide resolved
packages/broker/src/plugins/storage/StoragePlugin.ts Outdated Show resolved Hide resolved
const storageConfig = await StorageConfig.createInstance(brokerAddress, apiUrl, this.pluginConfig.storageConfig.refreshInterval)
const storageConfig = await StorageConfig.createInstance(
this.pluginConfig.cluster.clusterAddress || brokerAddress,
this.pluginConfig.cluster.clusterSize || 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it might be better to have one source for default values to avoid any potential confusion.

@hpihkala
Copy link
Contributor Author

I think the test failures in this branch are now the same as in main... 😁

})
}

private belongsToMeInCluster(key: StreamKey): boolean {
const hashedIndex = Protocol.Utils.keyToArrayIndex(this.clusterSize, key.toString())
return hashedIndex === this.myIndexInCluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the cluster size changes, data may go to a different node, will this break data retrieval?

Copy link
Contributor Author

@hpihkala hpihkala Sep 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't break data retrieval, because all the data still goes into the same Cassandra cluster. All of the nodes in the cluster can equally well retrieve data from that Cassandra cluster.

All nodes must be reconfigured and restarted if the cluster size is changed, so that they are aligned on who subscribes to what - see the answers to your other question.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I didn't catch the "same Cassandra cluster" part, makes sense now.

apiUrl: string
private _poller!: ReturnType<typeof setTimeout>
private _stopPoller: boolean

// use createInstance method instead: it fetches the up-to-date config from API
constructor(nodeId: string, apiUrl: string) {
constructor(clusterId: string, clusterSize: number, myIndexInCluster: number, apiUrl: string) {
Copy link
Contributor

@timoxley timoxley Sep 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the clusterSize config differs between nodes? e.g. config updated but nodes not restarted at the same time

What happens if there are multiple nodes with the same myIndexInCluster?

Copy link
Contributor Author

@hpihkala hpihkala Sep 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the clusterSize config differs between nodes? e.g. config updated but nodes not restarted at the same time

In that case the nodes aren't aligned on who handles what. Some streams may be subscribed to by multiple nodes (not a fatal problem, see below), while some streams may not be subscribed to by any nodes (the bigger problem). It's the responsibility of the storage cluster operator to ensure correct configuration (which is acceptable for now IMO).

What happens if there are multiple nodes with the same myIndexInCluster?

In that case the nodes will subscribe to the exact same subset of streams, and all that data gets written to Cassandra multiple times (first written and then overwritten). It will work, but produces unnecessary load and cleanup overhead for Cassandra.

@github-actions github-actions bot removed client Related to Client Package protocol Related to Protocol Package labels Oct 4, 2021
@harbu harbu merged commit 92cd55e into main Oct 4, 2021
@harbu harbu deleted the storage-node-cluster branch October 4, 2021 14:21
timoxley added a commit that referenced this pull request Oct 4, 2021
* main:
  broker: Storage node cluster (#180)
  ci: workaround for actions/labeler#136
  style: fix label.yml indentation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
broker Related to Broker Package
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants