## Run Assertion Checks on Raw `exp08` Dataset

In [1]:
import numpy as np
from glob import glob
from datetime import datetime
from os.path import join, dirname, basename, getsize

In [2]:
### MODIFY BEGIN ###

# Specify the name of the experiment to be checked.
EXP_NAME = "exp08_nym-binaries-v1.1.13_static-http-download"

# Specify path to raw experimental result files.
RAW_EXP_DIR_PARENT = "/PRIVATE_PATH/"

###  MODIFY END  ###

In [3]:
# Construct path to RAW_EXP_DIR: raw experimental result files repository.
RAW_EXP_DIR = join(RAW_EXP_DIR_PARENT, f"dataset_{EXP_NAME}_raw")

In [4]:
def check_gw_script(file: str):
    
    print(f"-> Checking the gateway's experiment-running script '{basename(file)}'...")
    
    print("\n->-> script_correct_exp_id[0]='1'  =>  Checking that we see the expected ID 'exp08' of the experiment to conduct in the script...")
    script_correct_exp_id = ! grep -c "^mixcorr_exp_id=\"exp08\"$" {file}
    print(f"     {script_correct_exp_id[0]=}")
    assert int(script_correct_exp_id[0]) == 1, f"{int(script_correct_exp_id[0])=} != 1"
    
    print("\n->-> script_correct_nym_version[0]='1'  =>  Checking that we see the expected Nym git tag specified...")
    script_correct_nym_version = ! grep -c "^mixcorr_nym_version=\"nym-binaries-v1.1.13\"$" {file}
    print(f"     {script_correct_nym_version[0]=}")
    assert int(script_correct_nym_version[0]) == 1, f"{int(script_correct_nym_version[0])=} != 1"
    
    print("\n->-> script_correct_exp_name[0]='1'  =>  Checking that we see the expected experiment name specified...")
    script_correct_exp_name = ! grep -c "^mixcorr_exp_name=\"\${{mixcorr_exp_id}}_\${{mixcorr_nym_version}}_static-http-download\"$" {file}
    print(f"     {script_correct_exp_name[0]=}")
    assert int(script_correct_exp_name[0]) == 1, f"{int(script_correct_exp_name[0])=} != 1"

In [5]:
def check_gw_log(file: str):
    
    print(f"\n\n-> Checking the gateway's experiment-running log file '{basename(file)}'...")

    print("\n->-> Check that orchestration repository is checked out at the expected tag...")
    correct_orch_tag = ! grep -A 2 "Logging git status of /root/mixcorr/data-collection_live-nym-network_hetzner..." {file}
    assert "HEAD detached at VERSION_FOR_exp08_nym-binaries-v1.1.13_static-http-download" in correct_orch_tag[1], \
        f"'HEAD detached at VERSION_FOR_exp08_nym-binaries-v1.1.13_static-http-download' not in {correct_orch_tag[1]=}"
    assert "nothing to commit, working tree clean" in correct_orch_tag[2], \
        f"'nothing to commit, working tree clean' not in {correct_orch_tag[2]=}"
    
    print("\n->-> Check that experiment scenarios repository is checked out at the expected tag...")
    correct_scens_tag = ! grep -A 2 "Logging git status of /root/mixcorr/data-collection_experiments_nym..." {file}
    assert "HEAD detached at VERSION_FOR_exp08_nym-binaries-v1.1.13_static-http-download" in correct_scens_tag[1], \
        f"'HEAD detached at VERSION_FOR_exp08_nym-binaries-v1.1.13_static-http-download' not in {correct_scens_tag[1]=}"
    assert "nothing to commit, working tree clean" in correct_scens_tag[2], \
        f"'nothing to commit, working tree clean' not in {correct_scens_tag[2]=}"
    
    print("\n->-> Check that no log line contains the message that stored messages were pushed to an endpoint...")
    pushed_stored_msgs_not_present = ! grep "\[MIXCORR\] \[push_stored_messages_to_client\] WARNING At least one on-disk message stored for endpoint " {file}
    print(f"{pushed_stored_msgs_not_present=}")
    assert len(pushed_stored_msgs_not_present) == 0, f"{len(pushed_stored_msgs_not_present)=} != 0"

