Skip to content

Commit

Permalink
Minor code cleanup for Redis connector
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Dec 5, 2015
1 parent 1388116 commit ed14763
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 111 deletions.
Expand Up @@ -15,11 +15,8 @@


import com.facebook.presto.spi.Connector; import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexResolver;
import com.facebook.presto.spi.ConnectorMetadata; import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorPageSourceProvider;
import com.facebook.presto.spi.ConnectorRecordSetProvider; import com.facebook.presto.spi.ConnectorRecordSetProvider;
import com.facebook.presto.spi.ConnectorRecordSinkProvider;
import com.facebook.presto.spi.ConnectorSplitManager; import com.facebook.presto.spi.ConnectorSplitManager;


import javax.inject.Inject; import javax.inject.Inject;
Expand Down Expand Up @@ -69,27 +66,9 @@ public ConnectorSplitManager getSplitManager()
return splitManager; return splitManager;
} }


@Override
public ConnectorPageSourceProvider getPageSourceProvider()
{
throw new UnsupportedOperationException();
}

@Override @Override
public ConnectorRecordSetProvider getRecordSetProvider() public ConnectorRecordSetProvider getRecordSetProvider()
{ {
return recordSetProvider; return recordSetProvider;
} }

@Override
public ConnectorRecordSinkProvider getRecordSinkProvider()
{
throw new UnsupportedOperationException();
}

@Override
public ConnectorIndexResolver getIndexResolver()
{
throw new UnsupportedOperationException();
}
} }
Expand Up @@ -15,9 +15,6 @@


import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.ConnectorTableLayoutHandle;
Expand All @@ -28,7 +25,7 @@
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


/** /**
* Redis specific {@link com.facebook.presto.spi.ConnectorHandleResolver} implementation. * Redis specific {@link ConnectorHandleResolver} implementation.
*/ */
public class RedisHandleResolver public class RedisHandleResolver
implements ConnectorHandleResolver implements ConnectorHandleResolver
Expand Down Expand Up @@ -66,24 +63,6 @@ public boolean canHandle(ConnectorTableLayoutHandle handle)
return handle instanceof RedisTableLayoutHandle && ((RedisTableLayoutHandle) handle).getConnectorId().equals(connectorId); return handle instanceof RedisTableLayoutHandle && ((RedisTableLayoutHandle) handle).getConnectorId().equals(connectorId);
} }


@Override
public boolean canHandle(ConnectorIndexHandle indexHandle)
{
return false;
}

@Override
public boolean canHandle(ConnectorOutputTableHandle tableHandle)
{
return false;
}

@Override
public boolean canHandle(ConnectorInsertTableHandle tableHandle)
{
return false;
}

@Override @Override
public Class<? extends ConnectorTableHandle> getTableHandleClass() public Class<? extends ConnectorTableHandle> getTableHandleClass()
{ {
Expand All @@ -102,30 +81,12 @@ public Class<? extends ConnectorSplit> getSplitClass()
return RedisSplit.class; return RedisSplit.class;
} }


@Override
public Class<? extends ConnectorIndexHandle> getIndexHandleClass()
{
throw new UnsupportedOperationException();
}

@Override
public Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass()
{
throw new UnsupportedOperationException();
}

@Override @Override
public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
{ {
return RedisTableLayoutHandle.class; return RedisTableLayoutHandle.class;
} }


@Override
public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
{
throw new UnsupportedOperationException();
}

RedisTableHandle convertTableHandle(ConnectorTableHandle tableHandle) RedisTableHandle convertTableHandle(ConnectorTableHandle tableHandle)
{ {
requireNonNull(tableHandle, "tableHandle is null"); requireNonNull(tableHandle, "tableHandle is null");
Expand Down
Expand Up @@ -116,7 +116,7 @@ RedisColumnHandle getColumnHandle(String connectorId, int index, boolean hidden)
true); true);
} }


