Skip to content

Commit

Permalink
Testing universal sketch adding layers behavior
Browse files Browse the repository at this point in the history
* Increasing the number of layers only helps until a point, and then query accuracy decreases(although seems to still be within guaranteed bounds under current tests).

* Minor changes.

* Removed class templating.

* Commented out univmon dependency.
  • Loading branch information
ujvl committed Dec 20, 2018
1 parent 9dac9b3 commit f05c0f2
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 75 deletions.
42 changes: 38 additions & 4 deletions libconfluo/CMakeLists.txt
@@ -1,4 +1,4 @@
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-parameter")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-parameter -I/Users/Ujval/dev/research/univmon_extension/simulator/V0.3/")
if (NOT APPLE AND UNIX)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--no-as-needed -ldl")
endif ()
Expand All @@ -8,6 +8,32 @@ set(UTILS_INCLUDE ${CMAKE_CURRENT_SOURCE_DIR}/../libutils/utils)

include_directories(${GTEST_INCLUDE_DIR} ${CONFLUO_INCLUDE} ${UTILS_INCLUDE} ${Boost_INCLUDE_DIRS})

#SET(ALAN_UNIVMON_DIR "/Users/Ujval/dev/research/univmon_extension/simulator/V0.3/")
#INCLUDE_DIRECTORIES(${ALAN_UNIVMON_DIR})

#ADD_LIBRARY(
# alan_univmon
# ${ALAN_UNIVMON_DIR}/common.h
# ${ALAN_UNIVMON_DIR}/countSketch.h
# ${ALAN_UNIVMON_DIR}/counterInfo.h
# ${ALAN_UNIVMON_DIR}/data_plane.h
# ${ALAN_UNIVMON_DIR}/hash.h
# ${ALAN_UNIVMON_DIR}/hashInfo.h
# ${ALAN_UNIVMON_DIR}/os_galoisField.h
# ${ALAN_UNIVMON_DIR}/os_mangler.h
# ${ALAN_UNIVMON_DIR}/packet.h
# ${ALAN_UNIVMON_DIR}/pickdrop.h
# ${ALAN_UNIVMON_DIR}/topK.h
# ${ALAN_UNIVMON_DIR}/countSketch.cpp
# ${ALAN_UNIVMON_DIR}/data_plane.h
# ${ALAN_UNIVMON_DIR}/testUniversalSketch.cpp
# ${ALAN_UNIVMON_DIR}/testCountSketch.cpp
# ${ALAN_UNIVMON_DIR}/hash.cpp
# ${ALAN_UNIVMON_DIR}/topK.cpp
# test/container/sketch/universal_sketch_test.h confluo/container/sketch/priority_queue.h)

#TARGET_LINK_LIBRARIES(MYProgram svm_lib_light)

add_library(confluo STATIC
confluo/read_tail.h
confluo/atomic_multilog.h
Expand Down Expand Up @@ -95,6 +121,12 @@ add_library(confluo STATIC
confluo/container/monolog/monolog_linear.h
confluo/container/monolog/monolog_linear_bucket.h
confluo/container/radix_tree.h
confluo/container/sketch
confluo/container/sketch/count_sketch.h
confluo/container/sketch/hash_manager.h
confluo/container/sketch/priority_queue.h
confluo/container/sketch/sketch_utils.h
confluo/container/sketch/universal_sketch.h
confluo/schema/field.h
confluo/schema/record.h
confluo/schema/record_batch.h
Expand Down Expand Up @@ -185,7 +217,7 @@ add_library(confluo STATIC
src/types/raw_data.cc
src/types/numeric.cc
src/types/type_properties.cc
src/types/type_manager.cc)
src/types/type_manager.cc confluo/container/sketch/confluo_universal_sketch.h confluo/univ_sketch_log.h confluo/container/sketch/confluo_count_sketch.h)
target_link_libraries(confluo confluoutils ${CMAKE_THREAD_LIBS_INIT} ${lz4_STATIC_LIB})
add_dependencies(confluo lz4)

Expand Down Expand Up @@ -222,6 +254,8 @@ if (BUILD_TESTS)
test/container/cursor/batched_cursor_test.h
test/container/bitmap/bitmap_test.h
test/container/bitmap/bitmap_array_test.h
test/container/sketch/count_sketch_test.h
test/container/sketch/universal_sketch_test.h
test/container/bitmap/delta_encoded_array_test.h
test/container/stream_test.h
test/container/string_map_test.h
Expand All @@ -238,8 +272,8 @@ if (BUILD_TESTS)
test/compression/lz4_encode_test.h
test/compression/delta_encode_test.h
test/aggregated_reflog_test.h
test/confluo_store_test.h)
target_link_libraries(ctest confluo gtest gtest_main)
test/confluo_store_test.h test/container/sketch/priority_queue_test.h)
target_link_libraries(ctest confluo gtest gtest_main) #alan_univmon)
add_dependencies(ctest googletest)

