# Evaluation Wrapper

## Imports

In [None]:
import os
import time
import sys
import re
import pandas as pd
from performance_test import *
from kafka_producer import *
from directNeo4jImporter import *
from node_deletion import *
from correctness_test import *
from partition_changer import *
from heapSize_changer import *
from streams_conf_changer import *

## Parameter

#### General Parameter

In [None]:
settings_path = '../settings.json'
with open(settings_path) as json_file:
    settings = json.load(json_file)
settings


In [None]:
## put in settings(?)
#name of database

node_labels = {
    'original': {
        'block'       : settings['node_labels_neo4j']['block'],
        'transaction' : settings['node_labels_neo4j']['transaction'],
        'address'     : settings['node_labels_neo4j']['address']
    },
    'test': {
        'block'       : 'Block_test',
        'transaction' : 'Transaction_test',
        'address'     : 'Address_test'
    }
}

neo4j_port = str(7687)


path_to_neo4j_conf_directory = settings['path_to_neo4j_conf_dir']
path_to_streams_conf = path_to_neo4j_conf_directory + '/streams.conf'
path_to_neo4j_conf = path_to_neo4j_conf_directory + '/neo4j.conf'

kafka_topics =   {
    'transaction': settings['kafka_topics']['transaction'],
    'block': settings['kafka_topics']['block']
}


#### Changing Parameter

In [None]:
############## Performance Parameters

# set to false if test_nodes are inserted 
evaluate_original = True
evaluate_test     = True

nodeTypesToTest = []
if evaluate_test == True:
    nodeTypesToTest.append(False)
if evaluate_original == True:
    nodeTypesToTest.append(True)


# check correctness of nodes inserted
check_correctness = False

# deleting Nodes at the end of each run
deleteNodes = True

# nr of runs per set of parameter configurations
subRuns = 5

#################### block heights
    
blockRangeToInsert = {
    'original': {
        'start_block_height' : 591116,
        'end_block_height'   : 591126
    }, 
    'test': {
        'start_block_height' : 1,
        'end_block_height'   : 10001
    }
}


# whether nodes to delete were already collected and saved
originalNodesCached = False    

nodesToDeleteFilePath = "deletion_nodes_591116_10.json"

# skipping Matching on previous transaction for original nodes if set to True
deactivateMatchOnPreviousOnOriginal = False
    
############## Evaluation Parameters

# number of kafka partitions 
kafka_partitions_list = [1,2,4,8]

# batch processing - MISSING implementation (!)
#batching_list = [1, 100, 400, 800]
batching_list = [0, 1, 10, 100]

# neo4j heap size
heap_size_list = [5, 8, 16]

# matching input on previous address
match_on_previous_add = True  #(?) => previous address is matched in the script (bitcoind)

# bypass kafka pipeline
bypass_kafka = False 



############## Default testing parameter values

default_kafka_partitions = kafka_partitions_list[0]
default_batching_size = batching_list[0]
default_heap_size = heap_size_list[0]
default_match_on_previous_add = True
default_bypass_kafka = False



############## Booleans whether or not to test on parameter
testKafkaPartitions = True
testBatching        = True
testHeapSize        = True
testMatchOnPrevAddr = True
testBypassKafka     = True

nodeTypesToTest

#### Create Parameter Configurations list 

In [None]:
# Parameter configurations [kafka_partitions, batching_size, heap_size, match_on_previous_add, bypass_kafka]

configurations= [[default_kafka_partitions, 
                  default_batching_size, 
                  default_heap_size, 
                  default_match_on_previous_add, 
                  default_bypass_kafka]]

# change kafka_partitions parameter
if testKafkaPartitions == True:
    for i in kafka_partitions_list:
        if i == default_kafka_partitions:
            continue
        else: 
            add = [i, default_batching_size, default_heap_size, default_match_on_previous_add, default_bypass_kafka]
            configurations.append(add)

# change batching_size parameter
if testBatching == True:
    for i in batching_list:
        if i == default_batching_size:
            continue
        else: 
            add = [default_kafka_partitions, i, default_heap_size, default_match_on_previous_add, default_bypass_kafka]
            configurations.append(add)

# change heap_size parameter    
if testHeapSize == True:
    for i in heap_size_list:
        if i == default_heap_size:
            continue
        else: 
            add = [default_kafka_partitions, default_batching_size, i, default_match_on_previous_add, default_bypass_kafka]
            configurations.append(add)

# change match_on_previous_add parameter
if testMatchOnPrevAddr == True:
    change_match_on_previous_add = not default_match_on_previous_add
    add = [default_kafka_partitions, default_batching_size, default_heap_size, change_match_on_previous_add, default_bypass_kafka]
    configurations.append(add)