ColumnMetadata getColumnMetadata(int index, boolean hidden) ColumnMetadata getColumnMetadata(boolean hidden)
{ {
return new ColumnMetadata(name, type, false, comment, hidden); return new ColumnMetadata(name, type, false, comment, hidden);
} }
Expand Down
Expand Up @@ -56,8 +56,8 @@ public class RedisMetadata
private static final Logger log = Logger.get(RedisMetadata.class); private static final Logger log = Logger.get(RedisMetadata.class);


private final String connectorId; private final String connectorId;
private final RedisConnectorConfig redisConnectorConfig;
private final RedisHandleResolver handleResolver; private final RedisHandleResolver handleResolver;
private final boolean hideInternalColumns;


private final Supplier<Map<SchemaTableName, RedisTableDescription>> redisTableDescriptionSupplier; private final Supplier<Map<SchemaTableName, RedisTableDescription>> redisTableDescriptionSupplier;
private final Set<RedisInternalFieldDescription> internalFieldDescriptions; private final Set<RedisInternalFieldDescription> internalFieldDescriptions;
Expand All @@ -71,9 +71,11 @@ public class RedisMetadata
Set<RedisInternalFieldDescription> internalFieldDescriptions) Set<RedisInternalFieldDescription> internalFieldDescriptions)
{ {
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.redisConnectorConfig = requireNonNull(redisConnectorConfig, "redisConfig is null");
this.handleResolver = requireNonNull(handleResolver, "handleResolver is null"); this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");


requireNonNull(redisConnectorConfig, "redisConfig is null");
hideInternalColumns = redisConnectorConfig.isHideInternalColumns();

log.debug("Loading redis table definitions from %s", redisConnectorConfig.getTableDescriptionDir().getAbsolutePath()); log.debug("Loading redis table definitions from %s", redisConnectorConfig.getTableDescriptionDir().getAbsolutePath());


this.redisTableDescriptionSupplier = Suppliers.memoize(redisTableDescriptionSupplier); this.redisTableDescriptionSupplier = Suppliers.memoize(redisTableDescriptionSupplier);
Expand Down Expand Up @@ -164,7 +166,6 @@ public List<SchemaTableName> listTables(ConnectorSession session, String schemaN
return builder.build(); return builder.build();
} }


@SuppressWarnings("ValueOfIncrementOrDecrementUsed")
@Override @Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{ {
Expand All @@ -182,8 +183,9 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
if (key != null) { if (key != null) {
List<RedisTableFieldDescription> fields = key.getFields(); List<RedisTableFieldDescription> fields = key.getFields();
if (fields != null) { if (fields != null) {
for (RedisTableFieldDescription redisTableFieldDescription : fields) { for (RedisTableFieldDescription field : fields) {
columnHandles.put(redisTableFieldDescription.getName(), redisTableFieldDescription.getColumnHandle(connectorId, true, index++)); columnHandles.put(field.getName(), field.getColumnHandle(connectorId, true, index));
index++;
} }
} }
} }
Expand All @@ -192,15 +194,16 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
if (value != null) { if (value != null) {
List<RedisTableFieldDescription> fields = value.getFields(); List<RedisTableFieldDescription> fields = value.getFields();
if (fields != null) { if (fields != null) {
for (RedisTableFieldDescription redisTableFieldDescription : fields) { for (RedisTableFieldDescription field : fields) {
columnHandles.put(redisTableFieldDescription.getName(), redisTableFieldDescription.getColumnHandle(connectorId, false, index++)); columnHandles.put(field.getName(), field.getColumnHandle(connectorId, false, index));
index++;
} }
} }
} }


for (RedisInternalFieldDescription redisInternalFieldDescription : internalFieldDescriptions) { for (RedisInternalFieldDescription field : internalFieldDescriptions) {
RedisColumnHandle columnHandle = redisInternalFieldDescription.getColumnHandle(connectorId, index++, redisConnectorConfig.isHideInternalColumns()); columnHandles.put(field.getName(), field.getColumnHandle(connectorId, index, hideInternalColumns));
columnHandles.put(redisInternalFieldDescription.getName(), columnHandle); index++;
} }


return columnHandles.build(); return columnHandles.build();
Expand Down Expand Up @@ -255,32 +258,26 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName)
} }


ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder(); ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
int index = 0;


