diff --git a/3rd/datasketches/datasketches/cpc/cpc_sketch_impl.hpp b/3rd/datasketches/datasketches/cpc/cpc_sketch_impl.hpp
index 82df38022..77f84f349 100644
--- a/3rd/datasketches/datasketches/cpc/cpc_sketch_impl.hpp
+++ b/3rd/datasketches/datasketches/cpc/cpc_sketch_impl.hpp
@@ -588,7 +588,9 @@ cpc_sketch_alloc cpc_sketch_alloc::deserialize(std::istream& is, uint64_t
template
cpc_sketch_alloc cpc_sketch_alloc::deserialize(const void* bytes, size_t size, uint64_t seed) {
+ ensure_minimum_memory(size, 8);
const char* ptr = static_cast(bytes);
+ const char* base = static_cast(bytes);
uint8_t preamble_ints;
ptr += copy_from_mem(ptr, &preamble_ints, sizeof(preamble_ints));
uint8_t serial_version;
@@ -606,6 +608,7 @@ cpc_sketch_alloc cpc_sketch_alloc::deserialize(const void* bytes, size_t s
const bool has_hip = flags_byte & (1 << flags::HAS_HIP);
const bool has_table = flags_byte & (1 << flags::HAS_TABLE);
const bool has_window = flags_byte & (1 << flags::HAS_WINDOW);
+ ensure_minimum_memory(size, preamble_ints << 2);
compressed_state compressed;
compressed.table_data_words = 0;
compressed.table_num_entries = 0;
@@ -614,30 +617,38 @@ cpc_sketch_alloc cpc_sketch_alloc::deserialize(const void* bytes, size_t s
double kxp = 0;
double hip_est_accum = 0;
if (has_table || has_window) {
+ check_memory_size(ptr - base + sizeof(num_coupons), size);
ptr += copy_from_mem(ptr, &num_coupons, sizeof(num_coupons));
if (has_table && has_window) {
+ check_memory_size(ptr - base + sizeof(compressed.table_num_entries), size);
ptr += copy_from_mem(ptr, &compressed.table_num_entries, sizeof(compressed.table_num_entries));
if (has_hip) {
+ check_memory_size(ptr - base + sizeof(kxp) + sizeof(hip_est_accum), size);
ptr += copy_from_mem(ptr, &kxp, sizeof(kxp));
ptr += copy_from_mem(ptr, &hip_est_accum, sizeof(hip_est_accum));
}
}
if (has_table) {
+ check_memory_size(ptr - base + sizeof(compressed.table_data_words), size);
ptr += copy_from_mem(ptr, &compressed.table_data_words, sizeof(compressed.table_data_words));
}
if (has_window) {
+ check_memory_size(ptr - base + sizeof(compressed.window_data_words), size);
ptr += copy_from_mem(ptr, &compressed.window_data_words, sizeof(compressed.window_data_words));
}
if (has_hip && !(has_table && has_window)) {
+ check_memory_size(ptr - base + sizeof(kxp) + sizeof(hip_est_accum), size);
ptr += copy_from_mem(ptr, &kxp, sizeof(kxp));
ptr += copy_from_mem(ptr, &hip_est_accum, sizeof(hip_est_accum));
}
if (has_window) {
compressed.window_data.resize(compressed.window_data_words);
+ check_memory_size(ptr - base + sizeof(compressed.window_data_words) + sizeof(uint32_t), size);
ptr += copy_from_mem(ptr, compressed.window_data.data(), compressed.window_data_words * sizeof(uint32_t));
}
if (has_table) {
compressed.table_data.resize(compressed.table_data_words);
+ check_memory_size(ptr - base + sizeof(compressed.table_data_words), size);
ptr += copy_from_mem(ptr, compressed.table_data.data(), compressed.table_data_words * sizeof(uint32_t));
}
if (!has_window) compressed.table_num_entries = num_coupons;
diff --git a/3rd/datasketches/datasketches/cpc/cpc_union_impl.hpp b/3rd/datasketches/datasketches/cpc/cpc_union_impl.hpp
index e852782cd..65d933c2d 100644
--- a/3rd/datasketches/datasketches/cpc/cpc_union_impl.hpp
+++ b/3rd/datasketches/datasketches/cpc/cpc_union_impl.hpp
@@ -28,12 +28,13 @@ template
cpc_union_alloc::cpc_union_alloc(uint8_t lg_k, uint64_t seed):
lg_k(lg_k),
seed(seed),
-accumulator(new (AllocCpc().allocate(1)) cpc_sketch_alloc(lg_k, seed)),
+accumulator(nullptr),
bit_matrix()
{
if (lg_k < CPC_MIN_LG_K || lg_k > CPC_MAX_LG_K) {
throw std::invalid_argument("lg_k must be >= " + std::to_string(CPC_MIN_LG_K) + " and <= " + std::to_string(CPC_MAX_LG_K) + ": " + std::to_string(lg_k));
}
+ accumulator = new (AllocCpc().allocate(1)) cpc_sketch_alloc(lg_k, seed);
}
template
diff --git a/3rd/datasketches/datasketches/fi/frequent_items_sketch_impl.hpp b/3rd/datasketches/datasketches/fi/frequent_items_sketch_impl.hpp
index 1b7c9e822..40a70f5b9 100644
--- a/3rd/datasketches/datasketches/fi/frequent_items_sketch_impl.hpp
+++ b/3rd/datasketches/datasketches/fi/frequent_items_sketch_impl.hpp
@@ -24,6 +24,8 @@
#include
#include
+#include "memory_operations.hpp"
+
namespace datasketches {
// clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug"
@@ -209,6 +211,7 @@ vector_u8 frequent_items_sketch::serialize(unsigned header_
const size_t size = header_size_bytes + get_serialized_size_bytes();
vector_u8 bytes(size);
uint8_t* ptr = bytes.data() + header_size_bytes;
+ uint8_t* end_ptr = ptr + size;
const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY;
ptr += copy_to_mem(&preamble_longs, ptr, sizeof(uint8_t));
@@ -245,7 +248,8 @@ vector_u8 frequent_items_sketch::serialize(unsigned header_
}
ptr += copy_to_mem(weights, ptr, sizeof(W) * num_items);
AllocW().deallocate(weights, num_items);
- ptr += S().serialize(ptr, items, num_items);
+ const size_t bytes_remaining = end_ptr - ptr;
+ ptr += S().serialize(ptr, bytes_remaining, items, num_items);
for (unsigned i = 0; i < num_items; i++) items[i].~T();
A().deallocate(items, num_items);
}
@@ -276,7 +280,7 @@ frequent_items_sketch frequent_items_sketch:
check_family_id(family_id);
check_size(lg_cur_size, lg_max_size);
- frequent_items_sketch sketch(lg_cur_size, lg_max_size);
+ frequent_items_sketch sketch(lg_max_size, lg_cur_size);
if (!is_empty) {
uint32_t num_items;
is.read((char*)&num_items, sizeof(num_items));
@@ -308,7 +312,9 @@ frequent_items_sketch frequent_items_sketch:
template
frequent_items_sketch frequent_items_sketch::deserialize(const void* bytes, size_t size) {
+ ensure_minimum_memory(size, 8);
const char* ptr = static_cast(bytes);
+ const char* base = static_cast(bytes);
uint8_t preamble_longs;
ptr += copy_from_mem(ptr, &preamble_longs, sizeof(uint8_t));
uint8_t serial_version;
@@ -330,8 +336,9 @@ frequent_items_sketch frequent_items_sketch:
check_serial_version(serial_version);
check_family_id(family_id);
check_size(lg_cur_size, lg_max_size);
+ ensure_minimum_memory(size, 1 << preamble_longs);
- frequent_items_sketch sketch(lg_cur_size, lg_max_size);
+ frequent_items_sketch sketch(lg_max_size, lg_cur_size);
if (!is_empty) {
uint32_t num_items;
ptr += copy_from_mem(ptr, &num_items, sizeof(uint32_t));
@@ -345,9 +352,11 @@ frequent_items_sketch frequent_items_sketch:
// batch deserialization with intermediate array of items and weights
typedef typename std::allocator_traits::template rebind_alloc AllocW;
W* weights = AllocW().allocate(num_items);
+ ensure_minimum_memory(size, ptr - base + (sizeof(W) * num_items));
ptr += copy_from_mem(ptr, weights, sizeof(W) * num_items);
T* items = A().allocate(num_items);
- ptr += S().deserialize(ptr, items, num_items);
+ const size_t bytes_remaining = size - (ptr - base);
+ ptr += S().deserialize(ptr, bytes_remaining, items, num_items);
for (uint32_t i = 0; i < num_items; i++) {
sketch.update(std::move(items[i]), weights[i]);
items[i].~T();
@@ -436,7 +445,7 @@ void frequent_items_sketch::check_weight(WW weight) {
// version for integral unsigned type - no-op
template
template::value && std::is_unsigned::value, int>::type>
-void frequent_items_sketch::check_weight(WW weight) {}
+void frequent_items_sketch::check_weight(WW) {}
// version for floating point type
template
diff --git a/3rd/datasketches/datasketches/fi/reverse_purge_hash_map_impl.hpp b/3rd/datasketches/datasketches/fi/reverse_purge_hash_map_impl.hpp
index 628faff46..732d92341 100644
--- a/3rd/datasketches/datasketches/fi/reverse_purge_hash_map_impl.hpp
+++ b/3rd/datasketches/datasketches/fi/reverse_purge_hash_map_impl.hpp
@@ -25,6 +25,8 @@
#include
#include
+#include "MurmurHash3.h"
+
namespace datasketches {
// clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug"
@@ -86,13 +88,18 @@ reverse_purge_hash_map::~reverse_purge_hash_map() {
const uint32_t size = 1 << lg_cur_size;
if (num_active > 0) {
for (uint32_t i = 0; i < size; i++) {
- if (is_active(i)) keys[i].~K();
- if (--num_active == 0) break;
+ if (is_active(i)) {
+ keys[i].~K();
+ if (--num_active == 0) break;
+ }
}
}
- A().deallocate(keys, size);
- AllocV().deallocate(values, size);
- AllocU16().deallocate(states, size);
+ if (keys != nullptr)
+ A().deallocate(keys, size);
+ if (values != nullptr)
+ AllocV().deallocate(values, size);
+ if (states != nullptr)
+ AllocU16().deallocate(states, size);
}
template
@@ -142,7 +149,7 @@ V reverse_purge_hash_map::adjust_or_insert(K&& key, V value) {
template
V reverse_purge_hash_map::get(const K& key) const {
const uint32_t mask = (1 << lg_cur_size) - 1;
- uint32_t probe = H()(key) & mask;
+ uint32_t probe = fmix64(H()(key)) & mask;
while (is_active(probe)) {
if (E()(keys[probe], key)) return values[probe];
probe = (probe + 1) & mask;
@@ -251,7 +258,7 @@ void reverse_purge_hash_map::hash_delete(uint32_t delete_index) {
template
uint32_t reverse_purge_hash_map::internal_adjust_or_insert(const K& key, V value) {
const uint32_t mask = (1 << lg_cur_size) - 1;
- uint32_t index = H()(key) & mask;
+ uint32_t index = fmix64(H()(key)) & mask;
uint16_t drift = 1;
while (is_active(index)) {
if (E()(keys[index], key)) {
@@ -326,7 +333,7 @@ V reverse_purge_hash_map::purge() {
}
i++;
}
- std::nth_element(&samples[0], &samples[num_samples / 2], &samples[num_samples - 1]);
+ std::nth_element(&samples[0], &samples[num_samples / 2], &samples[num_samples]);
const V median = samples[num_samples / 2];
AllocV().deallocate(samples, limit);
subtract_and_keep_positive_only(median);
diff --git a/3rd/datasketches/datasketches/kll/kll_helper.hpp b/3rd/datasketches/datasketches/kll/kll_helper.hpp
index a13e49be2..4857f51b7 100644
--- a/3rd/datasketches/datasketches/kll/kll_helper.hpp
+++ b/3rd/datasketches/datasketches/kll/kll_helper.hpp
@@ -63,7 +63,7 @@ class kll_helper {
if (std::isnan(values[i])) {
throw std::invalid_argument("Values must not be NaN");
}
- if ((i < (size - 1)) and !(C()(values[i], values[i + 1]))) {
+ if ((i < (size - 1)) && !(C()(values[i], values[i + 1]))) {
throw std::invalid_argument("Values must be unique and monotonically increasing");
}
}
@@ -77,7 +77,7 @@ class kll_helper {
static typename std::enable_if::value, void>::type
validate_values(const T* values, uint32_t size) {
for (uint32_t i = 0; i < size ; i++) {
- if ((i < (size - 1)) and !(C()(values[i], values[i + 1]))) {
+ if ((i < (size - 1)) && !(C()(values[i], values[i + 1]))) {
throw std::invalid_argument("Values must be unique and monotonically increasing");
}
}
diff --git a/3rd/datasketches/datasketches/kll/kll_sketch_impl.hpp b/3rd/datasketches/datasketches/kll/kll_sketch_impl.hpp
index 6a74fa02d..822a4a6b1 100644
--- a/3rd/datasketches/datasketches/kll/kll_sketch_impl.hpp
+++ b/3rd/datasketches/datasketches/kll/kll_sketch_impl.hpp
@@ -23,6 +23,7 @@
#include
#include
+#include "memory_operations.hpp"
#include "kll_helper.hpp"
namespace datasketches {
@@ -417,6 +418,7 @@ vector_u8 kll_sketch::serialize(unsigned header_size_bytes) const
const size_t size = header_size_bytes + get_serialized_size_bytes();
vector_u8 bytes(size);
uint8_t* ptr = bytes.data() + header_size_bytes;
+ uint8_t* end_ptr = ptr + size;
const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
ptr += copy_to_mem(&preamble_ints, ptr, sizeof(preamble_ints));
const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
@@ -440,10 +442,11 @@ vector_u8 kll_sketch::serialize(unsigned header_size_bytes) const
ptr += copy_to_mem(&num_levels_, ptr, sizeof(num_levels_));
ptr += copy_to_mem(&unused, ptr, sizeof(unused));
ptr += copy_to_mem(levels_, ptr, sizeof(levels_[0]) * num_levels_);
- ptr += S().serialize(ptr, min_value_, 1);
- ptr += S().serialize(ptr, max_value_, 1);
+ ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
+ ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1);
}
- ptr += S().serialize(ptr, &items_[levels_[0]], get_num_retained());
+ const size_t bytes_remaining = end_ptr - ptr;
+ ptr += S().serialize(ptr, bytes_remaining, &items_[levels_[0]], get_num_retained());
}
const size_t delta = ptr - bytes.data();
if (delta != size) throw std::logic_error("serialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
@@ -478,6 +481,7 @@ kll_sketch kll_sketch::deserialize(std::istream& is) {
template
kll_sketch kll_sketch::deserialize(const void* bytes, size_t size) {
+ ensure_minimum_memory(size, 8);
const char* ptr = static_cast(bytes);
uint8_t preamble_ints;
ptr += copy_from_mem(ptr, &preamble_ints, sizeof(preamble_ints));
@@ -496,6 +500,7 @@ kll_sketch kll_sketch::deserialize(const void* bytes, si
check_preamble_ints(preamble_ints, flags_byte);
check_serial_version(serial_version);
check_family_id(family_id);
+ ensure_minimum_memory(size, 1 << preamble_ints);
const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
return is_empty ? kll_sketch(k) : kll_sketch(k, flags_byte, bytes, size);
@@ -562,12 +567,14 @@ kll_sketch::kll_sketch(uint16_t k, uint8_t flags_byte, std::istream&
// for deserialization
// the common part of the preamble was read and compatibility checks were done
+// we also assume we have already checked that the preamble information fits within the buffer
template
kll_sketch::kll_sketch(uint16_t k, uint8_t flags_byte, const void* bytes, size_t size) {
k_ = k;
m_ = DEFAULT_M;
const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2
const char* ptr = static_cast(bytes) + DATA_START_SINGLE_ITEM;
+ const char* end_ptr = static_cast(bytes) + size;
if (is_single_item) {
n_ = 1;
min_k_ = k_;
@@ -591,13 +598,13 @@ kll_sketch::kll_sketch(uint16_t k, uint8_t flags_byte, const void* b
min_value_ = A().allocate(1);
max_value_ = A().allocate(1);
if (!is_single_item) {
- ptr += S().deserialize(ptr, min_value_, 1);
- ptr += S().deserialize(ptr, max_value_, 1);
+ ptr += S().deserialize(ptr, end_ptr - ptr, min_value_, 1);
+ ptr += S().deserialize(ptr, end_ptr - ptr, max_value_, 1);
}
items_ = A().allocate(capacity);
items_size_ = capacity;
const auto num_items(levels_[num_levels_] - levels_[0]);
- ptr += S().deserialize(ptr, &items_[levels_[0]], num_items);
+ ptr += S().deserialize(ptr, end_ptr - ptr, &items_[levels_[0]], num_items);
if (is_single_item) {
new (min_value_) T(items_[levels_[0]]);
new (max_value_) T(items_[levels_[0]]);
diff --git a/3rd/datasketches/datasketches/memory_operations.hpp b/3rd/datasketches/datasketches/memory_operations.hpp
new file mode 100644
index 000000000..1cff60f85
--- /dev/null
+++ b/3rd/datasketches/datasketches/memory_operations.hpp
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _MEMORY_CHECKS_HPP_
+#define _MEMORY_CHECKS_HPP_
+
+#include
+#include
+#include
+
+namespace datasketches {
+
+static inline void ensure_minimum_memory(size_t bytes_available, size_t min_needed) {
+ if (bytes_available < min_needed) {
+ throw std::out_of_range("Insufficient buffer size detected: bytes available "
+ + std::to_string((int) bytes_available) + ", minimum needed " + std::to_string((int) min_needed));
+ }
+}
+
+static inline void check_memory_size(size_t requested_index, size_t capacity) {
+ if (requested_index > capacity) {
+ throw std::out_of_range("Attempt to access memory beyond limits: requested index "
+ + std::to_string((int) requested_index) + ", capacity " + std::to_string((int) capacity));
+ }
+}
+
+// note: size is in bytes, not items
+static inline size_t copy_from_mem(const void* src, void* dst, size_t size) {
+ memcpy(dst, src, size);
+ return size;
+}
+
+// note: size is in bytes, not items
+static inline size_t copy_to_mem(const void* src, void* dst, size_t size) {
+ memcpy(dst, src, size);
+ return size;
+}
+
+} // namespace
+
+#endif // _MEMORY_CHECKS_HPP_
diff --git a/3rd/datasketches/datasketches/serde.hpp b/3rd/datasketches/datasketches/serde.hpp
index 4ed6e71cc..6e510863f 100644
--- a/3rd/datasketches/datasketches/serde.hpp
+++ b/3rd/datasketches/datasketches/serde.hpp
@@ -25,6 +25,8 @@
#include
#include
+#include "memory_operations.hpp"
+
namespace datasketches {
// serialize and deserialize
@@ -35,8 +37,8 @@ template struct serde {
// raw bytes serialization
size_t size_of_item(const T& item);
- size_t serialize(void* ptr, const T* items, unsigned num);
- size_t deserialize(const void* ptr, T* items, unsigned num); // items are not initialized
+ size_t serialize(void* ptr, size_t capacity, const T* items, unsigned num);
+ size_t deserialize(const void* ptr, size_t capacity, T* items, unsigned num); // items are not initialized
};
// serde for all fixed-size arithmetic types (int and float of different sizes)
@@ -50,16 +52,20 @@ struct serde::value>::type> {
void deserialize(std::istream& is, T* items, unsigned num) {
is.read((char*)items, sizeof(T) * num);
}
- size_t size_of_item(T item) {
+ size_t size_of_item(const T&) {
return sizeof(T);
}
- size_t serialize(void* ptr, const T* items, unsigned num) {
- memcpy(ptr, items, sizeof(T) * num);
- return sizeof(int32_t) * num;
+ size_t serialize(void* ptr, size_t capacity, const T* items, unsigned num) {
+ const size_t bytes_written = sizeof(T) * num;
+ check_memory_size(bytes_written, capacity);
+ memcpy(ptr, items, bytes_written);
+ return bytes_written;
}
- size_t deserialize(const void* ptr, T* items, unsigned num) {
- memcpy(items, ptr, sizeof(T) * num);
- return sizeof(T) * num;
+ size_t deserialize(const void* ptr, size_t capacity, T* items, unsigned num) {
+ const size_t bytes_read = sizeof(T) * num;
+ check_memory_size(bytes_read, capacity);
+ memcpy(items, ptr, bytes_read);
+ return bytes_read;
}
};
@@ -93,42 +99,38 @@ struct serde {
size_t size_of_item(const std::string& item) {
return sizeof(uint32_t) + item.size();
}
- size_t serialize(void* ptr, const std::string* items, unsigned num) {
- size_t size = sizeof(uint32_t) * num;
+ size_t serialize(void* ptr, size_t capacity, const std::string* items, unsigned num) {
+ size_t bytes_written = 0;
for (unsigned i = 0; i < num; i++) {
- uint32_t length = items[i].size();
+ const uint32_t length = items[i].size();
+ const size_t new_bytes = length + sizeof(length);
+ check_memory_size(bytes_written + new_bytes, capacity);
memcpy(ptr, &length, sizeof(length));
ptr = static_cast(ptr) + sizeof(uint32_t);
memcpy(ptr, items[i].c_str(), length);
ptr = static_cast(ptr) + length;
- size += length;
+ bytes_written += new_bytes;
}
- return size;
+ return bytes_written;
}
- size_t deserialize(const void* ptr, std::string* items, unsigned num) {
- size_t size = sizeof(uint32_t) * num;
+ size_t deserialize(const void* ptr, size_t capacity, std::string* items, unsigned num) {
+ size_t bytes_read = 0;
for (unsigned i = 0; i < num; i++) {
uint32_t length;
+ check_memory_size(bytes_read + sizeof(length), capacity);
memcpy(&length, ptr, sizeof(length));
ptr = static_cast(ptr) + sizeof(uint32_t);
+ bytes_read += sizeof(length);
+
+ check_memory_size(bytes_read + length, capacity);
new (&items[i]) std::string(static_cast(ptr), length);
ptr = static_cast(ptr) + length;
- size += length;
+ bytes_read += length;
}
- return size;
+ return bytes_read;
}
};
-static inline size_t copy_from_mem(const void* src, void* dst, size_t size) {
- memcpy(dst, src, size);
- return size;
-}
-
-static inline size_t copy_to_mem(const void* src, void* dst, size_t size) {
- memcpy(dst, src, size);
- return size;
-}
-
} /* namespace datasketches */
# endif
diff --git a/3rd/rng/randutils.hpp b/3rd/rng/randutils.hpp
index d94122f5f..19a6db846 100644
--- a/3rd/rng/randutils.hpp
+++ b/3rd/rng/randutils.hpp
@@ -486,7 +486,7 @@ class auto_seeded : public SeedSeq {
// The address of the time function. It should hopefully be in
// a system library that hopefully isn't always in the same place
// (might not change until system is rebooted though)
- auto time_func = hash(&std::chrono::high_resolution_clock::now);
+ //auto time_func = hash(&std::chrono::high_resolution_clock::now);
// The address of the exit function. It should hopefully be in
// a system library that hopefully isn't always in the same place
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 7144a6a6d..404344130 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -4,7 +4,7 @@ project(pktvisor3)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake")
# this is the source of truth for version, which will be written to config.h include file.
-project(pktvisord VERSION 3.0.5)
+project(pktvisord VERSION 3.0.6)
set(PKTVISOR_VERSION_NUM ${PROJECT_VERSION})
set(PKTVISOR_VERSION "pktvisor ${PROJECT_VERSION}")
@@ -90,6 +90,7 @@ add_executable(unit-tests
tests/test_parse_pcap.cpp
tests/test_sketches.cpp
tests/test_utils.cpp
+ tests/test_metrics.cpp
)
if (MMDB_ENABLE)
diff --git a/Dockerfile b/Dockerfile
index 7cbc09d47..ba9150a18 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -29,6 +29,7 @@ RUN \
cd /tmp/build && \
go get github.com/pkg/errors && \
go get github.com/jroimartin/gocui && \
+ go get github.com/docopt/docopt-go && \
go build /src/cmd/pktvisor/pktvisor.go
FROM ubuntu:disco AS runtime
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 000000000..f49a4e16e
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
\ No newline at end of file
diff --git a/README.md b/README.md
index 5cd54fc76..cf0829c06 100644
--- a/README.md
+++ b/README.md
@@ -9,7 +9,7 @@
pktvisord (-h | --help)
pktvisord --version
- pktvisord will summarize your packet streams.
+ pktvisord summarizes your data streams.
TARGET is either a network interface, an IP address (4 or 6) or a pcap file (ending in .pcap or .cap)
@@ -30,11 +30,12 @@
```
-Running the server:
+pktvisor summarizes input data streams, and provides a clean, time-windowed HTTP interface to the results.
-`docker run --rm --net=host -d nsone/pktvisor3:latest pktvisord -b 'port 53' enp3s0`
-Running the console UI:
+Running the server from Docker:
+`docker run --rm --net=host -d --mount type=bind,source=/opt/geo,target=/geo --entrypoint '/usr/local/sbin/pktvisord' ns1/pktvisor:latest --geo-city /geo/GeoIP2-City.mmdb --geo-asn /geo/GeoIP2-ISP.mmdb -H 192.168.0.54/32,127.0.0.1/32 any`
-`docker -D run --rm --net=host -ti nsone/pktvisor3:latest pktvisor`
+Running the console UI from Docker:
+`docker run -it --rm --net=host --entrypoint '/bin/bash' ns1/pktvisor:latest -c "sleep 1;pktvisor"`
diff --git a/cmd/pktvisor/pktvisor.go b/cmd/pktvisor/pktvisor.go
index 2ef3a39c4..40a629d59 100644
--- a/cmd/pktvisor/pktvisor.go
+++ b/cmd/pktvisor/pktvisor.go
@@ -3,13 +3,13 @@ package main
import (
"encoding/json"
"fmt"
+ "github.com/docopt/docopt-go"
"github.com/pkg/errors"
"io/ioutil"
"log"
"strconv"
"net/http"
- //"sort"
"time"
"github.com/jroimartin/gocui"
@@ -105,15 +105,16 @@ type StatSnapshot struct {
DstIpsOut int64 `json:"dst_ips_out"`
SrcIpsIn int64 `json:"src_ips_in"`
} `json:"cardinality"`
- Ipv4 int64 `json:"ipv4"`
- Ipv6 int64 `json:"ipv6"`
- Tcp int64 `json:"tcp"`
- Total int64 `json:"total"`
- Udp int64 `json:"udp"`
- In int64 `json:"in"`
- Out int64 `json:"out"`
- Other_L4 int64 `json:"other_l4"`
- Rates struct {
+ Ipv4 int64 `json:"ipv4"`
+ Ipv6 int64 `json:"ipv6"`
+ Tcp int64 `json:"tcp"`
+ Total int64 `json:"total"`
+ Udp int64 `json:"udp"`
+ In int64 `json:"in"`
+ Out int64 `json:"out"`
+ OtherL4 int64 `json:"other_l4"`
+ DeepSamples int64 `json:"deep_samples"`
+ Rates struct {
Pps_in struct {
P50 int64 `json:"p50"`
P90 int64 `json:"p90"`
@@ -139,8 +140,20 @@ type StatSnapshot struct {
}
func main() {
- // TODO get port from command line
- // statPort =
+ usage := `pktvisor v3 UI
+
+ Usage:
+ pktvisor [-p PORT] [-H HOST]
+ pktvisor (-h | --help)
+
+ Options:
+ -p PORT Query pktvisord metrics webserver on the given port [default: 10853]
+ -H HOST Query pktvisord metrics webserver on the given host [default: localhost]
+ -h --help Show this screen
+`
+ opts, err := docopt.ParseDoc(usage)
+ statPort, _ = opts.Int("-p")
+ statHost, _ = opts.String("-H")
g, err := gocui.NewGui(gocui.OutputNormal)
if err != nil {
log.Panicln(err)
@@ -166,14 +179,14 @@ func updateHeader(v *gocui.View, rates *InstantRates, stats *StatSnapshot) {
pcounts := stats.Packets
// there may be some unknown
inOutDiff := pcounts.Total - (pcounts.In + pcounts.Out)
- _, _ = fmt.Fprintf(v, "Pkts %d | UDP %d (%3.1f%%) | TCP %d (%3.1f%%) | Other %d (%3.1f%%) | IPv4 %d (%3.1f%%) | IPv6 %d (%3.1f%%) | In %d (%3.1f%%) | Out %d (%3.1f%%)\n",
+ _, _ = fmt.Fprintf(v, "Pkts %d | UDP %d (%3.1f%%) | TCP %d (%3.1f%%) | Other %d (%3.1f%%) | IPv4 %d (%3.1f%%) | IPv6 %d (%3.1f%%) | In %d (%3.1f%%) | Out %d (%3.1f%%) | Deep Samples %d (%3.1f%%)\n",
pcounts.Total,
pcounts.Udp,
(float64(pcounts.Udp)/float64(pcounts.Total))*100,
pcounts.Tcp,
(float64(pcounts.Tcp)/float64(pcounts.Total))*100,
- pcounts.Other_L4,
- (float64(pcounts.Other_L4)/float64(pcounts.Total))*100,
+ pcounts.OtherL4,
+ (float64(pcounts.OtherL4)/float64(pcounts.Total))*100,
pcounts.Ipv4,
(float64(pcounts.Ipv4)/float64(pcounts.Total))*100,
pcounts.Ipv6,
@@ -182,6 +195,8 @@ func updateHeader(v *gocui.View, rates *InstantRates, stats *StatSnapshot) {
(float64(pcounts.In)/float64(pcounts.Total-inOutDiff))*100,
pcounts.Out,
(float64(pcounts.Out)/float64(pcounts.Total-inOutDiff))*100,
+ pcounts.DeepSamples,
+ (float64(pcounts.DeepSamples)/float64(pcounts.Total))*100,
)
_, _ = fmt.Fprintf(v, "Pkt Rates In %d/s %d/%d/%d/%d pps | Out %d/s %d/%d/%d/%d pps | IP Card. In: %d | Out: %d\n\n",
rates.Packets.In,
@@ -254,16 +269,13 @@ func updateTable(data []NameCount, v *gocui.View, baseNumber int64) {
top3 := 0
for _, stat := range data {
w, _ := v.Size()
- w = w - 7
- fmtstr := ""
+ numStr := ""
if baseNumber > 0 && top3 < 3 {
- w = w - 8
- fmtstr = "%-" + strconv.Itoa(w) + "." + strconv.Itoa(w) + "s %5d (%4.1f%%)\n"
- fmt.Fprintf(v, fmtstr, stat.Name, stat.Estimate, float64(stat.Estimate)/float64(baseNumber)*100)
+ numStr = fmt.Sprintf("%d (%4.1f%%)", stat.Estimate, float64(stat.Estimate)/float64(baseNumber)*100)
} else {
- fmtstr = "%-" + strconv.Itoa(w) + "." + strconv.Itoa(w) + "s %5d\n"
- fmt.Fprintf(v, fmtstr, stat.Name, stat.Estimate)
+ numStr = fmt.Sprintf("%d", stat.Estimate)
}
+ fmt.Fprintf(v, "%-" + strconv.Itoa(w - len(numStr) - 1) + "s %s\n", stat.Name, numStr)
top3++
}
}
@@ -594,75 +606,80 @@ func updateViews(g *gocui.Gui) {
if err != nil {
return err
}
- updateTable(stats.Packets.TopIpv4, v, stats.Packets.Total)
+ updateTable(stats.Packets.TopIpv4, v, stats.Packets.DeepSamples)
v, err = g.View("top_ipv6")
if err != nil {
return err
}
- updateTable(stats.Packets.TopIpv6, v, stats.Packets.Total)
+ updateTable(stats.Packets.TopIpv6, v, stats.Packets.DeepSamples)
v, err = g.View("top_geo")
if err != nil {
return err
}
- updateTable(stats.Packets.TopGeoLoc, v, stats.Packets.Total)
+ updateTable(stats.Packets.TopGeoLoc, v, stats.Packets.DeepSamples)
v, err = g.View("top_asn")
if err != nil {
return err
}
- updateTable(stats.Packets.TopASN, v, stats.Packets.Total)
+ updateTable(stats.Packets.TopASN, v, stats.Packets.DeepSamples)
}
currentView = "dns"
if currentView == "dns" {
+ // we need to figure in the current sampling rate
+ sampleRate := float64(stats.Packets.DeepSamples) / float64(stats.Packets.Total)
+ wireSample := int64(float64(stats.DNS.WirePackets.Total) * sampleRate)
+ replySample := int64(float64(stats.DNS.WirePackets.Replies) * sampleRate)
+ xactSample := int64(float64(stats.DNS.Xact.Counts.Total) * sampleRate)
v, err = g.View("qname2")
if err != nil {
return err
}
- updateTable(stats.DNS.TopQname2, v, stats.DNS.WirePackets.Total)
+ updateTable(stats.DNS.TopQname2, v, wireSample)
v, err = g.View("qname3")
if err != nil {
return err
}
- updateTable(stats.DNS.TopQname3, v, stats.DNS.WirePackets.Total)
+ updateTable(stats.DNS.TopQname3, v, wireSample)
v, err = g.View("nx")
if err != nil {
return err
}
- updateTable(stats.DNS.TopNX, v, stats.DNS.WirePackets.Replies)
+ updateTable(stats.DNS.TopNX, v, replySample)
v, err = g.View("rcode")
if err != nil {
return err
}
- updateTable(stats.DNS.TopRcode, v, stats.DNS.WirePackets.Replies)
+ updateTable(stats.DNS.TopRcode, v, replySample)
v, err = g.View("srvfail")
if err != nil {
return err
}
- updateTable(stats.DNS.TopSRVFAIL, v, stats.DNS.WirePackets.Replies)
+ updateTable(stats.DNS.TopSRVFAIL, v, replySample)
v, err = g.View("refused")
if err != nil {
return err
}
- updateTable(stats.DNS.TopREFUSED, v, stats.DNS.WirePackets.Replies)
+ updateTable(stats.DNS.TopREFUSED, v, replySample)
v, err = g.View("qtype")
if err != nil {
return err
}
- updateTable(stats.DNS.TopQtype, v, stats.DNS.WirePackets.Total)
+ updateTable(stats.DNS.TopQtype, v, wireSample)
v, err = g.View("top_udp_ports")
if err != nil {
return err
}
- updateTable(stats.DNS.TopUDPPorts, v, stats.DNS.WirePackets.Total)
+ updateTable(stats.DNS.TopUDPPorts, v, wireSample)
v, err = g.View("slow_in")
if err != nil {
return err
}
- updateTable(stats.DNS.Xact.In.TopSlow, v, stats.DNS.Xact.Counts.Total)
+ updateTable(stats.DNS.Xact.In.TopSlow, v, xactSample)
v, err = g.View("slow_out")
if err != nil {
return err
}
- updateTable(stats.DNS.Xact.Out.TopSlow, v, stats.DNS.Xact.Counts.Total)
+ updateTable(stats.DNS.Xact.Out.TopSlow, v, xactSample)
}
return nil
})
diff --git a/src/main.cpp b/src/main.cpp
index c37cf7c3d..781a157c6 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -28,7 +28,7 @@ static const char USAGE[] =
pktvisord (-h | --help)
pktvisord --version
- pktvisord will summarize your packet streams.
+ pktvisord summarizes your data streams.
TARGET is either a network interface, an IP address (4 or 6) or a pcap file (ending in .pcap or .cap)
diff --git a/src/metrics.h b/src/metrics.h
index fc474f45a..cd45cbd7d 100644
--- a/src/metrics.h
+++ b/src/metrics.h
@@ -108,19 +108,6 @@ class Rate
};
-struct IPv4Hash {
- size_t operator()(long long key) {
- // This hash function is taken from the internals of Austin Appleby's MurmurHash3 algorithm
- // see https://github.com/apache/incubator-datasketches-characterization/blob/master/cpp/src/frequent_items_sketch_timing_profile.cpp
- key ^= key >> 33;
- key *= 0xff51afd7ed558ccdL;
- key ^= key >> 33;
- key *= 0xc4ceb9fe1a85ec53L;
- key ^= key >> 33;
- return key;
- }
-};
-
struct Sketches {
//
@@ -147,7 +134,7 @@ struct Sketches {
datasketches::frequent_items_sketch _dns_topREFUSED;
datasketches::frequent_items_sketch _dns_topSRVFAIL;
datasketches::frequent_items_sketch _dns_topUDPPort;
- datasketches::frequent_items_sketch _net_topIPv4;
+ datasketches::frequent_items_sketch _net_topIPv4;
datasketches::frequent_items_sketch _net_topIPv6; // TODO not very efficient, should switch to 16 byte uint
datasketches::frequent_items_sketch _dns_topQType;
datasketches::frequent_items_sketch _dns_topRCode;
diff --git a/tests/test_metrics.cpp b/tests/test_metrics.cpp
new file mode 100644
index 000000000..beb900982
--- /dev/null
+++ b/tests/test_metrics.cpp
@@ -0,0 +1,27 @@
+#include
+#include
+#include
+#include
+
+#include "metrics.h"
+
+TEST_CASE("metrics", "[metrics]")
+{
+
+ SECTION("random qnames")
+ {
+ pktvisor::MetricsMgr mm(false, 5, 100);
+ mm.setInitialShiftTS();
+
+ for (int i = 0; i < 9; i++) {
+ for (int k = 0; k < 11; k++) {
+ pcpp::DnsLayer dns;
+ dns.getDnsHeader()->queryOrResponse = pktvisor::query;
+ std::stringstream name;
+ name << "0000" << k << ".0000" << i << ".com";
+ dns.addQuery(name.str(), pcpp::DnsType::DNS_TYPE_A, pcpp::DnsClass::DNS_CLASS_IN);
+ mm.newDNSPacket(&dns, pktvisor::Direction::toHost, pcpp::IPv4, pcpp::UDP);
+ }
+ }
+ }
+}