Skip to content

Commit

Permalink
NIFI-8023: Convert java.sql.Date between UTC/local time zone normaliz…
Browse files Browse the repository at this point in the history
…ed forms before/after database operations

This closes apache#4781

Signed-off-by: David Handermann <exceptionfactory@gmail.com>
  • Loading branch information
turcsanyip authored and krisztina-zsihovszki committed Jun 27, 2022
1 parent d875f86 commit ea09cee
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.nifi.serialization.record;

import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -149,6 +150,12 @@ private Object normalizeValue(final Object value) throws SQLException {
return null;
}

if (value instanceof java.sql.Date) {
// Date objects should be stored in records as UTC normalized dates (UTC 00:00:00)
// but they come from the driver in JVM's local time zone 00:00:00 and need to be converted.
return DataTypeUtils.convertDateToUTC((java.sql.Date) value);
}

if (value instanceof List) {
return ((List) value).toArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1085,6 +1089,32 @@ public static java.sql.Date toDate(final Object value, final Supplier<DateFormat
throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date for field " + fieldName);
}

/**
* Converts a java.sql.Date object in local time zone (typically coming from a java.sql.ResultSet and having 00:00:00 time part)
* to UTC normalized form (storing the epoch corresponding to the UTC time with the same date/time as the input).
*
* @param dateLocalTZ java.sql.Date in local time zone
* @return java.sql.Date in UTC normalized form
*/
public static Date convertDateToUTC(Date dateLocalTZ) {
ZonedDateTime zdtLocalTZ = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateLocalTZ.getTime()), ZoneId.systemDefault());
ZonedDateTime zdtUTC = zdtLocalTZ.withZoneSameLocal(ZoneOffset.UTC);
return new Date(zdtUTC.toInstant().toEpochMilli());
}

/**
* Converts a java.sql.Date object in UTC normalized form
* to local time zone (storing the epoch corresponding to the local time with the same date/time as the input).
*
* @param dateUTC java.sql.Date in UTC normalized form
* @return java.sql.Date in local time zone
*/
public static Date convertDateToLocalTZ(Date dateUTC) {
ZonedDateTime zdtUTC = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateUTC.getTime()), ZoneOffset.UTC);
ZonedDateTime zdtLocalTZ = zdtUTC.withZoneSameLocal(ZoneId.systemDefault());
return new Date(zdtLocalTZ.toInstant().toEpochMilli());
}