RedisTableFieldGroup key = table.getKey(); appendFields(builder, table.getKey());
if (key != null) { appendFields(builder, table.getValue());
List<RedisTableFieldDescription> fields = key.getFields();
if (fields != null) { for (RedisInternalFieldDescription fieldDescription : internalFieldDescriptions) {
for (RedisTableFieldDescription fieldDescription : fields) { builder.add(fieldDescription.getColumnMetadata(hideInternalColumns));
builder.add(fieldDescription.getColumnMetadata(index++));
}
}
} }


RedisTableFieldGroup value = table.getValue(); return new ConnectorTableMetadata(schemaTableName, builder.build());
if (value != null) { }
List<RedisTableFieldDescription> fields = value.getFields();
private static void appendFields(ImmutableList.Builder<ColumnMetadata> builder, RedisTableFieldGroup group)
{
if (group != null) {
List<RedisTableFieldDescription> fields = group.getFields();
if (fields != null) { if (fields != null) {
for (RedisTableFieldDescription fieldDescription : fields) { for (RedisTableFieldDescription fieldDescription : fields) {
builder.add(fieldDescription.getColumnMetadata(index++)); builder.add(fieldDescription.getColumnMetadata());
} }
} }
} }

for (RedisInternalFieldDescription fieldDescription : internalFieldDescriptions) {
builder.add(fieldDescription.getColumnMetadata(index++, redisConnectorConfig.isHideInternalColumns()));
}

