Skip to content

Commit cf89596

Browse files
authored
fix: support ZSTD-compressed IR and fix filteredLogEvents_ lifecycle. (#36)
1 parent ab9de32 commit cf89596

File tree

11 files changed

+43
-30
lines changed

11 files changed

+43
-30
lines changed

velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ uint64_t ClpIrCursor::fetchNext(uint64_t numRows) {
5151
}
5252

5353
size_t ClpIrCursor::getNumFilteredRows() const {
54-
return irDeserializer_->get_ir_unit_handler().getFilteredLogEvents()->size();
54+
return filteredLogEvents_->size();
5555
}
5656

5757
VectorPtr ClpIrCursor::createVector(
@@ -72,8 +72,6 @@ ErrorCode ClpIrCursor::loadSplit() {
7272
? NetworkAuthOption{.method = AuthMethod::None}
7373
: NetworkAuthOption{.method = AuthMethod::S3PresignedUrlV4};
7474

75-
auto irHandler = ClpIrUnitHandler{};
76-
7775
auto projections = splitFieldsToNamesAndTypes();
7876
auto queryHandlerResult{QueryHandlerType::create(
7977
projectionResolutionCallback_,
@@ -89,15 +87,28 @@ ErrorCode ClpIrCursor::loadSplit() {
8987
auto irPath = Path{.source = inputSource_, .path = splitPath_};
9088
irReader_ = try_create_reader(irPath, networkAuthOption);
9189
if (nullptr == irReader_) {
90+
VLOG(2) << "Failed to create IR reader";
91+
return ErrorCode::InternalError;
92+
}
93+
irReaderZstdWrapper_ =
94+
std::make_shared<::clp::streaming_compression::zstd::Decompressor>();
95+
constexpr size_t cReaderBufferSize{64L * 1024L};
96+
if (nullptr == irReaderZstdWrapper_) {
9297
VLOG(2) << "Failed to open kv-ir stream \"" << splitPath_
9398
<< "\" for reading.";
9499
return ErrorCode::InternalError;
95100
}
101+
irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize);
96102

103+
filteredLogEvents_ = std::make_shared<
104+
std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>();
105+
auto irHandler = ClpIrUnitHandler{filteredLogEvents_};
97106
auto deserializerResult = ::clp::ffi::ir_stream::make_deserializer(
98-
*irReader_, std::move(irHandler), std::move(queryHandler));
99-
if (!deserializerResult) {
100-
VLOG(2) << "Failed to create deserializer for deserialization.";
107+
*irReaderZstdWrapper_, std::move(irHandler), std::move(queryHandler));
108+
if (deserializerResult.has_error()) {
109+
auto error = deserializerResult.error();
110+
VLOG(2) << "Failed to create deserializer for deserialization, error: "
111+
<< error.message();
101112
return ErrorCode::InternalError;
102113
}
103114
irDeserializer_ = std::make_shared<
@@ -147,11 +158,11 @@ ClpIrCursor::splitFieldsToNamesAndTypes() const {
147158

148159
ystdlib::error_handling::Result<void> ClpIrCursor::deserialize(
149160
uint64_t numRows) {
150-
irDeserializer_->get_ir_unit_handler().clearFilteredLogEvents();
161+
filteredLogEvents_->clear();
151162
uint64_t cnt{0};
152163
while (cnt < numRows) {
153164
auto deserializeResult =
154-
irDeserializer_->deserialize_next_ir_unit(*irReader_);
165+
irDeserializer_->deserialize_next_ir_unit(*irReaderZstdWrapper_);
155166
if (deserializeResult.has_error()) {
156167
auto error = deserializeResult.error();
157168
if (std::errc::result_out_of_range == error ||
@@ -201,7 +212,7 @@ VectorPtr ClpIrCursor::createVectorHelper(
201212
vectorType,
202213
vectorSize,
203214
std::make_unique<ClpIrVectorLoader>(
204-
irDeserializer_->get_ir_unit_handler().getFilteredLogEvents(),
215+
filteredLogEvents_,
205216
isResolved,
206217
std::move(projectedColumnNodeIds),
207218
projectedColumn.name,

velox/connectors/clp/search_lib/ir/ClpIrCursor.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#pragma once
1818

19+
#include "clp/streaming_compression/zstd/Decompressor.hpp"
1920
#include "ffi/ir_stream/Deserializer.hpp"
2021
#include "velox/connectors/clp/search_lib/BaseClpCursor.h"
2122
#include "velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h"
@@ -72,11 +73,16 @@ class ClpIrCursor final : public BaseClpCursor {
7273
};
7374
using QueryHandlerType = ::clp::ffi::ir_stream::search::QueryHandler<
7475
decltype(projectionResolutionCallback_)>;
76+
std::shared_ptr<
77+
std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>
78+
filteredLogEvents_{nullptr};
7579
bool ignoreCase_;
7680
std::shared_ptr<
7781
::clp::ffi::ir_stream::Deserializer<ClpIrUnitHandler, QueryHandlerType>>
78-
irDeserializer_;
82+
irDeserializer_{nullptr};
7983
std::shared_ptr<::clp::ReaderInterface> irReader_{nullptr};
84+
std::shared_ptr<::clp::streaming_compression::zstd::Decompressor>
85+
irReaderZstdWrapper_{nullptr};
8086
std::unordered_map<size_t, std::vector<::clp::ffi::SchemaTree::Node::id_t>>
8187
projectedColumnIdxNodeIdsMap_;
8288
size_t readerIndex_{0};

velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,13 @@ namespace facebook::velox::connector::clp::search_lib {
2828

2929
class ClpIrUnitHandler {
3030
public:
31-
ClpIrUnitHandler() {
32-
filteredLogEvents_ = std::make_shared<
33-
std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>();
31+
ClpIrUnitHandler(
32+
std::shared_ptr<
33+
std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>
34+
filteredLogEvents)
35+
: filteredLogEvents_(filteredLogEvents) {
36+
VELOX_CHECK_NOT_NULL(
37+
filteredLogEvents_, "filteredLogEvents cannot be null");
3438
}
3539

3640
// Destructor
@@ -67,16 +71,6 @@ class ClpIrUnitHandler {
6771
return ::clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success;
6872
}
6973

70-
std::shared_ptr<
71-
const std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>
72-
getFilteredLogEvents() const {
73-
return filteredLogEvents_;
74-
}
75-
76-
void clearFilteredLogEvents() {
77-
filteredLogEvents_->clear();
78-
}
79-
8074
private:
8175
std::shared_ptr<
8276
std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>

velox/connectors/clp/tests/ClpConnectorTest.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ TEST_F(ClpConnectorTest, test1NoPushdown) {
153153
auto irOutput = getResults(
154154
plan,
155155
{makeClpSplit(
156-
getExampleFilePath("test_1_ir.clps"),
156+
getExampleFilePath("test_1_ir.clp.zst"),
157157
ClpConnectorSplit::SplitType::kIr,
158158
kqlQuery)});
159159
test::assertEqualVectors(irExpected, irOutput);
@@ -201,7 +201,7 @@ TEST_F(ClpConnectorTest, test1Pushdown) {
201201
auto irOutput = getResults(
202202
plan,
203203
{makeClpSplit(
204-
getExampleFilePath("test_1_ir.clps"),
204+
getExampleFilePath("test_1_ir.clp.zst"),
205205
ClpConnectorSplit::SplitType::kIr,
206206
kqlQuery)});
207207
test::assertEqualVectors(expected, irOutput);
@@ -260,7 +260,7 @@ TEST_F(ClpConnectorTest, test2NoPushdown) {
260260
auto irOutput = getResults(
261261
plan,
262262
{makeClpSplit(
263-
getExampleFilePath("test_2_ir.clps"),
263+
getExampleFilePath("test_2_ir.clp.zst"),
264264
ClpConnectorSplit::SplitType::kIr,
265265
kqlQuery)});
266266
test::assertEqualVectors(expected, irOutput);
@@ -317,7 +317,7 @@ TEST_F(ClpConnectorTest, test2Pushdown) {
317317
auto irOutput = getResults(
318318
plan,
319319
{makeClpSplit(
320-
getExampleFilePath("test_2_ir.clps"),
320+
getExampleFilePath("test_2_ir.clp.zst"),
321321
ClpConnectorSplit::SplitType::kIr,
322322
kqlQuery)});
323323
test::assertEqualVectors(expected, irOutput);
@@ -378,7 +378,7 @@ TEST_F(ClpConnectorTest, test2Hybrid) {
378378
auto irOutput = getResults(
379379
plan,
380380
{makeClpSplit(
381-
getExampleFilePath("test_2_ir.clps"),
381+
getExampleFilePath("test_2_ir.clp.zst"),
382382
ClpConnectorSplit::SplitType::kIr,
383383
kqlQuery)});
384384
test::assertEqualVectors(expected, irOutput);
@@ -433,7 +433,7 @@ TEST_F(ClpConnectorTest, test4IrTimestampNoPushdown) {
433433
auto output = getResults(
434434
plan,
435435
{makeClpSplit(
436-
getExampleFilePath("test_4_ir.clps"),
436+
getExampleFilePath("test_4_ir.clp.zst"),
437437
ClpConnectorSplit::SplitType::kIr,
438438
kqlQuery)});
439439
auto expected = makeRowVector({
@@ -445,6 +445,8 @@ TEST_F(ClpConnectorTest, test4IrTimestampNoPushdown) {
445445
}
446446

447447
TEST_F(ClpConnectorTest, test4IrTimestampPushdown) {
448+
// Only the second event meet the condition, the first event is a date string
449+
// which is not supported yet so the value will be NULL.
448450
const std::shared_ptr<std::string> kqlQuery =
449451
std::make_shared<std::string>("(timestamp < 1756003005000000)");
450452
auto plan = PlanBuilder(pool_.get())
@@ -462,7 +464,7 @@ TEST_F(ClpConnectorTest, test4IrTimestampPushdown) {
462464
auto output = getResults(
463465
plan,
464466
{makeClpSplit(
465-
getExampleFilePath("test_4_ir.clps"),
467+
getExampleFilePath("test_4_ir.clp.zst"),
466468
ClpConnectorSplit::SplitType::kIr,
467469
kqlQuery)});
468470
auto expected = makeRowVector({
532 Bytes
Binary file not shown.
-898 Bytes
Binary file not shown.
1.09 KB
Binary file not shown.
-1.64 KB
Binary file not shown.
-261 Bytes
Binary file not shown.
207 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)