Skip to content
Permalink
Browse files

Chaos Monkey now messes up with network (#1514)

* Chaos Monkey now messes up with network

* Fixing a typo

* Adding missing shell command

* Marking `local_network` monkey as existing

* Adding one of the new tests that was previously missing
  • Loading branch information...
SkidanovAlex authored and ilblackdragon committed Oct 22, 2019
1 parent 864d6c9 commit f3c72387b153e7c21dba14575b0e56a7ec9dcc96
Showing with 171 additions and 20 deletions.
  1. +3 −0 .gitignore
  2. +2 −2 pytest/lib/cluster.py
  3. +47 −0 pytest/lib/network.py
  4. +61 −0 pytest/tests/stress/network_stress.py
  5. +58 −18 pytest/tests/stress/stress.py
@@ -41,3 +41,6 @@ tmp/

# Logs
*.log

# Vim tmp files
*.swp
@@ -67,15 +67,15 @@ def json_rpc(self, method, params):
'id': 'dontcare',
'jsonrpc': '2.0'
}
r = requests.post("http://%s:%s" % self.rpc_addr(), json=j)
r = requests.post("http://%s:%s" % self.rpc_addr(), json=j, timeout=1)
r.raise_for_status()
return json.loads(r.content)

def send_tx(self, signed_tx):
return self.json_rpc('broadcast_tx_async', [base64.b64encode(signed_tx).decode('utf8')])

def get_status(self):
r = requests.get("http://%s:%s/status" % self.rpc_addr())
r = requests.get("http://%s:%s/status" % self.rpc_addr(), timeout=1)
r.raise_for_status()
return json.loads(r.content)

@@ -0,0 +1,47 @@
import subprocess, sys

def _run_process(cmd):
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate()
return (process.returncode, out, err)

def init_network_pillager():
_run_process(["mkdir", "/sys/fs/cgroup/net_cls/block"])
try:
with open("/sys/fs/cgroup/net_cls/block/net_cls.classid", 'w') as f:
f.write("42")
except IOError as e:
if e[0] == 13:
print("Failed to modify `/sys/fs/cgroup/net_cls/block/net_cls.classid`.")
print("Make sure the current user has access to it, e.g. by changing the owner:")
print("")
print(" chown <group>.<user> /sys/fs/cgroup/net_cls/block/net_cls.classid")
print("")
sys.exit(1)
_run_process(["iptables", "-A", "OUTPUT", "-m", "cgroup", "--cgroup", "42", "-j", "DROP"])

def stop_network(pid):
with open('/sys/fs/cgroup/net_cls/block/tasks', 'w') as f:
f.write(str(pid))

def resume_network(pid):
with open('/sys/fs/cgroup/net_cls/tasks', 'w') as f:
f.write(str(pid))