In [6]:
def check_gw_sphinxflow_logs(flows_dir: str):
    
    print(f"\n\n-> Checking SphinxFlow logs in '{flows_dir}'...")
    
    print("\n->-> Check that timestamps are monotonically increasing per SphinxFlow log file and observed message sizes are restricted to be in { 53, 1675, 2413 }...")
    
    flow_files = sorted(glob(join(flows_dir, "*.sphinxflow")))
    expected_msg_sizes = [53, 1675, 2413]
    
    for flow_file in flow_files:
        
        # Skip over any SphinxFlow files which contain only the header line
        # 'msg_timestamp_nanos,msg_size_bytes\n' (i.e., empty).
        if getsize(flow_file) <= 35:
            continue
        
        flow = np.loadtxt(fname = flow_file, dtype = (np.uint64, np.uint64), delimiter = ",", skiprows = 1)
        
        ts_monotonically_increasing = np.all(np.diff(flow[:, 0]) > 0)
        assert ts_monotonically_increasing == True, f"{ts_monotonically_increasing=} != True"
        
        sizes_expected = np.all(np.isin(flow[:, 1], expected_msg_sizes))
        assert sizes_expected == True, f"{sizes_expected=} != True"

In [7]:
def check_raw_gateway_results(path: str):
    
    # Define all paths relevant to gateway results.
    script_gw = join(path, "4-exp08-run-experiments-nym-gateway.sh")
    log_gw = join(path, "logs_4-exp08-run-experiments-nym-gateway.log")
    sphinxflow_logs = join(path, "gateway_sphinxflows")
    
    # Apply all checks.    
    check_gw_script(script_gw)
    check_gw_log(log_gw)
    check_gw_sphinxflow_logs(sphinxflow_logs)

In [8]:
def check_endp_script(file: str):
    
    print(f"-> Checking experiment-running script of one endpoints instance '{basename(file)}'...")
    
    print("\n->-> script_correct_exp_id[0]='1'  =>  Checking that we see the expected ID 'exp08' of the experiment to conduct in the script...")
    script_correct_exp_id = ! grep -c "^mixcorr_exp_id=\"exp08\"$" {file}
    print(f"     {script_correct_exp_id[0]=}")
    assert int(script_correct_exp_id[0]) == 1, f"{int(script_correct_exp_id[0])=} != 1"
    
    print("\n->-> script_correct_nym_version[0]='1'  =>  Checking that we see the expected Nym git tag specified...")
    script_correct_nym_version = ! grep -c "^mixcorr_nym_version=\"nym-binaries-v1.1.13\"$" {file}
    print(f"     {script_correct_nym_version[0]=}")
    assert int(script_correct_nym_version[0]) == 1, f"{int(script_correct_nym_version[0])=} != 1"
    
    print("\n->-> script_correct_exp_name[0]='1'  =>  Checking that we see the expected experiment name specified...")
    script_correct_exp_name = ! grep -c "^mixcorr_exp_name=\"\${{mixcorr_exp_id}}_\${{mixcorr_nym_version}}_static-http-download\"$" {file}
    print(f"     {script_correct_exp_name[0]=}")
    assert int(script_correct_exp_name[0]) == 1, f"{int(script_correct_exp_name[0])=} != 1"
    
    print("\n->-> script_correct_file_size[0]='1'  =>  Checking that we see the correct file size to be generated specified...")
    script_correct_file_size = ! grep -c "^mixcorr_file_http_chars_num=1048576$" {file}
    print(f"     {script_correct_file_size[0]=}")
    assert int(script_correct_file_size[0]) == 1, f"{int(script_correct_file_size[0])=} != 1"
    
    print("\n->-> script_correct_gw_data[0]='1'  =>  Checking that we see the correct hard-coded gateway data...")
    script_correct_gw_data = ! grep -Pczo 'export mixcorr_static_private_gateway_owner="n16mgskx7xejdgpy9j8wxe944ech4zenrwjvm056"\nexport mixcorr_static_private_gateway_stake="100000000"\nexport mixcorr_static_private_gateway_location="Milky Way, Universe"\nexport mixcorr_static_private_gateway_host="159.69.196.183"\nexport mixcorr_static_private_gateway_mix_host="159.69.196.183:1789"\nexport mixcorr_static_private_gateway_clients_port="9000"\nexport mixcorr_static_private_gateway_version="1.1.13"' {file}
    print(f"     {script_correct_gw_data[0]=}")
    assert int(script_correct_gw_data[0]) == 1, f"{int(script_correct_gw_data[0])=} != 1"

