Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions 3rd/datasketches/datasketches/cpc/cpc_sketch_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,9 @@ cpc_sketch_alloc<A> cpc_sketch_alloc<A>::deserialize(std::istream& is, uint64_t

template<typename A>
cpc_sketch_alloc<A> cpc_sketch_alloc<A>::deserialize(const void* bytes, size_t size, uint64_t seed) {
ensure_minimum_memory(size, 8);
const char* ptr = static_cast<const char*>(bytes);
const char* base = static_cast<const char*>(bytes);
uint8_t preamble_ints;
ptr += copy_from_mem(ptr, &preamble_ints, sizeof(preamble_ints));
uint8_t serial_version;
Expand All @@ -606,6 +608,7 @@ cpc_sketch_alloc<A> cpc_sketch_alloc<A>::deserialize(const void* bytes, size_t s
const bool has_hip = flags_byte & (1 << flags::HAS_HIP);
const bool has_table = flags_byte & (1 << flags::HAS_TABLE);
const bool has_window = flags_byte & (1 << flags::HAS_WINDOW);
ensure_minimum_memory(size, preamble_ints << 2);
compressed_state<A> compressed;
compressed.table_data_words = 0;
compressed.table_num_entries = 0;
Expand All @@ -614,30 +617,38 @@ cpc_sketch_alloc<A> cpc_sketch_alloc<A>::deserialize(const void* bytes, size_t s
double kxp = 0;
double hip_est_accum = 0;
if (has_table || has_window) {
check_memory_size(ptr - base + sizeof(num_coupons), size);
ptr += copy_from_mem(ptr, &num_coupons, sizeof(num_coupons));
if (has_table && has_window) {
check_memory_size(ptr - base + sizeof(compressed.table_num_entries), size);
ptr += copy_from_mem(ptr, &compressed.table_num_entries, sizeof(compressed.table_num_entries));
if (has_hip) {
check_memory_size(ptr - base + sizeof(kxp) + sizeof(hip_est_accum), size);
ptr += copy_from_mem(ptr, &kxp, sizeof(kxp));
ptr += copy_from_mem(ptr, &hip_est_accum, sizeof(hip_est_accum));
}
}
if (has_table) {
check_memory_size(ptr - base + sizeof(compressed.table_data_words), size);
ptr += copy_from_mem(ptr, &compressed.table_data_words, sizeof(compressed.table_data_words));
}
if (has_window) {
check_memory_size(ptr - base + sizeof(compressed.window_data_words), size);
ptr += copy_from_mem(ptr, &compressed.window_data_words, sizeof(compressed.window_data_words));
}
if (has_hip && !(has_table && has_window)) {
check_memory_size(ptr - base + sizeof(kxp) + sizeof(hip_est_accum), size);
ptr += copy_from_mem(ptr, &kxp, sizeof(kxp));
ptr += copy_from_mem(ptr, &hip_est_accum, sizeof(hip_est_accum));
}
if (has_window) {
compressed.window_data.resize(compressed.window_data_words);
check_memory_size(ptr - base + sizeof(compressed.window_data_words) + sizeof(uint32_t), size);
ptr += copy_from_mem(ptr, compressed.window_data.data(), compressed.window_data_words * sizeof(uint32_t));
}
if (has_table) {
compressed.table_data.resize(compressed.table_data_words);
check_memory_size(ptr - base + sizeof(compressed.table_data_words), size);
ptr += copy_from_mem(ptr, compressed.table_data.data(), compressed.table_data_words * sizeof(uint32_t));
}
if (!has_window) compressed.table_num_entries = num_coupons;
Expand Down
3 changes: 2 additions & 1 deletion 3rd/datasketches/datasketches/cpc/cpc_union_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ template<typename A>
cpc_union_alloc<A>::cpc_union_alloc(uint8_t lg_k, uint64_t seed):
lg_k(lg_k),
seed(seed),
accumulator(new (AllocCpc().allocate(1)) cpc_sketch_alloc<A>(lg_k, seed)),
accumulator(nullptr),
bit_matrix()
{
if (lg_k < CPC_MIN_LG_K || lg_k > CPC_MAX_LG_K) {
throw std::invalid_argument("lg_k must be >= " + std::to_string(CPC_MIN_LG_K) + " and <= " + std::to_string(CPC_MAX_LG_K) + ": " + std::to_string(lg_k));
}
accumulator = new (AllocCpc().allocate(1)) cpc_sketch_alloc<A>(lg_k, seed);
}

template<typename A>
Expand Down
19 changes: 14 additions & 5 deletions 3rd/datasketches/datasketches/fi/frequent_items_sketch_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <cstring>
#include <limits>

#include "memory_operations.hpp"

namespace datasketches {

// clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug"
Expand Down Expand Up @@ -209,6 +211,7 @@ vector_u8<A> frequent_items_sketch<T, W, H, E, S, A>::serialize(unsigned header_
const size_t size = header_size_bytes + get_serialized_size_bytes();
vector_u8<A> bytes(size);
uint8_t* ptr = bytes.data() + header_size_bytes;
uint8_t* end_ptr = ptr + size;

const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY;
ptr += copy_to_mem(&preamble_longs, ptr, sizeof(uint8_t));
Expand Down Expand Up @@ -245,7 +248,8 @@ vector_u8<A> frequent_items_sketch<T, W, H, E, S, A>::serialize(unsigned header_
}
ptr += copy_to_mem(weights, ptr, sizeof(W) * num_items);
AllocW().deallocate(weights, num_items);
ptr += S().serialize(ptr, items, num_items);
const size_t bytes_remaining = end_ptr - ptr;
ptr += S().serialize(ptr, bytes_remaining, items, num_items);
for (unsigned i = 0; i < num_items; i++) items[i].~T();
A().deallocate(items, num_items);
}
Expand Down Expand Up @@ -276,7 +280,7 @@ frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>:
check_family_id(family_id);
check_size(lg_cur_size, lg_max_size);

frequent_items_sketch<T, W, H, E, S, A> sketch(lg_cur_size, lg_max_size);
frequent_items_sketch<T, W, H, E, S, A> sketch(lg_max_size, lg_cur_size);
if (!is_empty) {
uint32_t num_items;
is.read((char*)&num_items, sizeof(num_items));
Expand Down Expand Up @@ -308,7 +312,9 @@ frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>:

template<typename T, typename W, typename H, typename E, typename S, typename A>
frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>::deserialize(const void* bytes, size_t size) {
ensure_minimum_memory(size, 8);
const char* ptr = static_cast<const char*>(bytes);
const char* base = static_cast<const char*>(bytes);
uint8_t preamble_longs;
ptr += copy_from_mem(ptr, &preamble_longs, sizeof(uint8_t));
uint8_t serial_version;
Expand All @@ -330,8 +336,9 @@ frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>:
check_serial_version(serial_version);
check_family_id(family_id);
check_size(lg_cur_size, lg_max_size);
ensure_minimum_memory(size, 1 << preamble_longs);

frequent_items_sketch<T, W, H, E, S, A> sketch(lg_cur_size, lg_max_size);
frequent_items_sketch<T, W, H, E, S, A> sketch(lg_max_size, lg_cur_size);
if (!is_empty) {
uint32_t num_items;
ptr += copy_from_mem(ptr, &num_items, sizeof(uint32_t));
Expand All @@ -345,9 +352,11 @@ frequent_items_sketch<T, W, H, E, S, A> frequent_items_sketch<T, W, H, E, S, A>:
// batch deserialization with intermediate array of items and weights
typedef typename std::allocator_traits<A>::template rebind_alloc<W> AllocW;
W* weights = AllocW().allocate(num_items);
ensure_minimum_memory(size, ptr - base + (sizeof(W) * num_items));
ptr += copy_from_mem(ptr, weights, sizeof(W) * num_items);
T* items = A().allocate(num_items);
ptr += S().deserialize(ptr, items, num_items);
const size_t bytes_remaining = size - (ptr - base);
ptr += S().deserialize(ptr, bytes_remaining, items, num_items);
for (uint32_t i = 0; i < num_items; i++) {
sketch.update(std::move(items[i]), weights[i]);
items[i].~T();
Expand Down Expand Up @@ -436,7 +445,7 @@ void frequent_items_sketch<T, W, H, E, S, A>::check_weight(WW weight) {
// version for integral unsigned type - no-op
template<typename T, typename W, typename H, typename E, typename S, typename A>
template<typename WW, typename std::enable_if<std::is_integral<WW>::value && std::is_unsigned<WW>::value, int>::type>
void frequent_items_sketch<T, W, H, E, S, A>::check_weight(WW weight) {}
void frequent_items_sketch<T, W, H, E, S, A>::check_weight(WW) {}

// version for floating point type
template<typename T, typename W, typename H, typename E, typename S, typename A>
Expand Down
23 changes: 15 additions & 8 deletions 3rd/datasketches/datasketches/fi/reverse_purge_hash_map_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <iterator>
#include <cmath>

#include "MurmurHash3.h"

namespace datasketches {

// clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug"
Expand Down Expand Up @@ -86,13 +88,18 @@ reverse_purge_hash_map<K, V, H, E, A>::~reverse_purge_hash_map() {
const uint32_t size = 1 << lg_cur_size;
if (num_active > 0) {
for (uint32_t i = 0; i < size; i++) {
if (is_active(i)) keys[i].~K();
if (--num_active == 0) break;
if (is_active(i)) {
keys[i].~K();
if (--num_active == 0) break;
}
}
}
A().deallocate(keys, size);
AllocV().deallocate(values, size);
AllocU16().deallocate(states, size);
if (keys != nullptr)
A().deallocate(keys, size);
if (values != nullptr)
AllocV().deallocate(values, size);
if (states != nullptr)
AllocU16().deallocate(states, size);
}

template<typename K, typename V, typename H, typename E, typename A>
Expand Down Expand Up @@ -142,7 +149,7 @@ V reverse_purge_hash_map<K, V, H, E, A>::adjust_or_insert(K&& key, V value) {
template<typename K, typename V, typename H, typename E, typename A>
V reverse_purge_hash_map<K, V, H, E, A>::get(const K& key) const {
const uint32_t mask = (1 << lg_cur_size) - 1;
uint32_t probe = H()(key) & mask;
uint32_t probe = fmix64(H()(key)) & mask;
while (is_active(probe)) {
if (E()(keys[probe], key)) return values[probe];
probe = (probe + 1) & mask;
Expand Down Expand Up @@ -251,7 +258,7 @@ void reverse_purge_hash_map<K, V, H, E, A>::hash_delete(uint32_t delete_index) {
template<typename K, typename V, typename H, typename E, typename A>
uint32_t reverse_purge_hash_map<K, V, H, E, A>::internal_adjust_or_insert(const K& key, V value) {
const uint32_t mask = (1 << lg_cur_size) - 1;
uint32_t index = H()(key) & mask;
uint32_t index = fmix64(H()(key)) & mask;
uint16_t drift = 1;
while (is_active(index)) {
if (E()(keys[index], key)) {
Expand Down Expand Up @@ -326,7 +333,7 @@ V reverse_purge_hash_map<K, V, H, E, A>::purge() {
}
i++;
}
std::nth_element(&samples[0], &samples[num_samples / 2], &samples[num_samples - 1]);
std::nth_element(&samples[0], &samples[num_samples / 2], &samples[num_samples]);
const V median = samples[num_samples / 2];
AllocV().deallocate(samples, limit);
subtract_and_keep_positive_only(median);
Expand Down
4 changes: 2 additions & 2 deletions 3rd/datasketches/datasketches/kll/kll_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class kll_helper {
if (std::isnan(values[i])) {
throw std::invalid_argument("Values must not be NaN");
}
if ((i < (size - 1)) and !(C()(values[i], values[i + 1]))) {
if ((i < (size - 1)) && !(C()(values[i], values[i + 1]))) {
throw std::invalid_argument("Values must be unique and monotonically increasing");
}
}
Expand All @@ -77,7 +77,7 @@ class kll_helper {
static typename std::enable_if<!std::is_floating_point<T>::value, void>::type
validate_values(const T* values, uint32_t size) {
for (uint32_t i = 0; i < size ; i++) {
if ((i < (size - 1)) and !(C()(values[i], values[i + 1]))) {
if ((i < (size - 1)) && !(C()(values[i], values[i + 1]))) {
throw std::invalid_argument("Values must be unique and monotonically increasing");
}
}
Expand Down
19 changes: 13 additions & 6 deletions 3rd/datasketches/datasketches/kll/kll_sketch_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <iostream>
#include <iomanip>

#include "memory_operations.hpp"
#include "kll_helper.hpp"

namespace datasketches {
Expand Down Expand Up @@ -417,6 +418,7 @@ vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const
const size_t size = header_size_bytes + get_serialized_size_bytes();
vector_u8<A> bytes(size);
uint8_t* ptr = bytes.data() + header_size_bytes;
uint8_t* end_ptr = ptr + size;
const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
ptr += copy_to_mem(&preamble_ints, ptr, sizeof(preamble_ints));
const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
Expand All @@ -440,10 +442,11 @@ vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const
ptr += copy_to_mem(&num_levels_, ptr, sizeof(num_levels_));
ptr += copy_to_mem(&unused, ptr, sizeof(unused));
ptr += copy_to_mem(levels_, ptr, sizeof(levels_[0]) * num_levels_);
ptr += S().serialize(ptr, min_value_, 1);
ptr += S().serialize(ptr, max_value_, 1);
ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1);
}
ptr += S().serialize(ptr, &items_[levels_[0]], get_num_retained());
const size_t bytes_remaining = end_ptr - ptr;
ptr += S().serialize(ptr, bytes_remaining, &items_[levels_[0]], get_num_retained());
}
const size_t delta = ptr - bytes.data();
if (delta != size) throw std::logic_error("serialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
Expand Down Expand Up @@ -478,6 +481,7 @@ kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is) {

template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size) {
ensure_minimum_memory(size, 8);
const char* ptr = static_cast<const char*>(bytes);
uint8_t preamble_ints;
ptr += copy_from_mem(ptr, &preamble_ints, sizeof(preamble_ints));
Expand All @@ -496,6 +500,7 @@ kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, si
check_preamble_ints(preamble_ints, flags_byte);
check_serial_version(serial_version);
check_family_id(family_id);
ensure_minimum_memory(size, 1 << preamble_ints);

const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
return is_empty ? kll_sketch<T, C, S, A>(k) : kll_sketch<T, C, S, A>(k, flags_byte, bytes, size);
Expand Down Expand Up @@ -562,12 +567,14 @@ kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, std::istream&

// for deserialization
// the common part of the preamble was read and compatibility checks were done
// we also assume we have already checked that the preamble information fits within the buffer
template<typename T, typename C, typename S, typename A>
kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, const void* bytes, size_t size) {
k_ = k;
m_ = DEFAULT_M;
const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2
const char* ptr = static_cast<const char*>(bytes) + DATA_START_SINGLE_ITEM;
const char* end_ptr = static_cast<const char*>(bytes) + size;
if (is_single_item) {
n_ = 1;
min_k_ = k_;
Expand All @@ -591,13 +598,13 @@ kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, const void* b
min_value_ = A().allocate(1);
max_value_ = A().allocate(1);
if (!is_single_item) {
ptr += S().deserialize(ptr, min_value_, 1);
ptr += S().deserialize(ptr, max_value_, 1);
ptr += S().deserialize(ptr, end_ptr - ptr, min_value_, 1);
ptr += S().deserialize(ptr, end_ptr - ptr, max_value_, 1);
}
items_ = A().allocate(capacity);
items_size_ = capacity;
const auto num_items(levels_[num_levels_] - levels_[0]);
ptr += S().deserialize(ptr, &items_[levels_[0]], num_items);
ptr += S().deserialize(ptr, end_ptr - ptr, &items_[levels_[0]], num_items);
if (is_single_item) {
new (min_value_) T(items_[levels_[0]]);
new (max_value_) T(items_[levels_[0]]);
Expand Down
57 changes: 57 additions & 0 deletions 3rd/datasketches/datasketches/memory_operations.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#ifndef _MEMORY_CHECKS_HPP_
#define _MEMORY_CHECKS_HPP_

#include <memory>
#include <exception>
#include <iostream>

namespace datasketches {

static inline void ensure_minimum_memory(size_t bytes_available, size_t min_needed) {
if (bytes_available < min_needed) {
throw std::out_of_range("Insufficient buffer size detected: bytes available "
+ std::to_string((int) bytes_available) + ", minimum needed " + std::to_string((int) min_needed));
}
}

static inline void check_memory_size(size_t requested_index, size_t capacity) {
if (requested_index > capacity) {
throw std::out_of_range("Attempt to access memory beyond limits: requested index "
+ std::to_string((int) requested_index) + ", capacity " + std::to_string((int) capacity));
}
}

// note: size is in bytes, not items
static inline size_t copy_from_mem(const void* src, void* dst, size_t size) {
memcpy(dst, src, size);
return size;
}

// note: size is in bytes, not items
static inline size_t copy_to_mem(const void* src, void* dst, size_t size) {
memcpy(dst, src, size);
return size;
}

} // namespace

#endif // _MEMORY_CHECKS_HPP_
Loading