Skip to content

Commit

Permalink
* address remaining comments from apache#15908
Browse files Browse the repository at this point in the history
  • Loading branch information
zachjsh committed Feb 27, 2024
1 parent e6c3b3b commit d30a729
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,11 @@ private void createTableMetadata(TableMetadata table)
}

/**
* If the segment grain is given in the catalog then use this value is used.
* If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the
* value from the catalog.
*/
@Test
public void testInsertHourGrain()
public void testInsertHourGrainPartitonedByFromCatalog()
{
testIngestionQuery()
.sql("INSERT INTO hourDs\n" +
Expand All @@ -152,7 +153,7 @@ public void testInsertHourGrain()
* the query value is used.
*/
@Test
public void testInsertHourGrainWithDay()
public void testInsertHourGrainWithDayPartitonedByFromQuery()
{
testIngestionQuery()
.sql("INSERT INTO hourDs\n" +
Expand All @@ -171,4 +172,53 @@ public void testInsertHourGrainWithDay()
)
.verify();
}

/**
* If the segment grain is given in the catalog and absent in the PARTITIONED BY clause in the query, then use the
* value from the catalog.
*/
@Test
public void testReplaceHourGrainPartitonedByFromCatalog()
{
testIngestionQuery()
.sql("REPLACE INTO hourDs OVERWRITE ALL\n" +
"SELECT * FROM foo")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2")
.context(queryContextWithGranularity(Granularities.HOUR))
.build()
)
.verify();
}

/**
* If the segment grain is given in the catalog, and also by PARTITIONED BY, then
* the query value is used.
*/
@Test
public void testReplaceHourGrainWithDayPartitonedByFromQuery()
{
testIngestionQuery()
.sql("REPLACE INTO hourDs OVERWRITE ALL\n" +
"SELECT * FROM foo\n" +
"PARTITIONED BY day")
.authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
.expectTarget("hourDs", FOO_SIGNATURE)
.expectResources(dataSourceWrite("hourDs"), dataSourceRead("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "extra1", "extra2", "extra3", "m1", "m2")
.context(queryContextWithGranularity(Granularities.DAY))
.build()
)
.verify();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.sql.calcite.planner;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.prepare.BaseDruidSqlValidator;
import org.apache.calcite.prepare.CalciteCatalogReader;
Expand All @@ -34,6 +33,7 @@
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlUtil;
Expand Down Expand Up @@ -64,6 +64,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;

/**
Expand All @@ -77,14 +78,6 @@ class DruidSqlValidator extends BaseDruidSqlValidator
// Copied here from MSQE since that extension is not visible here.
public static final String CTX_ROWS_PER_SEGMENT = "msqRowsPerSegment";

public interface ValidatorContext
{
Map<String, Object> queryContextMap();
CatalogResolver catalog();
String druidSchemaName();
ObjectMapper jsonMapper();
}

private final PlannerContext plannerContext;

protected DruidSqlValidator(
Expand Down Expand Up @@ -153,6 +146,12 @@ public void validateWindow(SqlNode windowOrId, SqlValidatorScope scope, @Nullabl
super.validateWindow(windowOrId, scope, call);
}

/**
* Most of the implementation here is copied over from {@link org.apache.calcite.sql.validate.SqlValidator#validateInsert(SqlInsert)}
* we've extended, refactored, and extracted methods, to fit out needs, and added comments where appropriate.
*
* @param insert INSERT statement
*/
@Override
public void validateInsert(final SqlInsert insert)
{
Expand All @@ -173,7 +172,10 @@ public void validateInsert(final SqlInsert insert)
}

// The target namespace is both the target table ID and the row type for that table.
final SqlValidatorNamespace targetNamespace = getNamespace(insert);
final SqlValidatorNamespace targetNamespace = Objects.requireNonNull(
getNamespace(insert),
() -> "namespace for " + insert
);
final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
// The target is a new or existing datasource.
final DatasourceTable table = validateInsertTarget(targetNamespace, insertNs, operationName);
Expand Down Expand Up @@ -379,10 +381,11 @@ private RelDataType validateTargetType(
for (final RelDataTypeField sourceField : sourceFields) {
// Check that there are no unnamed columns in the insert.
if (UNNAMED_COLUMN_PATTERN.matcher(sourceField.getName()).matches()) {
throw InvalidSqlInput.exception(
throw buildCalciteContextException(
"Insertion requires columns to be named, but at least one of the columns was unnamed. This is usually "
+ "the result of applying a function without having an AS clause, please ensure that all function calls"
+ "are named with an AS clause as in \"func(X) as myColumn\"."
+ "are named with an AS clause as in \"func(X) as myColumn\".",
getSqlNodeFor(insert, sourceFields.indexOf(sourceField))
);
}
}
Expand Down Expand Up @@ -523,4 +526,17 @@ private CalciteContextException buildCalciteContextException(String message, Sql
pos.getEndLineNum(),
pos.getEndColumnNum());
}

private SqlNode getSqlNodeFor(SqlInsert insert, int idx)
{
SqlNode src = insert.getSource();
if (src instanceof SqlSelect) {
SqlSelect sqlSelect = (SqlSelect) src;
SqlNodeList selectList = sqlSelect.getSelectList();
if (idx < selectList.size()) {
return selectList.get(idx);
}
}
return src;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,15 @@ public void testInsertWithInvalidColumnNameInIngest()
.verify();
}

@Test
public void testInsertWithInvalidColumnName2InIngest()
{
testIngestionQuery()
.sql("INSERT INTO t SELECT __time, 1+1 FROM foo PARTITIONED BY ALL")
.expectValidationError(invalidSqlContains("Insertion requires columns to be named"))
.verify();
}

@Test
public void testInsertWithUnnamedColumnInNestedSelectStatement()
{
Expand Down

0 comments on commit d30a729

Please sign in to comment.