In [1]:
import numpy as np
import pandas as pd
from subprocess import call
import concurrent.futures
import sys, os
import time

import pubsub

def kf_create_topic(topic):
    if call(["./kf-topic", "create", topic], cwd="../kafka/client/admin/") != 0:
        print("error occurred on running producer")
        sys.exit ()

def kf_delete_topic(topic):
    if call(["./kf-topic", "delete", topic], cwd="../kafka/client/admin/") != 0:
        print("error occurred on running producer")
        sys.exit ()

def sq_create_topic(topic):
    if call(["./sq-topic", "create", topic], cwd="../shapleQ/client/admin/") != 0:
        print("error occurred on create topic")
        sys.exit ()

def sq_delete_topic(topic):
    if call(["./sq-topic", "delete", topic], cwd="../shapleQ/client/admin/") != 0:
        print("error occurred on delete topic")
        sys.exit ()



In [2]:
def run_process(num_producer, num_consumer, topic, file_path, num_data, p, c):
    producer_futures = []
    consumer_futures = []
    
    pool = concurrent.futures.ProcessPoolExecutor()
    
        
    # Setup consumer process
    for i in range(num_consumer):
        proc = pool.submit(c, topic, num_data * num_producer)
        consumer_futures.append(proc)
    
    time.sleep(1)
    # Setup producer processes
    for i in range(num_producer):
        proc = pool.submit(p, topic, file_path, num_data)
        producer_futures.append(proc)   
    start_produce_ts = [ f.result() for i, f in enumerate(producer_futures) ]
    end_consume_ts = [ f.result() for i, f in enumerate(consumer_futures) ]

    # check if result is -1
#     if min(start_produce_ts) <= 0 or min(end_consume_ts) <= 0:
#         print("error occurred...")
    
#     start_produce_t = min(start_produce_ts)
#     end_consume_t = max(end_consume_ts)
    return start_produce_ts, end_consume_ts

def bench_kafka(num_producer, num_consumer, topic, file_path, num_data):
    # Create topic
    kf_create_topic(topic)
    pubsub.kf_publish(topic, file_path, 1)
    elapsed_time = run_process(num_producer, num_consumer, topic, file_path, num_data, pubsub.kf_publish, pubsub.kf_subscribe)
    kf_delete_topic(topic)
    
    return elapsed_time

def bench_shapleq(num_producer, num_consumer, topic, file_path, num_data):
    # Create topic
    sq_create_topic(topic)
    pubsub.sq_publish(topic, file_path, 1)
    
    elapsed_time = run_process(num_producer, num_consumer, topic, file_path, num_data, pubsub.sq_publish, pubsub.sq_subscribe)
    sq_delete_topic(topic)
    
    return elapsed_time

In [6]:
test_topic = "b-tf"
consumer_num_list = np.arange(1, 31, 3).tolist()

In [5]:
sq_time_max, sq_time_avg = bench_shapleq(1, 3, test_topic, "/Users/elon/go/src/github.com/paust-team/shapleq/benchmark/test-dataset.tsv", 1000)
print(sq_time_max, sq_time_avg)

[-10011] [11005, 11007, 10970]


In [7]:
kf_time_max, kf_time_avg = bench_kafka(1, 3, test_topic, "/Users/elon/go/src/github.com/paust-team/shapleq/benchmark/test-dataset.tsv", 1000)
print(kf_time_max, kf_time_avg)

[-1767] [3155, 3155, 3152]


In [None]:
sq_times = []

# Heating
#bench_shapleq(1, 32, test_topic+"_h", "/Users/paust/go/src/github.com/paust-team/shapleq/benchmark/python/test-dataset.tsv", 10000)
    
def print_sp_times(num):
    pd_times, cm_times = bench_shapleq(1, num, test_topic+str(num), "/Users/elon/go/src/github.com/paust-team/shapleq/benchmark/test-dataset.tsv", 1000)
    elapsed_time = np.mean(cm_times)
    print(elapsed_time)
    sq_times.append(elapsed_time)

for num in consumer_num_list:
    print_sp_times(num)


In [None]:
print_sp_times(1)

In [None]:
print_sp_times(4)

In [None]:
print_sp_times(7)

In [None]:
print_sp_times(10)

In [None]:
print_sp_times(13)

In [None]:
print_sp_times(16)

In [None]:
print_sp_times(19)

In [None]:
print_sp_times(22)

In [None]:
print_sp_times(25)

In [None]:
print_sp_times(28)

In [None]:
kf_times = []

# Heating
#bench_kafka(1, 32, test_topic+"_h", "/Users/paust/go/src/github.com/paust-team/shapleq/benchmark/python/test-dataset.tsv", 10000)

def print_kf_times(num):
    t = time.perf_counter()
    pd_times, cm_times = bench_kafka(1, num, test_topic+str(num), "/Users/paust/go/src/github.com/paust-team/shapleq/benchmark/python/test-dataset.tsv", 1000)
    elapsed_time = time.perf_counter() - t
    print(elapsed_time, np.mean(cm_times))
    kf_times.append(elapsed_time)
    
for num in consumer_num_list:
    print_kf_times(num)


In [None]:
print_kf_times(1)

In [None]:
print_kf_times(4)

In [None]:
print_kf_times(7)

In [None]:
print_kf_times(10)

In [None]:
print_kf_times(13)

In [None]:
print_kf_times(16)

In [None]:
print_kf_times(19)

In [None]:
print_kf_times(22)

In [None]:
print_kf_times(25)

In [None]:
print_kf_times(28)

In [None]:

from matplotlib import pyplot as plt
plt.plot(consumer_num_list, sq_times)
plt.plot(consumer_num_list, kf_times)
plt.xlabel('Number of Consumers')
plt.ylabel('Elapsed Time(second)')
plt.legend(['ShapleQ', 'Kafka'])
plt.show()

In [None]:
print("kafka result = {} ShapleQ result = {}".format(kf_time, sq_time))