In [9]:
def check_endp_log(results_dir: str, file: str):
    
    print(f"\n\n-> Checking experiment log file '{basename(file)}'...")
    
    print("\n->-> log_correct_gw_identity_key[0]='1'  =>  Checking that we see the correct gateway identity key in the log...")
    log_correct_gw_identity_key = ! grep -c "\- mixcorr_static_private_gateway_identity_key='9cXBAoeGj7sWmMDk39XV3mP5GFY7JQv9LQ1wUrXc1B7u'$" {file}
    print(f"     {log_correct_gw_identity_key[0]=}")
    assert int(log_correct_gw_identity_key[0]) == 1, f"{int(log_correct_gw_identity_key[0])=} != 1"
    
    print("\n->-> log_correct_gw_sphinx_key[0]='1'  =>  Checking that we see the correct gateway sphinx key in the log...")
    log_correct_gw_sphinx_key = ! grep -c "\- mixcorr_static_private_gateway_sphinx_key='7QhSZxbLY6ryrzsWTNAMEawLPuhpT5P1Tk8i6aoJNn1g'$" {file}
    print(f"     {log_correct_gw_sphinx_key[0]=}")
    assert int(log_correct_gw_sphinx_key[0]) == 1, f"{int(log_correct_gw_sphinx_key[0])=} != 1"
    
    print("\n->-> Check that orchestration repository is checked out at the expected tag...")
    log_correct_orch_tag = ! grep -A 2 "Logging git status of /root/mixcorr/data-collection_live-nym-network_hetzner..." {file}
    assert "HEAD detached at VERSION_FOR_exp08_nym-binaries-v1.1.13_static-http-download" in log_correct_orch_tag[1], \
        f"'HEAD detached at VERSION_FOR_exp08_nym-binaries-v1.1.13_static-http-download' not in {log_correct_orch_tag[1]=}"
    assert "nothing to commit, working tree clean" in log_correct_orch_tag[2], \
        f"'nothing to commit, working tree clean' not in {log_correct_orch_tag[2]=}"
    
    print("\n->-> Check that experiment scenarios repository is checked out at the expected tag...")
    log_correct_scens_tag = ! grep -A 2 "Logging git status of /root/mixcorr/data-collection_experiments_nym..." {file}
    assert "HEAD detached at VERSION_FOR_exp08_nym-binaries-v1.1.13_static-http-download" in log_correct_scens_tag[1], \
        f"'HEAD detached at VERSION_FOR_exp08_nym-binaries-v1.1.13_static-http-download' not in {log_correct_scens_tag[1]=}"
    assert "nothing to commit, working tree clean" in log_correct_scens_tag[2], \
        f"'nothing to commit, working tree clean' not in {log_correct_scens_tag[2]=}"
    
    print("\n->-> Checking that random file of correct size has been created...")
    exp_file_generated = ! grep -A 1 "Generating 1048566 random characters for experiment document of size 1048576 Bytes (10 remaining characters will be run-specific ID)..." {file}
    exp_file_generated = "\n".join(exp_file_generated)
    assert "-rw-r--r-- 1 root root 1048566 " in exp_file_generated, f"Unexpected permissions and/or size or generated file: '{exp_file_generated}'"

