# hCassandra runTests

## Description

---

### Objective

   This test aims to **automate**:
   
   (1) The execution of the Hydra Cassandra Stress Test (hCassandra) for increasing client load.
   
   (2) The generation of performance results presented in the form of tables and graphs for relevant metrics. 
   
   To this end, performance of the Cassandra Cluster is measured as the number of clients writing and reading into the DataBase is increased. The number of clients can be defined by the user. 
   
 
### Customize the Test

   Modify **total_num_clients** to change the sets of clients for which you wish to execute the test.
   
   Current tests have been run for a maximum of **10000** clients and a duration of 5 minutes against a 3-node Cluster (for further details on Software & Hardware specs please refer to the *Software & Hardware Specs* section).
   
### Useful HINTS for running the test

- If test has been previously executed and output is still shown, you can restart (delete former results) by selecting in the top menu Cell -> All Output -> Clear
- To run test, step on top of the code cells and press the 'run cell' button on the top menu. For automatic Run select from the top menu Cell -> Run All
- If you wish to store your results. After RUN is finished, generate your own report by selecting FILE -> Download as -> Markdown (.md) (or any other preferred format).

## Software & Hardware Specs

---

The tests were executed on Google Cloud Servers, with the following specs:

#### Cassandra Cluster

- 3 Node Cluster, each with the following specs:
   - 16 vCPUs
   - RAM: 60 GB
   - Disk: 60 GB
   - OS: Debian 3.16.7-ckt25-2
  
- Cassandra + Cassandra-Tools Version: 3.0.6

#### Hydra Cluster

- **MASTER**: 1 Server
   - 4 vCPUs
   - RAM: 15 GB
   - OS: Ubuntu 14.04

- **SLAVES**: 9 Servers (hosts to the cassandra-stress tool)
   - 16 vCPUs 
   - RAM: 60 GB
   - Disk: 60 GB
   - OS: Debian 3.16.7-ckt25-2
   
### Important 

- For the performance tests maximum file open limit (ulimit) had to be increased for the Master Node.

---

## hCassandra Test 1: Fixed Number of Stress Clients (Debug Mode) 

---

The following test runs a SINGLE execution of the Cassandra Test for a fixed number of clients (total_client_count) and operations (total_ops_count). Runs in debug mode: showing logger info during execution. 

In [None]:
!python ./src/hCassandra_test.py --cluster_ips='10.10.1.108,10.10.1.165,10.10.1.162' --total_client_count=20 --total_ops_count=1000 --test_duration=5 

---

## hCassandra Test 2: Increasing the Number of stress clients (multiple runs)

---


**IMPORTANT**:

   If you want to change the number of clients and/or number of operations for your test, please set values to desired in the following section:


In [None]:
# Define num Client(s) / Operation(s)
total_num_clients = [10, 100, 200, 400, 800, 1600, 3200, 5000, 6000, 7000, 8000]
duration_array = [5, 10, 10, 10, 10, 20, 40, 60, 60, 70, 80]
total_ops_count = [1000000]
simulate_failure = False
# Set IPs of Nodes in Cassandra Cluster
cassandra_cluster_ips = '10.10.1.108,10.10.1.165,10.10.1.162'

**UTIL FUNCTIONS**

In [None]:
import json
import ast

def get_result(test_stdout):
    """This Function gets (filters) the Cassandra Test Results from stdout"""
    index_start = test_stdout.find('Cassandra Stress Results: \n')
    index_end = test_stdout.find('Calling Server shutdown')
    if index_start != -1:
        results = test_stdout[(index_start + len('Cassandra Stress Results: \n')):index_end]
        res_dict = ast.literal_eval(results)
        return res_dict
    else:
        return {}

The following block of code is the actual **EXECUTION OF THE CASSANDRA SCALE TESTS**. This may take a couple of minutes:

In [None]:
import subprocess
import os
import json
import signal

hCassandra_results = dict()