if __name__ == "__main__":
import time
init_network_pillager()
handle = subprocess.Popen(["ping", "8.8.8.8"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(handle.pid)
time.sleep(3)
stop_network(handle.pid)
time.sleep(3)
resume_network(handle.pid)
time.sleep(3)
handle.kill()
out, err = handle.communicate()
print("STDOUT (expect ~6 entries if all goes well):")
print(out)
print("STDERR (expect ~3 entries if all goes well):")
print(err)

@@ -0,0 +1,61 @@
import sys, random, time, base58, requests

sys.path.append('lib')

from stress import stress_process, doit, monkey_staking, get_validator_ids, get_the_guy_to_mess_up_with, get_recent_hash, sign_payment_tx, expect_network_issues
from network import stop_network, resume_network, init_network_pillager

TIMEOUT = 300

@stress_process
def monkey_transactions_noval(stopped, error, nodes, nonces):
while stopped.value == 0:
validator_ids = get_validator_ids(nodes)

from_ = random.randint(0, len(nodes) - 1)
to = random.randint(0, len(nodes) - 1)
while from_ == to:
to = random.randint(0, len(nodes) - 1)
amt = random.randint(0, 100)
nonce_val, nonce_lock = nonces[from_]

hash_, _ = get_recent_hash(nodes[-1])

with nonce_lock:
tx = sign_payment_tx(nodes[from_].signer_key, 'test%s' % to, amt, nonce_val.value, base58.b58decode(hash_.encode('utf8')))
for validator_id in validator_ids:
try:
tx_hash = nodes[validator_id].send_tx(tx)['result']
except requests.exceptions.ReadTimeout:
pass

nonce_val.value = nonce_val.value + 1

time.sleep(0.1)

@stress_process
def monkey_network_hammering(stopped, error, nodes, nonces):
s = [False for x in nodes]
while stopped.value == 0:
node_idx = random.randint(0, len(nodes) - 2)
pid = nodes[node_idx].handle.pid
if s[node_idx]:
print("Resuming network for process %s" % pid)
resume_network(pid)
else:
print("Stopping network for process %s" % pid)
stop_network(pid)
s[node_idx] = not s[node_idx]

time.sleep(0.5)
for i, x in enumerate(s):
if x:
pid = nodes[i].handle.pid
print("Resuming network for process %s" % pid)
resume_network(pid)


expect_network_issues()
init_network_pillager()
doit(3, 3, 3, 0, [monkey_network_hammering, monkey_transactions_noval, monkey_staking], TIMEOUT)

@@ -14,34 +14,41 @@
# `k`: number of observers (technically `k+1`, one more observer is used by the test)
# `monkeys`: enabled monkeys (see below)
# Supports the following monkeys:
# `node_set`: ocasionally spins up new nodes or kills existing ones, as long as the number of nodes doesn't exceed `N` and doesn't go below `n`. Also makes sure that for each shard there's at least one node that has been live sufficiently long
# `node_restart`: ocasionally restarts nodes
# `local_network`: ocasionally shuts down the network connection between random nodes
# `global_network`: ocasionally shots down the network globally for several seconds
# `transactions`: sends random transactions keeping track of expected balances
# `staking`: runs staking transactions for validators. Presently the test doesn't track staking invariants, relying on asserts in the nearcore.
# `staking2.py` tests some basic stake invariants
# [ ] `node_set`: ocasionally spins up new nodes or kills existing ones, as long as the number of nodes doesn't exceed `N` and doesn't go below `n`. Also makes sure that for each shard there's at least one node that has been live sufficiently long
# [ ] `node_restart`: ocasionally restarts nodes
# [v] `local_network`: ocasionally briefly shuts down the network connection for a specific node
# [ ] `global_network`: ocasionally shots down the network globally for several seconds
# [v] `transactions`: sends random transactions keeping track of expected balances
# [v] `staking`: runs staking transactions for validators. Presently the test doesn't track staking invariants, relying on asserts in the nearcore.
# `staking2.py` tests some basic stake invariants
# This test also completely disables rewards, which simplifies ensuring total supply invariance and balance invariances

import sys, time, base58, random, inspect, traceback
import sys, time, base58, random, inspect, traceback, requests
from multiprocessing import Process, Value, Lock

sys.path.append('lib')

from cluster import init_cluster, spin_up_node
from utils import TxContext
from transaction import sign_payment_tx, sign_staking_tx
from network import init_network_pillager, stop_network, resume_network

TIMEOUT = 1500 # after how much time to shut down the test
TIMEOUT_SHUTDOWN = 60 # time to wait after the shutdown was initiated before
BLOCK_TIMEOUT = 20 # if two blocks are not produced within that many seconds, the test will fail
BALANCES_TIMEOUT = 10 # how long to tolerate for balances to update after txs are sent
BALANCES_TIMEOUT = 15 # how long to tolerate for balances to update after txs are sent
MAX_STAKE = int(1e26)
EPOCH_LENGTH = 20

assert BALANCES_TIMEOUT * 2 <= TIMEOUT_SHUTDOWN
assert BLOCK_TIMEOUT * 2 <= TIMEOUT_SHUTDOWN

network_issues_expected = False

def expect_network_issues():
global network_issues_expected
network_issues_expected = True

def stress_process(func):
def wrapper(stopped, error, *args):
try:
@@ -76,8 +83,20 @@ def monkey_node_restart():
pass

@stress_process
def monkey_local_network():
pass
def monkey_local_network(stopped, error, nodes, nonces):
while stopped.value == 0:
# "- 2" below is because we don't want to kill the node we use to check stats
node_idx = random.randint(0, len(nodes) - 2)
pid = nodes[node_idx].handle.pid
print("Stopping network for process %s" % pid)
stop_network(pid)
if node_idx == get_the_guy_to_mess_up_with(nodes):
time.sleep(5)
else:
time.sleep(1)
print("Resuming network for process %s" % pid)
resume_network(pid)
time.sleep(5)

@stress_process
def monkey_global_network():
@@ -150,7 +169,12 @@ def get_balances():
with nonce_lock:
tx = sign_payment_tx(nodes[from_].signer_key, 'test%s' % to, amt, nonce_val.value, base58.b58decode(hash_.encode('utf8')))
for validator_id in validator_ids:
tx_hash = nodes[validator_id].send_tx(tx)['result']
try:
tx_hash = nodes[validator_id].send_tx(tx)['result']
except requests.exceptions.ReadTimeout:
if not network_issues_expected:
raise

last_tx_set.append((tx, from_, to, tx_hash, amt))
nonce_val.value = nonce_val.value + 1

@@ -202,7 +226,11 @@ def monkey_staking(stopped, error, nodes, nonces):

tx = sign_staking_tx(nodes[whom].signer_key, nodes[whom].validator_key, stake, nonce_val.value, base58.b58decode(hash_.encode('utf8')))
for validator_id in validator_ids:
nodes[validator_id].send_tx(tx)
try:
nodes[validator_id].send_tx(tx)
except requests.exceptions.ReadTimeout:
if not network_issues_expected:
raise
nonce_val.value = nonce_val.value + 1

time.sleep(1)
@@ -292,10 +320,15 @@ def blocks_tracker(stopped, error, nodes, nonces):
print("Largest height: %s" % largest_height)
print("Largest divergence: %s" % largest_divergence)

assert largest_divergence < len(nodes)
if not network_issues_expected:
assert largest_divergence < len(nodes)
else:
assert largest_divergence < 2 * len(nodes)


def doit(s, n, N, k, monkeys):
def doit(s, n, N, k, monkeys, timeout):
global network_issues_expected

assert 2 <= n <= N

config = {'local': True, 'near_root': '../target/debug/'}
@@ -319,6 +352,13 @@ def doit(s, n, N, k, monkeys):
else:
nodes.append(None)

monkey_names = [x.__name__ for x in monkeys]
print(monkey_names)
if 'monkey_local_network' in monkey_names or 'monkey_global_network' in monkey_names:
print("There are monkeys messing up with network, initializing the infra")
init_network_pillager()
network_issues_expected = True

stopped = Value('i', 0)
error = Value('i', 0)
ps = []
@@ -339,12 +379,12 @@ def check_errors():
assert False, "At least one process failed, check error messages above"

for monkey in monkeys:
launch_process(globals()['monkey_%s' % monkey])
launch_process(monkey)

launch_process(blocks_tracker)

started = time.time()
while time.time() - started < TIMEOUT:
while time.time() - started < timeout:
check_errors()
time.sleep(1)

@@ -386,5 +426,5 @@ def check_errors():
for monkey in monkeys:
assert monkey in MONKEYS, "Unknown monkey \"%s\"" % monkey

doit(s, n, N, k, monkeys)
doit(s, n, N, k, [globals()["monkey_%s" % x] for x in monkeys], TIMEOUT)

0 comments on commit f3c7238

Please sign in to comment.
You can’t perform that action at this time.