In [2]:
import pickle
import numpy as np
import pandas as pd
import copy
import sys
sys.path.append("/home/ubuntu/CE_scheme/")
from Schemas.stats.schema import gen_stats_light_schema
from Join_scheme.data_prepare import read_table_csv

In [94]:
import numpy as np
import copy
from scipy import stats
import jenkspy


class Bucket:
    """
    The class of bucketization of a key attribute
    """

    def __init__(self, name, bins=[], bin_modes=[], bin_vars=[], bin_means=[], rest_bins_remaining=None):
        self.name = name
        self.bins = bins
        self.bin_modes = bin_modes
        self.bin_vars = bin_vars
        self.bin_means = bin_means
        self.rest_bins_remaining = rest_bins_remaining
        if len(bins) != 0:
            assert len(bins) == len(bin_modes)


class Table_bucket:
    """
    The class of bucketization for all key attributes in a table.
    Supporting more than three dimensional bin modes requires simplifying the causal structure, which is left as a
    future work.
    """
    def __init__(self, table_name, id_attributes, bin_sizes):
        self.table_name = table_name
        self.id_attributes = id_attributes
        self.bin_sizes = bin_sizes
        self.oned_bin_modes = dict()
        self.twod_bin_modes = dict()


class Bucket_group:
    """
    The class of bucketization for a group of equivalent join keys
    """

    def __init__(self, buckets, start_key, sample_rate, bins=None, primary_keys=[]):
        self.buckets = buckets
        self.start_key = start_key
        self.sample_rate = sample_rate
        self.bins = bins
        self.primary_keys = primary_keys

    def bucketize(self, data):
        """
        Discretize data based on the bucket
        """
        res = dict()
        seen_remain_key = np.array([])
        cumulative_bin = copy.deepcopy(self.buckets[self.start_key].bins)
        start_means = np.asarray(self.buckets[self.start_key].bin_means)

        for key in data:
            if key in self.primary_keys:
                continue
            res[key] = copy.deepcopy(data[key])
            if key != self.start_key:
                unique_remain = np.setdiff1d(self.buckets[key].rest_bins_remaining, seen_remain_key)
                assert sum([np.sum(np.isin(unique_remain, b) == 1) for b in cumulative_bin]) == 0

                if len(unique_remain) != 0:
                    remaining_data = data[key][np.isin(data[key], unique_remain)]
                    unique_remain, count_remain = np.unique(remaining_data, return_counts=True)
                    unique_counts = np.unique(count_remain)
                    for u in unique_counts:
                        temp_idx = np.searchsorted(start_means, u)
                        if temp_idx == len(cumulative_bin):
                            idx = -1
                            if u > self.buckets[key].bin_modes[-1]:
                                self.buckets[key].bin_modes[-1] = u
                        elif temp_idx == 0:
                            idx = 0
                        else:
                            if (u - start_means[temp_idx - 1]) >= (start_means[temp_idx] - u):
                                idx = temp_idx - 1
                            else:
                                idx = temp_idx
                        temp_unique = unique_remain[count_remain == u]
                        cumulative_bin[idx] = np.concatenate((cumulative_bin[idx], temp_unique))
                        seen_remain_key = np.concatenate((seen_remain_key, temp_unique))
                        if u > self.buckets[key].bin_modes[idx]:
                            self.buckets[key].bin_modes[idx] = u
            res[key] = copy.deepcopy(data[key])
            count = 0
            for i, b in enumerate(cumulative_bin):
                count += len(data[key][np.isin(data[key], b)])
                res[key][np.isin(data[key], b)] = i
        self.bins = cumulative_bin

        for key in data:
            if key in self.primary_keys:
                res[key] = self.bucketize_PK(data[key])
                self.buckets[key] = Bucket(key)
        return res

    def bucketize_PK(self, data):
        res = copy.deepcopy(data)
        remaining_data = np.unique(data)
        for i, b in enumerate(self.bins):
            res[np.isin(data, b)] = i
            remaining_data = np.setdiff1d(remaining_data, b)
        res[np.isin(data, remaining_data)] = -1
        return res


def identify_key_values(schema):
    """
    identify all the key attributes from the schema of a DB, currently we assume all possible joins are known
    It is also easy to support unseen joins, which we left as a future work.
    :param schema: the schema of a DB
    :return: a dict of all keys, {table: [keys]};
             a dict of set, each indicating which keys on different tables are considered the same key.
    """
    all_keys = set()
    equivalent_keys = dict()
    for i, join in enumerate(schema.relationships):
        keys = join.identifier.split(" = ")
        all_keys.add(keys[0])
        all_keys.add(keys[1])
        seen = False
        for k in equivalent_keys:
            if keys[0] in equivalent_keys[k]:
                equivalent_keys[k].add(keys[1])
                seen = True
                break
            elif keys[1] in equivalent_keys[k]:
                equivalent_keys[k].add(keys[0])
                seen = True
                break
        if not seen:
            # set the keys[-1] as the identifier of this equivalent join key group for convenience.
            equivalent_keys[keys[-1]] = set(keys)

    assert len(all_keys) == sum([len(equivalent_keys[k]) for k in equivalent_keys])
    return all_keys, equivalent_keys


