Skip to content

Commit 12521a1

Browse files
authored
feat: Add timestamp type support for kv-ir splits. (#30)
1 parent 6c826f7 commit 12521a1

File tree

15 files changed

+279
-204
lines changed

15 files changed

+279
-204
lines changed

velox/connectors/clp/search_lib/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ velox_add_library(
1919
ClpPackageS3AuthProvider.cpp
2020
ClpPackageS3AuthProvider.h
2121
ClpS3AuthProviderBase.cpp
22-
ClpS3AuthProviderBase.h)
22+
ClpS3AuthProviderBase.h
23+
ClpTimestampsUtils.h)
2324

2425
add_subdirectory(archive)
2526
add_subdirectory(ir)
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/type/Timestamp.h"
20+
21+
namespace facebook::velox::connector::clp::search_lib {
22+
23+
enum class InputTimestampPrecision : uint8_t {
24+
Seconds,
25+
Milliseconds,
26+
Microseconds,
27+
Nanoseconds
28+
};
29+
30+
/// Estimates the precision of an epoch timestamp as seconds, milliseconds,
31+
/// microseconds, or nanoseconds.
32+
///
33+
/// This heuristic relies on the fact that 1 year of epoch nanoseconds is
34+
/// approximately 1000 years of epoch microseconds and so on. This heuristic
35+
/// can be unreliable for timestamps sufficiently close to the epoch, but
36+
/// should otherwise be accurate for the next 1000 years.
37+
///
38+
/// Note: Future versions of the clp-s archive format will adopt a
39+
/// nanosecond-precision integer timestamp format (as opposed to the current
40+
/// format which allows other precisions), at which point we can remove this
41+
/// heuristic.
42+
///
43+
/// @param timestamp
44+
/// @return the estimated timestamp precision
45+
template <typename T>
46+
auto estimatePrecision(T timestamp) -> InputTimestampPrecision {
47+
constexpr int64_t kEpochMilliseconds1971{31536000000};
48+
constexpr int64_t kEpochMicroseconds1971{31536000000000};
49+
constexpr int64_t kEpochNanoseconds1971{31536000000000000};
50+
auto absTimestamp = timestamp >= 0 ? timestamp : -timestamp;
51+
52+
if (absTimestamp > kEpochNanoseconds1971) {
53+
return InputTimestampPrecision::Nanoseconds;
54+
} else if (absTimestamp > kEpochMicroseconds1971) {
55+
return InputTimestampPrecision::Microseconds;
56+
} else if (absTimestamp > kEpochMilliseconds1971) {
57+
return InputTimestampPrecision::Milliseconds;
58+
} else {
59+
return InputTimestampPrecision::Seconds;
60+
}
61+
}
62+
63+
/// Converts a double value into a Velox timestamp.
64+
///
65+
/// @param timestamp the input timestamp as a double
66+
/// @return the corresponding Velox timestamp
67+
inline auto convertToVeloxTimestamp(double timestamp) -> Timestamp {
68+
switch (estimatePrecision(timestamp)) {
69+
case InputTimestampPrecision::Nanoseconds:
70+
timestamp /= Timestamp::kNanosInSecond;
71+
break;
72+
case InputTimestampPrecision::Microseconds:
73+
timestamp /= Timestamp::kMicrosecondsInSecond;
74+
break;
75+
case InputTimestampPrecision::Milliseconds:
76+
timestamp /= Timestamp::kMillisecondsInSecond;
77+
break;
78+
case InputTimestampPrecision::Seconds:
79+
break;
80+
}
81+
double seconds{std::floor(timestamp)};
82+
double nanoseconds{(timestamp - seconds) * Timestamp::kNanosInSecond};
83+
return Timestamp(
84+
static_cast<int64_t>(seconds), static_cast<uint64_t>(nanoseconds));
85+
}
86+
87+
/// Converts an integer value into a Velox timestamp.
88+
///
89+
/// @param timestamp the input timestamp as an integer
90+
/// @return the corresponding Velox timestamp
91+
inline auto convertToVeloxTimestamp(int64_t timestamp) -> Timestamp {
92+
int64_t precisionDifference{Timestamp::kNanosInSecond};
93+
switch (estimatePrecision(timestamp)) {
94+
case InputTimestampPrecision::Nanoseconds:
95+
break;
96+
case InputTimestampPrecision::Microseconds:
97+
precisionDifference =
98+
Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMicrosecond;
99+
break;
100+
case InputTimestampPrecision::Milliseconds:
101+
precisionDifference =
102+
Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMillisecond;
103+
break;
104+
case InputTimestampPrecision::Seconds:
105+
precisionDifference =
106+
Timestamp::kNanosInSecond / Timestamp::kNanosInSecond;
107+
break;
108+
}
109+
int64_t seconds{timestamp / precisionDifference};
110+
int64_t nanoseconds{
111+
(timestamp % precisionDifference) *
112+
(Timestamp::kNanosInSecond / precisionDifference)};
113+
if (nanoseconds < 0) {
114+
seconds -= 1;
115+
nanoseconds += Timestamp::kNanosInSecond;
116+
}
117+
return Timestamp(seconds, static_cast<uint64_t>(nanoseconds));
118+
}
119+
120+
} // namespace facebook::velox::connector::clp::search_lib

velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.cpp

Lines changed: 1 addition & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -21,106 +21,14 @@
2121
#include "clp_s/ColumnReader.hpp"
2222
#include "clp_s/SchemaTree.hpp"
2323
#include "velox/connectors/clp/search_lib/BaseClpCursor.h"
24+
#include "velox/connectors/clp/search_lib/ClpTimestampsUtils.h"
2425
#include "velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.h"
2526
#include "velox/type/Timestamp.h"
2627
#include "velox/vector/ComplexVector.h"
2728
#include "velox/vector/FlatVector.h"
2829

2930
namespace facebook::velox::connector::clp::search_lib {
3031

31-
namespace {
32-
33-
enum class TimestampPrecision : uint8_t {
34-
Seconds,
35-
Milliseconds,
36-
Microseconds,
37-
Nanoseconds
38-
};
39-
40-
/// Estimates the precision of an epoch timestamp as seconds, milliseconds,
41-
/// microseconds, or nanoseconds.
42-
///
43-
/// This heuristic relies on the fact that 1 year of epoch nanoseconds is
44-
/// approximately 1000 years of epoch microseconds and so on. This heuristic
45-
/// can be unreliable for timestamps sufficiently close to the epoch, but
46-
/// should otherwise be accurate for the next 1000 years.
47-
///
48-
/// Note: Future versions of the clp-s archive format will adopt a
49-
/// nanosecond-precision integer timestamp format (as opposed to the current
50-
/// format which allows other precisions), at which point we can remove this
51-
/// heuristic.
52-
///
53-
/// @param timestamp
54-
/// @return the estimated timestamp precision
55-
template <typename T>
56-
auto estimatePrecision(T timestamp) -> TimestampPrecision {
57-
constexpr int64_t kEpochMilliseconds1971{31536000000};
58-
constexpr int64_t kEpochMicroseconds1971{31536000000000};
59-
constexpr int64_t kEpochNanoseconds1971{31536000000000000};
60-
auto absTimestamp = timestamp >= 0 ? timestamp : -timestamp;
61-
62-
if (absTimestamp > kEpochNanoseconds1971) {
63-
return TimestampPrecision::Nanoseconds;
64-
} else if (absTimestamp > kEpochMicroseconds1971) {
65-
return TimestampPrecision::Microseconds;
66-
} else if (absTimestamp > kEpochMilliseconds1971) {
67-
return TimestampPrecision::Milliseconds;
68-
} else {
69-
return TimestampPrecision::Seconds;
70-
}
71-
}
72-
73-
auto convertToVeloxTimestamp(double timestamp) -> Timestamp {
74-
switch (estimatePrecision(timestamp)) {
75-
case TimestampPrecision::Nanoseconds:
76-
timestamp /= Timestamp::kNanosInSecond;
77-
break;
78-
case TimestampPrecision::Microseconds:
79-
timestamp /= Timestamp::kMicrosecondsInSecond;
80-
break;
81-
case TimestampPrecision::Milliseconds:
82-
timestamp /= Timestamp::kMillisecondsInSecond;
83-
break;
84-
case TimestampPrecision::Seconds:
85-
break;
86-
}
87-
double seconds{std::floor(timestamp)};
88-
double nanoseconds{(timestamp - seconds) * Timestamp::kNanosInSecond};
89-
return Timestamp(
90-
static_cast<int64_t>(seconds), static_cast<uint64_t>(nanoseconds));
91-
}
92-
93-
auto convertToVeloxTimestamp(int64_t timestamp) -> Timestamp {
94-
int64_t precisionDifference{Timestamp::kNanosInSecond};
95-
switch (estimatePrecision(timestamp)) {
96-
case TimestampPrecision::Nanoseconds:
97-
break;
98-
case TimestampPrecision::Microseconds:
99-
precisionDifference =
100-
Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMicrosecond;
101-
break;
102-
case TimestampPrecision::Milliseconds:
103-
precisionDifference =
104-
Timestamp::kNanosInSecond / Timestamp::kNanosecondsInMillisecond;
105-
break;
106-
case TimestampPrecision::Seconds:
107-
precisionDifference =
108-
Timestamp::kNanosInSecond / Timestamp::kNanosInSecond;
109-
break;
110-
}
111-
int64_t seconds{timestamp / precisionDifference};
112-
int64_t nanoseconds{
113-
(timestamp % precisionDifference) *
114-
(Timestamp::kNanosInSecond / precisionDifference)};
115-
if (nanoseconds < 0) {
116-
seconds -= 1;
117-
nanoseconds += Timestamp::kNanosInSecond;
118-
}
119-
return Timestamp(seconds, static_cast<uint64_t>(nanoseconds));
120-
}
121-
122-
} // namespace
123-
12432
ClpArchiveVectorLoader::ClpArchiveVectorLoader(
12533
clp_s::BaseColumnReader* columnReader,
12634
ColumnType nodeType,

velox/connectors/clp/search_lib/ir/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ velox_link_libraries(
2929
clp_s::clp_dependencies
3030
clp_s::io
3131
clp_s::search
32+
clp_s::search::ast
3233
clp_s::search::kql
3334
velox_vector)

velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@ VectorPtr ClpIrCursor::createVector(
5959
const TypePtr& vectorType,
6060
size_t vectorSize) {
6161
VELOX_CHECK_EQ(
62-
projectedColumnIdxNodeIdMap_.size(),
62+
projectedColumnIdxNodeIdsMap_.size(),
6363
outputColumns_.size(),
64-
"Projected columns size {} does not match fields size {}",
65-
projectedColumnIdxNodeIdMap_.size(),
64+
"Resolved node-id map size ({}) must not exceed projected columns ({})",
65+
projectedColumnIdxNodeIdsMap_.size(),
6666
outputColumns_.size());
6767
return createVectorHelper(pool, vectorType, vectorSize);
6868
}
@@ -133,9 +133,8 @@ ClpIrCursor::splitFieldsToNamesAndTypes() const {
133133
search::ast::LiteralType::ClpStringT;
134134
break;
135135
case ColumnType::Timestamp:
136-
// TODO: IR timestamp support pending; constrain to Unknown to avoid
137-
// mismatched projections.
138-
literalType = search::ast::LiteralType::EpochDateT;
136+
literalType = search::ast::LiteralType::FloatT |
137+
search::ast::LiteralType::IntegerT;
139138
break;
140139
default:
141140
literalType = search::ast::LiteralType::UnknownT;
@@ -189,22 +188,24 @@ VectorPtr ClpIrCursor::createVectorHelper(
189188
readerIndex_, outputColumns_.size(), "Reader index out of bounds");
190189
auto projectedColumn = outputColumns_[readerIndex_];
191190
auto projectedColumnType = projectedColumn.type;
192-
auto it = projectedColumnIdxNodeIdMap_.find(readerIndex_);
193-
bool isResolved = it != projectedColumnIdxNodeIdMap_.end();
194-
::clp::ffi::SchemaTree::Node::id_t projectedColumnNodeId;
191+
auto it = projectedColumnIdxNodeIdsMap_.find(readerIndex_);
192+
std::vector<::clp::ffi::SchemaTree::Node::id_t> projectedColumnNodeIds{};
193+
bool isResolved =
194+
it != projectedColumnIdxNodeIdsMap_.end() && !it->second.empty();
195195
if (isResolved) {
196-
projectedColumnNodeId = it->second;
196+
projectedColumnNodeIds = it->second;
197197
}
198198
readerIndex_++;
199199
return std::make_shared<LazyVector>(
200200
pool,
201201
vectorType,
202202
vectorSize,
203203
std::make_unique<ClpIrVectorLoader>(
204+
irDeserializer_->get_ir_unit_handler().getFilteredLogEvents(),
204205
isResolved,
205-
projectedColumnType,
206-
projectedColumnNodeId,
207-
irDeserializer_->get_ir_unit_handler().getFilteredLogEvents()),
206+
std::move(projectedColumnNodeIds),
207+
projectedColumn.name,
208+
projectedColumnType),
208209
std::move(vector));
209210
}
210211

velox/connectors/clp/search_lib/ir/ClpIrCursor.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,15 @@ class ClpIrCursor final : public BaseClpCursor {
5959
[[maybe_unused]] std::pair<std::string_view, size_t>
6060
projected_key_and_index)
6161
-> ystdlib::error_handling::Result<void> {
62-
projectedColumnIdxNodeIdMap_.insert(
63-
{projected_key_and_index.second, nodeId});
62+
auto it =
63+
projectedColumnIdxNodeIdsMap_.find(projected_key_and_index.second);
64+
if (it == projectedColumnIdxNodeIdsMap_.end()) {
65+
projectedColumnIdxNodeIdsMap_.insert(
66+
{projected_key_and_index.second,
67+
std::vector<::clp::ffi::SchemaTree::Node::id_t>{nodeId}});
68+
return ystdlib::error_handling::success();
69+
}
70+
it->second.emplace_back(nodeId);
6471
return ystdlib::error_handling::success();
6572
};
6673
using QueryHandlerType = ::clp::ffi::ir_stream::search::QueryHandler<
@@ -70,8 +77,8 @@ class ClpIrCursor final : public BaseClpCursor {
7077
::clp::ffi::ir_stream::Deserializer<ClpIrUnitHandler, QueryHandlerType>>
7178
irDeserializer_;
7279
std::shared_ptr<::clp::ReaderInterface> irReader_{nullptr};
73-
std::unordered_map<size_t, ::clp::ffi::SchemaTree::Node::id_t>
74-
projectedColumnIdxNodeIdMap_;
80+
std::unordered_map<size_t, std::vector<::clp::ffi::SchemaTree::Node::id_t>>
81+
projectedColumnIdxNodeIdsMap_;
7582
size_t readerIndex_{0};
7683

7784
std::vector<

velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.cpp

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616

1717
#include "velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.h"
18+
1819
#include "velox/connectors/clp/search_lib/BaseClpCursor.h"
20+
#include "velox/connectors/clp/search_lib/ClpTimestampsUtils.h"
1921

2022
namespace facebook::velox::connector::clp::search_lib {
2123

@@ -33,12 +35,20 @@ void ClpIrVectorLoader::loadInternal(
3335
auto& logEvent = filteredLogEvents_->at(vectorIndex);
3436
// TODO: also need to support auto-generated keys
3537
auto userGenNodeIdValueMap = logEvent->get_user_gen_node_id_value_pairs();
36-
auto const value_it{userGenNodeIdValueMap.find(nodeId_)};
37-
if (userGenNodeIdValueMap.end() == value_it ||
38-
false == value_it->second.has_value()) {
38+
auto valueIt = userGenNodeIdValueMap.end();
39+
::clp::ffi::SchemaTree::Node::id_t nodeId{};
40+
for (auto const candidateNodeId : nodeIds_) {
41+
valueIt = userGenNodeIdValueMap.find(candidateNodeId);
42+
if (valueIt != userGenNodeIdValueMap.end()) {
43+
nodeId = candidateNodeId;
44+
break;
45+
}
46+
}
47+
if (userGenNodeIdValueMap.end() == valueIt ||
48+
false == valueIt->second.has_value()) {
3949
continue;
4050
}
41-
auto const& value{value_it->second};
51+
auto const& value{valueIt->second};
4252
switch (nodeType_) {
4353
case ColumnType::String: {
4454
auto stringVector = vector->asFlatVector<StringView>();
@@ -89,6 +99,21 @@ void ClpIrVectorLoader::loadInternal(
8999
vector->setNull(vectorIndex, false);
90100
break;
91101
}
102+
case ColumnType::Timestamp: {
103+
auto timestampVector = vector->asFlatVector<Timestamp>();
104+
if (value->is<double>()) {
105+
timestampVector->set(
106+
vectorIndex,
107+
convertToVeloxTimestamp(value->get_immutable_view<double>()));
108+
} else if (value->is<int64_t>()) {
109+
timestampVector->set(
110+
vectorIndex,
111+
convertToVeloxTimestamp(value->get_immutable_view<int64_t>()));
112+
} else {
113+
VELOX_FAIL("Unsupported timestamp type");
114+
}
115+
break;
116+
}
92117
case ColumnType::Array: {
93118
auto arrayVector = std::dynamic_pointer_cast<ArrayVector>(vector);
94119
std::string jsonString;

0 commit comments

Comments
 (0)