# change bypass_kafka parameter
if testBypassKafka == True:
    change_bypass_kafka = not default_bypass_kafka
    add = [default_kafka_partitions, default_batching_size, default_heap_size, default_match_on_previous_add, change_bypass_kafka]
    configurations.append(add)

display('[kafka_partitions, batching_size, heap_size, match_on_previous_add, bypass_kafka]')
configurations

#### Preparation

In [None]:
# Set up Results dataframe
columns=[
    'experimentRun',
    'subRun',
    'evaluate_original',
    'start_block_height',
    'end_block_height',
    'kafka_partitions',
    'batching',
    'match_on_previous_add',
    'heap_size',
    'bypass_kafka',
    'check_correctness',
    'endTimePart1', 
    'endTimePart2', 
    'totalExecutionTime', 
    'timeOutReached']

results_df = pd.DataFrame(columns=columns)

In [None]:
# create directory structure
result_dir     = 'results'
mismatches_dir = 'mismatches'
if os.path.exists(result_dir) == False:
    os.makedirs(result_dir)
if os.path.exists(os.path.join(result_dir, mismatches_dir)) == False:
    os.makedirs(os.path.join(result_dir, mismatches_dir))
    

In [None]:
# collect original nodes that need to be deleted in case of inserting data to original topics
#if evaluate_original == True and originalNodesCached == False:


if originalNodesCached == False:
    
    # collect nodes of original database
    noderange = 'original'
    
    # ToDo: check correctness of starting hight
    # #query neo4j if 

    # collecting nodes for 
    deletion_nodes = getDeletionList(start_block_height = blockRangeToInsert[noderange]['start_block_height'], 
                                     end_block_height = blockRangeToInsert[noderange]['end_block_height'], 
                                     label_address = node_labels['original']['address'], 
                                     neo4j_location = 'server', 
                                     neo4j_port = '7687')
    
    import json
    # save deletion nodes to json file

    with open(nodesToDeleteFilePath, "w") as outfile: 
        json.dump(deletion_nodes, outfile)

In [None]:
with open(nodesToDeleteFilePath) as infile:
    deletion_nodes = json.load(infile)

#### Testing procedure

In [None]:
#%%time
EvaluationStartTime=time.time()
# for each set of settings (for each experiment run) make counter
# -> experimentRun
# (hand over to mismachtches)