def equal_freq_binning(name, data, n_bins, data_len, return_bucket=True):
    uniques, counts = data
    unique_counts, count_counts = np.unique(counts, return_counts=True)
    idx = np.argsort(unique_counts)
    unique_counts = unique_counts[idx]
    count_counts = count_counts[idx]

    bins = []
    bin_modes = []
    bin_vars = []
    bin_means = []

    bin_freq = data_len / n_bins
    cur_freq = 0
    cur_bin = []
    cur_bin_count = []
    for i, uni_c in enumerate(unique_counts):
        cur_freq += count_counts[i] * uni_c
        cur_bin.append(uniques[np.where(counts == uni_c)[0]])
        cur_bin_count.extend([uni_c] * count_counts[i])
        if (cur_freq > bin_freq) or (i == (len(unique_counts) - 1)):
            bins.append(np.concatenate(cur_bin))
            cur_bin_count = np.asarray(cur_bin_count)
            bin_modes.append(uni_c)
            bin_means.append(np.mean(cur_bin_count))
            bin_vars.append(np.var(cur_bin_count))
            cur_freq = 0
            cur_bin = []
            cur_bin_count = []
    assert len(uniques) == sum([len(b) for b in bins]), f"some unique values missed or duplicated"
    if return_bucket:
        return Bucket(name, bins, bin_modes, bin_vars, bin_means)
    else:
        return bins, bin_means


def apply_binning_to_data(bins, bin_means, data, start_key_data, n_bins, uniques, counts):
    # apply one greedy binning step based on existing bins
    unique_remains = np.setdiff1d(uniques, np.concatenate(bins))
    if len(unique_remains) != 0:
        remaining_data = data[np.isin(data, unique_remains)]
        unique_remain, count_remain = np.unique(remaining_data, return_counts=True)
        unique_counts = np.unique(count_remain)
        for u in unique_counts:
            temp_idx = np.searchsorted(bin_means, u)
            if temp_idx == len(bins):
                idx = -1
            elif temp_idx == 0:
                idx = 0
            else:
                if (u - bin_means[temp_idx - 1]) >= (bin_means[temp_idx] - u):
                    idx = temp_idx - 1
                else:
                    idx = temp_idx
            temp_unique = unique_remain[count_remain == u]
            bins[idx] = np.concatenate((bins[idx], temp_unique))   #modifying bins in place

    bin_vars = []
    temp_bin_means = []
    for i, bin in enumerate(bins):
        idx = np.where(np.isin(uniques, bin) == 1)[0]
        if len(idx) != 0:
            bin_vars.append(np.var(counts[idx]))
            temp_bin_means.append(np.mean(counts[idx]))
        else:
            bin_vars.append(0)
            temp_bin_means.append(1)

    assign_nbins = assign_bins_by_var(n_bins, bin_vars, temp_bin_means)
    
    new_bins = []
    new_bin_means = []
    for i, bin in enumerate(bins):
        if assign_nbins[i] == 0:
            new_bins.append(bin)
            new_bin_means.append(bin_means[i])
        else:
            curr_bin_data = data[np.isin(data, bin)]
            curr_start_key_data = start_key_data[np.isin(start_key_data, bin)]
            curr_bins, curr_bin_means = divide_bin(bin, curr_bin_data, assign_nbins[i]+1, curr_start_key_data)
            new_bins.extend(curr_bins)
            new_bin_means.extend(curr_bin_means)

    return new_bins, new_bin_means


def assign_bins_by_var(n_bins, bin_vars, bin_means, small_threshold=0.2, large_threshold=2):
    assign_nbins = np.zeros(len(bin_vars))
    remaining_nbins = n_bins
    idx = np.argsort(bin_vars)[::-1]
    if bin_vars[idx[0]]/bin_means[idx[0]] <= small_threshold:
        return assign_nbins

    while remaining_nbins > 0:
        for i in range(len(assign_nbins)):
            normalized_var = bin_vars[idx[i]]/bin_means[idx[i]]
            if normalized_var >= large_threshold:
                assign_nbins[i] += min(remaining_nbins, 2)
                remaining_nbins -= min(remaining_nbins, 2)
            elif normalized_var > small_threshold:
                assign_nbins[i] += 1
                remaining_nbins -= 1
            if remaining_nbins <= 0:
                break
    return assign_nbins


