From 361f546fded2ed899630b68b58d6113777a9a9f6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lisen=20=E6=9D=A8?=
Date: Tue, 30 Jan 2024 14:36:39 +0800
Subject: [PATCH 1/5] support tracking bucket changes in two level hash table
---
src/Common/HashTable/TimeBucketHashMap.h | 14 ++++
src/Common/HashTable/TimeBucketHashTable.h | 57 ++++++++++++++++
src/Common/HashTable/TwoLevelHashMap.h | 14 ++++
src/Common/HashTable/TwoLevelHashTable.h | 46 +++++++++++++
src/Common/HashTable/TwoLevelStringHashMap.h | 14 ++++
.../HashTable/TwoLevelStringHashTable.h | 66 +++++++++++++++++++
6 files changed, 211 insertions(+)
diff --git a/src/Common/HashTable/TimeBucketHashMap.h b/src/Common/HashTable/TimeBucketHashMap.h
index 172d1d1c192..685ede30af4 100644
--- a/src/Common/HashTable/TimeBucketHashMap.h
+++ b/src/Common/HashTable/TimeBucketHashMap.h
@@ -33,6 +33,20 @@ class TimeBucketHashMapTable
p.second.forEachValue(func);
}
+ template
+ void ALWAYS_INLINE forEachValueOfUpdatedBuckets(Func && func, bool reset_updated = false)
+ {
+ for (auto & p : this->impls)
+ {
+ if (this->isUpdatedBucket(p.first))
+ {
+ p.second.forEachValue(func);
+ if (reset_updated)
+ this->resetUpdated(p.first);
+ }
+ }
+ }
+
typename Cell::Mapped & ALWAYS_INLINE operator[](const Key & x)
{
LookupResult it;
diff --git a/src/Common/HashTable/TimeBucketHashTable.h b/src/Common/HashTable/TimeBucketHashTable.h
index 77c1cdbe8aa..9bff2271aa3 100644
--- a/src/Common/HashTable/TimeBucketHashTable.h
+++ b/src/Common/HashTable/TimeBucketHashTable.h
@@ -108,7 +108,9 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
using ConstLookupResult = typename Impl::ConstLookupResult;
/// FIXME, choose a better perf data structure
+ /// Usually we don't have too many time buckets
std::map impls;
+ std::unordered_map bucket_updated_flags;
Impl sentinel;
TimeBucketHashTable() { }
@@ -263,6 +265,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
{
auto window = windowKey(key_holder);
impls[window].emplace(key_holder, it, inserted, hash_value);
+ bucket_updated_flags[window] = true; /// updated
}
LookupResult ALWAYS_INLINE find(Key x, size_t hash_value)
@@ -289,6 +292,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
{
DB::writeIntBinary(p.first);
p.second.write(wb);
+ DB::writeBoolText(bucket_updated_flags[p.first], wb);
}
}
@@ -309,7 +313,12 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
/// Write key and key-value separator
DB::writeIntText(p.first, wb);
DB::writeChar(KEY_VALUE_SEPARATOR, wb);
+ ///
+ DB::writeChar('<', wb);
p.second.writeText(wb);
+ DB::writeChar(',', wb);
+ DB::writeBoolText(bucket_updated_flags[p.first], wb);
+ DB::writeChar('>', wb);
}
DB::writeChar(END_BUCKET_MARKER, wb);
}
@@ -327,6 +336,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
assert(key != 0);
assert(!impls.contains(key));
impls[key].read(rb);
+ DB::readBoolText(bucket_updated_flags[key], rb);
}
}
@@ -349,7 +359,12 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
assert(key != 0);
assert(!impls.contains(key));
+ ///
+ DB::assertChar('<', rb);
impls[key].readText(rb);
+ DB::assertChar(',', rb);
+ DB::readBoolText(bucket_updated_flags[key], rb);
+ DB::assertChar('>', rb);
}
DB::assertChar(END_BUCKET_MARKER, rb);
}
@@ -402,6 +417,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
last_removed_watermark = it->first;
++removed;
+ bucket_updated_flags.erase(it->first);
it = impls.erase(it);
}
else
@@ -438,4 +454,45 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
return buckets;
}
+
+ bool isUpdatedBucket(Int64 bucket_) const
+ {
+ auto it = bucket_updated_flags.find(bucket_);
+ if (it != bucket_updated_flags.end())
+ return it->second;
+
+ return false;
+ }
+
+ void resetUpdated(Int64 bucket_)
+ {
+ auto it = bucket_updated_flags.find(bucket_);
+ if (it != bucket_updated_flags.end())
+ it->second = false;
+ }
+
+ void writeBucketUpdatedFlags(DB::WriteBuffer & wb) const
+ {
+ DB::writeVarUInt(bucket_updated_flags.size(), wb);
+ for (const auto & [bucket, updated] : bucket_updated_flags)
+ {
+ DB::writeIntBinary(bucket, wb);
+ DB::writeBoolText(updated, wb);
+ }
+ }
+
+ void readBucketUpdatedFlags(DB::ReadBuffer & rb)
+ {
+ size_t size = 0;
+ DB::readVarUInt(size, rb);
+ bucket_updated_flags.clear();
+ Int64 bucket = 0;
+ bool updated = false;
+ for (size_t i = 0; i < size; ++i)
+ {
+ DB::readIntBinary(bucket, rb);
+ DB::readBoolText(updated, rb);
+ bucket_updated_flags.emplace(bucket, updated);
+ }
+ }
};
diff --git a/src/Common/HashTable/TwoLevelHashMap.h b/src/Common/HashTable/TwoLevelHashMap.h
index 3e618ca0a50..5c87d5e6eb0 100644
--- a/src/Common/HashTable/TwoLevelHashMap.h
+++ b/src/Common/HashTable/TwoLevelHashMap.h
@@ -38,6 +38,20 @@ class TwoLevelHashMapTable : public TwoLevelHashTableimpls[i].forEachValue(func);
}
+ template
+ void ALWAYS_INLINE forEachValueOfUpdatedBuckets(Func && func, bool reset_updated = false)
+ {
+ for (auto i = 0u; i < this->NUM_BUCKETS; ++i)
+ {
+ if (this->isUpdatedBucket(i))
+ {
+ this->impls[i].forEachValue(func);
+ if (reset_updated)
+ this->resetUpdated(i);
+ }
+ }
+ }
+
template
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{
diff --git a/src/Common/HashTable/TwoLevelHashTable.h b/src/Common/HashTable/TwoLevelHashTable.h
index 7e865cb48da..4dd13e6e7e4 100644
--- a/src/Common/HashTable/TwoLevelHashTable.h
+++ b/src/Common/HashTable/TwoLevelHashTable.h
@@ -90,6 +90,7 @@ class TwoLevelHashTable :
using ConstLookupResult = typename Impl::ConstLookupResult;
Impl impls[NUM_BUCKETS];
+ bool bucket_updated_flags[NUM_BUCKETS] = {false};
TwoLevelHashTable() = default;
@@ -119,6 +120,7 @@ class TwoLevelHashTable :
size_t hash_value = cell->getHash(src);
size_t buck = getBucketFromHash(hash_value);
impls[buck].insertUniqueNonZero(cell, hash_value);
+ bucket_updated_flags[buck] = true;
}
}
@@ -271,6 +273,7 @@ class TwoLevelHashTable :
{
size_t buck = getBucketFromHash(hash_value);
impls[buck].emplace(key_holder, it, inserted, hash_value);
+ bucket_updated_flags[buck] = true;
}
LookupResult ALWAYS_INLINE find(Key x, size_t hash_value)
@@ -292,7 +295,10 @@ class TwoLevelHashTable :
void write(DB::WriteBuffer & wb) const
{
for (UInt32 i = 0; i < NUM_BUCKETS; ++i)
+ {
impls[i].write(wb);
+ DB::writeBoolText(bucket_updated_flags[i], wb);
+ }
}
void writeText(DB::WriteBuffer & wb) const
@@ -301,14 +307,22 @@ class TwoLevelHashTable :
{
if (i != 0)
DB::writeChar(',', wb);
+ ///
+ DB::writeChar('<', wb);
impls[i].writeText(wb);
+ DB::writeChar(',', wb);
+ DB::writeBoolText(bucket_updated_flags[i], wb);
+ DB::writeChar('>', wb);
}
}
void read(DB::ReadBuffer & rb)
{
for (UInt32 i = 0; i < NUM_BUCKETS; ++i)
+ {
impls[i].read(rb);
+ DB::readBoolText(bucket_updated_flags[i], rb);
+ }
}
void readText(DB::ReadBuffer & rb)
@@ -317,7 +331,13 @@ class TwoLevelHashTable :
{
if (i != 0)
DB::assertChar(',', rb);
+
+ ///
+ DB::assertChar('<', rb);
impls[i].readText(rb);
+ DB::assertChar(',', rb);
+ DB::readBoolText(bucket_updated_flags[i], rb);
+ DB::assertChar('>', rb);
}
}
@@ -365,5 +385,31 @@ class TwoLevelHashTable :
std::iota(bucket_ids.begin(), bucket_ids.end(), 0);
return bucket_ids;
}
+
+ bool isUpdatedBucket(Int64 bucket_) const
+ {
+ return bucket_updated_flags[bucket_];
+ }
+
+ void resetUpdated(Int64 bucket_)
+ {
+ bucket_updated_flags[bucket_] = false;
+ }
+
+ void writeBucketUpdatedFlags(DB::WriteBuffer & wb) const
+ {
+ DB::writeVarUInt(NUM_BUCKETS, wb);
+ for (const auto & elem : bucket_updated_flags)
+ DB::writeBoolText(elem, wb);
+ }
+
+ void readBucketUpdatedFlags(DB::ReadBuffer & rb)
+ {
+ size_t size = 0;
+ DB::readVarUInt(size, rb);
+ assert(size == NUM_BUCKETS);
+ for (auto & elem : bucket_updated_flags)
+ DB::readBoolText(elem, rb);
+ }
/// proton : ends
};
diff --git a/src/Common/HashTable/TwoLevelStringHashMap.h b/src/Common/HashTable/TwoLevelStringHashMap.h
index a351543edb0..9f2c5ba00d3 100644
--- a/src/Common/HashTable/TwoLevelStringHashMap.h
+++ b/src/Common/HashTable/TwoLevelStringHashMap.h
@@ -29,6 +29,20 @@ class TwoLevelStringHashMap : public TwoLevelStringHashTableimpls[i].forEachValue(func);
}
+ template
+ void ALWAYS_INLINE forEachValueOfUpdatedBuckets(Func && func, bool reset_updated = false)
+ {
+ for (auto i = 0u; i < this->NUM_BUCKETS; ++i)
+ {
+ if (this->isUpdatedBucket(i))
+ {
+ this->impls[i].forEachValue(func);
+ if (reset_updated)
+ this->resetUpdated(i);
+ }
+ }
+ }
+
template
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{
diff --git a/src/Common/HashTable/TwoLevelStringHashTable.h b/src/Common/HashTable/TwoLevelStringHashTable.h
index e0485f5aaa6..e74ae676143 100644
--- a/src/Common/HashTable/TwoLevelStringHashTable.h
+++ b/src/Common/HashTable/TwoLevelStringHashTable.h
@@ -39,6 +39,7 @@ class TwoLevelStringHashTable : private boost::noncopyable
using ConstLookupResult = typename Impl::ConstLookupResult;
Impl impls[NUM_BUCKETS];
+ bool bucket_updated_flags[NUM_BUCKETS] = {false};
TwoLevelStringHashTable() {}
@@ -53,24 +54,28 @@ class TwoLevelStringHashTable : private boost::noncopyable
size_t hash_value = v.getHash(src.m1);
size_t buck = getBucketFromHash(hash_value);
impls[buck].m1.insertUniqueNonZero(&v, hash_value);
+ bucket_updated_flags[buck] = true;
}
for (auto & v : src.m2)
{
size_t hash_value = v.getHash(src.m2);
size_t buck = getBucketFromHash(hash_value);
impls[buck].m2.insertUniqueNonZero(&v, hash_value);
+ bucket_updated_flags[buck] = true;
}
for (auto & v : src.m3)
{
size_t hash_value = v.getHash(src.m3);
size_t buck = getBucketFromHash(hash_value);
impls[buck].m3.insertUniqueNonZero(&v, hash_value);
+ bucket_updated_flags[buck] = true;
}
for (auto & v : src.ms)
{
size_t hash_value = v.getHash(src.ms);
size_t buck = getBucketFromHash(hash_value);
impls[buck].ms.insertUniqueNonZero(&v, hash_value);
+ bucket_updated_flags[buck] = true;
}
}
@@ -84,6 +89,9 @@ class TwoLevelStringHashTable : private boost::noncopyable
const size_t sz = x.size;
if (sz == 0)
{
+ if constexpr (std::is_same_v)
+ self.bucket_updated_flags[0] = true;
+
keyHolderDiscardKey(key_holder);
return func(self.impls[0].m0, VoidKey{}, 0);
}
@@ -94,6 +102,9 @@ class TwoLevelStringHashTable : private boost::noncopyable
// string keys. Put them to the generic table.
auto res = hash(x);
auto buck = getBucketFromHash(res);
+ if constexpr (std::is_same_v)
+ self.bucket_updated_flags[buck] = true;
+
return func(self.impls[buck].ms, std::forward(key_holder),
res);
}
@@ -126,6 +137,9 @@ class TwoLevelStringHashTable : private boost::noncopyable
}
auto res = hash(k8);
auto buck = getBucketFromHash(res);
+ if constexpr (std::is_same_v)
+ self.bucket_updated_flags[buck] = true;
+
keyHolderDiscardKey(key_holder);
return func(self.impls[buck].m1, k8, res);
}
@@ -137,6 +151,9 @@ class TwoLevelStringHashTable : private boost::noncopyable
n[1] >>= s;
auto res = hash(k16);
auto buck = getBucketFromHash(res);
+ if constexpr (std::is_same_v)
+ self.bucket_updated_flags[buck] = true;
+
keyHolderDiscardKey(key_holder);
return func(self.impls[buck].m2, k16, res);
}
@@ -148,6 +165,9 @@ class TwoLevelStringHashTable : private boost::noncopyable
n[2] >>= s;
auto res = hash(k24);
auto buck = getBucketFromHash(res);
+ if constexpr (std::is_same_v)
+ self.bucket_updated_flags[buck] = true;
+
keyHolderDiscardKey(key_holder);
return func(self.impls[buck].m3, k24, res);
}
@@ -155,6 +175,9 @@ class TwoLevelStringHashTable : private boost::noncopyable
{
auto res = hash(x);
auto buck = getBucketFromHash(res);
+ if constexpr (std::is_same_v)
+ self.bucket_updated_flags[buck] = true;
+
return func(self.impls[buck].ms, std::forward(key_holder), res);
}
}
@@ -179,7 +202,10 @@ class TwoLevelStringHashTable : private boost::noncopyable
void write(DB::WriteBuffer & wb) const
{
for (UInt32 i = 0; i < NUM_BUCKETS; ++i)
+ {
impls[i].write(wb);
+ DB::writeBoolText(bucket_updated_flags[i], wb);
+ }
}
void writeText(DB::WriteBuffer & wb) const
@@ -188,14 +214,22 @@ class TwoLevelStringHashTable : private boost::noncopyable
{
if (i != 0)
DB::writeChar(',', wb);
+ ///
+ DB::writeChar('<', wb);
impls[i].writeText(wb);
+ DB::writeChar(',', wb);
+ DB::writeBoolText(bucket_updated_flags[i], wb);
+ DB::writeChar('>', wb);
}
}
void read(DB::ReadBuffer & rb)
{
for (UInt32 i = 0; i < NUM_BUCKETS; ++i)
+ {
impls[i].read(rb);
+ DB::readBoolText(bucket_updated_flags[i], rb);
+ }
}
void readText(DB::ReadBuffer & rb)
@@ -205,6 +239,12 @@ class TwoLevelStringHashTable : private boost::noncopyable
if (i != 0)
DB::assertChar(',', rb);
impls[i].readText(rb);
+ ///
+ DB::assertChar('<', rb);
+ impls[i].readText(rb);
+ DB::assertChar(',', rb);
+ DB::readBoolText(bucket_updated_flags[i], rb);
+ DB::assertChar('>', rb);
}
}
@@ -252,4 +292,30 @@ class TwoLevelStringHashTable : private boost::noncopyable
std::iota(bucket_ids.begin(), bucket_ids.end(), 0);
return bucket_ids;
}
+
+ bool isUpdatedBucket(Int64 bucket_) const
+ {
+ return bucket_updated_flags[bucket_];
+ }
+
+ void resetUpdated(Int64 bucket_)
+ {
+ bucket_updated_flags[bucket_] = false;
+ }
+
+ void writeBucketUpdatedFlags(DB::WriteBuffer & wb) const
+ {
+ DB::writeVarUInt(NUM_BUCKETS, wb);
+ for (const auto & elem : bucket_updated_flags)
+ DB::writeBoolText(elem, wb);
+ }
+
+ void readBucketUpdatedFlags(DB::ReadBuffer & rb)
+ {
+ size_t size = 0;
+ DB::readVarUInt(size, rb);
+ assert(size == NUM_BUCKETS);
+ for (auto & elem : bucket_updated_flags)
+ DB::readBoolText(elem, rb);
+ }
};
From a298a8e166c201069fa9decd3d345606ae1dd3a6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lisen=20=E6=9D=A8?=
Date: Tue, 30 Jan 2024 14:44:53 +0800
Subject: [PATCH 2/5] add expanded data in aggregate state to track updated and
retracted
---
cmake/autogenerated_versions.txt | 2 +-
src/Common/HashMapsTemplate.h | 26 +-
src/Common/serde.h | 21 +
src/Interpreters/InterpreterSelectQuery.cpp | 3 +-
src/Interpreters/Streaming/AggregateDataEx.h | 124 +
.../Streaming/AggregationUtils.cpp | 113 +
src/Interpreters/Streaming/AggregationUtils.h | 27 +
src/Interpreters/Streaming/Aggregator.cpp | 2525 ++++++++---------
src/Interpreters/Streaming/Aggregator.h | 236 +-
.../Streaming/AggregatingHelper.cpp | 73 +-
.../Transforms/Streaming/AggregatingHelper.h | 11 +-
.../Streaming/AggregatingTransform.cpp | 4 +-
.../AggregatingTransformWithSubstream.cpp | 8 +-
.../Streaming/GlobalAggregatingTransform.cpp | 82 +-
.../Streaming/GlobalAggregatingTransform.h | 2 +
...lobalAggregatingTransformWithSubstream.cpp | 64 +-
.../GlobalAggregatingTransformWithSubstream.h | 2 +
17 files changed, 1798 insertions(+), 1525 deletions(-)
create mode 100644 src/Interpreters/Streaming/AggregateDataEx.h
create mode 100644 src/Interpreters/Streaming/AggregationUtils.cpp
create mode 100644 src/Interpreters/Streaming/AggregationUtils.h
diff --git a/cmake/autogenerated_versions.txt b/cmake/autogenerated_versions.txt
index 29ccf0cc41c..2f61abb85dc 100644
--- a/cmake/autogenerated_versions.txt
+++ b/cmake/autogenerated_versions.txt
@@ -2,7 +2,7 @@
# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
-SET(VERSION_REVISION 2)
+SET(VERSION_REVISION 3)
SET(VERSION_MAJOR 1)
SET(VERSION_MINOR 4)
SET(VERSION_PATCH 1)
diff --git a/src/Common/HashMapsTemplate.h b/src/Common/HashMapsTemplate.h
index 8eb33d1d1d4..53df5ecd69f 100644
--- a/src/Common/HashMapsTemplate.h
+++ b/src/Common/HashMapsTemplate.h
@@ -4,7 +4,8 @@
#include
#include
#include
-#include
+#include
+#include
namespace DB
{
@@ -24,9 +25,14 @@ void serializeHashMap(const Map & map, MappedSerializer && mapped_serializer, Wr
});
}
-template
+template
void deserializeHashMap(Map & map, MappedDeserializer && mapped_deserializer, Arena & pool, ReadBuffer & rb)
{
+ using Mapped = std::decay_t