New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clustering and distributed execution #140

Open
liclac opened this Issue Mar 7, 2017 · 7 comments

Comments

Projects
None yet
7 participants
@liclac
Collaborator

liclac commented Mar 7, 2017

This is something that isn't top priority at the moment, but it's going to take a lot of design work, so I'd like to get the ball rolling on the actual planning.

My current idea is to have a k6 hive command (name subject to change), which hooks up to etcd - a lovely piece of software that can handle most of the heavy lifting around clustering, and is also the backbone that makes among other things Kubernetes tick.

Each instance registers itself, along with how many VUs it can handle, exposes an API to talk to the cluster, and triggers a leader election. The information registered in etcd might have a structure like:

  • /loadimpact/k6/nodes/nodeA - node data (JSON)
  • /loadimpact/k6/nodes/nodeB - node data (JSON)

Running a test on the cluster is a matter of calling k6 run --remote=https://a.node.address/ script.js. This causes it to, instead of running the test locally, roll up all the data and files needed and push them to the cluster, where they're stored in etcd - available from each node.

  • /loadimpact/k6/test - test data (JSON)
  • /loadimpact/k6/test/src/... - root for the pushed filesystem

When test data is loaded, each VU instantiates an engine and its maximum number of VUs right away, and watches its own registry information for changes. The elected leader then takes care of patching other nodes to distribute VUs evenly across the cluster.

@neilstuartcraig

This comment has been minimized.

Show comment
Hide comment
@neilstuartcraig

neilstuartcraig May 8, 2017

Collaborator

WRT #202 / request for comments:

I don't have direct experience with etcd so i don't have an opinion there but the notion of using existing, proven software is clearly sound so i'd welcome that.

My initial thought is, how would someone know how many VU's a machine can handle? If there's an automated estimation based on system resources (though obv. e.g. a CPU core on AWS is definitely not === a CPU core on dedicated hardware) then that'd help to at least provide consistency which would be good. I'm aware though that it'd quite easy to overload a load generator with work and thus skew its output as it would lack the system resources to measure accurately. However it's implemented, i would imagine it's going to be necessary to allow users to set/amend the VU capability and a good user guide would help a lot - i.e. defining a way in which users can calculate/estimate - that might be the best way to start actually, keeping it simple and iterating/adding from there.

Collaborator

neilstuartcraig commented May 8, 2017

WRT #202 / request for comments:

I don't have direct experience with etcd so i don't have an opinion there but the notion of using existing, proven software is clearly sound so i'd welcome that.

My initial thought is, how would someone know how many VU's a machine can handle? If there's an automated estimation based on system resources (though obv. e.g. a CPU core on AWS is definitely not === a CPU core on dedicated hardware) then that'd help to at least provide consistency which would be good. I'm aware though that it'd quite easy to overload a load generator with work and thus skew its output as it would lack the system resources to measure accurately. However it's implemented, i would imagine it's going to be necessary to allow users to set/amend the VU capability and a good user guide would help a lot - i.e. defining a way in which users can calculate/estimate - that might be the best way to start actually, keeping it simple and iterating/adding from there.

@liclac

This comment has been minimized.

Show comment
Hide comment
@liclac

liclac May 8, 2017

Collaborator

Honestly, your best shot is probably trial and error. The limiting factor is not typically CPU power, but rather local socket usage, and to some lesser extent, RAM usage, both of which vary slightly between scripts. A good start would be just split your desired number of VUs across as many hosts as you want and seeing if it flies or not.

Collaborator

liclac commented May 8, 2017

Honestly, your best shot is probably trial and error. The limiting factor is not typically CPU power, but rather local socket usage, and to some lesser extent, RAM usage, both of which vary slightly between scripts. A good start would be just split your desired number of VUs across as many hosts as you want and seeing if it flies or not.

@aidylewis

This comment has been minimized.

Show comment
Hide comment
@aidylewis

aidylewis May 10, 2017

Collaborator

Linux has a maximum of 64K ephemeral ports. That's your connection limit. I'd also kernel tune TIME_WAIT etc
http://gatling.io/docs/current/general/operations/#id3.
Telegraf should have a minimal footprint also https://github.com/influxdata/telegraf