def divide_bin(bin, curr_bin_data, n_bins, start_key_data):
    # divide one bin into multiple bins to minimize the variance of curr_bin_data
    uniques, counts = np.unique(curr_bin_data, return_counts=True)
    if len(uniques) == 0:
        return [], []
        
    if len(uniques) <= n_bins:
        new_bins = []
        bin_means = []
        remaining_values = bin

        for i, uni in enumerate(uniques):
            new_bins.append([uni])
            remaining_values = np.setdiff1d(remaining_values, np.asarray([uni]))

        # randomly assign the remaining index to some bins
        if len(remaining_values) > 0:
            assign_idx = np.random.randint(0, len(new_bins), size=len(remaining_values))
            for i in range(len(new_bins)):
                new_bins[i].extend(list(remaining_values[assign_idx == i]))
                new_bins[i] = np.asarray(new_bins[i])
        
        for bin in new_bins:
            curr_bin_data = start_key_data[np.isin(start_key_data, bin)]
            if len(curr_bin_data) == 0:
                bin_means.append(0)
            else:
                _, count = np.unique(curr_bin_data, return_counts=True)
                bin_means.append(np.mean(count))
        return new_bins, bin_means

    idx = np.argsort(counts)
    counts = counts[idx]
    uniques = uniques[idx]

    # Natural breaks optimization using Fisher-Jenks Algorithms
    breaks = jenkspy.jenks_breaks(counts, nb_class=n_bins)
    breaks[-1] += 0.01
    new_bins = []
    bin_means = []
    remaining_values = np.asarray(bin)
    for i in range(1, len(breaks)):
        idx = np.where((breaks[i-1] <= counts) & (counts < breaks[i]))[0]
        new_bins.append(uniques[idx])
        remaining_values = np.setdiff1d(remaining_values, uniques[idx])
    
    if len(remaining_values) > 0:
        assign_idx = np.random.randint(0, len(new_bins), size=len(remaining_values))
        for i in range(len(new_bins)):
            new_bins[i] = np.concatenate((new_bins[i], remaining_values[assign_idx == i]))
            
    for bin in new_bins:
        curr_bin_data = start_key_data[np.isin(start_key_data, bin)]
        if len(curr_bin_data) == 0:
            bin_means.append(0)
        else:
            _, count = np.unique(curr_bin_data, return_counts=True)
            bin_means.append(np.mean(count))
    return new_bins, bin_means


def compute_variance_score(buckets):
    """
    compute the variance of products of random variables
    """
    all_mean = np.asarray([buckets[k].bin_means for k in buckets])
    all_var = np.asarray([buckets[k].bin_vars for k in buckets])
    return np.sum(np.prod(all_var + all_mean ** 2, axis=0) - np.prod(all_mean, axis=0) ** 2)


def greedy_bucketize(data, sample_rate, n_bins=30, primary_keys=[], return_data=False):
    """
    Perform sub-optimal bucketization on a group of equivalent join keys.
    A greedy algorithm that assigns half of the bins to one key at a time.
    :param data: a dict of (potentially sampled) table data of the keys
                 the keys of this dict are one group of equivalent join keys
    :param sample_rate: the sampling rate the data, could be all 1 if no sampling is performed
    :param n_bins: how many bins can we allocate
    :param primary_keys: the primary keys in the equivalent group since we don't need to bucketize PK.
    :return: new data, where the keys are bucketized
             the mode of each bucket
    """
    unique_values = dict()
    key_orders = []
    data_lens = []
    curr_pk = []
    for key in data:
        if key not in primary_keys:
            unique_values[key] = np.unique(data[key], return_counts=True)
            key_orders.append(key)
            data_lens.append(len(data[key]))
        else:
            curr_pk.append(key)
    key_orders = [key_orders[i] for i in np.argsort(data_lens)[::-1]]
    print(key_orders)
    print(curr_pk)
    remaining_bins = n_bins
    start_key = key_orders[0]
    curr_bins = None
    curr_bin_means = None
    for key in key_orders:
        print(key)
        if key == key_orders[-1]:
            # least key value use up all remaining bins, otherwise use half of it
            assign_bins = remaining_bins
        else:
            assign_bins = remaining_bins // 2
        if key == start_key:
            curr_bins, curr_bin_means = equal_freq_binning(key, unique_values[key], assign_bins, len(data[key]), False)
        else:
            curr_bins, curr_bin_means = apply_binning_to_data(curr_bins, curr_bin_means, data[key],
                                                              data[start_key], assign_bins,
                                                              unique_values[key][0], unique_values[key][1])
        print(len(curr_bins), len(curr_bin_means))
        remaining_bins = n_bins - len(curr_bins)

    new_data, best_buckets, curr_bins = bin_all_data_with_existing_binning(curr_bins, data, sample_rate, curr_pk, return_data)
    best_buckets = Bucket_group(best_buckets, start_key, sample_rate, curr_bins, primary_keys=curr_pk)
    return new_data, best_buckets


