Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map PostgreSQL JSON, JSONB to Presto JSON #81

Merged
merged 1 commit into from
Feb 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.prestosql.spi.connector.ConnectorContext;
import io.prestosql.spi.connector.ConnectorFactory;
import io.prestosql.spi.connector.ConnectorHandleResolver;
import io.prestosql.spi.type.TypeManager;

import java.util.Map;

Expand Down Expand Up @@ -62,7 +63,10 @@ public Connector create(String catalogName, Map<String, String> requiredConfig,
requireNonNull(requiredConfig, "requiredConfig is null");

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
Bootstrap app = new Bootstrap(new JdbcModule(catalogName), module);
Bootstrap app = new Bootstrap(
binder -> binder.bind(TypeManager.class).toInstance(context.getTypeManager()),
new JdbcModule(catalogName),
module);

Injector injector = app
.strictConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static io.prestosql.spi.type.TinyintType.TINYINT;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static io.prestosql.type.JsonType.JSON;
import static java.lang.Float.floatToRawIntBits;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -261,6 +262,9 @@ else if (DOUBLE.equals(type)) {
else if (BOOLEAN.equals(type)) {
type.writeBoolean(blockBuilder, (Boolean) value);
}
else if (JSON.equals(type)) {
type.writeSlice(blockBuilder, Slices.utf8Slice((String) value));
}
else if (type instanceof VarcharType) {
type.writeSlice(blockBuilder, Slices.utf8Slice((String) value));
}
Expand Down
21 changes: 15 additions & 6 deletions presto-postgresql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@
<artifactId>javax.inject</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
</dependency>

<!-- Presto SPI -->
<dependency>
<groupId>io.prestosql</groupId>
Expand Down Expand Up @@ -91,12 +106,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.prestosql</groupId>
<artifactId>presto-main</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,63 @@
*/
package io.prestosql.plugin.postgresql;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.prestosql.plugin.jdbc.BaseJdbcClient;
import io.prestosql.plugin.jdbc.BaseJdbcConfig;
import io.prestosql.plugin.jdbc.ColumnMapping;
import io.prestosql.plugin.jdbc.DriverConnectionFactory;
import io.prestosql.plugin.jdbc.JdbcConnectorId;
import io.prestosql.plugin.jdbc.JdbcOutputTableHandle;
import io.prestosql.plugin.jdbc.JdbcTypeHandle;
import io.prestosql.plugin.jdbc.SliceWriteFunction;
import io.prestosql.plugin.jdbc.WriteMapping;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.type.StandardTypes;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeManager;
import io.prestosql.spi.type.TypeSignature;
import org.postgresql.Driver;
import org.postgresql.util.PGobject;

import javax.inject.Inject;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Optional;

import static com.fasterxml.jackson.core.JsonFactory.Feature.CANONICALIZE_FIELD_NAMES;
import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS;
import static io.airlift.slice.Slices.utf8Slice;
import static io.prestosql.plugin.jdbc.ColumnMapping.DISABLE_PUSHDOWN;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction;
import static io.prestosql.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;

public class PostgreSqlClient
extends BaseJdbcClient
{
protected final Type jsonType;

@Inject
public PostgreSqlClient(JdbcConnectorId connectorId, BaseJdbcConfig config)
public PostgreSqlClient(JdbcConnectorId connectorId, BaseJdbcConfig config, TypeManager typeManager)
{
super(connectorId, config, "\"", new DriverConnectionFactory(new Driver(), config));
this.jsonType = typeManager.getType(new TypeSignature(StandardTypes.JSON));
}

@Override
Expand Down Expand Up @@ -90,6 +115,11 @@ protected ResultSet getTables(Connection connection, String schemaName, String t
@Override
public Optional<ColumnMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
{
switch (typeHandle.getJdbcTypeName()) {
case "jsonb":
case "json":
return Optional.of(jsonColumnMapping());
}
// TODO support PostgreSQL's TIMESTAMP WITH TIME ZONE and TIME WITH TIME ZONE explicitly, otherwise predicate pushdown for these types may be incorrect
return super.toPrestoType(session, typeHandle);
}
Expand All @@ -100,7 +130,57 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
if (VARBINARY.equals(type)) {
return WriteMapping.sliceMapping("bytea", varbinaryWriteFunction());
}

if (type.getTypeSignature().getBase().equals(StandardTypes.JSON)) {
return WriteMapping.sliceMapping("jsonb", jsonWriteFunction());
}
return super.toWriteMapping(session, type);
}

private ColumnMapping jsonColumnMapping()
{
return ColumnMapping.sliceMapping(
jsonType,
(resultSet, columnIndex) -> jsonParse(utf8Slice(resultSet.getString(columnIndex))),
jsonWriteFunction(),
DISABLE_PUSHDOWN);
}

private static SliceWriteFunction jsonWriteFunction()
{
return (statement, index, value) -> {
PGobject pgObject = new PGobject();
pgObject.setType("json");
pgObject.setValue(value.toStringUtf8());
statement.setObject(index, pgObject);
};
}

private static final JsonFactory JSON_FACTORY = new JsonFactory()
.disable(CANONICALIZE_FIELD_NAMES);

private static final ObjectMapper SORTED_MAPPER = new ObjectMapperProvider().get().configure(ORDER_MAP_ENTRIES_BY_KEYS, true);

public static Slice jsonParse(Slice slice)
{
try (JsonParser parser = createJsonParser(JSON_FACTORY, slice)) {
byte[] in = slice.getBytes();
SliceOutput dynamicSliceOutput = new DynamicSliceOutput(in.length);
SORTED_MAPPER.writeValue((OutputStream) dynamicSliceOutput, SORTED_MAPPER.readValue(parser, Object.class));
// nextToken() returns null if the input is parsed correctly,
// but will throw an exception if there are trailing characters.
parser.nextToken();
return dynamicSliceOutput.slice();
}
catch (Exception e) {
throw new PrestoException(INVALID_FUNCTION_ARGUMENT, format("Cannot convert '%s' to JSON", slice.toStringUtf8()));
}
}

public static JsonParser createJsonParser(JsonFactory factory, Slice json)
throws IOException
{
// Jackson tries to detect the character encoding automatically when using InputStream
// so we pass an InputStreamReader instead.
return factory.createParser(new InputStreamReader(json.getInput(), UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.time.ZoneId;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.io.BaseEncoding.base16;
Expand All @@ -42,6 +43,7 @@
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static io.prestosql.tests.datatype.DataType.bigintDataType;
import static io.prestosql.tests.datatype.DataType.booleanDataType;
import static io.prestosql.tests.datatype.DataType.dataType;
import static io.prestosql.tests.datatype.DataType.dateDataType;
import static io.prestosql.tests.datatype.DataType.decimalDataType;
import static io.prestosql.tests.datatype.DataType.doubleDataType;
Expand All @@ -50,10 +52,12 @@
import static io.prestosql.tests.datatype.DataType.smallintDataType;
import static io.prestosql.tests.datatype.DataType.varbinaryDataType;
import static io.prestosql.tests.datatype.DataType.varcharDataType;
import static io.prestosql.type.JsonType.JSON;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_16LE;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptyList;
import static java.util.function.Function.identity;

@Test
public class TestPostgreSqlTypeMapping
Expand Down Expand Up @@ -268,13 +272,50 @@ public void testTimestamp()
// testing this is hard because of https://github.com/prestodb/presto/issues/7122
}

@Test
public void jsonDataTypeTest()
{
DataTypeTest dataTypeTest = DataTypeTest.create()
.addRoundTrip(jsonDataType(), "{}")
.addRoundTrip(jsonDataType(), null)
.addRoundTrip(jsonDataType(), "null")
.addRoundTrip(jsonDataType(), "123.4")
.addRoundTrip(jsonDataType(), "\"abc\"")
.addRoundTrip(jsonDataType(), "\"\"")
.addRoundTrip(jsonDataType(), "{\"a\":1,\"b\":2}")
.addRoundTrip(jsonDataType(), "{\"a\":[1,2,3],\"b\":{\"aa\":11,\"bb\":[{\"a\":1,\"b\":2},{\"a\":0}]}}")
.addRoundTrip(jsonDataType(), "[]");

dataTypeTest.execute(getQueryRunner(), prestoCreateAsSelect("presto_test_json"));
dataTypeTest.execute(getQueryRunner(), postgresCreateAndInsert("tpch.postgresql_test_json"));
}

@Test
public void testJson()
findepi marked this conversation as resolved.
Show resolved Hide resolved
{
JdbcSqlExecutor jdbcSqlExecutor = new JdbcSqlExecutor(postgreSqlServer.getJdbcUrl());
jdbcSqlExecutor.execute("CREATE TABLE tpch.test_json(key varchar(5), json_column json, jsonb_column jsonb)");
try {
findepi marked this conversation as resolved.
Show resolved Hide resolved
assertQuery(
"SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'tpch' AND table_name = 'test_json'",
"VALUES ('key','varchar(5)'),('json_column','json'),('jsonb_column','json')");
assertUpdate("INSERT INTO tpch.test_json VALUES ('1', json'{\"x\":123}',json'{\"x\": 123}' )", 1);
assertQuery("SELECT * FROM tpch.test_json", "SELECT '1' \"key\", '{\"x\": 123}' json_column, '{\"x\":123}' jsonb_column");
assertQuery("SELECT * FROM test_json WHERE json_column = json'{\"x\":123}'", "SELECT '1' \"key\", '{\"x\": 123}' json_column, '{\"x\":123}' jsonb_column");
assertUpdate("INSERT INTO test_json VALUES ('1', json'{\"x\":123}',json'{\"x\": 123}' )", 1);
}
finally {
jdbcSqlExecutor.execute("DROP TABLE tpch.test_json");
}
}

private void testUnsupportedDataType(String databaseDataType)
{
JdbcSqlExecutor jdbcSqlExecutor = new JdbcSqlExecutor(postgreSqlServer.getJdbcUrl());
jdbcSqlExecutor.execute(format("CREATE TABLE tpch.test_unsupported_data_type(key varchar(5), unsupported_column %s)", databaseDataType));
try {
assertQuery(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'tpch' AND TABLE_NAME = 'test_unsupported_data_type'",
"SELECT column_name FROM information_schema.columns WHERE table_schema = 'tpch' AND table_name = 'test_unsupported_data_type'",
"VALUES 'key'"); // no 'unsupported_column'
}
finally {
Expand All @@ -284,11 +325,23 @@ private void testUnsupportedDataType(String databaseDataType)

private static DataType<byte[]> byteaDataType()
{
return DataType.dataType(
return dataType(
"bytea",
VARBINARY,
bytes -> format("bytea E'\\\\x%s'", base16().encode(bytes)),
Function.identity());
identity());
}

private static DataType<String> jsonDataType()
{
return dataType(
"json",
JSON,
value -> {
checkArgument(!value.contains("'"));
return format("JSON '%s'", value);
},
identity());
}

private DataSetup prestoCreateAsSelect(String tableNamePrefix)
Expand Down
12 changes: 12 additions & 0 deletions presto-tests/src/main/java/io/prestosql/tests/H2QueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@
import static com.google.common.base.Strings.padEnd;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Lists.newArrayList;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.tpch.TpchTable.LINE_ITEM;
import static io.airlift.tpch.TpchTable.NATION;
import static io.airlift.tpch.TpchTable.ORDERS;
import static io.airlift.tpch.TpchTable.PART;
import static io.airlift.tpch.TpchTable.REGION;
import static io.prestosql.operator.scalar.JsonFunctions.jsonParse;
import static io.prestosql.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.prestosql.plugin.tpch.TpchRecordSet.createTpchRecordSet;
import static io.prestosql.spi.type.BigintType.BIGINT;
Expand All @@ -81,6 +83,7 @@
import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static io.prestosql.spi.type.TinyintType.TINYINT;
import static io.prestosql.spi.type.Varchars.isVarcharType;
import static io.prestosql.type.JsonType.JSON;
import static io.prestosql.type.UnknownType.UNKNOWN;
import static java.lang.String.format;
import static java.util.Collections.nCopies;
Expand Down Expand Up @@ -260,6 +263,15 @@ else if (DOUBLE.equals(type)) {
row.add(doubleValue);
}
}
else if (JSON.equals(type)) {
String stringValue = resultSet.getString(i);
if (resultSet.wasNull()) {
row.add(null);
}
else {
row.add(jsonParse(utf8Slice(stringValue)).toStringUtf8());
}
}
else if (isVarcharType(type)) {
String stringValue = resultSet.getString(i);
if (resultSet.wasNull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static io.prestosql.testing.MaterializedResult.DEFAULT_PRECISION;
import static io.prestosql.type.IntervalDayTimeType.INTERVAL_DAY_TIME;
import static io.prestosql.type.IntervalYearMonthType.INTERVAL_YEAR_MONTH;
import static io.prestosql.type.JsonType.JSON;
import static java.util.stream.Collectors.toList;

public class TestingPrestoClient
Expand Down Expand Up @@ -235,6 +236,9 @@ else if (type instanceof DecimalType) {
else if (type.getTypeSignature().getBase().equals("ObjectId")) {
return value;
}
else if (JSON.equals(type)) {
return value;
}
else {
throw new AssertionError("unhandled type: " + type);
}
Expand Down