In [60]:
import itertools as it
import time
from concurrent import futures
from timeit import default_timer as timer

import pandas as pd
import requests as rq
from IPython.display import display


In [61]:
# Parameters
users = "../data/users.json"
host = "localhost"

hash_function = "murmur2"
partition_port = {"0": "5001", "1": "5002"}

repeat_for_user = 10

limits = [10]
walks = [1000]
walk_length = [100]

should_repeat_for_user = True

union_results = True
highest_hit = True
most_interactions = True

output_latency = f"../output/{hash_function}_latency.csv"
output_recommendations = f"../data/{hash_function}_recommendations.json"
output_union_results = f"../data/{hash_function}_union_results.json"
output_highest_hit = f"../data/{hash_function}_highest_hit.json"
output_most_interactions = f"../data/{hash_function}_most_interactions.json"

In [62]:
# Import users
user_df = pd.read_json(users, orient="table")

# Compute cross product of all configuration values
configurations = list(it.product([hash_function], partition_port.items(), walks, walk_length, limits))
configurations


[('murmur2', ('0', '5001'), 1000, 100, 10),
 ('murmur2', ('1', '5002'), 1000, 100, 10)]

In [63]:
def get_recommendations(user_df, walks, walk_length, limit, partition_port):
    rows = []
    partition = partition_port[0]
    port = partition_port[1]
    print(f"gathering recommendations from port: {port} and partition {partition} hash_function: {hash_function}")

    for i, row in user_df.iterrows():
        user = row["user_id"]
        url = f"http://{host}:{port}/recommendation/salsa/{user}?walks={walks}&walk_length={walk_length}&limit={limit}"
        response = rq.get(url)
        counter = 0
        while response.status_code != 200 and counter != 9:
            print(f"Got response code {response.status_code} trying again {counter + 1}/10...")
            response = rq.get(url)
            counter += 1

        response_body = response.json() if response.status_code == 200 else []
        recommendations = [res['id'] for res in response_body]
        hits = [res['hit'] for res in response_body]
        rows.append((hash_function, partition, user, recommendations, hits))

    return pd.DataFrame(rows, columns=["hash_function", "partition", "user", "recommendations", "hits"])

In [64]:
def timed(func):
    def _w(*a, **k):
        then = time.time()
        res = func(*a, **k)
        elapsed = time.time() - then
        return elapsed, res

    return _w

In [65]:
# Perform API requests
result_dfs = []
latency_rows = []
list_future = []

# Create separate executor for each different configuration
with futures.ThreadPoolExecutor() as executor:
    for (hash_function, partition_port, walks, walk_length, limit) in configurations:
        list_future.append(
            executor.submit(timed(get_recommendations), user_df, walks, walk_length, limit, partition_port))

# Gather the results together
for f in list_future:
    res = f.result()
    result_df = res[1]
    result_dfs.append(res[1])
    latency = res[0]
    latency_rows.append(
        (result_df["hash_function"].iloc[0], result_df["partition"].iloc[0], "recommendation fetch", latency))


result_df = pd.concat(result_dfs)
display(result_df[["user", "recommendations", "hits", "partition"]])
result_df.to_json(output_recommendations, index=False, orient="table")

gathering recommendations from port: 5001 and partition 0 hash_function: murmur2
gathering recommendations from port: 5002 and partition 1 hash_function: murmur2
[('murmur2', '0', 'recommendation fetch', 5.585102319717407), ('murmur2', '1', 'recommendation fetch', 4.964040279388428)]


Unnamed: 0,user,recommendations,hits,partition
0,828645931927142400,"[928503178039054337, 1363489994279682048, 1223825387748298754, 1348988567687745539, 1367916280335003651, 1364232244743831552, 1367128736513064969, 1327222316561035265, 1353414851352268800, 1336243448429617156]","[139, 135, 134, 121, 94, 87, 83, 82, 78, 78]",0
1,890602645500702720,"[1365678076504338434, 1365896763513597957, 1364182175512358917, 1366048808220639234, 1368550254149206016, 1368146840764506113, 1366063562146390019, 1367144566651035654, 1364228528993050625, 1366664488083193856]","[622, 587, 579, 568, 559, 543, 298, 248, 233, 229]",0
2,433809972,"[1368126333222416388, 1365086675739705346, 1366463519621738498, 1368012208278433794, 1365131974126362626, 1367656109264273409, 1366573479378571265, 1367746708566929409, 1367127626381357056, 1366886099969318917]","[135, 122, 122, 109, 107, 103, 96, 94, 93, 89]",0
3,48459797,"[1365308369746219012, 1367500402606899213, 1367470576256897031, 1336406499174838279, 1365032706623463429, 1335871711758200832, 1367500049010331648, 1367348624208912387, 1367598381460316162, 1367606435513786368]","[93, 86, 54, 52, 50, 48, 48, 48, 42, 40]",0
0,828645931927142400,"[978113793405407233, 1362770535436886021, 927003837407297536, 1367197975043215361, 1367308413823184898, 1359956852520325120, 1367191079829663744, 1367942211028070402, 1368439342654316544, 1360470410479943684]","[348, 104, 84, 79, 73, 71, 65, 63, 62, 62]",1
1,890602645500702720,"[1368079395886358529, 1368078894750965762, 1368079143989084162, 1363429352893865987, 1296009054536400900, 1368516777781518339, 314075222078808064, 1367203124109324289, 1367501934479966209, 1363558587092639750]","[867, 822, 820, 782, 776, 745, 518, 404, 374, 266]",1
2,433809972,"[1368033599044411392, 1367384317723172866, 1367293721755193344, 1366916230989438978, 1366916252325666820, 1368140045610143746, 1367777657769791501, 1367278763097006080, 1368003401095778304, 1367429617422798849]","[142, 126, 117, 106, 105, 99, 92, 89, 88, 87]",1
3,48459797,"[1367562666529738760, 1362101514349731842, 1365064792277471232, 1367159111473856515, 1325112999141584897, 1358631201075003393, 1367692856127848448, 1327889016990048256, 1367484300271226880, 1367320834264694787]","[74, 61, 52, 51, 43, 43, 42, 41, 39, 38]",1