public static boolean isDateTypeCompatible(final Object value, final String format) {
if (value == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -27,35 +26,61 @@
import org.mockito.junit.MockitoJUnitRunner;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class ResultSetRecordSetTest {

private static final String COLUMN_NAME_VARCHAR = "varchar";
private static final String COLUMN_NAME_BIGINT = "bigint";
private static final String COLUMN_NAME_ROWID = "rowid";
private static final String COLUMN_NAME_BIT = "bit";
private static final String COLUMN_NAME_BOOLEAN = "boolean";
private static final String COLUMN_NAME_CHAR = "char";
private static final String COLUMN_NAME_DATE = "date";
private static final String COLUMN_NAME_INTEGER = "integer";
private static final String COLUMN_NAME_DOUBLE = "double";
private static final String COLUMN_NAME_REAL = "real";
private static final String COLUMN_NAME_FLOAT = "float";
private static final String COLUMN_NAME_SMALLINT = "smallint";
private static final String COLUMN_NAME_TINYINT = "tinyint";
private static final String COLUMN_NAME_BIG_DECIMAL_1 = "bigDecimal1";
private static final String COLUMN_NAME_BIG_DECIMAL_2 = "bigDecimal2";
private static final String COLUMN_NAME_BIG_DECIMAL_3 = "bigDecimal3";
private static final String COLUMN_NAME_BIG_DECIMAL_4 = "bigDecimal4";

private static final Object[][] COLUMNS = new Object[][] {
// column number; column label / name / schema field; column type; schema data type;
{1, "varchar", Types.VARCHAR, RecordFieldType.STRING.getDataType()},
{2, "bigint", Types.BIGINT, RecordFieldType.LONG.getDataType()},
{3, "rowid", Types.ROWID, RecordFieldType.LONG.getDataType()},
{4, "bit", Types.BIT, RecordFieldType.BOOLEAN.getDataType()},
{5, "boolean", Types.BOOLEAN, RecordFieldType.BOOLEAN.getDataType()},
{6, "char", Types.CHAR, RecordFieldType.CHAR.getDataType()},
{7, "date", Types.DATE, RecordFieldType.DATE.getDataType()},
{8, "integer", Types.INTEGER, RecordFieldType.INT.getDataType()},
{9, "double", Types.DOUBLE, RecordFieldType.DOUBLE.getDataType()},
{10, "real", Types.REAL, RecordFieldType.DOUBLE.getDataType()},
{11, "float", Types.FLOAT, RecordFieldType.FLOAT.getDataType()},
{12, "smallint", Types.SMALLINT, RecordFieldType.SHORT.getDataType()},
{13, "tinyint", Types.TINYINT, RecordFieldType.BYTE.getDataType()},
{14, "bigDecimal1", Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)},
{15, "bigDecimal2", Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)},
{16, "bigDecimal3", Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)},
{17, "bigDecimal4", Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(10, 3)},
{1, COLUMN_NAME_VARCHAR, Types.VARCHAR, RecordFieldType.STRING.getDataType()},
{2, COLUMN_NAME_BIGINT, Types.BIGINT, RecordFieldType.LONG.getDataType()},
{3, COLUMN_NAME_ROWID, Types.ROWID, RecordFieldType.LONG.getDataType()},
{4, COLUMN_NAME_BIT, Types.BIT, RecordFieldType.BOOLEAN.getDataType()},
{5, COLUMN_NAME_BOOLEAN, Types.BOOLEAN, RecordFieldType.BOOLEAN.getDataType()},
{6, COLUMN_NAME_CHAR, Types.CHAR, RecordFieldType.CHAR.getDataType()},
{7, COLUMN_NAME_DATE, Types.DATE, RecordFieldType.DATE.getDataType()},
{8, COLUMN_NAME_INTEGER, Types.INTEGER, RecordFieldType.INT.getDataType()},
{9, COLUMN_NAME_DOUBLE, Types.DOUBLE, RecordFieldType.DOUBLE.getDataType()},
{10, COLUMN_NAME_REAL, Types.REAL, RecordFieldType.DOUBLE.getDataType()},
{11, COLUMN_NAME_FLOAT, Types.FLOAT, RecordFieldType.FLOAT.getDataType()},
{12, COLUMN_NAME_SMALLINT, Types.SMALLINT, RecordFieldType.SHORT.getDataType()},
{13, COLUMN_NAME_TINYINT, Types.TINYINT, RecordFieldType.BYTE.getDataType()},
{14, COLUMN_NAME_BIG_DECIMAL_1, Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)},
{15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)},
{16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)},
{17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(10, 3)},
};

@Mock
Expand All @@ -66,26 +91,26 @@ public class ResultSetRecordSetTest {

@Before
public void setUp() throws SQLException {
Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(COLUMNS.length);
when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
when(resultSetMetaData.getColumnCount()).thenReturn(COLUMNS.length);

for (final Object[] column : COLUMNS) {
Mockito.when(resultSetMetaData.getColumnLabel((Integer) column[0])).thenReturn((column[1]) + "Col");
Mockito.when(resultSetMetaData.getColumnName((Integer) column[0])).thenReturn((String) column[1]);
Mockito.when(resultSetMetaData.getColumnType((Integer) column[0])).thenReturn((Integer) column[2]);
when(resultSetMetaData.getColumnLabel((Integer) column[0])).thenReturn((String) (column[1]));
when(resultSetMetaData.getColumnName((Integer) column[0])).thenReturn((String) column[1]);
when(resultSetMetaData.getColumnType((Integer) column[0])).thenReturn((Integer) column[2]);

if(column[3] instanceof DecimalDataType) {
DecimalDataType ddt = (DecimalDataType)column[3];
Mockito.when(resultSetMetaData.getPrecision((Integer) column[0])).thenReturn(ddt.getPrecision());
Mockito.when(resultSetMetaData.getScale((Integer) column[0])).thenReturn(ddt.getScale());
when(resultSetMetaData.getPrecision((Integer) column[0])).thenReturn(ddt.getPrecision());
when(resultSetMetaData.getScale((Integer) column[0])).thenReturn(ddt.getScale());
}
}

// Big decimal values are necessary in order to determine precision and scale
Mockito.when(resultSet.getBigDecimal(16)).thenReturn(new BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));
when(resultSet.getBigDecimal(16)).thenReturn(new BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));

