Skip to content

Commit

Permalink
ARROW-17629: [Java] Bind DB column to Arrow Map type in JdbcToArrowUt…
Browse files Browse the repository at this point in the history
…ils (apache#14134)

This pull request allows support of mapping for map type (hstore translated by jdbc driver to java.util.Map or json text/varchar). Mapping for MapConsumer should be manually expressed in new JdbcToArrowConfigBuilder(...).setJdbcToArrowTypeConverter(USER_CUSTOM_jdbcToArrowTypeConverter). Now it possible as part of [ARROW-17630](https://issues.apache.org/jira/browse/ARROW-17630) that allow user to distinguish columns by number and related external metadata.

Authored-by: igor.suhorukov <igor.suhorukov@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
igor-suhorukov authored and zagto committed Oct 7, 2022
1 parent bc8c53d commit f44bd94
Show file tree
Hide file tree
Showing 20 changed files with 452 additions and 110 deletions.
2 changes: 0 additions & 2 deletions java/adapter/jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* </ul>
*/
public class JdbcFieldInfo {
private final int column;
private final int jdbcType;
private final int nullability;
private final int precision;
Expand All @@ -53,6 +54,7 @@ public JdbcFieldInfo(int jdbcType) {
(jdbcType != Types.DECIMAL && jdbcType != Types.NUMERIC),
"DECIMAL and NUMERIC types require a precision and scale; please use another constructor.");

this.column = 0;
this.jdbcType = jdbcType;
this.nullability = ResultSetMetaData.columnNullableUnknown;
this.precision = 0;
Expand All @@ -68,6 +70,7 @@ public JdbcFieldInfo(int jdbcType) {
* @param scale The field's numeric scale.
*/
public JdbcFieldInfo(int jdbcType, int precision, int scale) {
this.column = 0;
this.jdbcType = jdbcType;
this.nullability = ResultSetMetaData.columnNullableUnknown;
this.precision = precision;
Expand All @@ -84,6 +87,7 @@ public JdbcFieldInfo(int jdbcType, int precision, int scale) {
* @param scale The field's numeric scale.
*/
public JdbcFieldInfo(int jdbcType, int nullability, int precision, int scale) {
this.column = 0;
this.jdbcType = jdbcType;
this.nullability = nullability;
this.precision = precision;
Expand All @@ -106,6 +110,7 @@ public JdbcFieldInfo(ResultSetMetaData rsmd, int column) throws SQLException {
column <= rsmd.getColumnCount(),
"The index must be within the number of columns (1 to %s, inclusive)", rsmd.getColumnCount());

this.column = column;
this.jdbcType = rsmd.getColumnType(column);
this.nullability = rsmd.isNullable(column);
this.precision = rsmd.getPrecision(column);
Expand Down Expand Up @@ -139,4 +144,11 @@ public int getPrecision() {
public int getScale() {
return scale;
}

/**
* The column index for query column.
*/
public int getColumn() {
return column;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public final class JdbcToArrowConfig {

// set up type converter
this.jdbcToArrowTypeConverter = jdbcToArrowTypeConverter != null ? jdbcToArrowTypeConverter :
jdbcFieldInfo -> JdbcToArrowUtils.getArrowTypeFromJdbcType(jdbcFieldInfo, calendar);
(jdbcFieldInfo) -> JdbcToArrowUtils.getArrowTypeFromJdbcType(jdbcFieldInfo, calendar);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
Expand All @@ -49,6 +50,7 @@
import org.apache.arrow.adapter.jdbc.consumer.FloatConsumer;
import org.apache.arrow.adapter.jdbc.consumer.IntConsumer;
import org.apache.arrow.adapter.jdbc.consumer.JdbcConsumer;
import org.apache.arrow.adapter.jdbc.consumer.MapConsumer;
import org.apache.arrow.adapter.jdbc.consumer.NullConsumer;
import org.apache.arrow.adapter.jdbc.consumer.SmallIntConsumer;
import org.apache.arrow.adapter.jdbc.consumer.TimeConsumer;
Expand Down Expand Up @@ -76,6 +78,7 @@
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand Down Expand Up @@ -279,6 +282,14 @@ public static Schema jdbcToArrowSchema(ResultSetMetaData rsmd, JdbcToArrowConfig
children = new ArrayList<Field>();
final ArrowType childType = config.getJdbcToArrowTypeConverter().apply(arrayFieldInfo);
children.add(new Field("child", FieldType.nullable(childType), null));
} else if (arrowType.getTypeID() == ArrowType.ArrowTypeID.Map) {
FieldType mapType = new FieldType(false, ArrowType.Struct.INSTANCE, null, null);
FieldType keyType = new FieldType(false, new ArrowType.Utf8(), null, null);
FieldType valueType = new FieldType(false, new ArrowType.Utf8(), null, null);
children = new ArrayList<>();
children.add(new Field("child", mapType,
Arrays.asList(new Field(MapVector.KEY_NAME, keyType, null),
new Field(MapVector.VALUE_NAME, valueType, null))));
}

fields.add(new Field(columnName, fieldType, children));
Expand Down Expand Up @@ -471,6 +482,8 @@ static JdbcConsumer getConsumer(ArrowType arrowType, int columnIndex, boolean nu
JdbcConsumer delegate = getConsumer(childVector.getField().getType(), JDBC_ARRAY_VALUE_COLUMN,
childVector.getField().isNullable(), childVector, config);
return ArrayConsumer.createConsumer((ListVector) vector, delegate, columnIndex, nullable);
case Map:
return MapConsumer.createConsumer((MapVector) vector, columnIndex, nullable);
case Null:
return new NullConsumer((NullVector) vector);
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.adapter.jdbc.consumer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.impl.UnionMapWriter;
import org.apache.arrow.vector.util.ObjectMapperFactory;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Consumer which consume map type values from {@link ResultSet}.
* Write the data into {@link org.apache.arrow.vector.complex.MapVector}.
*/
public class MapConsumer extends BaseConsumer<MapVector> {


private final UnionMapWriter writer;
private final ObjectMapper objectMapper = ObjectMapperFactory.newObjectMapper();
private final TypeReference<Map<String, String>> typeReference = new TypeReference<Map<String, String>>() {};
private int currentRow;

/**
* Creates a consumer for {@link MapVector}.
*/
public static MapConsumer createConsumer(MapVector mapVector, int index, boolean nullable) {
return new MapConsumer(mapVector, index);
}

/**
* Instantiate a MapConsumer.
*/
public MapConsumer(MapVector vector, int index) {
super(vector, index);
writer = vector.getWriter();
}

@Override
public void consume(ResultSet resultSet) throws SQLException, IOException {
Object map = resultSet.getObject(columnIndexInResultSet);
writer.setPosition(currentRow++);
if (map != null) {
if (map instanceof String) {
writeJavaMapIntoVector(objectMapper.readValue((String) map, typeReference));
} else if (map instanceof Map) {
writeJavaMapIntoVector((Map<String, String>) map);
} else {
throw new IllegalArgumentException("Unknown type of map type column from JDBC " + map.getClass().getName());
}
} else {
writer.writeNull();
}
}

private void writeJavaMapIntoVector(Map<String, String> map) {
BufferAllocator allocator = vector.getAllocator();
writer.startMap();
map.forEach((key, value) -> {
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
byte[] valueBytes = value != null ? value.getBytes(StandardCharsets.UTF_8) : null;
try (
ArrowBuf keyBuf = allocator.buffer(keyBytes.length);
ArrowBuf valueBuf = valueBytes != null ? allocator.buffer(valueBytes.length) : null;
) {
writer.startEntry();
keyBuf.writeBytes(keyBytes);
writer.key().varChar().writeVarChar(0, keyBytes.length, keyBuf);
if (valueBytes != null) {
valueBuf.writeBytes(valueBytes);
writer.value().varChar().writeVarChar(0, valueBytes.length, valueBuf);
} else {
writer.value().varChar().writeNull();
}
writer.endEntry();
}
});
writer.endMap();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Function;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.util.ValueVectorUtility;
import org.junit.After;
import org.junit.Before;
Expand All @@ -58,6 +61,7 @@ public abstract class AbstractJdbcToArrowTest {
protected static final String DOUBLE = "DOUBLE_FIELD7";
protected static final String INT = "INT_FIELD1";
protected static final String LIST = "LIST_FIELD19";
protected static final String MAP = "MAP_FIELD20";
protected static final String REAL = "REAL_FIELD8";
protected static final String SMALLINT = "SMALLINT_FIELD4";
protected static final String TIME = "TIME_FIELD9";
Expand Down Expand Up @@ -155,8 +159,10 @@ public static Object[][] prepareTestData(String[] testFiles, @SuppressWarnings("
* Abstract method to implement logic to assert test various datatype values.
*
* @param root VectorSchemaRoot for test
* @param isIncludeMapVector is this dataset checks includes map column.
* Jdbc type to 'map' mapping declared in configuration only manually
*/
public abstract void testDataSets(VectorSchemaRoot root);
public abstract void testDataSets(VectorSchemaRoot root, boolean isIncludeMapVector);

/**
* For the given SQL query, execute and fetch the data from Relational DB and convert it to Arrow objects.
Expand Down Expand Up @@ -342,4 +348,34 @@ public static VectorSchemaRoot sqlToArrow(ResultSet resultSet, JdbcToArrowConfig
return root;
}

/**
* Register MAP_FIELD20 as ArrowType.Map
* @param calendar Calendar instance to use for Date, Time and Timestamp datasets, or <code>null</code> if none.
* @param rsmd ResultSetMetaData to lookup column name from result set metadata
* @return typeConverter instance with mapping column to Map type
*/
protected Function<JdbcFieldInfo, ArrowType> jdbcToArrowTypeConverter(
Calendar calendar, ResultSetMetaData rsmd) {
return (jdbcFieldInfo) -> {
String columnLabel = null;
try {
int columnIndex = jdbcFieldInfo.getColumn();
if (columnIndex != 0) {
columnLabel = rsmd.getColumnLabel(columnIndex);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
if (MAP.equals(columnLabel)) {
return new ArrowType.Map(false);
} else {
return JdbcToArrowUtils.getArrowTypeFromJdbcType(jdbcFieldInfo, calendar);
}
};
}

protected ResultSetMetaData getQueryMetaData(String query) throws SQLException {
return conn.createStatement().executeQuery(query).getMetaData();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.nio.charset.Charset;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -47,8 +49,17 @@
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.JsonStringArrayList;
import org.apache.arrow.vector.util.JsonStringHashMap;
import org.apache.arrow.vector.util.ObjectMapperFactory;
import org.apache.arrow.vector.util.Text;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* This is a Helper class which has functionalities to read and assert the values from the given FieldVector object.
Expand Down Expand Up @@ -240,6 +251,45 @@ public static void assertListVectorValues(ListVector listVector, int rowCount, I
}
}

public static void assertMapVectorValues(MapVector mapVector, int rowCount, Map<String, String>[] values) {
assertEquals(rowCount, mapVector.getValueCount());

for (int j = 0; j < mapVector.getValueCount(); j++) {
if (values[j] == null) {
assertTrue(mapVector.isNull(j));
} else {
JsonStringArrayList<JsonStringHashMap<String, Text>> actualSource =
(JsonStringArrayList<JsonStringHashMap<String, Text>>) mapVector.getObject(j);
Map<String, String> actualMap = null;
if (actualSource != null && !actualSource.isEmpty()) {
actualMap = actualSource.stream().map(entry ->
new AbstractMap.SimpleEntry<>(entry.get("key").toString(),
entry.get("value") != null ? entry.get("value").toString() : null))
.collect(HashMap::new, (collector, val) -> collector.put(val.getKey(), val.getValue()), HashMap::putAll);
}
assertEquals(values[j], actualMap);
}
}
}

public static Map<String, String>[] getMapValues(String[] values, String dataType) {
String[] dataArr = getValues(values, dataType);
Map<String, String>[] maps = new Map[dataArr.length];
ObjectMapper objectMapper = ObjectMapperFactory.newObjectMapper();
TypeReference<Map<String, String>> typeReference = new TypeReference<Map<String, String>>() {};
for (int idx = 0; idx < dataArr.length; idx++) {
String jsonString = dataArr[idx].replace("|", ",");
if (!jsonString.isEmpty()) {
try {
maps[idx] = objectMapper.readValue(jsonString, typeReference);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
return maps;
}

public static void assertNullValues(BaseValueVector vector, int rowCount) {
assertEquals(rowCount, vector.getValueCount());

Expand Down
Loading

0 comments on commit f44bd94

Please sign in to comment.