diff --git a/ydb/core/viewer/json_handlers.h b/ydb/core/viewer/json_handlers.h index 526c671d0136..20f5bb986381 100644 --- a/ydb/core/viewer/json_handlers.h +++ b/ydb/core/viewer/json_handlers.h @@ -74,7 +74,7 @@ struct TJsonHandlers { json << ','; } TString name = itJson->first; - json << "\"/" << name << '"' << ":{"; + json << "\"/viewer" << name << '"' << ":{"; json << "\"get\":{"; json << "\"tags\":[\"" << TTagInfo::TagName << "\"],"; json << "\"produces\":[\"application/json\"],"; diff --git a/ydb/core/viewer/json_whoami.h b/ydb/core/viewer/json_whoami.h index c0033c94b374..2720108b1e2e 100644 --- a/ydb/core/viewer/json_whoami.h +++ b/ydb/core/viewer/json_whoami.h @@ -1,6 +1,8 @@ #pragma once #include #include +#include +#include #include #include #include @@ -14,7 +16,6 @@ using namespace NActors; class TJsonWhoAmI : public TActorBootstrapped { IViewer* Viewer; - TJsonSettings JsonSettings; NMon::TEvHttpInfo::TPtr Event; public: @@ -28,18 +29,48 @@ class TJsonWhoAmI : public TActorBootstrapped { {} void Bootstrap(const TActorContext& ctx) { - const auto& params(Event->Get()->Request.GetParams()); - JsonSettings.EnumAsNumbers = !FromStringWithDefault(params.Get("enums"), false); - JsonSettings.UI64AsString = !FromStringWithDefault(params.Get("ui64"), false); ReplyAndDie(ctx); } + bool CheckGroupMembership(std::unique_ptr& token, const NProtoBuf::RepeatedPtrField& sids) { + if (sids.empty()) { + return true; + } + for (const auto& sid : sids) { + if (token->IsExist(sid)) { + return true; + } + } + return false; + } + void ReplyAndDie(const TActorContext &ctx) { NACLibProto::TUserToken userToken; Y_PROTOBUF_SUPPRESS_NODISCARD userToken.ParseFromString(Event->Get()->UserToken); - TStringStream json; - TProtoToJson::ProtoToJson(json, userToken, JsonSettings); - ctx.Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + json.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + NJson::TJsonValue json(NJson::JSON_MAP); + if (userToken.HasUserSID()) { + json["UserSID"] = userToken.GetUserSID(); + } + if (userToken.HasGroupSIDs() && userToken.GetGroupSIDs().BucketsSize() > 0) { + NJson::TJsonValue& groupSIDs(json["GroupSIDs"]); + groupSIDs.SetType(NJson::JSON_ARRAY); + for (const auto& buckets : userToken.GetGroupSIDs().GetBuckets()) { + for (const auto& group : buckets.GetValues()) { + groupSIDs.AppendValue(group); + } + } + } + if (userToken.HasOriginalUserToken()) { + json["OriginalUserToken"] = userToken.GetOriginalUserToken(); + } + if (userToken.HasAuthType()) { + json["AuthType"] = userToken.GetAuthType(); + } + auto token = std::make_unique(userToken); + json["IsViewerAllowed"] = CheckGroupMembership(token, AppData()->DomainsConfig.GetSecurityConfig().GetViewerAllowedSIDs()); + json["IsMonitoringAllowed"] = CheckGroupMembership(token, AppData()->DomainsConfig.GetSecurityConfig().GetMonitoringAllowedSIDs()); + json["IsAdministrationAllowed"] = CheckGroupMembership(token, AppData()->DomainsConfig.GetSecurityConfig().GetAdministrationAllowedSIDs()); + ctx.Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get()) + NJson::WriteJson(json, false), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); Die(ctx); } @@ -52,17 +83,52 @@ class TJsonWhoAmI : public TActorBootstrapped { template <> struct TJsonRequestSchema { static TString GetSchema() { - TStringStream stream; - TProtoToJson::ProtoToJsonSchema(stream); - return stream.Str(); + return R"___( + { + "type": "object", + "title": "WhoAmI", + "properties": { + "UserSID": { + "type": "string", + "description": "User ID / name" + }, + "GroupSID": { + "type": "array", + "items": { + "type": "string" + }, + "description": "User groups" + }, + "OriginalUserToken": { + "type": "string", + "description": "User's token used to authenticate" + }, + "AuthType": { + "type": "string", + "description": "Authentication type" + }, + "IsViewerAllowed": { + "type": "boolean", + "description": "Is user allowed to view data" + }, + "IsMonitoringAllowed": { + "type": "boolean", + "description": "Is user allowed to view deeper and make simple changes" + }, + "IsAdministrationAllowed": { + "type": "boolean", + "description": "Is user allowed to do unrestricted changes in the system" + } + } + } + )___"; } }; template <> struct TJsonRequestParameters { static TString GetParameters() { - return R"___([{"name":"enums","in":"query","description":"convert enums to strings","required":false,"type":"boolean"}, - {"name":"ui64","in":"query","description":"return ui64 as numbers","required":false,"type":"boolean"}])___"; + return "[]"; } }; diff --git a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp index e417fcf535de..ee2864cc231f 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp @@ -195,6 +195,37 @@ class TTupleBlockItemConverter : public IBlockItemConverter { mutable TVector Items; }; +template +class TTzDateBlockItemConverter : public IBlockItemConverter { +public: + using TLayout = typename NYql::NUdf::TDataType::TLayout; + + NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final { + Y_UNUSED(holderFactory); + if constexpr (Nullable) { + if (!item) { + return {}; + } + } + + NUdf::TUnboxedValuePod value {item.Get()}; + value.SetTimezoneId(item.GetTimezoneId()); + return value; + } + + TBlockItem MakeItem(const NUdf::TUnboxedValuePod& value) const final { + if constexpr (Nullable) { + if (!value) { + return {}; + } + } + + TBlockItem item {value.Get()}; + item.SetTimezoneId(value.GetTimezoneId()); + return item; + } +}; + class TExternalOptionalBlockItemConverter : public IBlockItemConverter { public: TExternalOptionalBlockItemConverter(std::unique_ptr&& inner) @@ -229,6 +260,8 @@ struct TConverterTraits { template using TStrings = TStringBlockItemConverter; using TExtOptional = TExternalOptionalBlockItemConverter; + template + using TTzDateConverter = TTzDateBlockItemConverter; static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { if (desc.PassByValue) { @@ -258,6 +291,15 @@ struct TConverterTraits { return std::make_unique>(); } } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique>(); + } else { + return std::make_unique>(); + } + } }; } // namespace diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp index cd7a9a60cf01..e9700ed43995 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp @@ -150,7 +150,7 @@ class TBlockDeserializerBase : public IBlockDeserializer { DoResetMetadata(); } - std::shared_ptr MakeDefaultValue(ui64 blockLen, ui64 offset) { + std::shared_ptr MakeDefaultValue(ui64 blockLen, ui64 offset) const { std::shared_ptr nulls; i64 nullsCount = 0; if (IsNullable()) { @@ -176,7 +176,7 @@ template class TFixedSizeBlockSerializer final : public IBlockSerializer { public: TFixedSizeBlockSerializer() = default; -private: + size_t ArrayMetadataCount() const final { return Nullable ? 3 : 1; } @@ -389,16 +389,10 @@ class TExtOptionalBlockDeserializer final : public TBlockDeserializerBase { const std::unique_ptr Inner_; }; -template -class TTupleBlockSerializer final : public IBlockSerializer { -public: - explicit TTupleBlockSerializer(TVector>&& children) - : Children_(std::move(children)) - { - } -private: +template +class TTupleBlockSerializerBase : public IBlockSerializer { size_t ArrayMetadataCount() const final { - size_t result = GetChildMetaCount(); + size_t result = static_cast(this)->GetChildrenMetaCount(); if constexpr (Nullable) { result += 2; } @@ -410,14 +404,12 @@ class TTupleBlockSerializer final : public IBlockSerializer { StoreNullsSizes(data, metaSink); } if (data.GetNullCount() == data.length) { - auto childCount = GetChildMetaCount(); + auto childCount = static_cast(this)->GetChildrenMetaCount(); for (size_t i = 0; i < childCount; ++i) { metaSink(0); } } else { - for (size_t i = 0; i < Children_.size(); ++i) { - Children_[i]->StoreMetadata(*data.child_data[i], metaSink); - } + static_cast(this)->StoreChildrenMetadata(data.child_data, metaSink); } } @@ -426,13 +418,19 @@ class TTupleBlockSerializer final : public IBlockSerializer { StoreNulls(data, dst); } if (data.GetNullCount() != data.length) { - for (size_t i = 0; i < Children_.size(); ++i) { - Children_[i]->StoreArray(*data.child_data[i], dst); - } + static_cast(this)->StoreChildrenArrays(data.child_data, dst); } } +}; + +template +class TTupleBlockSerializer final : public TTupleBlockSerializerBase> { +public: + TTupleBlockSerializer(TVector>&& children) + : Children_(std::move(children)) + {} - size_t GetChildMetaCount() const { + size_t GetChildrenMetaCount() const { size_t result = 0; for (const auto& child : Children_) { result += child->ArrayMetadataCount(); @@ -440,9 +438,51 @@ class TTupleBlockSerializer final : public IBlockSerializer { return result; } + void StoreChildrenMetadata(const std::vector>& child_data, + const IBlockSerializer::TMetadataSink& metaSink) const { + + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->StoreMetadata(*child_data[i], metaSink); + } + } + + void StoreChildrenArrays(const std::vector>& child_data, TRope& dst) const { + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->StoreArray(*child_data[i], dst); + } + } + +private: const TVector> Children_; }; +template +class TTzDateBlockSerializer final : public TTupleBlockSerializerBase> { +public: + TTzDateBlockSerializer() = default; + + size_t GetChildrenMetaCount() const { + return DateSerialiser_.ArrayMetadataCount() + TzSerialiser_.ArrayMetadataCount(); + } + + void StoreChildrenMetadata(const std::vector>& child_data, + const IBlockSerializer::TMetadataSink& metaSink) const { + DateSerialiser_.StoreMetadata(*child_data[0], metaSink); + TzSerialiser_.StoreMetadata(*child_data[1], metaSink); + } + + void StoreChildrenArrays(const std::vector>& child_data, TRope& dst) const { + DateSerialiser_.StoreArray(*child_data[0], dst); + TzSerialiser_.StoreArray(*child_data[1], dst); + } + +private: + using TDateLayout = typename NUdf::TDataType::TLayout; + + TFixedSizeBlockSerializer DateSerialiser_; + TFixedSizeBlockSerializer TzSerialiser_; +}; + template class TTupleBlockDeserializer final : public TBlockDeserializerBase { public: @@ -494,6 +534,53 @@ class TTupleBlockDeserializer final : public TBlockDeserializerBase { const TVector> Children_; }; +template +class TTzDateBlockDeserializer final : public TBlockDeserializerBase { +public: + TTzDateBlockDeserializer() = default; + +private: + void DoLoadMetadata(const TMetadataSource& metaSource) final { + DateDeserialiser_.LoadMetadata(metaSource); + TzDeserialiser_.LoadMetadata(metaSource); + } + + std::shared_ptr DoMakeDefaultValue(const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { + std::vector> childData; + childData.emplace_back(DateDeserialiser_.MakeDefaultValue(blockLen, offset)); + childData.emplace_back(TzDeserialiser_.MakeDefaultValue(blockLen, offset)); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); + } + + std::shared_ptr DoLoadArray(TRope& src, const std::shared_ptr& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::vector> childData; + childData.emplace_back(DateDeserialiser_.LoadArray(src, blockLen, offset)); + childData.emplace_back(TzDeserialiser_.LoadArray(src, blockLen, offset)); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); + } + + void DoResetMetadata() final { + DateDeserialiser_.ResetMetadata(); + TzDeserialiser_.ResetMetadata(); + } + + bool IsNullable() const final { + return Nullable; + } + + void SetArrowType(const std::shared_ptr& type) final { + YQL_ENSURE(type->fields().size() == 2); + ArrowType_ = type; + DateDeserialiser_.SetArrowType(type->field(0)->type()); + TzDeserialiser_.SetArrowType(type->field(1)->type()); + } + + using TDateLayout = typename NUdf::TDataType::TLayout; + + TFixedSizeBlockDeserializer DateDeserialiser_; + TFixedSizeBlockDeserializer TzDeserialiser_; +}; + struct TSerializerTraits { using TResult = IBlockSerializer; template @@ -503,6 +590,9 @@ struct TSerializerTraits { template using TStrings = TStringBlockSerializer; using TExtOptional = TExtOptionalBlockSerializer; + template + using TTzDate = TTzDateBlockSerializer; + static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -516,6 +606,16 @@ struct TSerializerTraits { Y_UNUSED(isOptional); ythrow yexception() << "Serializer not implemented for block resources"; } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique>(); + } + else { + return std::make_unique>(); + } + } }; struct TDeserializerTraits { @@ -527,6 +627,8 @@ struct TDeserializerTraits { template using TStrings = TStringBlockDeserializer; using TExtOptional = TExtOptionalBlockDeserializer; + template + using TTzDate = TTzDateBlockDeserializer; static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -540,6 +642,16 @@ struct TDeserializerTraits { Y_UNUSED(isOptional); ythrow yexception() << "Deserializer not implemented for block resources"; } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique>(); + } + else { + return std::make_unique>(); + } + } }; } // namespace diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp index 3e0e7117c41f..ea4802cd6768 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp @@ -665,6 +665,9 @@ class TMiniKQLComputationNodePackTest: public TTestBase { auto scalarOptStrType = PgmBuilder.NewBlockType(optStrType, TBlockType::EShape::Scalar); auto blockOptTupleOptUi32StrType = PgmBuilder.NewBlockType(optTupleOptUi32StrType, TBlockType::EShape::Many); auto scalarUi64Type = PgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + + auto tzDateType = PgmBuilder.NewDataType(NUdf::EDataSlot::TzDate); + auto blockTzDateType = PgmBuilder.NewBlockType(tzDateType, TBlockType::EShape::Many); auto rowType = legacyStruct @@ -674,10 +677,11 @@ class TMiniKQLComputationNodePackTest: public TTestBase { {"_yql_block_length", scalarUi64Type}, {"a", scalarOptStrType}, {"b", blockOptTupleOptUi32StrType}, + {"c", blockTzDateType} }) : PgmBuilder.NewMultiType( {blockUi32Type, blockOptStrType, scalarOptStrType, - blockOptTupleOptUi32StrType, scalarUi64Type}); + blockOptTupleOptUi32StrType, blockTzDateType, scalarUi64Type}); ui64 blockLen = 1000; UNIT_ASSERT_LE(offset + len, blockLen); @@ -685,6 +689,7 @@ class TMiniKQLComputationNodePackTest: public TTestBase { auto builder1 = MakeArrayBuilder(TTypeInfoHelper(), ui32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(ui32Type)), nullptr); auto builder2 = MakeArrayBuilder(TTypeInfoHelper(), optStrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optStrType)), nullptr); auto builder3 = MakeArrayBuilder(TTypeInfoHelper(), optTupleOptUi32StrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTupleOptUi32StrType)), nullptr); + auto builder4 = MakeArrayBuilder(TTypeInfoHelper(), tzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tzDateType)), nullptr); for (ui32 i = 0; i < blockLen; ++i) { TBlockItem b1(i); @@ -697,6 +702,10 @@ class TMiniKQLComputationNodePackTest: public TTestBase { TBlockItem b3items[] = { (i % 2) ? TBlockItem(i) : TBlockItem(), TBlockItem(a) }; TBlockItem b3 = (i % 7) ? TBlockItem(b3items) : TBlockItem(); builder3->Add(b3); + + TBlockItem tzDate {i}; + tzDate.SetTimezoneId(i % 100); + builder4->Add(tzDate); } std::string_view testScalarString = "foobar"; @@ -709,11 +718,13 @@ class TMiniKQLComputationNodePackTest: public TTestBase { datums.emplace_back(arrow::Datum(std::make_shared(blockLen))); datums.emplace_back(arrow::Datum(std::make_shared(strbuf))); datums.emplace_back(builder3->Build(true)); + datums.emplace_back(builder4->Build(true)); } else { datums.emplace_back(builder1->Build(true)); datums.emplace_back(builder2->Build(true)); datums.emplace_back(arrow::Datum(std::make_shared(strbuf))); datums.emplace_back(builder3->Build(true)); + datums.emplace_back(builder4->Build(true)); datums.emplace_back(arrow::Datum(std::make_shared(blockLen))); } @@ -767,6 +778,7 @@ class TMiniKQLComputationNodePackTest: public TTestBase { auto reader1 = MakeBlockReader(TTypeInfoHelper(), ui32Type); auto reader2 = MakeBlockReader(TTypeInfoHelper(), optStrType); auto reader3 = MakeBlockReader(TTypeInfoHelper(), optTupleOptUi32StrType); + auto reader4 = MakeBlockReader(TTypeInfoHelper(), tzDateType); for (ui32 i = offset; i < len; ++i) { TBlockItem b1 = reader1->GetItem(*TArrowBlock::From(unpackedColumns[0]).GetDatum().array(), i - offset); @@ -792,6 +804,10 @@ class TMiniKQLComputationNodePackTest: public TTestBase { } else { UNIT_ASSERT(!b3); } + + TBlockItem b4 = reader4->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 5 : 4]).GetDatum().array(), i - offset); + UNIT_ASSERT(b4.Get() == i); + UNIT_ASSERT(b4.GetTimezoneId() == (i % 100)); } } } diff --git a/ydb/library/yql/minikql/datetime/datetime.h b/ydb/library/yql/minikql/datetime/datetime.h index 71363b27cdf3..a86f28060cf0 100644 --- a/ydb/library/yql/minikql/datetime/datetime.h +++ b/ydb/library/yql/minikql/datetime/datetime.h @@ -21,6 +21,7 @@ struct TTMStorage { unsigned int Second : 6; unsigned int Microsecond : 20; unsigned int TimezoneId : 16; + ui8 Reserved[2]; TTMStorage() { Zero(*this); diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index dc946c432278..38e536688e15 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -1455,9 +1455,28 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr& ty case NUdf::EDataSlot::Json: type = arrow::utf8(); return true; - default: + case NUdf::EDataSlot::TzDate: { + type = MakeTzDateArrowType(); + return true; + } + case NUdf::EDataSlot::TzDatetime: { + type = MakeTzDateArrowType(); + return true; + } + case NUdf::EDataSlot::TzTimestamp: { + type = MakeTzDateArrowType(); + return true; + } + case NUdf::EDataSlot::Uuid: { return false; } + case NUdf::EDataSlot::Decimal: { + return false; + } + case NUdf::EDataSlot::DyNumber: { + return false; + } + } } bool ConvertArrowType(TType* itemType, std::shared_ptr& type) { @@ -2415,9 +2434,22 @@ size_t CalcMaxBlockItemSize(const TType* type) { case NUdf::EDataSlot::Json: // size of offset part return sizeof(arrow::StringType::offset_type); - default: + case NUdf::EDataSlot::TzDate: + return sizeof(typename NUdf::TDataType::TLayout) + sizeof(NYql::NUdf::TTimezoneId); + case NUdf::EDataSlot::TzDatetime: + return sizeof(typename NUdf::TDataType::TLayout) + sizeof(NYql::NUdf::TTimezoneId); + case NUdf::EDataSlot::TzTimestamp: + return sizeof(typename NUdf::TDataType::TLayout) + sizeof(NYql::NUdf::TTimezoneId); + case NUdf::EDataSlot::Uuid: { MKQL_ENSURE(false, "Unsupported data slot: " << slot); } + case NUdf::EDataSlot::Decimal: { + MKQL_ENSURE(false, "Unsupported data slot: " << slot); + } + case NUdf::EDataSlot::DyNumber: { + MKQL_ENSURE(false, "Unsupported data slot: " << slot); + } + } } MKQL_ENSURE(false, "Unsupported type: " << *type); @@ -2432,6 +2464,8 @@ struct TComparatorTraits { template using TStrings = NUdf::TStringBlockItemComparator; using TExtOptional = NUdf::TExternalOptionalBlockItemComparator; + template + using TTzDateComparator = NUdf::TTzDateBlockItemComparator; static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -2442,6 +2476,15 @@ struct TComparatorTraits { Y_UNUSED(isOptional); ythrow yexception() << "Comparator not implemented for block resources: "; } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique>(); + } else { + return std::make_unique>(); + } + } }; struct THasherTraits { @@ -2453,6 +2496,8 @@ struct THasherTraits { template using TStrings = NUdf::TStringBlockItemHasher; using TExtOptional = NUdf::TExternalOptionalBlockItemHasher; + template + using TTzDateHasher = NYql::NUdf::TTzDateBlockItemHasher; static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -2463,6 +2508,15 @@ struct THasherTraits { Y_UNUSED(isOptional); ythrow yexception() << "Hasher not implemented for block resources"; } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique>(); + } else { + return std::make_unique>(); + } + } }; NUdf::IBlockItemComparator::TPtr TBlockTypeHelper::MakeComparator(NUdf::TType* type) const { diff --git a/ydb/library/yql/minikql/mkql_type_builder.h b/ydb/library/yql/minikql/mkql_type_builder.h index 2b234d7ee757..7199bc836057 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.h +++ b/ydb/library/yql/minikql/mkql_type_builder.h @@ -33,6 +33,31 @@ inline size_t CalcBlockLen(size_t maxBlockItemSize) { bool ConvertArrowType(TType* itemType, std::shared_ptr& type); bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr& type); +template +std::shared_ptr MakeTzLayoutArrowType() { + static_assert(slot == NUdf::EDataSlot::TzDate || slot == NUdf::EDataSlot::TzDatetime || slot == NUdf::EDataSlot::TzTimestamp, + "Expected tz date type slot"); + + if constexpr (slot == NUdf::EDataSlot::TzDate) { + return arrow::uint16(); + } + if constexpr (slot == NUdf::EDataSlot::TzDatetime) { + return arrow::uint32(); + } + if constexpr (slot == NUdf::EDataSlot::TzTimestamp) { + return arrow::uint64(); + } +} + +template +std::shared_ptr MakeTzDateArrowType() { + std::vector> fields { + std::make_shared("datetime", MakeTzLayoutArrowType()), + std::make_shared("timezoneId", arrow::uint16()), + }; + return std::make_shared(fields); +} + class TArrowType : public NUdf::IArrowType { public: TArrowType(const std::shared_ptr& type) diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp index c273fc0eac49..f1b7e0384754 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -155,6 +156,10 @@ class TYsonReaderDetails { const char* Data() { return Data_; } + + size_t Available() const { + return Available_; + } private: const char* Data_; size_t Available_; @@ -314,9 +319,41 @@ class TYsonStringBlockReader final : public IYsonBlockReaderWithNativeFlag> Children_; - TVector Items_; +}; + +template +class TYsonTzDateBlockReader final : public IYsonBlockReaderWithNativeFlag { +public: + NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final { + if constexpr (Nullable) { + return this->GetNullableItem(buf); + } + return GetNotNull(buf); + } + + NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final { + using TLayout = typename NUdf::TDataType::TLayout; + size_t length = sizeof(TLayout) + sizeof(NUdf::TTimezoneId); + Y_ASSERT(buf.Available() == length); + + TLayout date; + NUdf::TTimezoneId tz; + + if constexpr (std::is_same_v) { + DeserializeTzDate({buf.Data(), length}, date, tz); + } else if constexpr (std::is_same_v) { + DeserializeTzDatetime({buf.Data(), length}, date, tz); + } else if constexpr (std::is_same_v) { + DeserializeTzTimestamp({buf.Data(), length}, date, tz); + } else { + static_assert(sizeof(T) == 0, "Unsupported tz date type"); + } + + buf.Skip(length); + NUdf::TBlockItem res {date}; + res.SetTimezoneId(tz); + return res; + } }; namespace { @@ -374,9 +411,6 @@ class TYsonFixedSizeBlockReader final : public IYsonBlockReaderWithNativeFlag> Children_; - TVector Items_; }; template @@ -439,6 +473,18 @@ struct TYsonBlockReaderTraits { Y_UNUSED(isOptional); ythrow yexception() << "Yson reader not implemented for block resources"; } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + Y_UNUSED(isOptional); + if (isOptional) { + using TTzDateReader = TYsonTzDateBlockReader; + return std::make_unique(); + } else { + using TTzDateReader = TYsonTzDateBlockReader; + return std::make_unique(); + } + } }; template diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h index 7c86083ec9d0..10fa0a6f35db 100644 --- a/ydb/library/yql/public/udf/arrow/block_builder.h +++ b/ydb/library/yql/public/udf/arrow/block_builder.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -516,15 +517,16 @@ class TFixedSizeArrayBuilderBase : public TArrayBuilderBase { template class TFixedSizeArrayBuilder final: public TFixedSizeArrayBuilderBase> { - using TDerived = TFixedSizeArrayBuilder; + using TSelf = TFixedSizeArrayBuilder; + using TBase = TFixedSizeArrayBuilderBase; public: TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr arrowType, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr) - : TFixedSizeArrayBuilderBase(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated) + : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated) {} TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr) - : TFixedSizeArrayBuilderBase(typeInfoHelper, type, pool, maxLen, totalAllocated) + : TBase(typeInfoHelper, type, pool, maxLen, totalAllocated) {} void DoAddNotNull(TUnboxedValuePod value) { @@ -936,13 +938,11 @@ class TStringArrayBuilder final : public TArrayBuilderBase { i32 TypeLen = 0; }; -template -class TTupleArrayBuilder final : public TArrayBuilderBase { +template +class TTupleArrayBuilderBase : public TArrayBuilderBase { public: - TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, - TVector&& children, size_t* totalAllocated) + TTupleArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr) : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, totalAllocated) - , Children(std::move(children)) { Reserve(); } @@ -951,70 +951,50 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { if constexpr (Nullable) { if (!value) { NullBuilder->UnsafeAppend(0); - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->AddDefault(); - } + static_cast(this)->AddToChildrenDefault(); return; } NullBuilder->UnsafeAppend(1); } - auto elements = value.GetElements(); - if (elements) { - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->Add(elements[i]); - } - } else { - for (ui32 i = 0; i < Children.size(); ++i) { - auto element = value.GetElement(i); - Children[i]->Add(element); - } - } + static_cast(this)->AddToChildren(value); } void DoAdd(TBlockItem value) final { if constexpr (Nullable) { if (!value) { NullBuilder->UnsafeAppend(0); - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->AddDefault(); - } + static_cast(this)->AddToChildrenDefault(); return; } NullBuilder->UnsafeAppend(1); } - auto elements = value.AsTuple(); - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->Add(elements[i]); - } + static_cast(this)->AddToChildren(value); } void DoAdd(TInputBuffer& input) final { if constexpr (Nullable) { if (!input.PopChar()) { - return DoAdd(TBlockItem{}); + NullBuilder->UnsafeAppend(0); + static_cast(this)->AddToChildrenDefault(); + return; } NullBuilder->UnsafeAppend(1); } - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->Add(input); - } + static_cast(this)->AddToChildren(input); } void DoAddDefault() final { if constexpr (Nullable) { NullBuilder->UnsafeAppend(1); } - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->AddDefault(); - } + static_cast(this)->AddToChildrenDefault(); } void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { Y_ABORT_UNLESS(!array.buffers.empty()); - Y_ABORT_UNLESS(array.child_data.size() == Children.size()); if constexpr (Nullable) { if (array.buffers.front()) { @@ -1026,14 +1006,11 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { } } - for (size_t i = 0; i < Children.size(); ++i) { - Children[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length); - } + static_cast(this)->AddManyToChildren(array, sparseBitmap, popCount); } void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final { Y_ABORT_UNLESS(!array.buffers.empty()); - Y_ABORT_UNLESS(array.child_data.size() == Children.size()); if constexpr (Nullable) { for (ui64 i = beginIndex; i < beginIndex + count; ++i) { @@ -1041,14 +1018,11 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { } } - for (size_t i = 0; i < Children.size(); ++i) { - Children[i]->AddMany(*array.child_data[i], beginIndex, count); - } + static_cast(this)->AddManyToChildren(array, beginIndex, count); } void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final { Y_ABORT_UNLESS(!array.buffers.empty()); - Y_ABORT_UNLESS(array.child_data.size() == Children.size()); if constexpr (Nullable) { for (size_t i = 0; i < count; ++i) { @@ -1056,9 +1030,7 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { } } - for (size_t i = 0; i < Children.size(); ++i) { - Children[i]->AddMany(*array.child_data[i], indexes, count); - } + static_cast(this)->AddManyToChildren(array, indexes, count); } TBlockArrayTree::Ptr DoBuildTree(bool finish) final { @@ -1074,10 +1046,7 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { Y_ABORT_UNLESS(length); result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap })); - result->Children.reserve(Children.size()); - for (ui32 i = 0; i < Children.size(); ++i) { - result->Children.emplace_back(Children[i]->BuildTree(finish)); - } + static_cast(this)->BuildChildrenTree(finish, result->Children); if (!finish) { Reserve(); @@ -1104,10 +1073,145 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { } private: - TVector> Children; std::unique_ptr> NullBuilder; }; +template +class TTupleArrayBuilder final : public TTupleArrayBuilderBase> { +public: + + TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, + TVector&& children, size_t* totalAllocated = nullptr) + : TTupleArrayBuilderBase>(typeInfoHelper, type, pool, maxLen, totalAllocated) + , Children_(std::move(children)) {} + + void AddToChildrenDefault() { + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->AddDefault(); + } + } + + void AddToChildren(NUdf::TUnboxedValuePod value) { + auto elements = value.GetElements(); + if (elements) { + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->Add(elements[i]); + } + } else { + for (ui32 i = 0; i < Children_.size(); ++i) { + auto element = value.GetElement(i); + Children_[i]->Add(element); + } + } + } + + void AddToChildren(TBlockItem value) { + auto elements = value.AsTuple(); + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->Add(elements[i]); + } + } + + void AddToChildren(TInputBuffer& input) { + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->Add(input); + } + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) { + Y_ABORT_UNLESS(array.child_data.size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length); + } + } + + void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->AddMany(*array.child_data[i], beginIndex, count); + } + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->AddMany(*array.child_data[i], indexes, count); + } + } + + void BuildChildrenTree(bool finish, std::vector& resultChildren) { + resultChildren.reserve(Children_.size()); + for (ui32 i = 0; i < Children_.size(); ++i) { + resultChildren.emplace_back(Children_[i]->BuildTree(finish)); + } + } + +private: +TVector> Children_; +}; + +template +class TTzDateArrayBuilder final : public TTupleArrayBuilderBase> { + using TDateLayout = typename TDataType::TLayout; + static constexpr auto DataSlot = TDataType::Slot; + +public: + TTzDateArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr) + : TTupleArrayBuilderBase>(typeInfoHelper, type, pool, maxLen, totalAllocated) + , DateBuilder_(typeInfoHelper, NKikimr::NMiniKQL::MakeTzLayoutArrowType(), pool, maxLen) + , TimezoneBuilder_(typeInfoHelper, arrow::uint16(), pool, maxLen) + { + } + + void AddToChildrenDefault() { + DateBuilder_.AddDefault(); + TimezoneBuilder_.AddDefault(); + } + + void AddToChildren(NUdf::TUnboxedValuePod value) { + DateBuilder_.Add(value); + TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId())); + } + + void AddToChildren(TBlockItem value) { + DateBuilder_.Add(value); + TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId())); + } + + void AddToChildren(TInputBuffer& input) { + DateBuilder_.Add(input); + TimezoneBuilder_.Add(input); + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) { + Y_ABORT_UNLESS(array.child_data.size() == 2); + DateBuilder_.AddMany(*array.child_data[0], popCount, sparseBitmap, array.length); + TimezoneBuilder_.AddMany(*array.child_data[1], popCount, sparseBitmap, array.length); + } + + void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == 2); + DateBuilder_.AddMany(*array.child_data[0], beginIndex, count); + TimezoneBuilder_.AddMany(*array.child_data[1], beginIndex, count); + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == 2); + DateBuilder_.AddMany(*array.child_data[0], indexes, count); + TimezoneBuilder_.AddMany(*array.child_data[1], indexes, count); + } + + void BuildChildrenTree(bool finish, std::vector& resultChildren) { + resultChildren.emplace_back(DateBuilder_.BuildTree(finish)); + resultChildren.emplace_back(TimezoneBuilder_.BuildTree(finish)); + } + +private: + TFixedSizeArrayBuilder DateBuilder_; + TFixedSizeArrayBuilder TimezoneBuilder_; +}; + + class TExternalOptionalArrayBuilder final : public TArrayBuilderBase { public: TExternalOptionalArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, @@ -1306,7 +1410,13 @@ inline std::unique_ptr MakeArrayBuilderImpl( return std::make_unique>(typeInfoHelper, type, pool, maxLen, totalAllocated); case NUdf::EDataSlot::Utf8: case NUdf::EDataSlot::Json: - return std::make_unique>(typeInfoHelper, type, pool, maxLen, totalAllocated); + return std::make_unique>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::TzDate: + return std::make_unique>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::TzDatetime: + return std::make_unique>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::TzTimestamp: + return std::make_unique>(typeInfoHelper, type, pool, maxLen); default: Y_ENSURE(false, "Unsupported data slot"); } diff --git a/ydb/library/yql/public/udf/arrow/block_item.h b/ydb/library/yql/public/udf/arrow/block_item.h index 15edabe27ae1..f04a25666b12 100644 --- a/ydb/library/yql/public/udf/arrow/block_item.h +++ b/ydb/library/yql/public/udf/arrow/block_item.h @@ -155,6 +155,15 @@ class TBlockItem { bool IsBoxed() const { return EMarkers::Boxed == GetMarkers(); } bool IsEmbedded() const { return EMarkers::Embedded == GetMarkers(); } + inline void SetTimezoneId(ui16 id) { + UDF_VERIFY(GetMarkers() == EMarkers::Embedded, "Value is not a datetime"); + Raw.Simple.TimezoneId = id; + } + + inline ui16 GetTimezoneId() const { + UDF_VERIFY(GetMarkers() == EMarkers::Embedded, "Value is not a datetime"); + return Raw.Simple.TimezoneId; + } private: union TRaw { @@ -180,7 +189,8 @@ class TBlockItem { union { ui64 FullMeta; struct { - ui8 Reserved[7]; + TTimezoneId TimezoneId; + ui8 Reserved[5]; ui8 Meta; }; }; diff --git a/ydb/library/yql/public/udf/arrow/block_item_comparator.h b/ydb/library/yql/public/udf/arrow/block_item_comparator.h index 98ca57d7f94d..3431932d9250 100644 --- a/ydb/library/yql/public/udf/arrow/block_item_comparator.h +++ b/ydb/library/yql/public/udf/arrow/block_item_comparator.h @@ -3,6 +3,7 @@ #include "block_item.h" #include "block_reader.h" + #include #include #include @@ -148,6 +149,39 @@ class TStringBlockItemComparator : public TBlockItemComparatorBase +class TTzDateBlockItemComparator : public TBlockItemComparatorBase, Nullable> { + using TLayout = typename TDataType::TLayout; + +public: + bool DoCompare(TBlockItem lhs, TBlockItem rhs) const { + const auto x = lhs.Get(); + const auto y = rhs.Get(); + + if (x == y) { + const auto tx = lhs.GetTimezoneId(); + const auto ty = rhs.GetTimezoneId(); + return (tx == ty) ? 0 : (tx < ty ? -1 : 1); + } + + if (x < y) { + return -1; + } + + return 1; + } + + bool DoEquals(TBlockItem lhs, TBlockItem rhs) const { + return lhs.Get() == rhs.Get() && lhs.GetTimezoneId() == rhs.GetTimezoneId(); + } + + + bool DoLess(TBlockItem lhs, TBlockItem rhs) const { + return std::forward_as_tuple(lhs.Get(), lhs.GetTimezoneId()) < std::forward_as_tuple(rhs.Get(), rhs.GetTimezoneId()); + } +}; + + template class TTupleBlockItemComparator : public TBlockItemComparatorBase, Nullable> { public: diff --git a/ydb/library/yql/public/udf/arrow/block_item_hasher.h b/ydb/library/yql/public/udf/arrow/block_item_hasher.h index 4c3d89e998e8..a173e7940f24 100644 --- a/ydb/library/yql/public/udf/arrow/block_item_hasher.h +++ b/ydb/library/yql/public/udf/arrow/block_item_hasher.h @@ -49,6 +49,17 @@ class TFixedSizeBlockItemHasher : public TBlockItemHasherBase +class TTzDateBlockItemHasher : public TBlockItemHasherBase, Nullable> { +public: + ui64 DoHash(TBlockItem value) const { + using TLayout = typename TDataType::TLayout; + TUnboxedValuePod uv {value.Get()}; + uv.SetTimezoneId(value.GetTimezoneId()); + return GetValueHash::Slot>(uv); + } +}; + template class TStringBlockItemHasher : public TBlockItemHasherBase, Nullable> { public: diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h index d42c9f16c7e0..3eaeb62f73ec 100644 --- a/ydb/library/yql/public/udf/arrow/block_reader.h +++ b/ydb/library/yql/public/udf/arrow/block_reader.h @@ -203,26 +203,16 @@ class TStringBlockReader final : public IBlockReader { } }; -template -class TTupleBlockReader final : public IBlockReader { +template +class TTupleBlockReaderBase : public IBlockReader { public: - TTupleBlockReader(TVector>&& children) - : Children(std::move(children)) - , Items(Children.size()) - {} - TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { if constexpr (Nullable) { if (IsNull(data, index)) { return {}; } } - - for (ui32 i = 0; i < Children.size(); ++i) { - Items[i] = Children[i]->GetItem(*data.child_data[i], index); - } - - return TBlockItem(Items.data()); + return static_cast(this)->GetChildrenItems(data, index); } TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { @@ -233,33 +223,87 @@ class TTupleBlockReader final : public IBlockReader { } const auto& structScalar = arrow::internal::checked_cast(scalar); + return static_cast(this)->GetChildrenScalarItems(structScalar); + } - for (ui32 i = 0; i < Children.size(); ++i) { - Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]); + ui64 GetDataWeight(const arrow::ArrayData& data) const final { + ui64 size = 0; + if constexpr (Nullable) { + size += data.length; } - return TBlockItem(Items.data()); + size += static_cast(this)->GetChildrenDataWeight(data); + return size; } - ui64 GetDataWeight(const arrow::ArrayData& data) const final { + ui64 GetDataWeight(TBlockItem item) const final { + return static_cast(this)->GetDataWeightImpl(item); + } + + ui64 GetDefaultValueWeight() const final { ui64 size = 0; if constexpr (Nullable) { - size += data.length; + size = 1; + } + size += static_cast(this)->GetChildrenDefaultDataWeight(); + return size; + } + + void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { + if constexpr (Nullable) { + if (IsNull(data, index)) { + return out.PushChar(0); + } + out.PushChar(1); } + static_cast(this)->SaveChildrenItems(data, index, out); + } + + void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final { + if constexpr (Nullable) { + if (!scalar.is_valid) { + return out.PushChar(0); + } + out.PushChar(1); + } + + const auto& structScalar = arrow::internal::checked_cast(scalar); + + static_cast(this)->SaveChildrenScalarItems(structScalar, out); + } +}; + +template +class TTupleBlockReader final : public TTupleBlockReaderBase> { +public: + TTupleBlockReader(TVector>&& children) + : Children(std::move(children)) + , Items(Children.size()) + {} + + TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) { for (ui32 i = 0; i < Children.size(); ++i) { - size += Children[i]->GetDataWeight(*data.child_data[i]); + Items[i] = Children[i]->GetItem(*data.child_data[i], index); } - return size; + return TBlockItem(Items.data()); } - ui64 GetDataWeight(TBlockItem item) const final { + TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) { + for (ui32 i = 0; i < Children.size(); ++i) { + Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]); + } + + return TBlockItem(Items.data()); + } + + size_t GetDataWeightImpl(const TBlockItem& item) const { const TBlockItem* items = nullptr; ui64 size = 0; if constexpr (Nullable) { if (!item) { - return GetDefaultValueWeight(); + return this->GetDefaultValueWeight(); } size = 1; items = item.GetOptionalValue().GetElements(); @@ -274,40 +318,30 @@ class TTupleBlockReader final : public IBlockReader { return size; } - ui64 GetDefaultValueWeight() const final { - ui64 size = 0; - if constexpr (Nullable) { - size = 1; - } + size_t GetChildrenDataWeight(const arrow::ArrayData& data) const { + size_t size = 0; for (ui32 i = 0; i < Children.size(); ++i) { - size += Children[i]->GetDefaultValueWeight(); + size += Children[i]->GetDataWeight(*data.child_data[i]); } + return size; } - void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { - if constexpr (Nullable) { - if (IsNull(data, index)) { - return out.PushChar(0); - } - out.PushChar(1); + size_t GetChildrenDefaultDataWeight() const { + size_t size = 0; + for (ui32 i = 0; i < Children.size(); ++i) { + size += Children[i]->GetDefaultValueWeight(); } + return size; + } + void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const { for (ui32 i = 0; i < Children.size(); ++i) { Children[i]->SaveItem(*data.child_data[i], index, out); } } - - void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final { - if constexpr (Nullable) { - if (!scalar.is_valid) { - return out.PushChar(0); - } - out.PushChar(1); - } - - const auto& structScalar = arrow::internal::checked_cast(scalar); - + + void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { for (ui32 i = 0; i < Children.size(); ++i) { Children[i]->SaveScalarItem(*structScalar.value[i], out); } @@ -318,6 +352,65 @@ class TTupleBlockReader final : public IBlockReader { TVector Items; }; +template +class TTzDateBlockReader final : public TTupleBlockReaderBase> { +public: + TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) { + Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2); + + TBlockItem item {DateReader_.GetItem(*data.child_data[0], index)}; + item.SetTimezoneId(TimezoneReader_.GetItem(*data.child_data[1], index).Get()); + return item; + } + + TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) { + Y_DEBUG_ABORT_UNLESS(structScalar.value.size() == 2); + + TBlockItem item {DateReader_.GetScalarItem(*structScalar.value[0])}; + item.SetTimezoneId(TimezoneReader_.GetScalarItem(*structScalar.value[1]).Get()); + return item; + } + + size_t GetChildrenDataWeight(const arrow::ArrayData& data) const { + Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2); + + size_t size = 0; + size += DateReader_.GetDataWeight(*data.child_data[0]); + size += TimezoneReader_.GetDataWeight(*data.child_data[1]); + return size; + } + + size_t GetDataWeightImpl(const TBlockItem& item) const { + Y_UNUSED(item); + return GetChildrenDefaultDataWeight(); + } + + size_t GetChildrenDefaultDataWeight() const { + ui64 size = 0; + if constexpr (Nullable) { + size = 1; + } + + size += DateReader_.GetDefaultValueWeight(); + size += TimezoneReader_.GetDefaultValueWeight(); + return size; + } + + void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const { + DateReader_.SaveItem(*data.child_data[0], index, out); + TimezoneReader_.SaveItem(*data.child_data[1], index, out); + } + + void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { + DateReader_.SaveScalarItem(*structScalar.value[0], out); + TimezoneReader_.SaveScalarItem(*structScalar.value[1], out); + } + +private: + TFixedSizeBlockReader::TLayout, /* Nullable */false> DateReader_; + TFixedSizeBlockReader TimezoneReader_; +}; + class TExternalOptionalBlockReader final : public IBlockReader { public: TExternalOptionalBlockReader(std::unique_ptr&& inner) @@ -390,6 +483,8 @@ struct TReaderTraits { using TExtOptional = TExternalOptionalBlockReader; template using TResource = TResourceBlockReader; + template + using TTzDateReader = TTzDateBlockReader; static std::unique_ptr MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -407,6 +502,15 @@ struct TReaderTraits { return std::make_unique>(); } } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique>(); + } else { + return std::make_unique>(); + } + } }; template @@ -544,7 +648,15 @@ std::unique_ptr MakeBlockReaderImpl(const ITypeInfoHe return MakeStringBlockReaderImpl(isOptional); case NUdf::EDataSlot::Json: return MakeStringBlockReaderImpl(isOptional); - default: + case NUdf::EDataSlot::TzDate: + return TTraits::template MakeTzDate(isOptional); + case NUdf::EDataSlot::TzDatetime: + return TTraits::template MakeTzDate(isOptional); + case NUdf::EDataSlot::TzTimestamp: + return TTraits::template MakeTzDate(isOptional); + case NUdf::EDataSlot::Uuid: + case NUdf::EDataSlot::Decimal: + case NUdf::EDataSlot::DyNumber: Y_ENSURE(false, "Unsupported data slot"); } } @@ -606,7 +718,10 @@ inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper, if (dataTypeInfo.Features & StringType) { props.MaxSize = {}; props.IsFixed = false; - } else { + } else if (dataTypeInfo.Features & TzDateType) { + *props.MaxSize += dataTypeInfo.FixedSize + sizeof(TTimezoneId); + } + else { *props.MaxSize += dataTypeInfo.FixedSize; } return; diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h index 669e6386b2e0..2231ff239e92 100644 --- a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h +++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h @@ -533,6 +533,25 @@ arrow::Status UnaryPreallocatedExecImpl(arrow::compute::KernelContext* ctx, cons return arrow::Status::OK(); } + +template +arrow::Status UnaryPreallocatedReaderExecImpl(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { + Y_UNUSED(ctx); + static_assert(std::is_base_of_v); + TReader reader; + + auto& inArray = batch.values[0].array(); + auto& outArray = res->array(); + TOutput* outValues = outArray->GetMutableValues(1); + auto length = inArray->length; + for (int64_t i = 0; i < length; ++i) { + auto item = reader.GetItem(*inArray, i); + outValues[i] = Core(item); + } + + return arrow::Status::OK(); +} + template Core(TInput)> struct TUnaryUnsafeFixedSizeFilterKernel { static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { diff --git a/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp b/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp index fcaa2815846a..a915bb7814a4 100644 --- a/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp +++ b/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp @@ -175,6 +175,30 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { UNIT_ASSERT_VALUES_EQUAL(resource2->GetResourceTag(), ResourceName); } + Y_UNIT_TEST(TestTzDateBuilder_Layout) { + TArrayBuilderTestData data; + const auto tzDateType = data.PgmBuilder.NewDataType(EDataSlot::TzDate); + const auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDateType, + *data.ArrowPool, MAX_BLOCK_SIZE, /* pgBuilder */ nullptr); + + auto makeTzDate = [] (ui16 val, ui16 tz) { + TUnboxedValuePod tzDate {val}; + tzDate.SetTimezoneId(tz); + return tzDate; + }; + + TVector dates{makeTzDate(1234, 1), makeTzDate(1234, 2), makeTzDate(45678, 333)}; + for (auto date: dates) { + arrayBuilder->Add(date); + } + + const auto datum = arrayBuilder->Build(true); + UNIT_ASSERT(datum.is_array()); + UNIT_ASSERT_VALUES_EQUAL(datum.length(), dates.size()); + const auto childData = datum.array()->child_data; + UNIT_ASSERT_VALUES_EQUAL_C(childData.size(), 2, "Expected date and timezone children"); + } + Y_UNIT_TEST(TestResourceStringValueBuilderReader) { TArrayBuilderTestData data; const auto resourceType = data.PgmBuilder.NewResourceType(ResourceName); diff --git a/ydb/library/yql/public/udf/udf_data_type.h b/ydb/library/yql/public/udf/udf_data_type.h index 7802f180fd73..476c3c313eb2 100644 --- a/ydb/library/yql/public/udf/udf_data_type.h +++ b/ydb/library/yql/public/udf/udf_data_type.h @@ -13,6 +13,8 @@ namespace NUdf { using TDataTypeId = ui16; +using TTimezoneId = ui16; + enum EDataTypeFeatures : ui32 { CanCompare = 1u << 0, HasDeterministicCompare = 1u << 1, diff --git a/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp b/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp index d587c3a6fccd..311168f300ac 100644 --- a/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp +++ b/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp @@ -38,6 +38,24 @@ class TToUnits { static TResult DateCore(ui16 value) { return value * ui32(86400) * TResult(ScaleAfterSeconds); } + + template + static TResult TzBlockCore(TBlockItem tzDate); + + template<> + static TResult TzBlockCore(TBlockItem tzDate) { + return DateCore(tzDate.Get()); + } + + template<> + static TResult TzBlockCore(TBlockItem tzDate) { + return DatetimeCore(tzDate.Get()); + } + + template<> + static TResult TzBlockCore(TBlockItem tzDate) { + return TimestampCore(tzDate.Get()); + } static TResult DatetimeCore(ui32 value) { return value * TResult(ScaleAfterSeconds); @@ -55,6 +73,12 @@ class TToUnits { static auto name = TStringRef(TFuncName, std::strlen(TFuncName)); return name; } + + template + static auto MakeTzBlockExec() { + using TReader = TTzDateBlockReader; + return UnaryPreallocatedReaderExecImpl>; + } static bool DeclareSignature( const TStringRef& name, @@ -135,8 +159,12 @@ class TToUnits { if (!typesOnly) { if (typeId == TDataType::Id || typeId == TDataType::Id) { if (block) { + const auto exec = (typeId == TDataType::Id) + ? MakeTzBlockExec() + : UnaryPreallocatedExecImpl; + builder.Implementation(new TSimpleArrowUdfImpl(argBlockTypes, outputType, block.IsScalar(), - UnaryPreallocatedExecImpl, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); + exec, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); } else { builder.Implementation(new TUnaryOverOptionalImpl()); } @@ -144,8 +172,12 @@ class TToUnits { if (typeId == TDataType::Id || typeId == TDataType::Id) { if (block) { + const auto exec = (typeId == TDataType::Id) + ? MakeTzBlockExec() + : UnaryPreallocatedExecImpl; + builder.Implementation(new TSimpleArrowUdfImpl(argBlockTypes, outputType, block.IsScalar(), - UnaryPreallocatedExecImpl, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); + exec, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); } else { builder.Implementation(new TUnaryOverOptionalImpl()); } @@ -153,8 +185,12 @@ class TToUnits { if (typeId == TDataType::Id || typeId == TDataType::Id) { if (block) { + const auto exec = (typeId == TDataType::Id) + ? MakeTzBlockExec() + : UnaryPreallocatedExecImpl; + builder.Implementation(new TSimpleArrowUdfImpl(argBlockTypes, outputType, block.IsScalar(), - UnaryPreallocatedExecImpl, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); + exec, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); } else { builder.Implementation(new TUnaryOverOptionalImpl()); }