In [10]:
def check_runs(results_dir: str):
    
    print("\n\n-> Checking result folders of all runs in this experiment...")
    
    # Find all succeeded and failed runs.
    runs_dirs = sorted(glob(join(results_dir, "exp08_curl_run_*")))
    runs_succeeded_files = sorted(glob(join(results_dir, "exp08_curl_run_*", "SUCCEEDED")))
    runs_failed_files = sorted(glob(join(results_dir, "exp08_curl_run_*", "FAILED")))
    runs_completed_files = [ *runs_succeeded_files, *runs_failed_files ]
    
    assert len(runs_completed_files) == (len(runs_succeeded_files) + len(runs_failed_files)), \
        f"{len(runs_completed_files)=} != ({len(runs_succeeded_files)=} + {len(runs_failed_files)=})"
    
    print(f"\n->-> Of a total of {len(runs_dirs)=} runs, {len(runs_succeeded_files)=} SUCCEEDED and {len(runs_failed_files)=} FAILED...")
    
    # Convert lists with run indicators (SUCCEEDED or FAILED files) into lists containing
    # the files respective parent directories, i.e., the individual run folders.
    runs_succeeded = sorted(list(map(lambda file: dirname(file), runs_succeeded_files)))
    runs_failed = sorted(list(map(lambda file: dirname(file), runs_failed_files)))
    runs_completed = sorted(list(map(lambda file: dirname(file), runs_completed_files)))
    
    # Glob for nym-network-requester configuration file in each SUCCEEDED run folder.
    networkreq_confs = list(map(lambda run_dir: 
                            glob(join(run_dir, "nym_folder", "service-providers", "network-requester", "network_requester_*", "config", "config.toml"))[0],
                            runs_succeeded))
    
    # Glob for nym-socks5-client configuration file in each SUCCEEDED run folder.
    socks5client_confs = list(map(lambda run_dir:
                                  glob(join(run_dir, "nym_folder", "socks5-clients", "socks5-client_*", "config", "config.toml"))[0],
                                  runs_succeeded))
    
    
    print("\n->-> Check that only a single result file (SUCCEEDED, FAILED) exists in each run directory...")
    runs_completed_unique = set(runs_completed)
    print(f"{len(runs_completed)=}")
    print(f"{len(runs_completed_unique)=}")
    assert len(runs_completed) == len(runs_completed_unique), f"{len(runs_completed)=} != {len(runs_completed_unique)=}"
    
    
    print("\n->-> Check that each SUCCEEDED run actually used the SOCKS5 client to download the file (by way of checking for \"Proxy (started|finished)\" log lines)...")
    for run_succeeded_file in runs_succeeded_files:
        
        # Obtain the two respective log files (socks5-client, requester-server) of this SUCCEEDED run.
        log_socks5_client = glob(join(dirname(run_succeeded_file), "logs_socks5-client_run-*.log"))[0]
        log_requester_server = glob(join(dirname(run_succeeded_file), "logs_requester-server_run-*.log"))[0]
        
        # Ensure that the log line is present in the socks5-client log that the proxied HTTP request was started.
        log_socks5_client_proxy_start = ! grep -c "> Starting proxy for 127.0.0.1:9909 (id: " {log_socks5_client}
        assert int(log_socks5_client_proxy_start[0]) == 1, f"{int(log_socks5_client_proxy_start[0])=} != 1"
        
        # Ensure that the log line is present in the network-requester log that the proxied HTTP request was started.
        log_requester_server_proxy_start = ! grep -c "> Starting proxy for 127.0.0.1:9909 (currently there are 1 proxies being handled)$" {log_requester_server}
        assert int(log_requester_server_proxy_start[0]) == 1, f"{int(log_requester_server_proxy_start[0])=} != 1"
        
        # Ensure that the log line is present in the network-requester log that the proxied HTTP request concluded.
        log_requester_server_proxy_end = ! grep -c "> Proxy for 127.0.0.1:9909 is finished  (currently there are 0 proxies being handled)$" {log_requester_server}
        assert int(log_requester_server_proxy_end[0]) == 1, f"{int(log_requester_server_proxy_end[0])=} != 1"
        
        # Ensure that the log line is present in the socks5-client log that the proxied HTTP request concluded.
        log_socks5_client_proxy_end = ! grep -c "> Proxy for 127.0.0.1:9909 is finished (id: " {log_socks5_client}
        assert int(log_socks5_client_proxy_end[0]) == 1, f"{int(log_socks5_client_proxy_end[0])=} != 1"
    
    
    print("\n->-> Check that the requester-server log of each SUCCEEDED run shows the correct two document.txt sizes...")
    for run_succeeded_file in runs_succeeded_files:
        
        # Obtain the respective requester-server log file of this SUCCEEDED run.
        log_requester_server = glob(join(dirname(run_succeeded_file), "logs_requester-server_run-*.log"))[0]
        
        # Look for log line listing document.txt before runID was appended to it.
        log_requester_server_doc_size_initial = ! grep "^-rw-r--r-- 1 root root 1048566 " {log_requester_server} | grep -c "/webserver_directory/document.txt"
        assert int(log_requester_server_doc_size_initial[0]) == 1, f"{int(log_requester_server_doc_size_initial[0])=} != 1"
        
        # Look for log line listing document.txt after runID was appended to it.
        log_requester_server_doc_size_final = ! grep "^-rw-r--r-- 1 root root 1048576 " {log_requester_server} | grep -c "/webserver_directory/document.txt"
        assert int(log_requester_server_doc_size_final[0]) == 1, f"{int(log_requester_server_doc_size_final[0])=} != 1"
    
    
    print("\n->-> Check that the SHA512 hashes of the curl client and webserver documents are equal...")
    webserver_doc_hashes = np.zeros(len(runs_succeeded), dtype = (np.unicode_, 128))
    
    for idx, run_succeeded_file in enumerate(runs_succeeded_files):
        
        with open(run_succeeded_file, mode = "r", encoding = "utf-8") as run_succeeded_fp:
            
            # Extract both documents' SHA512 hash from file (first part of each line).
            curl_doc_hash = str(run_succeeded_fp.readline().split(" ")[0])
            webserver_doc_hash = str(run_succeeded_fp.readline().split(" ")[0])
            assert curl_doc_hash == webserver_doc_hash, f"{curl_doc_hash=} != {webserver_doc_hash=}"
            
            # Store hash of webserver's document for later check.
            webserver_doc_hashes[idx] = webserver_doc_hash
    
    
    print("\n->-> Check that all found webserver documents are unique...")
    webserver_doc_hashes_unique = np.unique(webserver_doc_hashes)
    print(f"{len(webserver_doc_hashes)=}")
    print(f"{len(webserver_doc_hashes_unique)=}")
    assert len(webserver_doc_hashes_unique) == len(webserver_doc_hashes), \
        f"{len(webserver_doc_hashes_unique)=} != {len(webserver_doc_hashes)=}"
    assert webserver_doc_hashes_unique.size == webserver_doc_hashes.size, \
        f"{webserver_doc_hashes_unique.size=} != {webserver_doc_hashes.size=}"
    assert webserver_doc_hashes_unique.shape == webserver_doc_hashes.shape, \
        f"{webserver_doc_hashes_unique.shape=} != {webserver_doc_hashes.shape=}"
    
    
    print("\n->-> Checking that all SUCCEEDED client configuration values related to timing/delay are what we expect for exp08...")
    
    print(f"{len(networkreq_confs)=}")
    print(f"{len(socks5client_confs)=}")
    assert len(networkreq_confs) == len(socks5client_confs), f"{len(networkreq_confs)=} != {len(socks5client_confs)=}"
    
    # Ensure that all SUCCEEDED run nym-network-requester configuration files
    # contain the expected part regarding timing and delay values.
    for networkreq_conf in networkreq_confs:
        
        with open(networkreq_conf, mode = "r", encoding = "utf-8") as conf:
            
            conf_data = conf.read()
            assert "average_packet_delay = '50ms'\naverage_ack_delay = '50ms'\nloop_cover_traffic_average_delay = '200ms'\nmessage_sending_average_delay = '20ms'\n" in conf_data
    
    # Ensure that all SUCCEEDED run nym-socks5-client configuration files
    # contain the expected part regarding timing and delay values.
    for socks5client_conf in socks5client_confs:
        
        with open(socks5client_conf, mode = "r", encoding = "utf-8") as conf:
            
            conf_data = conf.read()
            assert "average_packet_delay = '50ms'\naverage_ack_delay = '50ms'\nloop_cover_traffic_average_delay = '200ms'\nmessage_sending_average_delay = '20ms'\n" in conf_data
    
    
    print("\n->-> Check that FAILED runs fail for the single known failure reason...")
    for idx, run_failed_file in enumerate(runs_failed_files):
        
        with open(run_failed_file, mode = "r", encoding = "utf-8") as run_failed_fp:
            
            err_msg = str(run_failed_fp.read().strip())
            assert err_msg == "Proxy between curl=>socks5-client and network-requester=>webserver did not complete", \
                f"{err_msg=} != 'Proxy between curl=>socks5-client and network-requester=>webserver did not complete'"