def sub_optimal_bucketize(data, sample_rate, n_bins=30, primary_keys=[]):
    """
    Perform sub-optimal bucketization on a group of equivalent join keys.
    :param data: a dict of (potentially sampled) table data of the keys
                 the keys of this dict are one group of equivalent join keys
    :param sample_rate: the sampling rate the data, could be all 1 if no sampling is performed
    :param n_bins: how many bins can we allocate
    :param primary_keys: the primary keys in the equivalent group since we don't need to bucketize PK.
    :return: new data, where the keys are bucketized
             the mode of each bucket
    """
    unique_values = dict()
    for key in data:
        if key not in primary_keys:
            unique_values[key] = np.unique(data[key], return_counts=True)

    best_variance_score = np.infty
    best_bin_len = 0
    best_start_key = None
    best_buckets = None
    for start_key in data:
        if start_key in primary_keys:
            continue
        start_bucket = equal_freq_binning(start_key, unique_values[start_key], n_bins, len(data[start_key]), True)
        rest_buckets = dict()
        for key in data:
            if key == start_key or key in primary_keys:
                continue
            uniques = unique_values[key][0]
            counts = unique_values[key][1]
            rest_buckets[key] = Bucket(key, [], [0] * len(start_bucket.bins), [0] * len(start_bucket.bins),
                                       [0] * len(start_bucket.bins), uniques)
            for i, bin in enumerate(start_bucket.bins):
                idx = np.where(np.isin(uniques, bin) == 1)[0]
                if len(idx) != 0:
                    bin_count = counts[idx]
                    unique_bin_keys = uniques[idx]
                    # unique_bin_count = np.unique(bin_count)
                    # bin_count = np.concatenate([counts[counts == j] for j in unique_bin_count])
                    # unique_bin_keys = np.concatenate([uniques[counts == j] for j in unique_bin_count])
                    rest_buckets[key].rest_bins_remaining = np.setdiff1d(rest_buckets[key].rest_bins_remaining,
                                                                         unique_bin_keys)
                    rest_buckets[key].bin_modes[i] = np.max(bin_count)
                    rest_buckets[key].bin_vars[i] = np.var(bin_count)
                    rest_buckets[key].bin_means[i] = np.mean(bin_count)

        rest_buckets[start_key] = start_bucket
        var_score = compute_variance_score(rest_buckets)
        if len(start_bucket.bins) >= best_bin_len * 1.1:
            best_variance_score = var_score
            best_start_key = start_key
            best_buckets = rest_buckets
            best_bin_len = len(start_bucket.bins)
        elif len(start_bucket.bins) >= best_bin_len * 0.9 and var_score < best_variance_score:
            best_variance_score = var_score
            best_start_key = start_key
            best_buckets = rest_buckets
            best_bin_len = len(start_bucket.bins)

    best_buckets = Bucket_group(best_buckets, best_start_key, sample_rate, primary_keys=primary_keys)
    new_data = best_buckets.bucketize(data)
    return new_data, best_buckets


def fixed_start_key_bucketize(start_key, data, sample_rate, n_bins=30, primary_keys=[]):
    """
    Perform sub-optimal bucketization on a group of equivalent join keys based on the pre-defined start_key.
    :param data: a dict of (potentially sampled) table data of the keys
                 the keys of this dict are one group of equivalent join keys
    :param sample_rate: the sampling rate the data, could be all 1 if no sampling is performed
    :param n_bins: how many bins can we allocate
    :param primary_keys: the primary keys in the equivalent group since we don't need to bucketize PK.
    :return: new data, where the keys are bucketized
             the mode of each bucket
    """
    unique_values = dict()
    for key in data:
        if key not in primary_keys:
            unique_values[key] = np.unique(data[key], return_counts=True)

    start_bucket = equal_freq_binning(start_key, unique_values[start_key], n_bins, len(data[start_key]), True)
    rest_buckets = dict()
    for key in data:
        if key == start_key or key in primary_keys:
            continue
        uniques = unique_values[key][0]
        counts = unique_values[key][1]
        rest_buckets[key] = Bucket(key, [], [0] * len(start_bucket.bins), [0] * len(start_bucket.bins),
                                   [0] * len(start_bucket.bins), uniques)
        for i, bin in enumerate(start_bucket.bins):
            idx = np.where(np.isin(uniques, bin) == 1)[0]
            if len(idx) != 0:
                bin_count = counts[idx]
                unique_bin_keys = uniques[idx]
                rest_buckets[key].rest_bins_remaining = np.setdiff1d(rest_buckets[key].rest_bins_remaining,
                                                                     unique_bin_keys)
                rest_buckets[key].bin_modes[i] = np.max(bin_count)
                rest_buckets[key].bin_means[i] = np.mean(bin_count)

    best_buckets = Bucket_group(rest_buckets, start_key, sample_rate, primary_keys=primary_keys)
    new_data = best_buckets.bucketize(data)
    return new_data, best_buckets


