In [1]:
from utils import *
import duckdb
import copy
import re
import os
import json
import random
from collections import Counter
import time
from collections import Counter, defaultdict
from scipy.sparse import csr_matrix
from sklearn.neighbors import BallTree
import vptree
import nmslib
import threading
import bktree

In [2]:
experiment = True

In [3]:
if experiment:
    redset_scansets = redset_scansets = [(i, i+1) for i in range(1, 10)]
else:
    redset_scansets = get_redset_scansets()
num_redset_tables = max([max(scanset) for scanset in redset_scansets])

In [4]:
tpcds_scansets = get_tpcds_scansets()
num_tpcds_tables = max([max(scanset) for scanset in tpcds_scansets if scanset])

In [5]:
num_redset_tables, num_tpcds_tables

(10, 24)

## Python + Index in C++

In [6]:
N_THREADS = 64
N_ITERATIONS_PER_THREAD = 8

In [11]:
import bktree

ITERATIONS = 10

redset_scanset_counters = Counter(redset_scansets)

assert type(redset_scansets) == list, "redset_scansets should be a list of scansets"
assert type(tpcds_scansets) == list, "tpcds_scansets should be a list of scansets"

def get_random_assignment(n, k, seed=None):
    assignment = dict()
    if seed is not None:
        random.seed(seed)
    for i in range(1, n + 1):
        assignment[i] = random.randint(1, k)
    return assignment

def get_distance(scanset1, scanset2):
    a = Counter(scanset1) if isinstance(scanset1, list) or isinstance(scanset1, tuple) else scanset1
    b = Counter(scanset2) if isinstance(scanset2, list) or isinstance(scanset2, tuple) else scanset2
    a.subtract(b)
    return sum(abs(v) for v in a.values())

def convert_scanset_for_distance(scanset, assignment):
    if True:
        ts = Counter(scanset)
        collisions = dict() # TPCDS table -> set of redset tables in our scanset
        problematic_r_tables = set()
        for r_table in scanset:
            b_table = assignment[r_table]
            if b_table not in collisions:
                collisions[b_table] = set()
            collisions[b_table].add(r_table)
            problematic_r_tables.add(r_table)
        chosen_r_tables = set()
        for _, r_tables in collisions.items():
            if len(r_tables) == 1:
                chosen_r_tables.add(r_tables.pop())
            else:
                # If there are multiple Redset tables, we choose the one that is most common in the scanset
                best_num_occs = 0
                best_r_table = None
                for r_table in r_tables:
                    if ts[r_table] > best_num_occs:
                        best_num_occs = ts[r_table]
                        best_r_table = r_table
                chosen_r_tables.add(best_r_table)
        return [
            assignment[table] if table not in problematic_r_tables or table in chosen_r_tables else -1
            for table in scanset
        ]
    else:
        return [assignment[table] for table in scanset]

bk_tree = bktree.BKTree(tpcds_scansets)

r_table_to_scansets = defaultdict(list)
for idx, scanset in enumerate(redset_scansets):
    for r_table in scanset:
        r_table_to_scansets[r_table].append(idx)

def get_assignment_distance(assignment, affected_r_scansets, bk_tree, per_rs_distance, compute_per_rs_distance, update_per_rs_distance=False):
    total_distance = 0
    delta = 0
    for r_scanset_idx in affected_r_scansets:
        r_scanset = redset_scansets[r_scanset_idx]
        occs = redset_scanset_counters[r_scanset] # TODO: Make sure the traversal order of the keys in the dict is constant

        new_distance = bk_tree.nearest(convert_scanset_for_distance(r_scanset, assignment))[1] * occs
        if compute_per_rs_distance:
            total_distance += new_distance
            per_rs_distance.append(new_distance)
        else:
            old_distance = per_rs_distance[r_scanset_idx]
            delta += new_distance - old_distance
            if update_per_rs_distance:
                per_rs_distance[r_scanset_idx] = new_distance
    return total_distance if compute_per_rs_distance else delta

start_time = time.time()
def thread_work(n_iterations, thread_idx):
    best_distance = float('inf')
    total_time_dist = 0
    for itr in range(n_iterations):
        assignment = get_random_assignment(num_redset_tables, num_tpcds_tables, seed=thread_idx*1000+itr) # When multi-threaded, make sure that determinism is not broken
        current_per_rs_distance = []
        current_distance = get_assignment_distance(assignment, [i for i in range(len(redset_scansets))], bk_tree, current_per_rs_distance, True)
        while True:
            best_new_distance = float('inf')
            best_new_assignment = None
            for rt in range(1, num_redset_tables + 1): # What if we reassign this table?
                old_assignment = assignment[rt]
                for bt in range(1, num_tpcds_tables + 1): # Iterate over all possible assignments
                    if bt == old_assignment: # This is the current assignment
                        continue
                    assignment[rt] = bt
                    start_dist = time.time()
                    new_distance = current_distance + get_assignment_distance(assignment, r_table_to_scansets[rt], bk_tree, current_per_rs_distance, False)
                    total_time_dist += time.time() - start_dist
                    if new_distance < best_new_distance:
                        best_new_distance = new_distance
                        best_new_assignment = rt, bt
                assignment[rt] = old_assignment # Revert the assignment
            if best_new_distance >= current_distance:
                break # No more improvement possible. This is a (local) optimum.
            assignment[best_new_assignment[0]] = best_new_assignment[1] # Set the best new global assignment
            get_assignment_distance(assignment, [i for i in range(len(redset_scansets))], bk_tree, current_per_rs_distance, False, True) # Update the current distance
            current_distance = best_new_distance
        if current_distance < best_distance:
            best_distance = current_distance
            best_assignment = copy.deepcopy(assignment)
    return best_distance, best_assignment

def multi_processed():
    from multiprocessing import Pool

    worker_args = [(N_ITERATIONS_PER_THREAD, itr) for itr in range(N_THREADS)]

    with Pool(processes=N_THREADS) as pool:
        results = pool.starmap(thread_work, worker_args)

    return min(results, key=lambda r: r[0])

best_distance, best_assignment = multi_processed()

time.time() - start_time, best_distance, best_assignment

(10.608015060424805,
 9,
 {1: 17, 2: 14, 3: 17, 4: 14, 5: 18, 6: 16, 7: 8, 8: 18, 9: 8, 10: 12})

## Pure C++

In [7]:
r_table_to_scansets = defaultdict(list)
for idx, scanset in enumerate(redset_scansets):
    for r_table in scanset:
        r_table_to_scansets[r_table].append(idx)

c_r_table_to_scansets = [None] * (num_redset_tables + 1)
for r_table, scansets in r_table_to_scansets.items():
    c_r_table_to_scansets[r_table] = scansets
c_r_table_to_scansets[0] = []

In [8]:
start = time.time()
print(bktree.find_optimal_bijection(
    N_THREADS,
    N_ITERATIONS_PER_THREAD,
    [1 for _ in range(len(redset_scansets))],
    redset_scansets,
    tpcds_scansets,
    c_r_table_to_scansets,
    num_redset_tables,
    num_tpcds_tables
))
time.time() - start

(9.0, {10: 8, 9: 4, 8: 12, 7: 8, 6: 4, 5: 8, 4: 18, 3: 16, 2: 18, 1: 8})


0.9729795455932617