In [54]:
def merge_sort_rec_hit(row):
    if row["len_rec"] == 10:
        return row["recommendations"], row["hits"]
    rec_hit_dic = dict(zip(row["recommendations"], row["hits"]))
    sorted_by_hits = dict(sorted(rec_hit_dic.items(), key=lambda item: item[1], reverse=True))
    rec_hit_dic = dict(list(sorted_by_hits.items())[:10])
    return list(rec_hit_dic.keys()), list(rec_hit_dic.values())

In [55]:
# Merge partition results
if union_results:
    start = timer()
    merge_df = result_df.groupby("user", as_index=False).agg({'recommendations': 'sum', 'hits': 'sum'})
    merge_df["hash_function"] = hash_function
    merge_df["partition"] = "union results"
    merge_df["len_rec"] = merge_df.apply(lambda x: len(x["recommendations"]), axis=1)
    pd.set_option('display.max_colwidth', None)
    merge_df[["recommendations", "hits"]] = merge_df.apply(lambda row: merge_sort_rec_hit(row), axis=1,
                                                           result_type="expand")

    end = timer()
    latency = end - start
    latency_rows.append((hash_function, -1, "union results", latency))
    # Save results
    merge_df.to_json(output_union_results, index=False, orient="table")

In [56]:
if highest_hit:
    start = timer()
    res = []
    for group_name, df_group in result_df.groupby("user"):
        highest_hit_count = 0
        partition_to_take = 0
        for _, row in df_group.iterrows():
            if len(row["hits"]) == 0:
                continue
            hit = row["hits"][0]
            if hit > highest_hit_count:
                highest_hit_count = hit
                partition_to_take = row["partition"]
        result = df_group[df_group["partition"] == partition_to_take]
        res.append(df_group[df_group["partition"] == partition_to_take])

    best_partition_df = pd.concat(res, ignore_index=True)
    best_partition_df["partition"] = "highest hit"
    end = timer()
    latency = end - start
    latency_rows.append((hash_function, -1, "highest hit", latency))
    best_partition_df.to_json(output_highest_hit, index=False, orient="table")


In [57]:
def get_degree(user_df, port):
    degrees = []
    for i, row in user_df.iterrows():
        user = row["user_id"]
        url = f"http://{host}:{port}/status/degree/left-index/{user}"
        response = rq.get(url)
        counter = 0
        while response.status_code != 200 and counter != 9:
            print(f"Got response code {response.status_code} trying again {counter + 1}/10...")
            response = rq.get(url)
            counter += 1
        degree = response.json() if response.status_code == 200 else 0
        degrees.append(degree)
    return degrees

In [58]:
if most_interactions:
    start = timer()
    degrees = []
    for (hash_function, partition_port, walks, walk_length, limit) in configurations:
        degrees.extend(get_degree(user_df, partition_port[1]))
    result_df["degree"] = degrees
    result_df.to_json(output_recommendations, index=False, orient="table")
    res = []
    for group_name, df_group in result_df.groupby("user"):
        highest_degree = 0
        partition_to_take = 0
        for _, row in df_group.iterrows():
            if row["degree"] == 0:
                continue
            degree = row["degree"]
            if degree > highest_degree:
                highest_degree = degree
                partition_to_take = row["partition"]
        result = df_group[df_group["partition"] == partition_to_take]
        res.append(df_group[df_group["partition"] == partition_to_take])

    best_partition_df = pd.concat(res, ignore_index=True)
    best_partition_df["partition"] = "most interactions"
    end = timer()
    latency = end - start
    latency_rows.append((hash_function, -1, "most interactions", latency))
    best_partition_df.to_json(output_most_interactions, index=False, orient="table")


In [59]:
latency_df = pd.DataFrame(latency_rows, columns=["hash_function", "partition", "functionality", "latency"])

latency_df.to_csv(output_latency, index=False)