return new ConnectorTableMetadata(schemaTableName, builder.build());
} }
} }
Expand Up @@ -49,7 +49,7 @@
public class RedisRecordCursor public class RedisRecordCursor
implements RecordCursor implements RecordCursor
{ {
private static final Logger log = Logger.get(RedisRecordSet.class); private static final Logger log = Logger.get(RedisRecordCursor.class);
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];


private final RowDecoder keyDecoder; private final RowDecoder keyDecoder;
Expand Down Expand Up @@ -320,11 +320,10 @@ private boolean fetchKeys()
keysIterator = keys.iterator(); keysIterator = keys.iterator();
} }
break; break;
case ZSET: { case ZSET:
Set<String> keys = jedis.zrange(split.getKeyName(), split.getStart(), split.getEnd()); Set<String> keys = jedis.zrange(split.getKeyName(), split.getStart(), split.getEnd());
keysIterator = keys.iterator(); keysIterator = keys.iterator();
} break;
break;
default: default:
log.debug("Redis type of key %s is unsupported", split.getKeyDataFormat()); log.debug("Redis type of key %s is unsupported", split.getKeyDataFormat());
return false; return false;
Expand All @@ -347,22 +346,20 @@ private boolean fetchData(String keyString)
// whereas for the STRING type decoders are optional // whereas for the STRING type decoders are optional
try (Jedis jedis = jedisPool.getResource()) { try (Jedis jedis = jedisPool.getResource()) {
switch (split.getValueDataType()) { switch (split.getValueDataType()) {
case STRING: { case STRING:
valueString = jedis.get(keyString); valueString = jedis.get(keyString);
if (valueString == null) { if (valueString == null) {
log.warn("Redis data modified while query was running, string value at key %s deleted", keyString); log.warn("Redis data modified while query was running, string value at key %s deleted", keyString);
return false; return false;
} }
} break;
break; case HASH:
case HASH: {
valueMap = jedis.hgetAll(keyString); valueMap = jedis.hgetAll(keyString);
if (valueMap == null) { if (valueMap == null) {
log.warn("Redis data modified while query was running, hash value at key %s deleted", keyString); log.warn("Redis data modified while query was running, hash value at key %s deleted", keyString);
return false; return false;
} }
} break;
break;
default: default:
log.debug("Redis type for key %s is unsupported", keyString); log.debug("Redis type for key %s is unsupported", keyString);
return false; return false;
Expand Down
Expand Up @@ -89,7 +89,7 @@ public ConnectorSplitSource getSplits(ConnectorSession session, ConnectorTableLa
stride = numberOfKeys / REDIS_MAX_SPLITS; stride = numberOfKeys / REDIS_MAX_SPLITS;
} }


for (long startIndex = 0; startIndex < numberOfKeys; startIndex = startIndex + stride) { for (long startIndex = 0; startIndex < numberOfKeys; startIndex += stride) {
long endIndex = startIndex + stride - 1; long endIndex = startIndex + stride - 1;
if (endIndex >= numberOfKeys) { if (endIndex >= numberOfKeys) {
endIndex = -1; endIndex = -1;
Expand Down
Expand Up @@ -115,7 +115,7 @@ RedisColumnHandle getColumnHandle(String connectorId, boolean keyDecoder, int in
false); false);
} }


ColumnMetadata getColumnMetadata(int index) ColumnMetadata getColumnMetadata()
{ {
return new ColumnMetadata(getName(), getType(), false, getComment(), isHidden()); return new ColumnMetadata(getName(), getType(), false, getComment(), isHidden());
} }
Expand Down
Expand Up @@ -40,7 +40,7 @@ public static EmbeddedRedis createEmbeddedRedis()
} }


public void start() public void start()
throws InterruptedException, IOException throws IOException
{ {
redisServer.start(); redisServer.start();
jedisPool = new JedisPool(new JedisPoolConfig(), getConnectString(), getPort()); jedisPool = new JedisPool(new JedisPoolConfig(), getConnectString(), getPort());
Expand Down
Expand Up @@ -108,7 +108,7 @@ public void addResults(QueryResults results)


try (Jedis jedis = jedisPool.getResource()) { try (Jedis jedis = jedisPool.getResource()) {
switch (dataFormat) { switch (dataFormat) {
case "string": { case "string":
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder(); ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
for (int i = 0; i < fields.size(); i++) { for (int i = 0; i < fields.size(); i++) {
Type type = types.get().get(i); Type type = types.get().get(i);
Expand All @@ -118,18 +118,16 @@ public void addResults(QueryResults results)
} }
} }
jedis.set(redisKey, jsonEncoder.toString(builder.build())); jedis.set(redisKey, jsonEncoder.toString(builder.build()));
} break;
break; case "hash":
case "hash": {
// add keys to zset // add keys to zset
String redisZset = "keyset:" + tableName; String redisZset = "keyset:" + tableName;
jedis.zadd(redisZset, count.get(), redisKey); jedis.zadd(redisZset, count.get(), redisKey);
// add values to Hash // add values to Hash
for (int i = 0; i < fields.size(); i++) { for (int i = 0; i < fields.size(); i++) {
jedis.hset(redisKey, columns.get(i).getName(), fields.get(i).toString()); jedis.hset(redisKey, columns.get(i).getName(), fields.get(i).toString());
} }
} break;
break;
default: default:
throw new AssertionError("unhandled value type: " + dataFormat); throw new AssertionError("unhandled value type: " + dataFormat);
} }
Expand Down
Expand Up @@ -64,7 +64,7 @@ public static Map.Entry<SchemaTableName, RedisTableDescription> loadTpchTableDes
throws IOException throws IOException
{ {
RedisTableDescription tpchTemplate; RedisTableDescription tpchTemplate;
try (InputStream data = RedisTestUtils.class.getResourceAsStream(format("/tpch/" + dataFormat + "/%s.json", schemaTableName.getTableName()))) { try (InputStream data = RedisTestUtils.class.getResourceAsStream(format("/tpch/%s/%s.json", dataFormat, schemaTableName.getTableName()))) {
tpchTemplate = tableDescriptionJsonCodec.fromJson(ByteStreams.toByteArray(data)); tpchTemplate = tableDescriptionJsonCodec.fromJson(ByteStreams.toByteArray(data));
} }


Expand Down

0 comments on commit ed14763

Please sign in to comment.