Skip to content

Commit

Permalink
Fix creating empty tables in presto-memory
Browse files Browse the repository at this point in the history
Previously following scenario would fail:

CREATE TABLE memory.default.test (a BIGINT);
INSERT INTO memory.default.test SELECT nationkey FROM tpch.tiny.nation;

with an error message, that table test was not found on a worker.
It is fixed by allowing to "initialize" table on writes (not as it was before
only on creates) and by moving sanity checks for detecting worker crashes
to asserting expected number of rows per worker.
  • Loading branch information
pnowojski committed May 23, 2017
1 parent 563f819 commit 4a1ace6
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 61 deletions.
@@ -0,0 +1,71 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.memory;

import com.facebook.presto.spi.HostAddress;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.json.JsonCodec.jsonCodec;
import static java.util.Objects.requireNonNull;

public class MemoryDataFragment
{
private static final JsonCodec<MemoryDataFragment> MEMORY_DATA_FRAGMENT_CODEC = jsonCodec(MemoryDataFragment.class);

private final HostAddress hostAddress;
private final long rows;

@JsonCreator
public MemoryDataFragment(
@JsonProperty("hostAddress") HostAddress hostAddress,
@JsonProperty("rows") long rows)
{
this.hostAddress = requireNonNull(hostAddress, "hostAddress is null");
checkArgument(rows >= 0, "Rows number can not be negative");
this.rows = rows;
}

@JsonProperty
public HostAddress getHostAddress()
{
return hostAddress;
}

@JsonProperty
public long getRows()
{
return rows;
}

public Slice toSlice()
{
return Slices.wrappedBuffer(MEMORY_DATA_FRAGMENT_CODEC.toJsonBytes(this));
}

public static MemoryDataFragment fromSlice(Slice fragment)
{
return MEMORY_DATA_FRAGMENT_CODEC.fromJson(fragment.getBytes());
}

public static MemoryDataFragment merge(MemoryDataFragment a, MemoryDataFragment b)
{
checkArgument(a.getHostAddress().equals(b.getHostAddress()), "Can not merge fragments from different hosts");
return new MemoryDataFragment(a.getHostAddress(), a.getRows() + b.getRows());
}
}
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.SchemaTableName;
Expand All @@ -47,7 +48,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -66,6 +66,7 @@ public class MemoryMetadata
private final AtomicLong nextTableId = new AtomicLong();
private final Map<String, Long> tableIds = new HashMap<>();
private final Map<Long, MemoryTableHandle> tables = new HashMap<>();
private final Map<Long, Map<HostAddress, MemoryDataFragment>> tableDataFragments = new HashMap<>();

@Inject
public MemoryMetadata(NodeManager nodeManager, MemoryConnectorId connectorId)
Expand Down Expand Up @@ -138,6 +139,7 @@ public synchronized void dropTable(ConnectorSession session, ConnectorTableHandl
Long tableId = tableIds.remove(handle.getTableName());
if (tableId != null) {
tables.remove(tableId);
tableDataFragments.remove(tableId);
}
}

Expand All @@ -150,8 +152,7 @@ public synchronized void renameTable(ConnectorSession session, ConnectorTableHan
oldTableHandle.getSchemaName(),
newTableName.getTableName(),
oldTableHandle.getTableId(),
oldTableHandle.getColumnHandles(),
oldTableHandle.getHosts());
oldTableHandle.getColumnHandles());
tableIds.remove(oldTableHandle.getTableName());
tableIds.put(newTableName.getTableName(), oldTableHandle.getTableId());
tables.remove(oldTableHandle.getTableId());
Expand All @@ -176,16 +177,20 @@ public synchronized MemoryOutputTableHandle beginCreateTable(ConnectorSession se
MemoryTableHandle table = new MemoryTableHandle(
connectorId,
nextId,
tableMetadata,
nodes.stream().map(Node::getHostAndPort).collect(Collectors.toList()));
tableMetadata);
tables.put(table.getTableId(), table);
tableDataFragments.put(table.getTableId(), new HashMap<>());

return new MemoryOutputTableHandle(table, ImmutableSet.copyOf(tableIds.values()));
}

@Override
public synchronized Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
{
requireNonNull(tableHandle, "tableHandle is null");
MemoryOutputTableHandle memoryOutputHandle = (MemoryOutputTableHandle) tableHandle;

updateRowsOnHosts(memoryOutputHandle.getTable(), fragments);
return Optional.empty();
}

Expand All @@ -199,9 +204,28 @@ public synchronized MemoryInsertTableHandle beginInsert(ConnectorSession session
@Override
public synchronized Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments)
{
requireNonNull(insertHandle, "insertHandle is null");
MemoryInsertTableHandle memoryInsertHandle = (MemoryInsertTableHandle) insertHandle;

updateRowsOnHosts(memoryInsertHandle.getTable(), fragments);
return Optional.empty();
}