// This will be handled by a dedicated branch for Java Objects, needs some further details
Mockito.when(resultSetMetaData.getColumnClassName(16)).thenReturn(BigDecimal.class.getName());
when(resultSetMetaData.getColumnClassName(16)).thenReturn(BigDecimal.class.getName());
}

@Test
Expand Down Expand Up @@ -124,7 +149,7 @@ public void testCreateSchemaWhenOtherType() throws SQLException {
final RecordSchema resultSchema = testSubject.getSchema();

// then
Assert.assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), resultSchema.getField(0).getDataType());
assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), resultSchema.getField(0).getDataType());
}

@Test
Expand All @@ -137,17 +162,88 @@ public void testCreateSchemaWhenOtherTypeWithoutSchema() throws SQLException {
final RecordSchema resultSchema = testSubject.getSchema();

// then
Assert.assertEquals(RecordFieldType.CHOICE, resultSchema.getField(0).getDataType().getFieldType());
assertEquals(RecordFieldType.CHOICE, resultSchema.getField(0).getDataType().getFieldType());
}

@Test
public void testCreateRecord() throws SQLException {
// given
final RecordSchema recordSchema = givenRecordSchema();

LocalDate testDate = LocalDate.of(2021, 1, 26);

final String varcharValue = "varchar";
final Long bigintValue = 1234567890123456789L;
final Long rowidValue = 11111111L;
final Boolean bitValue = Boolean.FALSE;
final Boolean booleanValue = Boolean.TRUE;
final Character charValue = 'c';
final Date dateValue = Date.valueOf(testDate);
final Integer integerValue = 1234567890;
final Double doubleValue = 0.12;
final Double realValue = 3.45;
final Float floatValue = 6.78F;
final Short smallintValue = 12345;
final Byte tinyintValue = 123;
final BigDecimal bigDecimal1Value = new BigDecimal("1234.567");
final BigDecimal bigDecimal2Value = new BigDecimal("1234");
final BigDecimal bigDecimal3Value = new BigDecimal("1234567890.1");
final BigDecimal bigDecimal4Value = new BigDecimal("1234567.089");

when(resultSet.getObject(COLUMN_NAME_VARCHAR)).thenReturn(varcharValue);
when(resultSet.getObject(COLUMN_NAME_BIGINT)).thenReturn(bigintValue);
when(resultSet.getObject(COLUMN_NAME_ROWID)).thenReturn(rowidValue);
when(resultSet.getObject(COLUMN_NAME_BIT)).thenReturn(bitValue);
when(resultSet.getObject(COLUMN_NAME_BOOLEAN)).thenReturn(booleanValue);
when(resultSet.getObject(COLUMN_NAME_CHAR)).thenReturn(charValue);
when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(dateValue);
when(resultSet.getObject(COLUMN_NAME_INTEGER)).thenReturn(integerValue);
when(resultSet.getObject(COLUMN_NAME_DOUBLE)).thenReturn(doubleValue);
when(resultSet.getObject(COLUMN_NAME_REAL)).thenReturn(realValue);
when(resultSet.getObject(COLUMN_NAME_FLOAT)).thenReturn(floatValue);
when(resultSet.getObject(COLUMN_NAME_SMALLINT)).thenReturn(smallintValue);
when(resultSet.getObject(COLUMN_NAME_TINYINT)).thenReturn(tinyintValue);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_1)).thenReturn(bigDecimal1Value);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_2)).thenReturn(bigDecimal2Value);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_3)).thenReturn(bigDecimal3Value);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_4)).thenReturn(bigDecimal4Value);

// when
ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, recordSchema);
Record record = testSubject.createRecord(resultSet);

// then
assertEquals(varcharValue, record.getAsString(COLUMN_NAME_VARCHAR));
assertEquals(bigintValue, record.getAsLong(COLUMN_NAME_BIGINT));
assertEquals(rowidValue, record.getAsLong(COLUMN_NAME_ROWID));
assertEquals(bitValue, record.getAsBoolean(COLUMN_NAME_BIT));
assertEquals(booleanValue, record.getAsBoolean(COLUMN_NAME_BOOLEAN));
assertEquals(charValue, record.getValue(COLUMN_NAME_CHAR));

