Skip to content

Commit

Permalink
Convert PartitionTransforms.ColumnTransform to record
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr authored and wendigo committed Jun 25, 2024
1 parent 2524c21 commit 98cd650
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public IcebergBucketFunction(
FieldInfo fieldInfo = nameToFieldInfo.get(fieldName);
checkArgument(fieldInfo != null, "partition field not found: %s", field);
ColumnTransform transform = getColumnTransform(field, fieldInfo.type());
return new PartitionColumn(fieldInfo.sourceChannel(), transform.getValueTransform(), transform.getType(), fieldInfo.path());
return new PartitionColumn(fieldInfo.sourceChannel(), transform.valueTransform(), transform.type(), fieldInfo.path());
})
.collect(toImmutableList());
hashCodeInvokers = partitionColumns.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ private static PartitionColumn getPartitionColumn(PartitionField field, List<Ice
List<Integer> sourceChannels = getIndexPathToField(schema, getNestedFieldIds(schema, field.sourceId()));
Type sourceType = handles.get(idChannels.get(field.sourceId())).getType();
ColumnTransform transform = getColumnTransform(field, sourceType);
return new PartitionColumn(field, sourceChannels, sourceType, transform.getType(), transform.getBlockTransform());
return new PartitionColumn(field, sourceChannels, sourceType, transform.type(), transform.blockTransform());
}

private static List<Integer> getNestedFieldIds(Types.StructType schema, Integer sourceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ private static boolean canEnforceConstraintWithPartitionField(TypeOperators type
boolean canEnforce = valueSet.getValuesProcessor().transform(
ranges -> {
MethodHandle targetTypeEqualOperator = typeOperators.getEqualOperator(
transform.getType(), InvocationConvention.simpleConvention(FAIL_ON_NULL, NEVER_NULL, NEVER_NULL));
transform.type(), InvocationConvention.simpleConvention(FAIL_ON_NULL, NEVER_NULL, NEVER_NULL));
for (Range range : ranges.getOrderedRanges()) {
if (!canEnforceRangeWithPartitioningField(field, transform, range, targetTypeEqualOperator)) {
return false;
Expand All @@ -544,7 +544,7 @@ private static boolean canEnforceConstraintWithPartitionField(TypeOperators type

private static boolean canEnforceRangeWithPartitioningField(PartitionField field, ColumnTransform transform, Range range, MethodHandle targetTypeEqualOperator)
{
if (!transform.isMonotonic()) {
if (!transform.monotonic()) {
// E.g. bucketing transform
return false;
}
Expand Down Expand Up @@ -579,8 +579,8 @@ private static boolean yieldSamePartitioningValue(
{
requireNonNull(first, "first is null");
requireNonNull(second, "second is null");
Object firstTransformed = transform.getValueTransform().apply(nativeValueToBlock(sourceType, first), 0);
Object secondTransformed = transform.getValueTransform().apply(nativeValueToBlock(sourceType, second), 0);
Object firstTransformed = transform.valueTransform().apply(nativeValueToBlock(sourceType, first), 0);
Object secondTransformed = transform.valueTransform().apply(nativeValueToBlock(sourceType, second), 0);
// The pushdown logic assumes NULLs and non-NULLs are segregated, so that we have to think about non-null values only.
verify(firstTransformed != null && secondTransformed != null, "Transform for %s returned null for non-null input", field);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,56 +752,22 @@ private interface Hasher
int hash(Block block, int position);
}

public static class ColumnTransform
{
private final Type type;
private final boolean preservesNonNull;
private final boolean monotonic;
private final boolean temporal;
private final Function<Block, Block> blockTransform;
private final ValueTransform valueTransform;

public ColumnTransform(Type type, boolean preservesNonNull, boolean monotonic, boolean temporal, Function<Block, Block> blockTransform, ValueTransform valueTransform)
{
this.type = requireNonNull(type, "type is null");
this.preservesNonNull = preservesNonNull;
this.monotonic = monotonic;
this.temporal = temporal;
this.blockTransform = requireNonNull(blockTransform, "transform is null");
this.valueTransform = requireNonNull(valueTransform, "valueTransform is null");
}

/**
* Result type.
*/
public Type getType()
{
return type;
}

public boolean preservesNonNull()
{
return preservesNonNull;
}

public boolean isMonotonic()
{
return monotonic;
}

public boolean isTemporal()
{
return temporal;
}

public Function<Block, Block> getBlockTransform()
{
return blockTransform;
}

public ValueTransform getValueTransform()
/**
* @param type Result type.
*/
public record ColumnTransform(
Type type,
boolean preservesNonNull,
boolean monotonic,
boolean temporal,
Function<Block, Block> blockTransform,
ValueTransform valueTransform)
{
public ColumnTransform
{
return valueTransform;
requireNonNull(type, "type is null");
requireNonNull(blockTransform, "transform is null");
requireNonNull(valueTransform, "valueTransform is null");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ private List<ColumnMetadata> columnsForMaterializedView(ConnectorMaterializedVie
Types.NestedField sourceField = schemaWithTimestampTzPreserved.findField(partitionField.sourceId());
Type sourceType = toTrinoType(sourceField.type(), typeManager);
ColumnTransform columnTransform = getColumnTransform(partitionField, sourceType);
if (!columnTransform.isTemporal()) {
if (!columnTransform.temporal()) {
return Stream.of();
}
return Stream.of(sourceField.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private Integer computeTrinoBucket(Type icebergType, Object icebergValue, int bu
{
io.trino.spi.type.Type trinoType = toTrinoType(icebergType, TYPE_MANAGER);
ColumnTransform transform = PartitionTransforms.bucket(trinoType, bucketCount);
Function<Block, Block> blockTransform = transform.getBlockTransform();
Function<Block, Block> blockTransform = transform.blockTransform();

BlockBuilder blockBuilder = trinoType.createBlockBuilder(null, 1);

Expand All @@ -285,7 +285,7 @@ private Integer computeTrinoBucket(Type icebergType, Object icebergValue, int bu
verify(bucketBlock.getPositionCount() == 1);
Integer trinoBucketWithBlock = bucketBlock.isNull(0) ? null : INTEGER.getInt(bucketBlock, 0);

Long trinoBucketWithValue = (Long) transform.getValueTransform().apply(block, 0);
Long trinoBucketWithValue = (Long) transform.valueTransform().apply(block, 0);
Integer trinoBucketWithValueAsInteger = trinoBucketWithValue == null ? null : toIntExact(trinoBucketWithValue);
assertThat(trinoBucketWithValueAsInteger).isEqualTo(trinoBucketWithBlock);

Expand Down

0 comments on commit 98cd650

Please sign in to comment.