Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@
* @author hank
* @create 2022-06-28
*/
public class Input
public abstract class Input
{
/**
* The unique id of the query.
*/
private long queryId;

public Input(long queryId)
{
this.queryId = queryId;
}

public long getQueryId()
{
return queryId;
}

public void setQueryId(long queryId)
{
this.queryId = queryId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* @author hank
* @create 2022-06-28
*/
public class Output
public abstract class Output
{
private String requestId;
private boolean successful;
Expand Down
17 changes: 14 additions & 3 deletions pixels-common/src/main/resources/pixels.properties
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,21 @@ join.partition.size.rows=20480000
aggr.partition.size.rows=1280000

### pixels-turbo - query execution ###
executor.input.storage=s3
executor.intermediate.storage=s3
executor.stage.completion.ratio=0.6
executor.input.storage.scheme=s3
executor.input.storage.endpoint=input-endpoint-dummy
executor.input.storage.access.key=input-ak-dummy
executor.input.storage.secret.key=input-sk-dummy
executor.intermediate.storage.scheme=s3
executor.intermediate.storage.endpoint=intermediate-endpoint-dummy
executor.intermediate.storage.access.key=intermediate-ak-dummy
executor.intermediate.storage.secret.key=intermediate-sk-dummy
executor.intermediate.folder=/pixels-lambda-test/
executor.output.storage.scheme=s3
executor.output.storage.endpoint=output-endpoint-dummy
executor.output.storage.access.key=output-ak-dummy
executor.output.storage.secret.key=output-sk-dummy
executor.output.folder=/pixels-lambda-test/
executor.stage.completion.ratio=0.6
executor.selectivity.enabled=true
# the number of threads used in each worker
executor.intra.worker.parallelism=8
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* The optimizer for serverless query plan.
*
* @author hank
* @date 6/20/22
* @create 2022-06-20
*/
public class PlanOptimizer
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

/**
* @author hank
* @date 05/07/2022
* @create 2022-07-05
*/
public class AggregationOperator extends Operator
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/**
* @author hank
* @date 05/06/2022
* @create 2022-06-05
*/
public abstract class JoinOperator extends Operator
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/**
* @author hank
* @date 05/07/2022
* @create 2022-07-05
*/
public abstract class Operator
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* The executor of a partitioned join.
*
* @author hank
* @date 04/06/2022
* @create 2022-06-04
*/
public class PartitionedJoinOperator extends SingleStageJoinOperator
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* The executor of a single-stage join.
*
* @author hank
* @date 04/06/2022
* @create 2022-06-04
*/
public class SingleStageJoinOperator extends JoinOperator
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

/**
* @author hank
* @date 05/07/2022
* @create 2022-07-05
*/
public class StarlingAggregationOperator extends Operator
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2023 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.planner.plan.physical.domain;

import io.pixelsdb.pixels.planner.plan.physical.input.AggregationInput;

import java.util.List;

/**
* The partial aggregates are seen as a table (i.e., materialized view). This is the
* information of the partial aggregates.
* @author hank
* @create 2023-04-29 (move fields from {@link AggregationInput} to here)
*/
public class AggregatedTableInfo extends TableInfo
{
/**
* The paths of the partial aggregated files.
*/
private List<String> inputFiles;
/**
* The number of threads to scan and aggregate the input files.
*/
private int parallelism;

public AggregatedTableInfo() { }

public AggregatedTableInfo(String tableName, boolean base, String[] columnsToRead,
StorageInfo storageInfo, List<String> inputFiles, int parallelism)
{
super(tableName, base, columnsToRead, storageInfo);
this.inputFiles = inputFiles;
this.parallelism = parallelism;
}

public List<String> getInputFiles()
{
return inputFiles;
}

public void setInputFiles(List<String> inputFiles)
{
this.inputFiles = inputFiles;
}

public int getParallelism()
{
return parallelism;
}

public void setParallelism(int parallelism)
{
this.parallelism = parallelism;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2023 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.planner.plan.physical.domain;

import io.pixelsdb.pixels.executor.aggregation.FunctionType;
import io.pixelsdb.pixels.planner.plan.physical.input.AggregationInput;

import java.util.List;

/**
* The information of the final aggregation.
* @author hank
* @create 2023-04-29 (move fields from {@link AggregationInput} to here)
*/
public class AggregationInfo
{
/**
* Whether the input files are partitioned.
*/
private boolean inputPartitioned;
/**
* The hash values to be processed by this aggregation worker.
*/
private List<Integer> hashValues;
/**
* The number of partitions in the input files.
*/
private int numPartition;
/**
* The column ids of the group-key columns in columnsToRead.
*/
private int[] groupKeyColumnIds;
/**
* The column ids of the aggregate columns in columnsToRead.
*/
private int[] aggregateColumnIds;
/**
* The column names of the group-key columns in the aggregation result.
*/
private String[] groupKeyColumnNames;
/**
* If a group-key column appears in the aggregation result,
* the corresponding element in this array should be true, and vice versa.
*/
private boolean[] groupKeyColumnProjection;
/**
* The column names of the aggregated columns in the aggregation result.
*/
private String[] resultColumnNames;
/**
* The display name of the data types of the result columns.
* They should be parsed by the TypeDescription in Pixels.
*/
private String[] resultColumnTypes;
/**
* The aggregation functions, in the same order of resultColumnNames.
*/
private FunctionType[] functionTypes;

public AggregationInfo() { }

public AggregationInfo(boolean inputPartitioned, List<Integer> hashValues, int numPartition,
int[] groupKeyColumnIds, int[] aggregateColumnIds, String[] groupKeyColumnNames,
boolean[] groupKeyColumnProjection, String[] resultColumnNames,
String[] resultColumnTypes, FunctionType[] functionTypes)
{
this.inputPartitioned = inputPartitioned;
this.hashValues = hashValues;
this.numPartition = numPartition;
this.groupKeyColumnIds = groupKeyColumnIds;
this.aggregateColumnIds = aggregateColumnIds;
this.groupKeyColumnNames = groupKeyColumnNames;
this.groupKeyColumnProjection = groupKeyColumnProjection;
this.resultColumnNames = resultColumnNames;
this.resultColumnTypes = resultColumnTypes;
this.functionTypes = functionTypes;
}

public boolean isInputPartitioned()
{
return inputPartitioned;
}

public void setInputPartitioned(boolean inputPartitioned)
{
this.inputPartitioned = inputPartitioned;
}

public List<Integer> getHashValues()
{
return hashValues;
}

public void setHashValues(List<Integer> hashValues)
{
this.hashValues = hashValues;
}

public int getNumPartition()
{
return numPartition;
}

public void setNumPartition(int numPartition)
{
this.numPartition = numPartition;
}

public int[] getGroupKeyColumnIds()
{
return groupKeyColumnIds;
}

public void setGroupKeyColumnIds(int[] groupKeyColumnIds)
{
this.groupKeyColumnIds = groupKeyColumnIds;
}

public int[] getAggregateColumnIds()
{
return aggregateColumnIds;
}

public void setAggregateColumnIds(int[] aggregateColumnIds)
{
this.aggregateColumnIds = aggregateColumnIds;
}

public String[] getGroupKeyColumnNames()
{
return groupKeyColumnNames;
}

public void setGroupKeyColumnNames(String[] groupKeyColumnNames)
{
this.groupKeyColumnNames = groupKeyColumnNames;
}

public boolean[] getGroupKeyColumnProjection()
{
return groupKeyColumnProjection;
}

public void setGroupKeyColumnProjection(boolean[] groupKeyColumnProjection)
{
this.groupKeyColumnProjection = groupKeyColumnProjection;
}

public String[] getResultColumnNames()
{
return resultColumnNames;
}

public void setResultColumnNames(String[] resultColumnNames)
{
this.resultColumnNames = resultColumnNames;
}

public String[] getResultColumnTypes()
{
return resultColumnTypes;
}

public void setResultColumnTypes(String[] resultColumnTypes)
{
this.resultColumnTypes = resultColumnTypes;
}

public FunctionType[] getFunctionTypes()
{
return functionTypes;
}

public void setFunctionTypes(FunctionType[] functionTypes)
{
this.functionTypes = functionTypes;
}
}
Loading