Skip to content

Commit

Permalink
Add initial INSERT support
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamtagra authored and electrum committed Aug 8, 2014
1 parent 6c37d8c commit 787585c
Show file tree
Hide file tree
Showing 43 changed files with 701 additions and 1 deletion.
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.cassandra;

import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorRecordSinkProvider;
import com.facebook.presto.spi.RecordSink;
Expand Down Expand Up @@ -42,4 +43,10 @@ public RecordSink getRecordSink(ConnectorOutputTableHandle tableHandle)

return new CassandraRecordSink(handle, cassandraSession);
}

@Override
public RecordSink getRecordSink(ConnectorInsertTableHandle tableHandle)
{
throw new UnsupportedOperationException();
}
}
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
Expand Down Expand Up @@ -66,6 +67,12 @@ public boolean canHandle(ConnectorOutputTableHandle tableHandle)
return (tableHandle instanceof CassandraOutputTableHandle) && ((CassandraOutputTableHandle) tableHandle).getConnectorId().equals(connectorId);
}

@Override
public boolean canHandle(ConnectorInsertTableHandle tableHandle)
{
return false;
}

@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass()
{
Expand Down Expand Up @@ -96,6 +103,12 @@ public Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass()
return CassandraOutputTableHandle.class;
}

@Override
public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
{
throw new UnsupportedOperationException();
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.cassandra.util.CassandraCqlUtils;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSession;
Expand Down Expand Up @@ -306,6 +307,18 @@ public void commitCreateTable(ConnectorOutputTableHandle tableHandle, Collection
schemaProvider.flushTable(new SchemaTableName(outputTableHandle.getSchemaName(), outputTableHandle.getTableName()));
}

@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new UnsupportedOperationException();
}

@Override
public void commitInsert(ConnectorInsertTableHandle insertHandle, Collection<String> fragments)
{
throw new UnsupportedOperationException();
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, String viewData, boolean replace)
{
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
Expand Down Expand Up @@ -65,6 +66,12 @@ public boolean canHandle(ConnectorOutputTableHandle tableHandle)
return false;
}

@Override
public boolean canHandle(ConnectorInsertTableHandle tableHandle)
{
return false;
}

@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass()
{
Expand Down Expand Up @@ -94,4 +101,10 @@ public Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass()
{
throw new UnsupportedOperationException();
}

@Override
public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
{
throw new UnsupportedOperationException();
}
}
31 changes: 31 additions & 0 deletions presto-hive/src/main/java/com/facebook/presto/hive/HiveClient.java
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPartition;
Expand Down Expand Up @@ -616,6 +617,12 @@ public RecordSink getRecordSink(ConnectorOutputTableHandle tableHandle)
return new HiveRecordSink(handle, target, conf);
}

@Override
public RecordSink getRecordSink(ConnectorInsertTableHandle tableHandle)
{
throw new UnsupportedOperationException();
}

private Database getDatabase(String database)
{
try {
Expand Down Expand Up @@ -780,6 +787,18 @@ public Map<SchemaTableName, String> getViews(ConnectorSession session, SchemaTab
return views.build();
}

@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new UnsupportedOperationException();
}

@Override
public void commitInsert(ConnectorInsertTableHandle insertHandle, Collection<String> fragments)
{
throw new UnsupportedOperationException();
}

@Override
public ConnectorPartitionResult getPartitions(ConnectorTableHandle tableHandle, TupleDomain<ConnectorColumnHandle> tupleDomain)
{
Expand Down Expand Up @@ -1010,6 +1029,12 @@ public boolean canHandle(ConnectorOutputTableHandle handle)
return (handle instanceof HiveOutputTableHandle) && ((HiveOutputTableHandle) handle).getClientId().equals(connectorId);
}

@Override
public boolean canHandle(ConnectorInsertTableHandle tableHandle)
{
return false;
}

@Override
public boolean canHandle(ConnectorIndexHandle indexHandle)
{
Expand Down Expand Up @@ -1046,6 +1071,12 @@ public Class<? extends ConnectorIndexHandle> getIndexHandleClass()
throw new UnsupportedOperationException();
}

@Override
public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
{
throw new UnsupportedOperationException();
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
Expand Down Expand Up @@ -53,6 +54,12 @@ public boolean canHandle(ConnectorOutputTableHandle tableHandle)
return false;
}

@Override
public boolean canHandle(ConnectorInsertTableHandle tableHandle)
{
return false;
}

@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass()
{
Expand Down Expand Up @@ -82,4 +89,10 @@ public Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass()
{
throw new UnsupportedOperationException();
}

@Override
public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
{
throw new UnsupportedOperationException();
}
}
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
Expand Down Expand Up @@ -53,6 +54,12 @@ public boolean canHandle(ConnectorOutputTableHandle tableHandle)
return false;
}