// Date is expected in UTC normalized form
Date expectedDate = new Date(testDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
assertEquals(expectedDate, record.getAsDate(COLUMN_NAME_DATE, null));

assertEquals(integerValue, record.getAsInt(COLUMN_NAME_INTEGER));
assertEquals(doubleValue, record.getAsDouble(COLUMN_NAME_DOUBLE));
assertEquals(realValue, record.getAsDouble(COLUMN_NAME_REAL));
assertEquals(floatValue, record.getAsFloat(COLUMN_NAME_FLOAT));
assertEquals(smallintValue.shortValue(), record.getAsInt(COLUMN_NAME_SMALLINT).shortValue());
assertEquals(tinyintValue.byteValue(), record.getAsInt(COLUMN_NAME_TINYINT).byteValue());
assertEquals(bigDecimal1Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_1));
assertEquals(bigDecimal2Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_2));
assertEquals(bigDecimal3Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_3));
assertEquals(bigDecimal4Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_4));
}

private ResultSet givenResultSetForOther() throws SQLException {
final ResultSet resultSet = Mockito.mock(ResultSet.class);
final ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(1);
Mockito.when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
Mockito.when(resultSetMetaData.getColumnName(1)).thenReturn("column");
Mockito.when(resultSetMetaData.getColumnType(1)).thenReturn(Types.OTHER);
when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
when(resultSetMetaData.getColumnCount()).thenReturn(1);
when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
when(resultSetMetaData.getColumnName(1)).thenReturn("column");
when(resultSetMetaData.getColumnType(1)).thenReturn(Types.OTHER);
return resultSet;
}

Expand All @@ -162,10 +258,10 @@ private RecordSchema givenRecordSchema() {
}

private void thenAllColumnDataTypesAreCorrect(final RecordSchema resultSchema) {
Assert.assertNotNull(resultSchema);
assertNotNull(resultSchema);

for (final Object[] column : COLUMNS) {
Assert.assertEquals("For column " + column[0] + " the converted type is not matching", column[3], resultSchema.getField((Integer) column[0] - 1).getDataType());
assertEquals("For column " + column[0] + " the converted type is not matching", column[3], resultSchema.getField((Integer) column[0] - 1).getDataType());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -874,4 +879,44 @@ public void testIsFittingNumberType() {
assertTrue(DataTypeUtils.isFittingNumberType(9D, RecordFieldType.DOUBLE));
assertFalse(DataTypeUtils.isFittingNumberType(9, RecordFieldType.DOUBLE));
}

@Test
public void testConvertDateToUTC() {
int year = 2021;
int month = 1;
int dayOfMonth = 25;

Date dateLocalTZ = new Date(ZonedDateTime.of(LocalDateTime.of(year, month, dayOfMonth,0,0,0), ZoneId.systemDefault()).toInstant().toEpochMilli());

Date dateUTC = DataTypeUtils.convertDateToUTC(dateLocalTZ);

ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateUTC.getTime()), ZoneId.of("UTC"));
assertEquals(year, zdt.getYear());
assertEquals(month, zdt.getMonthValue());
assertEquals(dayOfMonth, zdt.getDayOfMonth());
assertEquals(0, zdt.getHour());
assertEquals(0, zdt.getMinute());
assertEquals(0, zdt.getSecond());
assertEquals(0, zdt.getNano());
}

@Test
public void testConvertDateToLocalTZ() {
int year = 2021;
int month = 1;
int dayOfMonth = 25;

Date dateUTC = new Date(ZonedDateTime.of(LocalDateTime.of(year, month, dayOfMonth,0,0,0), ZoneId.of("UTC")).toInstant().toEpochMilli());

Date dateLocalTZ = DataTypeUtils.convertDateToLocalTZ(dateUTC);

ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateLocalTZ.getTime()), ZoneId.systemDefault());
assertEquals(year, zdt.getYear());
assertEquals(month, zdt.getMonthValue());
assertEquals(dayOfMonth, zdt.getDayOfMonth());
assertEquals(0, zdt.getHour());
assertEquals(0, zdt.getMinute());
assertEquals(0, zdt.getSecond());
assertEquals(0, zdt.getNano());
}
}

0 comments on commit ea09cee

Please sign in to comment.