def bin_all_data_with_existing_binning(bins, data, sample_rate, curr_pk, return_data):
    buckets = dict()
    new_data = dict()
    if return_data:
        new_data = copy.deepcopy(data)

    for key in curr_pk:
        bin_modes = [1 for i in range(len(bins))]
        remaining_data = np.unique(data[key])
        for i, bin in enumerate(bins):
            if return_data:
                new_data[key][np.isin(data[key], bin)] = i
            remaining_data = np.setdiff1d(remaining_data, bin)
        if len(remaining_data) != 0:
            if return_data:
                # assigning all remaining key values to the first bin
                new_data[key][np.isin(data[key], remaining_data)] = 0
            bins[0] = np.concatenate((bins[0], remaining_data))
        buckets[key] = Bucket(key, bin_modes=bin_modes)

    for key in data:
        bin_modes = []
        for i, bin in enumerate(bins):
            curr_data = data[key][np.isin(data[key], bin)]
            if len(curr_data) == 0:
                bin_modes.append(0)
            else:
                bin_mode = stats.mode(curr_data).count[0]
                if bin_mode > 1:
                    bin_mode /= sample_rate[key]
                bin_modes.append(bin_mode)
                if return_data:
                    new_data[key][np.isin(data[key], bin)] = i
        buckets[key] = Bucket(key, bin_modes=bin_modes)

    return new_data, buckets, bins



def apply_binning_to_data_value_count(bins, data):
    res = np.zeros(len(bins))
    unique_remain = np.unique(data)
    for i, bin in enumerate(bins):
        res[i] = np.sum(np.isin(data, bin))
        unique_remain = np.setdiff1d(unique_remain, bin)

    res[0] += np.sum(np.isin(data, unique_remain))
    return res

In [4]:
data_path = "/home/ubuntu/End-to-End-CardEst-Benchmark/datasets/stats_simplified/{}.csv"
schema = gen_stats_light_schema(data_path)

In [5]:
all_keys, equivalent_keys = identify_key_values(schema)
print(equivalent_keys)

{'posts.Id': {'posts.Id', 'votes.PostId', 'postLinks.PostId', 'comments.PostId', 'tags.ExcerptPostId', 'postHistory.PostId', 'postLinks.RelatedPostId'}, 'users.Id': {'users.Id', 'comments.UserId', 'postHistory.UserId', 'badges.UserId', 'votes.UserId', 'posts.OwnerUserId'}}


In [35]:
data = dict()
sample_rate = dict()
primary_keys = []
for table_obj in schema.tables:
    df_rows = pd.read_csv(table_obj.csv_file_location)
    
    df_rows.columns = [table_obj.table_name + '.' + attr for attr in table_obj.attributes]

    for attribute in table_obj.irrelevant_attributes:
        df_rows = df_rows.drop(table_obj.table_name + '.' + attribute, axis=1)

    df_rows.apply(pd.to_numeric, errors="ignore")
    for attr in df_rows.columns:
        if attr in all_keys:
            print(attr)
            print(np.sum(np.isnan(df_rows[attr].values)))
            data[attr] = df_rows[attr].values
            data[attr][np.isnan(data[attr])] = -1
            data[attr][data[attr] < 0] = -1
            print(len(data[attr]), len(df_rows[attr].values))
            data[attr] = copy.deepcopy(data[attr])[data[attr]>=0]
            print(len(data[attr]), len(df_rows[attr].values))
            if len(np.unique(data[attr])) >= len(data[attr]) - 10:
                primary_keys.append(attr)
            print(np.sum(np.isnan(df_rows[attr].values)))

badges.UserId
0
79851 79851
79851 79851
0
votes.PostId
0
328064 328064
328064 328064
0
votes.UserId
293275
328064 328064
34773 328064
0
postHistory.PostId
0
303187 303187
303187 303187
0
postHistory.UserId
21328
303187 303187
277348 303187
0
posts.Id
0
91976 91976
91976 91976
0
posts.OwnerUserId
1392
91976 91976
90373 91976
0
users.Id
0
40325 40325
40324 40325
0
comments.PostId
0
174305 174305
174305 174305
0
comments.UserId
2835
174305 174305
171470 174305
0
postLinks.PostId
0
11102 11102
11102 11102
0
postLinks.RelatedPostId
0
11102 11102
11102 11102
0
tags.ExcerptPostId
436
1032 1032
596 1032
0


In [37]:
for k in data:
    sample_rate[k] = 1.0
    print(k, len(data[k]), np.sum(np.isnan(data[k])), len(data[k][data[k]<0]))
print(primary_keys)


