Skip to content

Commit

Permalink
[oap] Pass timezone into parquet writer
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored and zhztheplayer committed Jul 26, 2024
1 parent 1b9e0e0 commit 6bfbcd2
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 18 deletions.
2 changes: 2 additions & 0 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
std::dynamic_pointer_cast<::arrow::TimestampType>(
arrowSchema->field(0)->type());
ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO);
ASSERT_EQ(tsType->timezone(), "America/Los_Angeles");
})));

const auto data = makeRowVector({makeFlatVector<Timestamp>(
10'000, [](auto row) { return Timestamp(row, row); })});
parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetWriteTimestampUnit = TimestampUnit::kMicro;
writerOptions.parquetWriteTimestampTimeZone = "America/Los_Angeles";

// Create an in-memory writer.
auto sink = std::make_unique<MemorySink>(
Expand Down
22 changes: 22 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include "velox/common/testutil/TestValue.h"
#include "velox/core/QueryConfig.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Writer.h"
#include "velox/exec/MemoryReclaimer.h"
Expand Down Expand Up @@ -237,6 +238,8 @@ Writer::Writer(
}
options_.timestampUnit =
options.parquetWriteTimestampUnit.value_or(TimestampUnit::kNano);
options_.timestampTimeZone =
options.parquetWriteTimestampTimeZone.value_or("");
arrowContext_->properties =
getArrowParquetWriterOptions(options, flushPolicy_);
setMemoryReclaimers();
Expand Down Expand Up @@ -421,20 +424,39 @@ std::optional<TimestampUnit> getTimestampUnit(
return std::nullopt;
}

std::optional<std::string> getTimestampTimeZone(
const Config& config,
const char* configKey) {
if (const auto timezone = config.get<std::string>(configKey)) {
return std::optional(static_cast<std::string>(timezone.value()));
}
return std::nullopt;
}

} // namespace

void WriterOptions::processSessionConfigs(const Config& config) {
if (!parquetWriteTimestampUnit) {
parquetWriteTimestampUnit =
getTimestampUnit(config, kParquetSessionWriteTimestampUnit);
}

if (!parquetWriteTimestampTimeZone) {
parquetWriteTimestampTimeZone =
getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone);
}
}

void WriterOptions::processHiveConnectorConfigs(const Config& config) {
if (!parquetWriteTimestampUnit) {
parquetWriteTimestampUnit =
getTimestampUnit(config, kParquetHiveConnectorWriteTimestampUnit);
}

if (!parquetWriteTimestampTimeZone) {
parquetWriteTimestampTimeZone =
getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone);
}
}

std::unique_ptr<dwio::common::Writer> ParquetWriterFactory::createWriter(
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ struct WriterOptions : public dwio::common::WriterOptions {
/// Timestamp unit for Parquet write through Arrow bridge.
/// Default if not specified: TimestampUnit::kNano (9).
std::optional<TimestampUnit> parquetWriteTimestampUnit;
bool writeInt96AsTimestamp = false;
std::optional<std::string> parquetWriteTimestampTimeZone;
bool writeInt96AsTimestamp = true;

// Parsing session and hive configs.

Expand Down
38 changes: 25 additions & 13 deletions velox/vector/arrow/Bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ struct VeloxToArrowSchemaBridgeHolder {

std::unique_ptr<ArrowSchema> dictionary;

// Buffer required to generate a decimal format.
// Buffer required to generate a decimal format or timestamp with timezone
// format.
std::string formatBuffer;

void setChildAtIndex(
Expand Down Expand Up @@ -212,6 +213,28 @@ static void releaseArrowSchema(ArrowSchema* arrowSchema) {
arrowSchema->private_data = nullptr;
}

const char* exportArrowFormatTimestampStr(
const ArrowOptions& options,
std::string& formatBuffer) {
switch (options.timestampUnit) {
case TimestampUnit::kSecond:
formatBuffer = fmt::format("tss:{}", options.timestampTimeZone);
break;
case TimestampUnit::kMilli:
formatBuffer = fmt::format("tsm:{}", options.timestampTimeZone);
break;
case TimestampUnit::kMicro:
formatBuffer = fmt::format("tsu:{}", options.timestampTimeZone);
break;
case TimestampUnit::kNano:
formatBuffer = fmt::format("tsn:{}", options.timestampTimeZone);
break;
default:
VELOX_UNREACHABLE();
}
return formatBuffer.c_str();
}

// Returns the Arrow C data interface format type for a given Velox type.
const char* exportArrowFormatStr(
const TypePtr& type,
Expand Down Expand Up @@ -255,18 +278,7 @@ const char* exportArrowFormatStr(
case TypeKind::UNKNOWN:
return "n"; // NullType
case TypeKind::TIMESTAMP:
switch (options.timestampUnit) {
case TimestampUnit::kSecond:
return "tss:";
case TimestampUnit::kMilli:
return "tsm:";
case TimestampUnit::kMicro:
return "tsu:";
case TimestampUnit::kNano:
return "tsn:";
default:
VELOX_UNREACHABLE();
}
return exportArrowFormatTimestampStr(options, formatBuffer);
// Complex/nested types.
case TypeKind::ARRAY:
static_assert(sizeof(vector_size_t) == 4);
Expand Down
1 change: 1 addition & 0 deletions velox/vector/arrow/Bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct ArrowOptions {
bool flattenDictionary{false};
bool flattenConstant{false};
TimestampUnit timestampUnit = TimestampUnit::kNano;
std::string timestampTimeZone = "";
};

namespace facebook::velox {
Expand Down
9 changes: 5 additions & 4 deletions velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,15 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) {
testScalarType(VARCHAR(), "u");
testScalarType(VARBINARY(), "z");

options_.timestampTimeZone = "America/Los_Angeles";
options_.timestampUnit = TimestampUnit::kSecond;
testScalarType(TIMESTAMP(), "tss:");
testScalarType(TIMESTAMP(), "tss:America/Los_Angeles");
options_.timestampUnit = TimestampUnit::kMilli;
testScalarType(TIMESTAMP(), "tsm:");
testScalarType(TIMESTAMP(), "tsm:America/Los_Angeles");
options_.timestampUnit = TimestampUnit::kMicro;
testScalarType(TIMESTAMP(), "tsu:");
testScalarType(TIMESTAMP(), "tsu:America/Los_Angeles");
options_.timestampUnit = TimestampUnit::kNano;
testScalarType(TIMESTAMP(), "tsn:");
testScalarType(TIMESTAMP(), "tsn:America/Los_Angeles");

testScalarType(DATE(), "tdD");
testScalarType(INTERVAL_YEAR_MONTH(), "tiM");
Expand Down

0 comments on commit 6bfbcd2

Please sign in to comment.