In [11]:
def check_raw_endpoints_results(path: str):
    
    # Define all paths relevant to endpoints results.
    script_endp = join(path, "5-exp08-run-experiments-nym-endpoints.sh")
    log_endp = join(path, "logs_5-exp08-run-experiments-nym-endpoints.log")
    
    # Apply all checks.
    check_endp_script(script_endp)
    check_endp_log(path, log_endp)
    check_runs(path)

In [12]:
raw_gateway_folder = sorted(glob(join(RAW_EXP_DIR, f"*_mixcorr-nym-gateway-ccx22-exp08_{EXP_NAME}")))[0]
raw_endpoints_folders = sorted(glob(join(RAW_EXP_DIR, f"*_mixcorr-nym-endpoints-ccx22-exp08-*_{EXP_NAME}")))

In [13]:
print(f"----- [{datetime.now().strftime('%Y-%m-%d_%H:%M:%S.%f')}] Begin merging previously `split` files again -----\n")

# Find all files ending in '.[0-9][0-9]', indicating they are part of a larger file that
# was previously `split` into smaller ones in order to be able to upload to GitHub.
prev_split_files = ! find {RAW_EXP_DIR} -type f -name "*.[0-9][0-9]" | sort -d

if len(prev_split_files) > 0:
    
    print("-> Going to merge again the following previously `split` files:")
    print(*prev_split_files, sep = "\n")
    print()
    
    prev_split_files_bases = set()
    
    for prev_split_file in prev_split_files:
        prev_split_files_bases.add(prev_split_file.rsplit(".", 1)[0])
    
    for prev_split_files_base in prev_split_files_bases:
        ! cat {prev_split_files_base}.?? > {prev_split_files_base}