badges.UserId 79851 0 0
votes.PostId 328064 0 0
votes.UserId 34773 0 0
postHistory.PostId 303187 0 0
postHistory.UserId 277348 0 0
posts.Id 91976 0 0
posts.OwnerUserId 90373 0 0
users.Id 40324 0 0
comments.PostId 174305 0 0
comments.UserId 171470 0 0
postLinks.PostId 11102 0 0
postLinks.RelatedPostId 11102 0 0
tags.ExcerptPostId 596 0 0
['posts.Id', 'users.Id', 'tags.ExcerptPostId']


In [49]:
temp = dict()
for PK in equivalent_keys:
    print(equivalent_keys[PK])
    group_data = {}
    group_sample_rate = {}
    for K in equivalent_keys[PK]:
        group_data[K] = data[K]
        group_sample_rate[K] = sample_rate[K]
    d, bucket = greedy_bucketize(group_data, sample_rate, n_bins=300, primary_keys=primary_keys, return_data=True)
    temp[PK] = bucket

{'posts.Id', 'votes.PostId', 'postLinks.PostId', 'comments.PostId', 'tags.ExcerptPostId', 'postHistory.PostId', 'postLinks.RelatedPostId'}
['votes.PostId', 'postHistory.PostId', 'comments.PostId', 'postLinks.RelatedPostId', 'postLinks.PostId']
['posts.Id', 'tags.ExcerptPostId']
votes.PostId
42 42
postHistory.PostId
171 171
comments.PostId
223 223
postLinks.RelatedPostId
240 240
postLinks.PostId
284 284
{'users.Id', 'comments.UserId', 'postHistory.UserId', 'badges.UserId', 'votes.UserId', 'posts.OwnerUserId'}
['postHistory.UserId', 'comments.UserId', 'posts.OwnerUserId', 'badges.UserId', 'votes.UserId']
['users.Id']
postHistory.UserId
88 88
comments.UserId
194 194
posts.OwnerUserId
247 247
badges.UserId
272 272
votes.UserId
284 284


In [44]:
print(len(bucket.buckets[bucket.start_key].bins))
for k in d:
    print("     ==      ")
    print(k, len(data[k]), np.sum(np.isnan(data[k])), len(data[k][data[k]<0]))
    print(len(d[k]), len(np.unique(d[k])))

0


In [48]:
bucket = temp["posts.Id"]
for k in bucket.buckets:
    print("==============================================================")
    print(k, len(bucket.buckets[k].bin_modes))
    print(bucket.buckets[k].bin_modes)
    print([len(b) for b in bucket.buckets[k].bins])
    print(bucket.buckets[k].bin_means)
    print(bucket.buckets[k].bin_vars)

posts.Id 284
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[]
[]
[]
votes.PostId 284
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,

In [98]:
import copy
import logging
import pickle
import numpy as np
import pandas as pd
import time
import sys
import os

sys.path.append("/Users/ziniuw/Desktop/research/Learned_QO/CE_scheme/")

from Schemas.stats.schema import gen_stats_light_schema
from Join_scheme.binning import identify_key_values, sub_optimal_bucketize, greedy_bucketize, Table_bucket

logger = logging.getLogger(__name__)


def timestamp_transorform(time_string, start_date="2010-07-19 00:00:00"):
    start_date_int = time.strptime(start_date, "%Y-%m-%d %H:%M:%S")
    time_array = time.strptime(time_string, "%Y-%m-%d %H:%M:%S")
    return int(time.mktime(time_array)) - int(time.mktime(start_date_int))


def read_table_hdf(table_obj):
    """
    Reads hdf from path, renames columns and drops unnecessary columns
    """
    df_rows = pd.read_hdf(table_obj.csv_file_location)
    df_rows.columns = [table_obj.table_name + '.' + attr for attr in table_obj.attributes]

    for attribute in table_obj.irrelevant_attributes:
        df_rows = df_rows.drop(table_obj.table_name + '.' + attribute, axis=1)

    return df_rows.apply(pd.to_numeric, errors="ignore")


def convert_time_to_int(data_folder):
    for file in os.listdir(data_folder):
        if file.endswith(".csv"):
            csv_file_location = data_folder + file
            df_rows = pd.read_csv(csv_file_location)
            for attribute in df_rows.columns:
                if "Date" in attribute:
                    if df_rows[attribute].values.dtype == 'object':
                        new_value = []
                        for value in df_rows[attribute].values:
                            new_value.append(timestamp_transorform(value))
                        df_rows[attribute] = new_value
            df_rows.to_csv(csv_file_location, index=False)