private void updateRowsOnHosts(MemoryTableHandle table, Collection<Slice> fragments)
{
checkState(
tableDataFragments.containsKey(table.getTableId()),
"Uninitialized table [%s.%s]",
table.getSchemaName(),
table.getTableName());
Map<HostAddress, MemoryDataFragment> dataFragments = tableDataFragments.get(table.getTableId());

for (Slice fragment : fragments) {
MemoryDataFragment memoryDataFragment = MemoryDataFragment.fromSlice(fragment);
dataFragments.merge(memoryDataFragment.getHostAddress(), memoryDataFragment, MemoryDataFragment::merge);
}
}

@Override
public synchronized List<ConnectorTableLayoutResult> getTableLayouts(
ConnectorSession session,
Expand All @@ -211,8 +235,17 @@ public synchronized List<ConnectorTableLayoutResult> getTableLayouts(
{
requireNonNull(handle, "handle is null");
checkArgument(handle instanceof MemoryTableHandle);
MemoryTableHandle memoryTableHandle = (MemoryTableHandle) handle;
checkState(
tableDataFragments.containsKey(memoryTableHandle.getTableId()),
"Inconsistent state for the table [%s.%s]",
memoryTableHandle.getSchemaName(),
memoryTableHandle.getTableName());

List<MemoryDataFragment> expectedFragments = ImmutableList.copyOf(
tableDataFragments.get(memoryTableHandle.getTableId()).values());

MemoryTableLayoutHandle layoutHandle = new MemoryTableLayoutHandle((MemoryTableHandle) handle);
MemoryTableLayoutHandle layoutHandle = new MemoryTableLayoutHandle(memoryTableHandle, expectedFragments);
return ImmutableList.of(new ConnectorTableLayoutResult(getTableLayout(session, layoutHandle), constraint.getSummary()));
}

Expand Down
Expand Up @@ -17,9 +17,12 @@
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;

Expand All @@ -36,11 +39,19 @@ public class MemoryPageSinkProvider
implements ConnectorPageSinkProvider
{
private final MemoryPagesStore pagesStore;
private final HostAddress currentHostAddress;

@Inject
public MemoryPageSinkProvider(MemoryPagesStore pagesStore)
public MemoryPageSinkProvider(MemoryPagesStore pagesStore, NodeManager nodeManager)
{
this(pagesStore, requireNonNull(nodeManager, "nodeManager is null").getCurrentNode().getHostAndPort());
}

@VisibleForTesting
public MemoryPageSinkProvider(MemoryPagesStore pagesStore, HostAddress currentHostAddress)
{
this.pagesStore = requireNonNull(pagesStore, "pagesStore is null");
this.currentHostAddress = requireNonNull(currentHostAddress, "currentHostAddress is null");
}

@Override
Expand All @@ -53,7 +64,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa

pagesStore.cleanUp(memoryOutputTableHandle.getActiveTableIds());
pagesStore.initialize(tableId);
return new MemoryPageSink(pagesStore, tableId);
return new MemoryPageSink(pagesStore, currentHostAddress, tableId);
}

@Override
Expand All @@ -65,32 +76,37 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
checkState(memoryInsertTableHandle.getActiveTableIds().contains(tableId));

pagesStore.cleanUp(memoryInsertTableHandle.getActiveTableIds());
return new MemoryPageSink(pagesStore, tableId);
pagesStore.initialize(tableId);
return new MemoryPageSink(pagesStore, currentHostAddress, tableId);
}

private static class MemoryPageSink
implements ConnectorPageSink
{
private final MemoryPagesStore pagesStore;
private final HostAddress currentHostAddress;
private final long tableId;
private long addedRows;

public MemoryPageSink(MemoryPagesStore pagesStore, long tableId)
public MemoryPageSink(MemoryPagesStore pagesStore, HostAddress currentHostAddress, long tableId)
{
this.pagesStore = requireNonNull(pagesStore, "pagesStore is null");
this.currentHostAddress = requireNonNull(currentHostAddress, "currentHostAddress is null");
this.tableId = tableId;
}

@Override
public CompletableFuture<?> appendPage(Page page)
{
pagesStore.add(tableId, page);
addedRows += page.getPositionCount();
return NOT_BLOCKED;
}

@Override
public CompletableFuture<Collection<Slice>> finish()
{
return completedFuture(ImmutableList.of());
return completedFuture(ImmutableList.of(new MemoryDataFragment(currentHostAddress, addedRows).toSlice()));
}

@Override
Expand Down
Expand Up @@ -51,11 +51,17 @@ public ConnectorPageSource createPageSource(
long tableId = memorySplit.getTableHandle().getTableId();
int partNumber = memorySplit.getPartNumber();
int totalParts = memorySplit.getTotalPartsPerWorker();
long expectedRows = memorySplit.getExpectedRows();

List<Integer> columnIndexes = columns.stream()
.map(MemoryColumnHandle.class::cast)
.map(MemoryColumnHandle::getColumnIndex).collect(toList());
List<Page> pages = pagesStore.getPages(tableId, partNumber, totalParts, columnIndexes);
List<Page> pages = pagesStore.getPages(
tableId,
partNumber,
totalParts,
columnIndexes,
expectedRows);

return new FixedPageSource(pages);
}
Expand Down

0 comments on commit 4a1ace6

Please sign in to comment.