print 'STARTING CASSANDRA STRESS TESTS \n'
# Execute hCassandra_test for given client_count
for idx1, clients in enumerate(total_num_clients):
    for idx2, ops in enumerate(total_ops_count):
        print ('Test (%s/%s) in progress.. Please wait until test is completed..' % ((len(total_ops_count) * idx1) + idx2 + 1,len(total_num_clients) * len(total_ops_count)))
        # Execute hCassandra_test.py (python script for hCassandra Scale Test)
        if simulate_failure:
            hcass_cmd = "python ./src/hCassandra_test.py --cluster_ips=%s --total_client_count=%s --total_ops_count=%s --test_duration=%s" % (cassandra_cluster_ips, clients, ops, duration_array[idx1])
        else:
            hcass_cmd = "python ./src/hCassandra_test.py --cluster_ips=%s --total_client_count=%s --total_ops_count=%s --test_duration=%s" % (cassandra_cluster_ips, clients, ops, duration_array[idx1])
        stress_test = subprocess.Popen(hcass_cmd, stdout=subprocess.PIPE,
                                          stderr=subprocess.PIPE, shell=True, preexec_fn=os.setsid)
        stdout, stderr = stress_test.communicate()
        results_dict = get_result(stdout)
        if len(results_dict) <= 1:
            print ('There was an ERROR while attempting to parse stdout...')
            print 'STDOUT: %s' % stdout
            print 'STDERR: %s' % stderr
        if not str(clients) in hCassandra_results:
            hCassandra_results[str(clients)] = dict()
        hCassandra_results[str(clients)][str(ops)] = results_dict
        with open('results_hcassandra.txt', 'w') as outfile:
            json.dump(hCassandra_results, outfile)
        print 'Test SUCCESFULLY completed... \n'

print 'END OF TESTS:'
print 'ALL TESTS HAVE BEEN COMPLETED. PLEASE PROCEED TO GENERATE GRAPHS & TABLES WITH PERFORMANCE RESULTS.'

  **NOTE:**
  
  ---
   
   Wait until RESULTS (**hCassandra_results**) are generated for all cases, and then execute the following blocks to generate:
   (1) Tables with results (markdown compatible) and 
   (2) Graphs.
   
   The **END OF TEST** is indicated by a message. Please wait...
   
   ---

### RESULT PROCESSING & TABLE/ GRAPH GENERATION

---

In this section, we process the results for generating tables with performance values and graphs that reflect number of operations per second and median latency for increased number of clients. 

**NOTE**
If you are interested in representing any other performance metric, follow the pattern followed for any of the two graps already provided. 

Persist results to *results_hcassandra.txt* file. 

In [None]:
import json
with open('results_hcassandra.txt', 'w') as outfile:
    json.dump(hCassandra_results, outfile)

The following object converts a list to an HTML formatted table. 

In [None]:
class ListTable(list):
    """ Overridden list class which takes a 2-dimensional list of 
        the form [[1,2,3],[4,5,6]], and renders an HTML Table in 
        IPython Notebook. """
    
    def _repr_html_(self):
        html = ["<table>"]
        for row in self:
            html.append("<tr>")
            
            for col in row:
                html.append("<td>{0}</td>".format(col))
            
            html.append("</tr>")
        html.append("</table>")
        return ''.join(html)

Process and format results.

In [None]:
import numpy

results_per_ops = dict()

# Table Format: Metrics
header = [
            '# Clients',
            'total_ops',
            'op/s',
            'med',
            '.95',
            '.99',
            'max',
            'op_time'
        ]

data_matrix_write = ListTable()
data_matrix_read = ListTable()

data_matrix_write.append(header)
data_matrix_read.append(header)