def read_table_csv(table_obj, csv_seperator=',', stats=True):
    """
    Reads csv from path, renames columns and drops unnecessary columns
    """
    if stats:
        df_rows = pd.read_csv(table_obj.csv_file_location)
    else:
        df_rows = pd.read_csv(table_obj.csv_file_location, header=None, escapechar='\\', encoding='utf-8',
                              quotechar='"',
                              sep=csv_seperator)
    df_rows.columns = [table_obj.table_name + '.' + attr for attr in table_obj.attributes]

    for attribute in table_obj.irrelevant_attributes:
        df_rows = df_rows.drop(table_obj.table_name + '.' + attribute, axis=1)

    return df_rows.apply(pd.to_numeric, errors="ignore")


def generate_table_buckets(data, key_attrs, bin_sizes, bin_modes, optimal_buckets):
    table_buckets = dict()
    for table in data:
        table_data = data[table]
        table_bucket = Table_bucket(table, key_attrs[table], bin_sizes[table])
        for key in key_attrs[table]:
            if key in bin_modes and len(bin_modes[key]) != 0:
                table_bucket.oned_bin_modes[key] = bin_modes[key]
            else:
                # this is a primary key
                table_bucket.oned_bin_modes[key] = np.ones(table_bucket.bin_sizes[key])
        # getting mode for 2D bins
        if len(key_attrs[table]) == 2:
            key1 = key_attrs[table][0]
            key2 = key_attrs[table][1]
            res1 = np.zeros((table_bucket.bin_sizes[key1], table_bucket.bin_sizes[key2]))
            res2 = np.zeros((table_bucket.bin_sizes[key1], table_bucket.bin_sizes[key2]))
            key_data = np.stack((table_data[key1].values, table_data[key2].values), axis=1)
            assert table_bucket.bin_sizes[key1] == len(optimal_buckets[key1].bins)
            assert table_bucket.bin_sizes[key2] == len(optimal_buckets[key2].bins)
            for v1, b1 in enumerate(optimal_buckets[key1].bins):
                temp_data = key_data[np.isin(key_data[:, 0], b1)]
                if len(temp_data) == 0:
                    continue
                assert np.max(np.unique(temp_data[:, 0], return_counts=True)[-1]) == table_bucket.oned_bin_modes[key1][
                    v1], f"{key1} data error at {v1}, with " \
                         f"{np.max(np.unique(temp_data[:, 0], return_counts=True)[-1])} and " \
                         f"{table_bucket.oned_bin_modes[key1][v1]}."
                for v2, b2 in enumerate(optimal_buckets[key2].bins):
                    temp_data2 = copy.deepcopy(temp_data[np.isin(temp_data[:, 1], b2)])
                    if len(temp_data2) == 0:
                        continue
                    res1[v1, v2] = np.max(np.unique(temp_data2[:, 0], return_counts=True)[-1])
                    res2[v1, v2] = np.max(np.unique(temp_data2[:, 1], return_counts=True)[-1])
            table_bucket.twod_bin_modes[key1] = res1
            table_bucket.twod_bin_modes[key2] = res2
        table_buckets[table] = table_bucket

    return table_buckets


def process_stats_data(data_path, model_folder, n_bins=500, bucket_method="greedy", save_bucket_bins=False):
    """
    Preprocessing stats data and generate optimal bucket
    :param data_path: path to stats data folder
    :param n_bins: number of bins (the actually number of bins returned will be smaller than this)
    :param bucket_method: choose between "sub_optimal" and "greedy". Please refer to binning.py for details.
    :param save_bucket_bins: Set to true for dynamic environment, the default is False for static environment
    :return:
    """
    if not data_path.endswith(".csv"):
        data_path += "/{}.csv"
    schema = gen_stats_light_schema(data_path)
    all_keys, equivalent_keys = identify_key_values(schema)
    data = dict()
    key_data = dict()  # store the columns of all keys
    sample_rate = dict()
    primary_keys = []
    null_values = dict()
    key_attrs = dict()
    for table_obj in schema.tables:
        table_name = table_obj.table_name
        null_values[table_name] = dict()
        key_attrs[table_name] = []
        df_rows = read_table_csv(table_obj, stats=True)
        for attr in df_rows.columns:
            if attr in all_keys:
                key_data[attr] = df_rows[attr].values
                # the nan value of id are set to -1, this is hardcoded.
                key_data[attr][np.isnan(key_data[attr])] = -1
                key_data[attr][key_data[attr] < 0] = -1
                null_values[table_name][attr] = -1
                key_data[attr] = copy.deepcopy(key_data[attr])[key_data[attr] >= 0]
                # if the all keys have exactly one appearance, we consider them primary keys
                # we set a error margin of 0.01 in case of data mis-write.
                if len(np.unique(key_data[attr])) >= len(key_data[attr]) * 0.99:
                    primary_keys.append(attr)
                sample_rate[attr] = 1.0
                key_attrs[table_name].append(attr)
            else:
                temp = df_rows[attr].values
                null_values[table_name][attr] = np.nanmin(temp) - 100
                temp[np.isnan(temp)] = null_values[table_name][attr]
        data[table_name] = df_rows

    all_bin_modes = dict()
    bin_size = dict()
    binned_data = dict()
    optimal_buckets = dict()
    for PK in equivalent_keys:
        print(f"bucketizing equivalent key group:", equivalent_keys[PK])
        group_data = {}
        group_sample_rate = {}
        for K in equivalent_keys[PK]:
            group_data[K] = key_data[K]
            group_sample_rate[K] = sample_rate[K]
        if bucket_method == "greedy":
            temp_data, optimal_bucket = greedy_bucketize(group_data, sample_rate, n_bins, primary_keys, True)
        elif bucket_method == "sub_optimal":
            temp_data, optimal_bucket = sub_optimal_bucketize(group_data, sample_rate, n_bins, primary_keys, True)
        else:
            assert False, f"unrecognized bucketization method: {bucket_method}"

        binned_data.update(temp_data)
        for K in equivalent_keys[PK]:
            optimal_buckets[K] = optimal_bucket
            temp_table_name = K.split(".")[0]
            if temp_table_name not in bin_size:
                bin_size[temp_table_name] = dict()
            bin_size[temp_table_name][K] = len(optimal_bucket.bins)
            all_bin_modes[K] = np.asarray(optimal_bucket.buckets[K].bin_modes)

    table_buckets = generate_table_buckets(data, key_attrs, bin_size, all_bin_modes, optimal_buckets)

    for K in binned_data:
        temp_table_name = K.split(".")[0]
        temp = data[temp_table_name][K].values
        temp[temp >= 0] = binned_data[K]

    if save_bucket_bins:
        with open(model_folder + f"/buckets.pkl") as f:
            pickle.dump(optimal_buckets, f, pickle.HIGHEST_PROTOCOL)

    return data, null_values, key_attrs, table_buckets, equivalent_keys, schema, bin_size

