Skip to content

Commit

Permalink
Fix Kafka CSV, JSON and RAW encoders to use mappings
Browse files Browse the repository at this point in the history
See #5791.
  • Loading branch information
hashhar authored and losipiuk committed Nov 21, 2020
1 parent ffcdd48 commit e42f44a
Show file tree
Hide file tree
Showing 10 changed files with 632 additions and 18 deletions.
Expand Up @@ -126,100 +126,100 @@ private DateTimeFormat parseDataFormat(String dataFormat, String columnName)
}
}

private String currentColumnName()
private String currentColumnMapping()
{
return columnHandles.get(currentColumnIndex).getName();
return columnHandles.get(currentColumnIndex).getMapping();
}

@Override
protected void appendNullValue()
{
node.putNull(currentColumnName());
node.putNull(currentColumnMapping());
}

@Override
protected void appendLong(long value)
{
node.put(currentColumnName(), value);
node.put(currentColumnMapping(), value);
}

@Override
protected void appendInt(int value)
{
node.put(currentColumnName(), value);
node.put(currentColumnMapping(), value);
}

@Override
protected void appendShort(short value)
{
node.put(currentColumnName(), value);
node.put(currentColumnMapping(), value);
}

@Override
protected void appendByte(byte value)
{
node.put(currentColumnName(), value);
node.put(currentColumnMapping(), value);
}

@Override
protected void appendDouble(double value)
{
node.put(currentColumnName(), value);
node.put(currentColumnMapping(), value);
}

@Override
protected void appendFloat(float value)
{
node.put(currentColumnName(), value);
node.put(currentColumnMapping(), value);
}

@Override
protected void appendBoolean(boolean value)
{
node.put(currentColumnName(), value);
node.put(currentColumnMapping(), value);
}

@Override
protected void appendString(String value)
{
node.put(currentColumnName(), value);
node.put(currentColumnMapping(), value);
}

@Override
protected void appendByteBuffer(ByteBuffer value)
{
node.put(currentColumnName(), value.array());
node.put(currentColumnMapping(), value.array());
}

@Override
protected void appendSqlDate(SqlDate value)
{
node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatDate(value));
node.put(currentColumnMapping(), dateTimeFormatters.get(currentColumnIndex).formatDate(value));
}

@Override
protected void appendSqlTime(SqlTime value)
{
int precision = ((TimeType) columnHandles.get(currentColumnIndex).getType()).getPrecision();
node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTime(value, precision));
node.put(currentColumnMapping(), dateTimeFormatters.get(currentColumnIndex).formatTime(value, precision));
}

@Override
protected void appendSqlTimeWithTimeZone(SqlTimeWithTimeZone value)
{
node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTimeWithZone(value));
node.put(currentColumnMapping(), dateTimeFormatters.get(currentColumnIndex).formatTimeWithZone(value));
}

@Override
protected void appendSqlTimestamp(SqlTimestamp value)
{
node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTimestamp(value));
node.put(currentColumnMapping(), dateTimeFormatters.get(currentColumnIndex).formatTimestamp(value));
}

@Override
protected void appendSqlTimestampWithTimeZone(SqlTimestampWithTimeZone value)
{
node.put(currentColumnName(), dateTimeFormatters.get(currentColumnIndex).formatTimestampWithZone(value));
node.put(currentColumnMapping(), dateTimeFormatters.get(currentColumnIndex).formatTimestampWithZone(value));
}