results_per_ops[str(total_ops_count[0])] = dict()
for idx1, clients in enumerate(total_num_clients):
    if str(clients) in hCassandra_results:
        res_dict = ast.literal_eval(hCassandra_results[str(clients)][str(total_ops_count[0])])
        if (('op/s' in res_dict['write']) and ('op/s' in res_dict['read'])): 
            if ((len(res_dict['write']['op/s']) !=0) and (len(res_dict['read']['op/s']) !=0)):
                results_per_ops[str(total_ops_count[0])][str(clients)] = res_dict
                data_matrix_write.append([clients, sum((ops for ops in res_dict['write']['total ops'])), sum((ops for ops in res_dict['write']['op/s'])), numpy.median(res_dict['write']['med']), numpy.percentile(res_dict['write']['.95'], 95), numpy.percentile(res_dict['write']['.99'], 99), max(res_dict['write']['max']), res_dict['write']['op_time'][0]])
                data_matrix_read.append([clients, sum((ops for ops in res_dict['read']['total ops'])), sum((ops for ops in res_dict['read']['op/s'])), numpy.median(res_dict['read']['med']), numpy.percentile(res_dict['read']['.95'], 95), numpy.percentile(res_dict['read']['.99'], 99), max(res_dict['read']['max']), duration_array[idx1]])   
            else:
                if (len(res_dict['write']['op/s']) !=0):
                    data_matrix_write.append([clients, sum((ops for ops in res_dict['write']['total ops'])), sum((ops for ops in res_dict['write']['op/s'])), numpy.median(res_dict['write']['med']), numpy.percentile(res_dict['write']['.95'], 95), numpy.percentile(res_dict['write']['.99'], 99), max(res_dict['write']['max']), res_dict['write']['op_time'][0]])
                elif (len(res_dict['read']['op/s']) !=0):
                    data_matrix_read.append([clients, sum((ops for ops in res_dict['read']['total ops'])), sum((ops for ops in res_dict['read']['op/s'])), numpy.median(res_dict['read']['med']), numpy.percentile(res_dict['read']['.95'], 95), numpy.percentile(res_dict['read']['.99'], 99), max(res_dict['read']['max']), duration_array[idx1]])   
                else:
                    print ("Removing Results for client count %s. No results found." % clients)
        else:
            print ("Removing Results for client count %s. No results found." % clients)

### Result Generation: Table

Next, results are displayed in a Table, following the markdown format. 



Save results for 'WRITE' operation in a file. This will be a backup of test results in case of failure.

In [None]:
from datetime import datetime

text_file = open("write_stats_" + str(datetime.now().strftime("%m%d%Y_%H%M%S")) + ".txt", "w")
text_file.write("%s" % data_matrix_write)
text_file.close()

The next table represents the results for the **WRITE** Operations:

---

*Table 1. "Cassandra Performance over WRITE Operation."*

In [None]:
data_matrix_write

The next table represents the results for the **READ** Operations:

---

*Table 2. "Cassandra Performance over READ Operation."*

In [None]:
text_file = open("read_stats_" + str(datetime.now().strftime("%m%d%Y_%H%M%S")) + ".txt", "w")
text_file.write("%s" % data_matrix_read)
text_file.close()

In [None]:
data_matrix_read

### Result Generation: Graphs

Next, results are displayed in Graphs. 

--- 

**IMPORTANT**

Please, MODIFY the graphs name here if desired. Otherwise, graphs are indexed by datetime. 



---

In [None]:
from datetime import datetime

ops_second_graph_filename = "hCassandra_ops_" + str(datetime.now().strftime("%m%d%Y_%H%M%S"))
median_latency_graph_filename = "hCassandra_med_" + str(datetime.now().strftime("%m%d%Y_%H%M%S"))

In [None]:
import sys

def asint(s):
    try: return int(s), ''
    except ValueError: return sys.maxint, s

In [None]:
%%capture

import plotly.plotly as py
import plotly.offline as offline
from plotly.graph_objs import *
import operator
import numpy
import collections

# run at the start of every ipython notebook to use plotly.offline
offline.init_notebook_mode(connected=True)

data_matrix = [['# ops', '# Clients', 'total_ops', 'op/s', 'pk/s', 'med', '.95', '.99', 'max', 'max_ms', 'sdv_ms', 'op_time']]

traces_plot1 = []
traces_plot2 = []