In [99]:
data_path = "/home/ubuntu/End-to-End-CardEst-Benchmark/datasets/stats_simplified/{}.csv"
model_folder = "/home/ubuntu/data_CE/saved_models"
data, null_values, key_attrs, table_buckets, equivalent_keys, schema, bin_size = process_stats_data(data_path,
                                                                       model_folder, 200, "greedy", False)

bucketizing equivalent key group: {'posts.Id', 'votes.PostId', 'postLinks.PostId', 'comments.PostId', 'tags.ExcerptPostId', 'postHistory.PostId', 'postLinks.RelatedPostId'}
['votes.PostId', 'postHistory.PostId', 'comments.PostId', 'postLinks.RelatedPostId', 'postLinks.PostId']
['posts.Id', 'tags.ExcerptPostId']
votes.PostId
34 34
postHistory.PostId
117 117
comments.PostId
155 155
postLinks.RelatedPostId
168 168
postLinks.PostId
190 190
bucketizing equivalent key group: {'users.Id', 'comments.UserId', 'postHistory.UserId', 'badges.UserId', 'votes.UserId', 'posts.OwnerUserId'}
['postHistory.UserId', 'comments.UserId', 'posts.OwnerUserId', 'badges.UserId', 'votes.UserId']
['users.Id']
postHistory.UserId
66 66
comments.UserId
133 133
posts.OwnerUserId
166 166
badges.UserId
183 183
votes.UserId
197 197


In [71]:
df_rows = pd.read_csv(data_path.format("badges"))
df_rows.head(5)

Unnamed: 0.1,Unnamed: 0,Id,UserId,Date
0,0,1,5,70747
1,1,2,6,70747
2,2,3,8,70747
3,3,4,23,70747
4,4,5,36,70747


In [None]:
table = "votes"
bucket = table_buckets[table]
for attr in bucket.id_attributes:
    print(attr)
    print(np.sum(bucket.oned_bin_modes[attr]), bucket.oned_bin_modes[attr].shape)
    if len(bucket.twod_bin_modes[attr]) != 0:
        print(np.sum(bucket.twod_bin_modes[attr]), bucket.twod_bin_modes[attr].shape)

In [None]:
table = "postLinks"
bucket = table_buckets[table]
for attr in bucket.id_attributes:
    print(attr)
    print(np.sum(bucket.oned_bin_modes[attr]), bucket.oned_bin_modes[attr].shape)
    if len(bucket.twod_bin_modes[attr]) != 0:
        print(np.sum(bucket.twod_bin_modes[attr]), bucket.twod_bin_modes[attr].shape)

In [None]:
print(bucket.oned_bin_modes[bucket.id_attributes[0]])
print(np.sum(bucket.twod_bin_modes[bucket.id_attributes[0]], axis=1))
print(bucket.oned_bin_modes[bucket.id_attributes[1]])
print(np.sum(bucket.twod_bin_modes[bucket.id_attributes[1]], axis=0))