file(GLOB_RECURSE test_sources test/*.cc)
Expand Down
76 changes: 37 additions & 39 deletions libconfluo/confluo/container/sketch/count_sketch.h
Expand Up @@ -27,26 +27,26 @@ class count_sketch {

/**
* Constructor.
* @param num_estimates number of estimates to track per update (depth)
* @param num_buckets number of buckets (width)
* @param t number of estimates per update (depth)
* @param b number of buckets (width)
*/
count_sketch(size_t num_estimates, size_t num_buckets)
: num_estimates_(num_estimates),
num_buckets_(num_buckets),
counters_(num_estimates_ * num_buckets_),
count_sketch(size_t t, size_t b)
: depth_(t),
width_(b),
counters_(depth_ * width_),
bucket_hash_manager_(),
sign_hash_manager_() {
for (size_t i = 0; i < counters_.size(); i++) {
atomic::store(&counters_[i], counter_t());
}
bucket_hash_manager_.guarantee_initialized(num_estimates_);
sign_hash_manager_.guarantee_initialized(num_estimates_);
bucket_hash_manager_.guarantee_initialized(depth_);
sign_hash_manager_.guarantee_initialized(depth_);
}

count_sketch(const count_sketch& other)
: num_estimates_(other.num_estimates_),
num_buckets_(other.num_buckets_),
counters_(num_estimates_ * num_buckets_),
: depth_(other.depth_),
width_(other.width_),
counters_(depth_ * width_),
bucket_hash_manager_(other.bucket_hash_manager_),
sign_hash_manager_(other.sign_hash_manager_) {
for (size_t i = 0; i < counters_.size(); i++) {
Expand All @@ -55,9 +55,9 @@ class count_sketch {
}

count_sketch& operator=(const count_sketch& other) {
num_estimates_ = other.num_estimates_;
num_buckets_ = other.num_buckets_;
counters_ = std::vector<atomic_counter_t>(num_estimates_ * num_buckets_);
depth_ = other.depth_;
width_ = other.width_;
counters_ = std::vector<atomic_counter_t>(depth_ * width_);
bucket_hash_manager_ = other.bucket_hash_manager_;
sign_hash_manager_ = other.sign_hash_manager_;
for (size_t i = 0; i < counters_.size(); i++) {
Expand All @@ -71,10 +71,10 @@ class count_sketch {
* @param key key
*/
void update(T key) {
for (size_t i = 0; i < num_estimates_; i++) {
size_t bucket_idx = bucket_hash_manager_.hash(i, key) % num_buckets_;
for (size_t i = 0; i < depth_; i++) {
size_t bucket_idx = bucket_hash_manager_.hash(i, key) % width_;
counter_t sign = to_sign(sign_hash_manager_.hash(i, key));
atomic::faa<counter_t>(&counters_[num_buckets_ * i + bucket_idx], sign);
atomic::faa<counter_t>(&counters_[width_ * i + bucket_idx], sign);
}
}

Expand All @@ -84,11 +84,11 @@ class count_sketch {
* @return estimated count
*/
counter_t estimate(T key) const {
std::vector<counter_t> median_buf(num_estimates_);
for (size_t i = 0; i < num_estimates_; i++) {
size_t bucket_idx = bucket_hash_manager_.hash(i, key) % num_buckets_;
std::vector<counter_t> median_buf(depth_);
for (size_t i = 0; i < depth_; i++) {
size_t bucket_idx = bucket_hash_manager_.hash(i, key) % width_;
counter_t sign = to_sign(sign_hash_manager_.hash(i, key));
median_buf[i] = sign * atomic::load(&counters_[num_buckets_ * i + bucket_idx]);
median_buf[i] = sign * atomic::load(&counters_[width_ * i + bucket_idx]);
}
return median(median_buf);
}
Expand All @@ -99,53 +99,51 @@ class count_sketch {
* @return old estimated count
*/
counter_t update_and_estimate(T key) {
std::vector<counter_t> median_buf(num_estimates_);
for (size_t i = 0; i < num_estimates_; i++) {
size_t bucket_idx = bucket_hash_manager_.hash(i, key) % num_buckets_;
std::vector<counter_t> median_buf(depth_);
for (size_t i = 0; i < depth_; i++) {
size_t bucket_idx = bucket_hash_manager_.hash(i, key) % width_;
counter_t sign = to_sign(sign_hash_manager_.hash(i, key));
counter_t old_count = atomic::faa<counter_t>(&counters_[num_buckets_ * i + bucket_idx], sign);
counter_t old_count = atomic::faa<counter_t>(&counters_[width_ * i + bucket_idx], sign);
median_buf[i] = sign * old_count;
}
return median(median_buf);
}

size_t depth() {
return num_estimates_;
return depth_;
}

size_t width() {
return num_buckets_;
return width_;
}

/**
* @return storage size of data structure in bytes
*/
size_t storage_size() {
size_t counters_size_bytes = sizeof(atomic_counter_t) * (num_estimates_ * num_buckets_);
size_t counters_size_bytes = sizeof(atomic_counter_t) * (depth_ * width_);
// TODO account for hashes (O(n) increase)
return counters_size_bytes;
}

/**
* Create a cont_min_sketch with desired accuracy guarantees. TODO describe
* @param gamma desired probability of error (0 < gamma < 1)
* Create a count_sketch with desired accuracy guarantees.
* @param epsilon desired margin of error (0 < epsilon < 1)
* @param gamma desired probability of error (0 < gamma < 1)
* @param manager hash manager
* @return count min sketch with accuracy guarantees
*/
// TODO rename func
static count_sketch create_parameterized(double gamma, double epsilon) {
return count_sketch(count_sketch<T>::perror_to_num_estimates(gamma),
count_sketch<T>::error_margin_to_num_buckets(epsilon));
static count_sketch create_parameterized(double epsilon, double gamma) {
return count_sketch(count_sketch<T>::perror_to_depth(gamma),
count_sketch<T>::error_margin_to_width(epsilon));
}

// TODO move
/**
* Number of estimates per update computed from probability of error
* @param gamma desired probability of error
* @return number of estimates
*/
static size_t perror_to_num_estimates(double gamma) {
static size_t perror_to_depth(double gamma) {
assert(gamma > 0.0 && gamma < 1.0);
return size_t(std::ceil(sizeof(T) * 8 - std::log2(gamma))); // log2(N/gamma)
}
Expand All @@ -155,7 +153,7 @@ class count_sketch {
* @param epsilon desired error margin
* @return number of buckets
*/
static size_t error_margin_to_num_buckets(double epsilon) {
static size_t error_margin_to_width(double epsilon) {
assert(epsilon > 0.0 && epsilon < 1.0);
return size_t(std::ceil(std::exp(1) / (epsilon * epsilon)));
}
Expand All @@ -165,8 +163,8 @@ class count_sketch {
return num % 2 == 1 ? 1 : -1;
}

size_t num_estimates_; // depth
size_t num_buckets_; // width
size_t depth_; // number of estimates
size_t width_; // number of buckets

std::vector<atomic_counter_t> counters_;
hash_manager<T> bucket_hash_manager_;
Expand Down
23 changes: 10 additions & 13 deletions libconfluo/confluo/container/sketch/hash_manager.h
Expand Up @@ -10,38 +10,35 @@ namespace sketch {
/**
* Pairwise-independent hash
*/
template<typename T>
class pairwise_indep_hash {

public:
static const size_t PRIME = 39916801UL;
static constexpr size_t PRIME = 39916801UL;

pairwise_indep_hash()
: pairwise_indep_hash(0, 0) {
}

pairwise_indep_hash(size_t a, size_t b)
: a_(a),
b_(b),
hash_() {
b_(b) {
}

template<typename T>
size_t apply(T elem) const {
return (a_ * hash_(elem) + b_) % PRIME;
static std::hash<T> hash;
return (a_ * hash(elem) + b_) % PRIME;
}

static pairwise_indep_hash<T> generate_random() {
return pairwise_indep_hash<T>(utils::rand_utils::rand_uint64(PRIME), utils::rand_utils::rand_uint64(PRIME));
static pairwise_indep_hash generate_random() {
return { utils::rand_utils::rand_uint64(PRIME), utils::rand_utils::rand_uint64(PRIME) };
}

private:
size_t a_, b_;
std::hash<T> hash_;

};

template<typename T> const size_t pairwise_indep_hash<T>::PRIME;

template<typename T>
class hash_manager {
public:
Expand All @@ -62,7 +59,7 @@ class hash_manager {
size_t cur_size = hashes_.size();
size_t num_new_hashes = num_hashes > cur_size ? num_hashes - cur_size : 0;
for (size_t i = 0; i < num_new_hashes; i++) {
hashes_.push_back(pairwise_indep_hash<T>::generate_random());
hashes_.push_back(pairwise_indep_hash::generate_random());
}
}

Expand All @@ -73,11 +70,11 @@ class hash_manager {
* @return hashed value
*/
size_t hash(size_t hash_id, T elem) const {
return hashes_[hash_id].apply(elem);
return hashes_[hash_id].template apply<T>(elem);
}

private:
std::vector<pairwise_indep_hash<T>> hashes_;
std::vector<pairwise_indep_hash> hashes_;

};

Expand Down
18 changes: 8 additions & 10 deletions libconfluo/confluo/container/sketch/universal_sketch.h
Expand Up @@ -36,7 +36,7 @@ class substream_summary {
sketch_(t, b),
heavy_hitters_(k),
hhs_precise_(),
hh_hash_(pairwise_indep_hash<T>::generate_random()),
hh_hash_(pairwise_indep_hash::generate_random()),
use_precise_hh_(precise) {
}

Expand Down Expand Up @@ -152,7 +152,7 @@ class substream_summary {
}
bool done = false;
while (!done) {
size_t idx = hh_hash_.apply(key) % heavy_hitters_.size();
size_t idx = hh_hash_.apply<T>(key) % heavy_hitters_.size();
T prev = atomic::load(&heavy_hitters_[idx]);
if (prev == key)
return;
Expand All @@ -176,7 +176,7 @@ class substream_summary {
sketch sketch_;
atomic_vector_t heavy_hitters_;
heavy_hitter_set<T, counter_t> hhs_precise_;
pairwise_indep_hash<T> hh_hash_;
pairwise_indep_hash hh_hash_;

bool use_precise_hh_;

Expand All @@ -198,7 +198,7 @@ class universal_sketch {
* @param precise track exact heavy hitters
*/
universal_sketch(size_t t, size_t b, size_t k, double a)
: universal_sketch(8 * sizeof(T), t, b, a, k) {
: universal_sketch(8 * sizeof(T), t, b, k, a) {
}

/**
Expand Down Expand Up @@ -235,7 +235,7 @@ class universal_sketch {

/**
* Update universal sketch with an element.
* @param elem element
* @param key key
*/
void update(T key) {
substream_summaries_[0].update(key);
Expand Down Expand Up @@ -350,11 +350,9 @@ class universal_sketch {
return total_size;
}

static universal_sketch<T, counter_t> create_parameterized(double gamma, double epsilon,
double hh_threshold, size_t num_heavy_hitters) {
return universal_sketch<T, counter_t>(count_sketch<T, counter_t>::perror_to_num_estimates(gamma),
count_sketch<T, counter_t>::error_margin_to_num_buckets(epsilon),
hh_threshold, num_heavy_hitters);
static universal_sketch<T, counter_t> create_parameterized(double gamma, double epsilon, double a, size_t k) {
return universal_sketch<T, counter_t>(count_sketch<T, counter_t>::error_margin_to_width(epsilon),
count_sketch<T, counter_t>::perror_to_depth(gamma), k, a);
}

private:
Expand Down

0 comments on commit f05c0f2

Please sign in to comment.