Collaborator

aidylewis commented May 10, 2017

Linux has a maximum of 64K ephemeral ports. That's your connection limit. I'd also kernel tune TIME_WAIT etc
http://gatling.io/docs/current/general/operations/#id3.
Telegraf should have a minimal footprint also https://github.com/influxdata/telegraf

@arichiardi

This comment has been minimized.

Show comment
Hide comment
@arichiardi

arichiardi Nov 30, 2017

Thanks for this, I am very interested!

I was wondering if there would be a way to avoid adding a new service to the pool.

The need of extra shared meta date is going to be there, because you would probably want to direct the load test output to a single InfluxDB instance, so maybe we can save this kind of metadata directly there?

I know that this would force folks to stick with InfluxDB, but if we'd use etcd people would in any case need to custom tailor something to collect and aggregate results.

arichiardi commented Nov 30, 2017

Thanks for this, I am very interested!

I was wondering if there would be a way to avoid adding a new service to the pool.

The need of extra shared meta date is going to be there, because you would probably want to direct the load test output to a single InfluxDB instance, so maybe we can save this kind of metadata directly there?

I know that this would force folks to stick with InfluxDB, but if we'd use etcd people would in any case need to custom tailor something to collect and aggregate results.

@liclac

This comment has been minimized.

Show comment
Hide comment
@liclac

liclac Dec 20, 2017

Collaborator

@arichiardi I think it's more important that we look at how we can best implement this, using all available tools, rather than looking at how to minimise dependencies right out of the bat. I'm not saying we should introduce dependencies for the sake of it, but we want this done right.

The current requirements for this to be implemented is as follows:

Prerequisite: Leader assignment

Most of the below requirements have one prerequisite: we need a central point to make all decisions from. The leader doesn't need a lot of processing power, it just needs to keep an eye on things, so to speak.

  1. The initiating client acts as the brain.

    This is the absolute simplest solution, with the drawback that the test will have to be aborted if the client loses connection.

  2. Dedicated master, á la kubernetes.

    Second simplest. Needs a coordinating process that can communicate with all instances, either directly or through an indirection layer (eg. etcd, redis, etc). Because this can be expected to run in the cluster, and we have a persistence mechanism for the test, we could technically recover from a master failure/network hiccup by just skipping the chunk of the timeline that never passed.

  3. Leader election.

    This is fortunately not something we have to implement the algorithm for ourselves (etcd and similar provide it with a single function call), but it does add a hard dependency on something that can provide this. If one of the instances in a cluster can be dynamically elected as the acting master, it would theoretically simplify deployment, I just have a bad feeling it might be opening a can of worms of synchronisation bugs.

Spreading VUs across instances.

The algorithm for this could simply be to spread VUs evenly across all available instances, respecting their caps. We could possibly do some weighing, eg. between an instance with max 1000 VUs and one with max 2000 VUs, the latter could get 2x as many VUs allocated to it.

Possible implementations I can see:

  1. All instances hold persistent connections to the master.

    I don't like this. Connections break. It's real simple though.

  2. Key-Value Observing.

    All instances register themselves in a control plane of some kind (etcd, consul, redis), then watch their own keyspaces. The master node updates them. Reliability is offloaded to the control plane, we don't have to worry about it.

Central execution of thresholds, from a data source.