@Override
public boolean canHandle(ConnectorInsertTableHandle tableHandle)
{
return false;
}

@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass()
{
Expand Down Expand Up @@ -82,4 +89,10 @@ public Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass()
{
throw new UnsupportedOperationException();
}

@Override
public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
{
throw new UnsupportedOperationException();
}
}
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
Expand Down Expand Up @@ -53,6 +54,12 @@ public boolean canHandle(ConnectorOutputTableHandle tableHandle)
return false;
}

@Override
public boolean canHandle(ConnectorInsertTableHandle tableHandle)
{
return false;
}

@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass()
{
Expand Down Expand Up @@ -82,4 +89,10 @@ public Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass()
{
throw new UnsupportedOperationException();
}

@Override
public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
{
throw new UnsupportedOperationException();
}
}
Expand Up @@ -33,6 +33,7 @@ public void configure(Binder binder)
jsonBinder(binder).addModuleBinding().to(ColumnHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(SplitJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class);

binder.bind(HandleResolver.class).in(Scopes.SINGLETON);
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorTableHandle;
Expand Down Expand Up @@ -100,6 +101,16 @@ public String getId(ConnectorOutputTableHandle outputHandle)
throw new IllegalArgumentException("No connector for output table handle: " + outputHandle);
}

public String getId(ConnectorInsertTableHandle insertHandle)
{
for (Entry<String, ConnectorHandleResolver> entry : handleIdResolvers.entrySet()) {
if (entry.getValue().canHandle(insertHandle)) {
return entry.getKey();
}
}
throw new IllegalArgumentException("No connector for insert table handle: " + insertHandle);
}

public Class<? extends ConnectorTableHandle> getTableHandleClass(String id)
{
return resolverFor(id).getTableHandleClass();
Expand All @@ -125,6 +136,11 @@ public Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass(Str
return resolverFor(id).getOutputTableHandleClass();
}

public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass(String id)
{
return resolverFor(id).getInsertTableHandleClass();
}

public ConnectorHandleResolver resolverFor(String id)
{
ConnectorHandleResolver resolver = handleIdResolvers.get(id);
Expand Down
@@ -0,0 +1,75 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;

import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

import static com.google.common.base.Preconditions.checkNotNull;

public final class InsertTableHandle
{
private final String connectorId;
private final ConnectorInsertTableHandle connectorHandle;

@JsonCreator
public InsertTableHandle(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("connectorHandle") ConnectorInsertTableHandle connectorHandle)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null");
this.connectorHandle = checkNotNull(connectorHandle, "connectorHandle is null");
}

@JsonProperty
public String getConnectorId()
{
return connectorId;
}

@JsonProperty
public ConnectorInsertTableHandle getConnectorHandle()
{
return connectorHandle;
}

@Override
public int hashCode()
{
return Objects.hash(connectorId, connectorHandle);
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
InsertTableHandle o = (InsertTableHandle) obj;
return Objects.equals(this.connectorId, o.connectorId) &&
Objects.equals(this.connectorHandle, o.connectorHandle);
}

@Override
public String toString()
{
return connectorId + ":" + connectorHandle;
}
}

0 comments on commit 787585c

Please sign in to comment.