# For each trace = client count
for ops_count, tests_per_trace in results_per_ops.iteritems():
    
    total_ops = []
    op_s = []
    op_s_r = []
    med = []
    med_r = []
    p99 = []
    p99_r = []
    max_lat = []
    max_lat_r = []
    
    clients = []
    # Sort list by # Clients
    sortedlist = [(k, tests_per_trace[k]) for k in sorted(tests_per_trace, key=asint)]
    
    for test in sortedlist:
        clients.append(test[0])
        op_s.append(sum((ops for ops in test[1]['write']['op/s'])))
        med.append(numpy.median(test[1]['write']['med']))
        op_s_r.append(sum((ops for ops in test[1]['read']['op/s'])))
        med_r.append(numpy.median(test[1]['read']['med']))
        p99.append(numpy.percentile(test[1]['write']['.99'], 99))
        p99_r.append(numpy.percentile(test[1]['read']['.99'], 99))
        max_lat.append(max(test[1]['write']['max']))
        max_lat_r.append(max(test[1]['read']['max']))
        
    trace_plot1 = Scatter(
          x=clients,
          y=op_s, 
          mode = 'lines+markers',
          name = 'WRITE',
          marker = dict(
            size = 10,
            color = 'rgb(91,79,224)')
        )
    
    trace_plot2 = Scatter(
          x=clients,
          y=op_s_r, 
          mode = 'lines+markers',
          name = 'READ',
          marker = dict(
            size = 10,
            color = 'rgb(212,224,79)')
        )
        
    trace_plot3 = Scatter(
          x=clients,
          y=med, 
          mode = 'lines+markers',
          name = 'WRITE-median', 
          marker = dict(
            size = 10,
            color = 'rgb(91,79,224)')
        )

    trace_plot4 = Scatter(
          x=clients,
          y=med_r, 
          mode = 'lines+markers',
          name = 'READ-median', 
          marker = dict(
            size = 10,
            color = 'rgb(212,224,79)')
        )
    trace_plot5 = Scatter(
          x=clients,
          y=p99, 
          mode = 'lines+markers',
          name = 'WRITE-percentile 99', 
          marker = dict(
            size = 10,
            color = 'rgb(222,44,118)')
        )
    trace_plot7 = Scatter(
          x=clients,
          y=max_lat, 
          mode = 'lines+markers',
          name = 'WRITE-max', 
          marker = dict(
            size = 10,
            color = 'rgb(29,113,204)')
        )
    trace_plot8 = Scatter(
          x=clients,
          y=p99_r, 
          mode = 'lines+markers',
          name = 'READ-percentile 99', 
          marker = dict(
            size = 10,
            color = 'rgb(255,151,5)')
        )
    trace_plot9 = Scatter(
          x=clients,
          y=max_lat_r, 
          mode = 'lines+markers',
          name = 'READ-max', 
          marker = dict(
            size = 10,
            color = 'rgb(36,218,242)')
        )
    
    traces_plot1.append(trace_plot1)
    traces_plot1.append(trace_plot2)
    traces_plot2.append(trace_plot3)
    traces_plot2.append(trace_plot4)
    traces_plot2.append(trace_plot5)
    traces_plot2.append(trace_plot7)
    traces_plot2.append(trace_plot8)
    traces_plot2.append(trace_plot9)

### Result Generation: operations per second vs. client count

The following graph illustrates how, the number of operations per second changes while the number of clients increases 

In [None]:
%%capture plot_med --no-stdout

data = Data(traces_plot1)
# Edit the layout
layout = dict(title = 'op/s vs. # Clients',
              xaxis = dict(title = '# clients'),
              yaxis = dict(title = 'op/s'),
              )

# Plot and embed in notebook
fig = dict(data=data, layout=layout)
offline.plot(fig, filename=ops_second_graph_filename + "_offline")
py.iplot(fig, filename = ops_second_graph_filename)

### Result Generation: median latency vs. client count

The following graph illustrates median latency in miliseconds for each operation during that run as the number of clients increases. 

In [None]:
%%capture plot_med --no-stdout

data = Data(traces_plot2)
# Edit the layout
layout = dict(title = 'Latency vs. Client Count',
              xaxis = dict(title = '# Clients'),
              yaxis = dict(type='log', title = 'Latency [ms]'),
              )

# Plot and embed in notebook
fig = dict(data=data, layout=layout)
offline.plot(fig, filename = median_latency_graph_filename + "_offline")
py.iplot(fig, filename = median_latency_graph_filename)

Overall, these benchmarks represent the **maximum throughput** of a 3 node cluster for the *default* model generated by the cassandra-stress tool. For accurate performance assessment of an application a range of parameters (including data model, queries, etc.) need to be adjusted. 