This would be fairly simple using something like InfluxDB; we can parse threshold snippets for the variables they refer to (there's some code for that already), then query them out of the database, using the starting timestamp of the test as delimiter.

We could do something with shipping samples back to the master, but that feels... a little silly.

Distributed rate limiting.

The --rps flag, and possible future rate limits, need to be distributed to work properly.

Distributed data storage.

We need to be able to store two kinds of things:

  • Archive data - script files, static files, options
  • Runtime data like the fixed seed used for the init context, setup data (#194), etc

This can be anything that can store keys and values of arbitrary size.

Collaborator

liclac commented Dec 20, 2017

@arichiardi I think it's more important that we look at how we can best implement this, using all available tools, rather than looking at how to minimise dependencies right out of the bat. I'm not saying we should introduce dependencies for the sake of it, but we want this done right.

The current requirements for this to be implemented is as follows:

Prerequisite: Leader assignment

Most of the below requirements have one prerequisite: we need a central point to make all decisions from. The leader doesn't need a lot of processing power, it just needs to keep an eye on things, so to speak.

  1. The initiating client acts as the brain.

    This is the absolute simplest solution, with the drawback that the test will have to be aborted if the client loses connection.

  2. Dedicated master, á la kubernetes.

    Second simplest. Needs a coordinating process that can communicate with all instances, either directly or through an indirection layer (eg. etcd, redis, etc). Because this can be expected to run in the cluster, and we have a persistence mechanism for the test, we could technically recover from a master failure/network hiccup by just skipping the chunk of the timeline that never passed.

  3. Leader election.

    This is fortunately not something we have to implement the algorithm for ourselves (etcd and similar provide it with a single function call), but it does add a hard dependency on something that can provide this. If one of the instances in a cluster can be dynamically elected as the acting master, it would theoretically simplify deployment, I just have a bad feeling it might be opening a can of worms of synchronisation bugs.

Spreading VUs across instances.

The algorithm for this could simply be to spread VUs evenly across all available instances, respecting their caps. We could possibly do some weighing, eg. between an instance with max 1000 VUs and one with max 2000 VUs, the latter could get 2x as many VUs allocated to it.

Possible implementations I can see:

  1. All instances hold persistent connections to the master.

    I don't like this. Connections break. It's real simple though.

  2. Key-Value Observing.

    All instances register themselves in a control plane of some kind (etcd, consul, redis), then watch their own keyspaces. The master node updates them. Reliability is offloaded to the control plane, we don't have to worry about it.

Central execution of thresholds, from a data source.

This would be fairly simple using something like InfluxDB; we can parse threshold snippets for the variables they refer to (there's some code for that already), then query them out of the database, using the starting timestamp of the test as delimiter.

We could do something with shipping samples back to the master, but that feels... a little silly.

Distributed rate limiting.

The --rps flag, and possible future rate limits, need to be distributed to work properly.

Distributed data storage.

We need to be able to store two kinds of things:

  • Archive data - script files, static files, options
  • Runtime data like the fixed seed used for the init context, setup data (#194), etc

This can be anything that can store keys and values of arbitrary size.

@coderlifter

This comment has been minimized.

Show comment
Hide comment
@coderlifter

coderlifter Oct 16, 2018

Have you considered implementing something similar to what was done for Locust?

They have a master/slave architecture where the synchronization happens via ZMQ (TCP), which is lightweight enough.

One advantage, in this case, is that there is no need for introducing a hard dependency. The synchronization master/slave can be implemented via ZMQ, HTTP or whatever network protocol you might consider.

IMHO, the only disadvantage that Locust implementation has is the fact that it is a stateful system, where the master must always be started first and can't recover if a slave disappears and then comes back.

I would rather see a stateless system that can handle connectivity issues gracefully.

coderlifter commented Oct 16, 2018

Have you considered implementing something similar to what was done for Locust?

They have a master/slave architecture where the synchronization happens via ZMQ (TCP), which is lightweight enough.

One advantage, in this case, is that there is no need for introducing a hard dependency. The synchronization master/slave can be implemented via ZMQ, HTTP or whatever network protocol you might consider.

IMHO, the only disadvantage that Locust implementation has is the fact that it is a stateful system, where the master must always be started first and can't recover if a slave disappears and then comes back.

I would rather see a stateless system that can handle connectivity issues gracefully.

@na--

This comment has been minimized.

Show comment
Hide comment
@na--

na-- Oct 19, 2018

Member

@coderlifter, thanks, we still haven't finalized the k6 distributed execution design yet, so we'll definitely consider this approach when we get to it. We'll post a final design/RFC here when we start implementing this, so it can be discussed by any interested people.

Member

na-- commented Oct 19, 2018

@coderlifter, thanks, we still haven't finalized the k6 distributed execution design yet, so we'll definitely consider this approach when we get to it. We'll post a final design/RFC here when we start implementing this, so it can be discussed by any interested people.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment