Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ struct TEvaluationGraphInfo {
NActors::TActorId ResultId;
NThreading::TPromise<NYql::IDqGateway::TResult> Result;
ui64 Index = 0;
TMaybe<NCommon::TResultFormatSettings> ResultFormatSettings;
};

class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
Expand Down Expand Up @@ -1279,7 +1280,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {

LOG_D("Query evaluation " << NYql::NDqProto::StatusIds_StatusCode_Name(QueryEvalStatusCode)
<< ". " << it->second.Index << " response. Issues count: " << result.IssuesSize()
<< ". Rows count: " << result.GetRowsCount());
<< ". Rows count: " << result.GetRowsCount() << ", Sample count: " << result.SampleSize() << ", Truncated: " << result.GetTruncated());

TVector<NDq::TDqSerializedBatch> rows;
for (const auto& s : result.GetSample()) {
Expand All @@ -1288,11 +1289,12 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
rows.emplace_back(std::move(batch));
}

TProtoBuilder protoBuilder(ResultFormatSettings->ResultType, ResultFormatSettings->Columns);
const auto& resultFormatSettings = it->second.ResultFormatSettings;
TProtoBuilder protoBuilder(resultFormatSettings->ResultType, resultFormatSettings->Columns);

bool ysonTruncated = false;
queryResult.Data = protoBuilder.BuildYson(std::move(rows), ResultFormatSettings->SizeLimit.GetOrElse(Max<ui64>()),
ResultFormatSettings->RowsLimit.GetOrElse(Max<ui64>()), &ysonTruncated);
queryResult.Data = protoBuilder.BuildYson(std::move(rows), resultFormatSettings->SizeLimit.GetOrElse(Max<ui64>()),
resultFormatSettings->RowsLimit.GetOrElse(Max<ui64>()), &ysonTruncated);

queryResult.RowsCount = result.GetRowsCount();
queryResult.Truncated = result.GetTruncated() || ysonTruncated;
Expand Down Expand Up @@ -1546,7 +1548,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
*request.MutableSettings() = dqGraphParams.GetSettings();
*request.MutableSecureParams() = dqGraphParams.GetSecureParams();
*request.MutableColumns() = dqGraphParams.GetColumns();
PrepareResultFormatSettings(dqGraphParams, *dqConfiguration);
PrepareResultFormatSettings(info.ResultFormatSettings, dqGraphParams, *dqConfiguration);
NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram());
Send(info.ExecuterId, new NYql::NDqs::TEvGraphRequest(request, info.ControlId, info.ResultId));
LOG_D("Evaluation Executer: " << info.ExecuterId << ", Controller: " << info.ControlId << ", ResultActor: " << info.ResultId);
Expand Down Expand Up @@ -1585,7 +1587,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
ExecuterId, dqGraphParams.GetResultType(),
writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ResultBytesLimit));

PrepareResultFormatSettings(dqGraphParams, *dqConfiguration);
PrepareResultFormatSettings(ResultFormatSettings, dqGraphParams, *dqConfiguration);
} else {
LOG_D("ResultWriter was NOT CREATED since ResultType is empty");
resultId = ExecuterId;
Expand Down Expand Up @@ -1652,15 +1654,15 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
LOG_D("Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId);
}

void PrepareResultFormatSettings(NFq::NProto::TGraphParams& dqGraphParams, const TDqConfiguration& dqConfiguration) {
ResultFormatSettings.ConstructInPlace();
void PrepareResultFormatSettings(TMaybe<NCommon::TResultFormatSettings>& resultFormatSettings, NFq::NProto::TGraphParams& dqGraphParams, const TDqConfiguration& dqConfiguration) {
resultFormatSettings.ConstructInPlace();
for (const auto& c : dqGraphParams.GetColumns()) {
ResultFormatSettings->Columns.push_back(c);
resultFormatSettings->Columns.push_back(c);
}

ResultFormatSettings->ResultType = dqGraphParams.GetResultType();
ResultFormatSettings->SizeLimit = dqConfiguration._AllResultsBytesLimit.Get();
ResultFormatSettings->RowsLimit = dqConfiguration._RowsLimitPerWrite.Get();
resultFormatSettings->ResultType = dqGraphParams.GetResultType();
resultFormatSettings->SizeLimit = dqConfiguration._AllResultsBytesLimit.Get();
resultFormatSettings->RowsLimit = dqConfiguration._RowsLimitPerWrite.Get();
}

void ClearResultFormatSettings() {
Expand Down
76 changes: 76 additions & 0 deletions ydb/tests/fq/s3/test_s3_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,3 +1167,79 @@ def test_double_optional_types_validation(self, kikimr, s3, client, unique_prefi
issues = str(client.describe_query(query_id).result.query.issue)

assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues

@yq_v1
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_precompute_with_different_result_types(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("fbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

fruits = '''f1,f2,f3
Banana,3,100'''
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='file1.csv', ContentType='text/plain')
fruits = '''f3,f4
Banana,3'''
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='file2.csv', ContentType='text/plain')

kikimr.control_plane.wait_bootstrap(1)
storage_connection_name = unique_prefix + "fruitbucket"
client.create_storage_connection(storage_connection_name, "fbucket")

sql = f'''
$input1 =
SELECT AGGREGATE_LIST( AsStruct(f1 AS dns_mining_pool))
FROM `{storage_connection_name}`.`file1.csv`
WITH (format=csv_with_names, SCHEMA (
f1 String NOT NULL,
f2 Int NOT NULL,
f3 Int NOT NULL
));

$input2 =
SELECT AGGREGATE_LIST( AsStruct(f3 AS dns_f_query, f4 AS dns_query1wewqwer) )
FROM `{storage_connection_name}`.`file2.csv`
WITH (format=csv_with_names, SCHEMA (
f3 String NOT NULL,
f4 Int NOT NULL
));

$f1 = () -> {{
RETURN ListHead(ListMap(
$input1,
($r) -> (
AsStruct("1" AS dns_mining_pool)
)
))
}};

$f2 = () -> {{
RETURN ListHead(ListMap(
$input2,
($r) -> (
AsStruct("2" AS dns_f_query)
)
))
}};

$parsed = SELECT $f1() AS f1, $f2() AS f2;

$parsed =
SELECT
p.f1.dns_mining_pool AS f1, p.f2.dns_f_query AS f2
FROM $parsed AS p;

SELECT SOME("MinersPoolsViaDNS") AS event_class FROM $parsed
WHERE (f1 == f2 )
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
Loading