## loop for evaluate original or test nodes
for boolean in nodeTypesToTest:
    evaluate_original = boolean

    experimentRun = 0

    if evaluate_original == True:
        noderange = 'original'
    else:
        noderange = 'test'
    start_block_height = blockRangeToInsert[noderange]['start_block_height']
    end_block_height   = blockRangeToInsert[noderange]['end_block_height']


    for i in range(len(configurations)): 

        j = 0 
        experimentRun = experimentRun + 1
        kafka_partitions = configurations[i][j]
        batching_size = configurations[i][j+1]
        heap_size = configurations[i][j+2]
        match_on_previous_add = configurations[i][j+3]
        bypass_kafka = configurations[i][j+4]
        
        # check whether to skip Matching on previous Transaction on original nodes
        if deactivateMatchOnPreviousOnOriginal == True and \
            match_on_previous_add == False             and \
            evaluate_original == True:
            pass

        else:
            print('Experiment run: ', experimentRun)
            #print(kafka_partitions, batching_size, heap_size, match_on_previous_add, bypass_kafka)


            # change cypher templates - match_on_previous_add can change
            changeStreamsFile(path = path_to_streams_conf, 
                              kafka_topics = kafka_topics, 
                              evaluate_original = evaluate_original, 
                              matchOnAddress = match_on_previous_add, 
                              getTemplate = True,    # to retrieve cypher template or query for direct insertion 
                              node_labels = node_labels, 
                              evaluation = True)


            # check & change batching parameter
            with open(path_to_streams_conf, 'r') as f:
                for line in f.readlines():
                    if 'kafka.fetch.min.bytes' in line:
                        sizeString = line.split('=')[1]
                        currentBatching_minbytes = int(re.findall(r'\d+', sizeString)[0])


            changeBatching = True
            # default waittime
            batching_waittime = 500
            # default min bytes 
            batching_minbytes =   1

            if (batching_size == default_batching_size and currentBatching_minbytes == 1) or (batching_size != default_batching_size and currentBatching_minbytes/550 == batching_size):
                changeBatching = False

            elif batching_size == default_batching_size and currentBatching_minbytes != 1: 
                # default waittime
                batching_waittime = 500

                # default min bytes 
                batching_minbytes =   1

            elif batching_size != default_batching_size and currentBatching_minbytes/550 != batching_size:

                # 0.1 second * batch size
                batching_waittime = 100 * batching_size 

                # bytes per object (transaction) * batch size
                batching_minbytes = 555 * batching_size

            if changeBatching == True:
                # change streams.conf file 
                os.popen(f'''sed -i '/kafka.fetch.max.wait.ms/c\kafka.fetch.max.wait.ms={batching_waittime}' /etc/neo4j/streams.conf''')
                os.popen(f'''sed -i '/kafka.fetch.min.bytes/c\kafka.fetch.min.bytes={batching_minbytes}'     /etc/neo4j/streams.conf''')


            time.sleep(5)
            # check & change batching parameter
            with open(path_to_streams_conf, 'r') as f:
                for line in f.readlines():
                    if 'kafka.fetch.min.bytes' in line:
                        sizeString = line.split('=')[1]
                        currentBatching_minbytes = int(re.findall(r'\d+', sizeString)[0])


            # Parameters that require to start neo4j

            #changePartitions = False
            #changeHeapSize   = False


            # check kafka partitions
            # check the value of changing parameters and process accordingly
            stream = os.popen(f"/home/kafka/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic {kafka_topics['block']}")
            output = stream.read()
            partitionCountPhrase = 'PartitionCount: ' 
            numberOfCurrentPartitionsString = output[(output.find(partitionCountPhrase)+len(partitionCountPhrase)):].split('\t')[0]
            try:
                numberOfCurrentPartitions = int(numberOfCurrentPartitionsString)
            except:
                partitionChanger(kafka_partitions)


            if numberOfCurrentPartitions != default_kafka_partitions:
                partitionChanger(kafka_partitions)



            # check heap size
            with open(path_to_neo4j_conf, 'r') as f:
                for line in f.readlines():
                    if 'dbms.memory.heap.initial_size' in line:
                        sizeString = line.split('=')[1]
                        initialHeapSize = int(re.findall(r'\d+', sizeString)[0])
                    if 'dbms.memory.heap.max_size' in line:
                        sizeString = line.split('=')[1]
                        maxHeapSize = int(re.findall(r'\d+', sizeString)[0])

            if initialHeapSize != maxHeapSize or initialHeapSize != default_heap_size:
                heapSizeChanger(heap_size)



            #start loop for rerunning each configuration steps
            for subRun in range(subRuns):
                subRun += 1

                # insertion and performance testing
                endTimePart1, endTimePart2, totalExecutionTime, timeOutReached = runPerformanceTest(evaluate_original, 
                                                                                                node_labels, 
                                                                                                bypass_kafka, 
                                                                                                start_block_height, 
                                                                                                end_block_height, 
                                                                                                match_on_previous_add, 
                                                                                                kafka_topics)   

                # correctness testing
                if evaluate_original == False and check_correctness == True:
                    checkCorrectness(start_block_height, 
                                     end_block_height,
                                     node_labels,
                                     neo4j_port,
                                     experimentRun, 
                                     printMismatches=False, 
                                     saveMismatches=True)

                # result collection
                data = [[
                    experimentRun,
                    subRun,
                    evaluate_original,
                    start_block_height,
                    end_block_height,
                    kafka_partitions,
                    batching_size,
                    match_on_previous_add,
                    heap_size,
                    bypass_kafka,
                    check_correctness,
                    endTimePart1, 
                    endTimePart2, 
                    totalExecutionTime, 
                    timeOutReached]]

                new_results_entry = pd.DataFrame(columns=columns, data = data)
                results_df=pd.concat([results_df,new_results_entry]).sort_index()

                # writing results to csv
                results_df.to_csv('./results/evaluation_results.csv', index=False)

                time.sleep((end_block_height-start_block_height)*0.001)

                # deletion of inserted nodes
                if deleteNodes == True:
                    if evaluate_original == True:
                        deleteOriginalEvaluationNodes(deletion_nodes = deletion_nodes, 
                                                      node_labels = node_labels,
                                                      neo4j_location = 'server', 
                                                      neo4j_port = '7687')


                    else:
                        deleteTestEvaluationNodes(node_labels = node_labels,
                                                  neo4j_location = 'server', 
                                                  neo4j_port = '7687')


                display(results_df)
                print(f'Run {experimentRun} finished \n')

            

#display(results_df) 
    
display(results_df)
EvaluationTime=time.time() - EvaluationStartTime
print('Evaluation Time: ', np.round(EvaluationTime/60, 2), ' min')

## After loop - Evaluation Process cleanup

Restore streams file
- remove insertion time

### Save Evaluation results

In [None]:
results_df.to_csv('./results/evaluation_results.csv', index=False)

### Restore streams file

In [None]:
changeStreamsFile(path = path_to_streams_conf, 
                  kafka_topics = kafka_topics, 
                  evaluate_original = True, 
                  matchOnAddress = True, 
                  getTemplate = True,  
                  node_labels = node_labels, 
                  evaluation = False)