@Override
Expand Down
Expand Up @@ -26,6 +26,9 @@ public class MillisecondsSinceEpochFormatter
{
public static boolean isSupportedType(Type type)
{
// milliseconds-since-epoch cannot encode a timezone hence writing TIMESTAMP WITH TIME ZONE
// is not supported to avoid losing the irrecoverable timezone information after write.
// TODO allow TIMESTAMP_TZ_MILLIS to be inserted too https://github.com/prestosql/presto/issues/5955
return type.equals(TIME_MILLIS) ||
type.equals(TIMESTAMP_MILLIS);
}
Expand Down
Expand Up @@ -27,6 +27,9 @@ public class SecondsSinceEpochFormatter
{
public static boolean isSupportedType(Type type)
{
// seconds-since-epoch cannot encode a timezone hence writing TIMESTAMP WITH TIME ZONE
// is not supported to avoid losing the irrecoverable timezone information after write.
// TODO allow TIMESTAMP_TZ_MILLIS to be inserted too https://github.com/prestosql/presto/issues/5955
return type.equals(TIME_MILLIS) ||
type.equals(TIMESTAMP_MILLIS);
}
Expand Down
Expand Up @@ -168,7 +168,7 @@ private static int parseOffset(String group, String offsetName, String columnNam
private static FieldType parseFieldType(String dataFormat, String columnName)
{
try {
if (!dataFormat.equals("")) {
if (dataFormat != null && !dataFormat.equals("")) {
return FieldType.valueOf(dataFormat.toUpperCase(Locale.ENGLISH));
}
return FieldType.BYTE;
Expand Down
Expand Up @@ -6,6 +6,10 @@ kafka.table-names=product_tests.read_simple_key_and_value,\
product_tests.read_all_datatypes_avro,\
product_tests.read_all_null_avro,\
product_tests.read_structural_datatype_avro,\
product_tests.write_simple_key_and_value,\
product_tests.write_all_datatypes_raw,\
product_tests.write_all_datatypes_csv,\
product_tests.write_all_datatypes_json,\
product_tests.write_all_datatypes_avro,\
product_tests.write_structural_datatype_avro
kafka.nodes=kafka:9092
Expand Down
@@ -0,0 +1,49 @@
{
"tableName": "write_all_datatypes_csv",
"schemaName": "product_tests",
"topicName": "write_all_datatypes_csv",
"message": {
"dataFormat": "csv",
"fields": [
{
"name": "c_varchar",
"type": "VARCHAR",
"mapping": "0"
},
{
"name": "c_bigint",
"type": "BIGINT",
"mapping": "1"
},
{
"name": "c_integer",
"type": "INTEGER",
"mapping": "2"
},
{
"name": "c_smallint",
"type": "SMALLINT",
"mapping": "3"
},
{
"name": "c_tinyint",
"type": "TINYINT",
"mapping": "4"
},
{
"name": "c_double",
"type": "DOUBLE",
"mapping": "5"
},
{
"name": "c_boolean",
"type": "BOOLEAN",
"mapping": "6"
}
]
},
"key": {
"dataFormat": "raw",
"fields": []
}
}
@@ -0,0 +1,157 @@
{
"tableName": "write_all_datatypes_json",
"schemaName": "product_tests",
"topicName": "write_all_datatypes_json",
"message": {
"dataFormat": "json",
"fields": [
{
"name": "c_varchar",
"type": "VARCHAR",
"mapping": "j_varchar"
},
{
"name": "c_bigint",
"type": "BIGINT",
"mapping": "j_bigint"
},
{
"name": "c_integer",
"type": "INTEGER",
"mapping": "j_integer"
},
{
"name": "c_smallint",
"type": "SMALLINT",
"mapping": "j_smallint"
},
{
"name": "c_tinyint",
"type": "TINYINT",
"mapping": "j_tinyint"
},
{
"name": "c_double",
"type": "DOUBLE",
"mapping": "j_double"
},
{
"name": "c_boolean",
"type": "BOOLEAN",
"mapping": "j_boolean"
},
{
"name": "c_timestamp_milliseconds_since_epoch",
"type": "TIMESTAMP",
"mapping": "j_timestamp_milliseconds_since_epoch",
"dataFormat": "milliseconds-since-epoch"
},
{
"name": "c_timestamp_seconds_since_epoch",
"type": "TIMESTAMP",
"mapping": "j_timestamp_seconds_since_epoch",
"dataFormat": "seconds-since-epoch"
},
{
"name": "c_timestamp_iso8601",
"type": "TIMESTAMP",
"mapping": "j_timestamp_iso8601",
"dataFormat": "iso8601"
},
{
"name": "c_timestamp_rfc2822",
"type": "TIMESTAMP",
"mapping": "j_timestamp_rfc2822",
"dataFormat": "rfc2822"
},
{
"name": "c_timestamp_custom",
"type": "TIMESTAMP",
"mapping": "j_timestamp_custom",
"dataFormat": "custom-date-time",
"formatHint": "MM/yyyy/dd H:m:s"
},
{
"name": "c_date_iso8601",
"type": "DATE",
"mapping": "j_date_iso8601",
"dataFormat": "iso8601"
},
{
"name": "c_date_custom",
"type": "DATE",
"mapping": "j_date_custom",
"dataFormat": "custom-date-time",
"formatHint": "yyyy/dd/MM"
},
{
"name": "c_time_milliseconds_since_epoch",
"type": "TIME",
"mapping": "j_time_milliseconds_since_epoch",
"dataFormat": "milliseconds-since-epoch"
},
{
"name": "c_time_seconds_since_epoch",
"type": "TIME",
"mapping": "j_time_seconds_since_epoch",
"dataFormat": "seconds-since-epoch"
},
{
"name": "c_time_iso8601",
"type": "TIME",
"mapping": "j_time_iso8601",
"dataFormat": "iso8601"
},
{
"name": "c_time_custom",
"type": "TIME",
"mapping": "j_time_custom",
"dataFormat": "custom-date-time",
"formatHint": "mm:HH:ss"
},
{
"name": "c_timestamptz_iso8601",
"type": "TIMESTAMP WITH TIME ZONE",
"mapping": "j_timestamptz_iso8601",
"dataFormat": "iso8601"
},
{
"name": "c_timestamptz_rfc2822",
"type": "TIMESTAMP WITH TIME ZONE",
"mapping": "j_timestamptz_rfc2822",
"dataFormat": "rfc2822"
},
{
"name": "c_timestamptz_custom",
"type": "TIMESTAMP WITH TIME ZONE",
"mapping": "j_timestamptz_custom",
"dataFormat": "custom-date-time",
"formatHint": "MM/yyyy/dd H:m:s ZZZ"
},
{
"name": "c_timestamptz_custom_with_zone",
"type": "TIMESTAMP WITH TIME ZONE",
"mapping": "j_timestamptz_custom_with_zone",
"dataFormat": "custom-date-time",
"formatHint": "MM/yyyy/dd H:m:s ZZZ"
},
{
"name": "c_timetz_iso8601",
"type": "TIME WITH TIME ZONE",
"mapping": "j_timetz_iso8601",
"dataFormat": "iso8601"
},
{
"name": "c_timetz_custom",
"type": "TIME WITH TIME ZONE",
"mapping": "j_timetz_custom",
"dataFormat": "custom-date-time",
"formatHint": "mm:HH:ss ZZ"
}
]
},
"key": {
"dataFormat": "raw",
"fields": []
}
}

0 comments on commit e42f44a

Please sign in to comment.