print(f"----- [{datetime.now().strftime('%Y-%m-%d_%H:%M:%S.%f')}] End merging previously `split` files again -----\n\n\n")



print(f"----- [{datetime.now().strftime('%Y-%m-%d_%H:%M:%S.%f')}] Begin running assertion checks on raw exp08 dataset -----\n")


# Start with checking the raw gateway result files.
print(f"\n\n----- BEGIN {raw_gateway_folder} -----\n")
check_raw_gateway_results(raw_gateway_folder)
print(f"\n\n-----  END  {raw_gateway_folder} -----\n\n\n")


for raw_endpoints_folder in raw_endpoints_folders:
    
    # Then, check each endpoints-related raw results folder.
    print(f"\n\n----- BEGIN {raw_endpoints_folder} -----\n")
    check_raw_endpoints_results(raw_endpoints_folder)
    print(f"\n\n-----  END  {raw_endpoints_folder} -----\n\n\n")

    
print(f"----- [{datetime.now().strftime('%Y-%m-%d_%H:%M:%S.%f')}] Done running assertion checks on raw exp08 dataset -----\n")

----- [2023-04-24_15:33:10.322822] Begin merging previously `split` files again -----

-> Going to merge again the following previously `split` files:
/PRIVATE_PATH/dataset_exp08_nym-binaries-v1.1.13_static-http-download_raw/2023-04-14_16-21-52_mixcorr-nym-gateway-ccx22-exp08_exp08_nym-binaries-v1.1.13_static-http-download/gateway_sphinxflows/9Pff2ZEXMUozH6DwcwJHzn8sojymc1NmYsdak2NpuMtF_gateway-to-endpoint.sphinxflow.01
/PRIVATE_PATH/dataset_exp08_nym-binaries-v1.1.13_static-http-download_raw/2023-04-14_16-21-52_mixcorr-nym-gateway-ccx22-exp08_exp08_nym-binaries-v1.1.13_static-http-download/gateway_sphinxflows/9Pff2ZEXMUozH6DwcwJHzn8sojymc1NmYsdak2NpuMtF_gateway-to-endpoint.sphinxflow.02
/PRIVATE_PATH/dataset_exp08_nym-binaries-v1.1.13_static-http-download_raw/2023-04-14_16-21-52_mixcorr-nym-gateway-ccx22-exp08_exp08_nym-binaries-v1.1.13_static-http-download/logs_4-exp08-run-experiments-nym-gateway.log.01
/PRIVATE_PATH/dataset_exp08_nym-binaries-v1.1.13_static-http-download_raw/2023-04