Skip to content

Commit

Permalink
Rename ConnectorPageSink commit/rollback to finish/abort
Browse files Browse the repository at this point in the history
  • Loading branch information
erichwang committed Dec 28, 2015
1 parent a3c977b commit 011e763
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 25 deletions.
Expand Up @@ -50,13 +50,13 @@ public void appendPage(Page page, Block sampleWeightBlock)
} }


@Override @Override
public Collection<Slice> commit() public Collection<Slice> finish()
{ {
return ImmutableList.of(); return ImmutableList.of();
} }


@Override @Override
public void rollback() public void abort()
{ {
} }
} }
Expand Down
Expand Up @@ -224,7 +224,7 @@ public HivePageSink(
} }


@Override @Override
public Collection<Slice> commit() public Collection<Slice> finish()
{ {
ImmutableList.Builder<Slice> partitionUpdates = ImmutableList.builder(); ImmutableList.Builder<Slice> partitionUpdates = ImmutableList.builder();
for (HiveRecordWriter writer : writers) { for (HiveRecordWriter writer : writers) {
Expand All @@ -238,7 +238,7 @@ public Collection<Slice> commit()
} }


@Override @Override
public void rollback() public void abort()
{ {
for (HiveRecordWriter writer : writers) { for (HiveRecordWriter writer : writers) {
if (writer != null) { if (writer != null) {
Expand Down
Expand Up @@ -1310,7 +1310,7 @@ public void testTableCreationRollback()
// write the data // write the data
ConnectorPageSink sink = pageSinkProvider.createPageSink(session, outputHandle); ConnectorPageSink sink = pageSinkProvider.createPageSink(session, outputHandle);
sink.appendPage(CREATE_TABLE_DATA.toPage(), null); sink.appendPage(CREATE_TABLE_DATA.toPage(), null);
sink.commit(); sink.finish();


// verify we have data files // verify we have data files
assertFalse(listAllDataFiles(outputHandle).isEmpty()); assertFalse(listAllDataFiles(outputHandle).isEmpty());
Expand Down Expand Up @@ -1531,7 +1531,7 @@ protected void doCreateSampledTable(SchemaTableName tableName)


sink.appendPage(new Page(dataBlockBuilder.build()), sampleBlockBuilder.build()); sink.appendPage(new Page(dataBlockBuilder.build()), sampleBlockBuilder.build());


Collection<Slice> fragments = sink.commit(); Collection<Slice> fragments = sink.finish();


// commit the table // commit the table
metadata.commitCreateTable(session, outputHandle, fragments); metadata.commitCreateTable(session, outputHandle, fragments);
Expand Down Expand Up @@ -1594,7 +1594,7 @@ protected void doCreateTable(SchemaTableName tableName, HiveStorageFormat storag
// write the data // write the data
ConnectorPageSink sink = pageSinkProvider.createPageSink(session, outputHandle); ConnectorPageSink sink = pageSinkProvider.createPageSink(session, outputHandle);
sink.appendPage(CREATE_TABLE_DATA.toPage(), null); sink.appendPage(CREATE_TABLE_DATA.toPage(), null);
Collection<Slice> fragments = sink.commit(); Collection<Slice> fragments = sink.finish();


// verify all new files start with the unique prefix // verify all new files start with the unique prefix
for (String filePath : listAllDataFiles(outputHandle)) { for (String filePath : listAllDataFiles(outputHandle)) {
Expand Down Expand Up @@ -1669,7 +1669,7 @@ private void doInsert(HiveStorageFormat storageFormat, SchemaTableName tableName


// write data // write data
sink.appendPage(CREATE_TABLE_DATA.toPage(), null); sink.appendPage(CREATE_TABLE_DATA.toPage(), null);
Collection<Slice> fragments = sink.commit(); Collection<Slice> fragments = sink.finish();


// commit the insert // commit the insert
metadata.commitInsert(session, insertTableHandle, fragments); metadata.commitInsert(session, insertTableHandle, fragments);
Expand Down Expand Up @@ -1701,7 +1701,7 @@ private void doInsert(HiveStorageFormat storageFormat, SchemaTableName tableName
ConnectorPageSink sink = pageSinkProvider.createPageSink(session, insertTableHandle); ConnectorPageSink sink = pageSinkProvider.createPageSink(session, insertTableHandle);
sink.appendPage(CREATE_TABLE_DATA.toPage(), null); sink.appendPage(CREATE_TABLE_DATA.toPage(), null);
sink.appendPage(CREATE_TABLE_DATA.toPage(), null); sink.appendPage(CREATE_TABLE_DATA.toPage(), null);
sink.commit(); sink.finish();


// verify we did not modify the table directory // verify we did not modify the table directory
assertEquals(listAllDataFiles(tableName.getSchemaName(), tableName.getTableName()), existingFiles); assertEquals(listAllDataFiles(tableName.getSchemaName(), tableName.getTableName()), existingFiles);
Expand Down Expand Up @@ -1840,7 +1840,7 @@ private void doInsertIntoNewPartition(HiveStorageFormat storageFormat, SchemaTab
ConnectorInsertTableHandle insertTableHandle = metadata.beginInsert(session, tableHandle); ConnectorInsertTableHandle insertTableHandle = metadata.beginInsert(session, tableHandle);
ConnectorPageSink sink = pageSinkProvider.createPageSink(session, insertTableHandle); ConnectorPageSink sink = pageSinkProvider.createPageSink(session, insertTableHandle);
sink.appendPage(CREATE_TABLE_PARTITIONED_DATA_2ND.toPage(), null); sink.appendPage(CREATE_TABLE_PARTITIONED_DATA_2ND.toPage(), null);
sink.commit(); sink.finish();


// verify we did not modify the table directory // verify we did not modify the table directory
assertEquals(listAllDataFiles(tableName.getSchemaName(), tableName.getTableName()), existingFiles); assertEquals(listAllDataFiles(tableName.getSchemaName(), tableName.getTableName()), existingFiles);
Expand Down Expand Up @@ -1909,7 +1909,7 @@ private void doInsertIntoExistingPartition(HiveStorageFormat storageFormat, Sche
ConnectorPageSink sink = pageSinkProvider.createPageSink(session, insertTableHandle); ConnectorPageSink sink = pageSinkProvider.createPageSink(session, insertTableHandle);
sink.appendPage(CREATE_TABLE_PARTITIONED_DATA.toPage(), null); sink.appendPage(CREATE_TABLE_PARTITIONED_DATA.toPage(), null);
sink.appendPage(CREATE_TABLE_PARTITIONED_DATA.toPage(), null); sink.appendPage(CREATE_TABLE_PARTITIONED_DATA.toPage(), null);
sink.commit(); sink.finish();


// verify we did not modify the table directory // verify we did not modify the table directory
assertEquals(listAllDataFiles(tableName.getSchemaName(), tableName.getTableName()), existingFiles); assertEquals(listAllDataFiles(tableName.getSchemaName(), tableName.getTableName()), existingFiles);
Expand Down Expand Up @@ -1943,7 +1943,7 @@ private void insertData(ConnectorTableHandle tableHandle, MaterializedResult dat


// write data // write data
sink.appendPage(data.toPage(), null); sink.appendPage(data.toPage(), null);
Collection<Slice> fragments = sink.commit(); Collection<Slice> fragments = sink.finish();


// commit the insert // commit the insert
metadata.commitInsert(session, insertTableHandle, fragments); metadata.commitInsert(session, insertTableHandle, fragments);
Expand Down
Expand Up @@ -319,7 +319,7 @@ private void doCreateTable(SchemaTableName tableName, HiveStorageFormat storageF
// write the records // write the records
ConnectorPageSink sink = pageSinkProvider.createPageSink(SESSION, outputHandle); ConnectorPageSink sink = pageSinkProvider.createPageSink(SESSION, outputHandle);
sink.appendPage(data.toPage(), null); sink.appendPage(data.toPage(), null);
Collection<Slice> fragments = sink.commit(); Collection<Slice> fragments = sink.finish();


// commit the table // commit the table
metadata.commitCreateTable(SESSION, outputHandle, fragments); metadata.commitCreateTable(SESSION, outputHandle, fragments);
Expand Down
Expand Up @@ -191,7 +191,7 @@ public Page getOutput()
} }
state = State.FINISHED; state = State.FINISHED;


Collection<Slice> fragments = pageSink.commit(); Collection<Slice> fragments = pageSink.finish();
committed = true; committed = true;


PageBuilder page = new PageBuilder(TYPES); PageBuilder page = new PageBuilder(TYPES);
Expand Down Expand Up @@ -220,7 +220,7 @@ public void close()
if (!closed) { if (!closed) {
closed = true; closed = true;
if (!committed) { if (!committed) {
pageSink.rollback(); pageSink.abort();
} }
} }
} }
Expand Down
Expand Up @@ -99,7 +99,7 @@ public void appendPage(Page page, Block sampleWeightBlock)
} }


@Override @Override
public Collection<Slice> commit() public Collection<Slice> finish()
{ {
flushPages(pageBuffer.getPages()); flushPages(pageBuffer.getPages());
List<ShardInfo> shards = storagePageSink.commit(); List<ShardInfo> shards = storagePageSink.commit();
Expand All @@ -112,7 +112,7 @@ public Collection<Slice> commit()
} }


@Override @Override
public void rollback() public void abort()
{ {
storagePageSink.rollback(); storagePageSink.rollback();
} }
Expand Down
Expand Up @@ -22,7 +22,7 @@ public interface ConnectorPageSink
{ {
void appendPage(Page page, Block sampleWeightBlock); void appendPage(Page page, Block sampleWeightBlock);


Collection<Slice> commit(); Collection<Slice> finish();


void rollback(); void abort();
} }
Expand Up @@ -34,13 +34,13 @@ public RecordPageSink(RecordSink recordSink)
} }


@Override @Override
public Collection<Slice> commit() public Collection<Slice> finish()
{ {
return recordSink.commit(); return recordSink.commit();
} }


@Override @Override
public void rollback() public void abort()
{ {
recordSink.rollback(); recordSink.rollback();
} }
Expand Down
Expand Up @@ -43,18 +43,18 @@ public void appendPage(Page page, Block sampleWeightBlock)
} }


@Override @Override
public Collection<Slice> commit() public Collection<Slice> finish()
{ {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.commit(); return delegate.finish();
} }
} }


@Override @Override
public void rollback() public void abort()
{ {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
delegate.rollback(); delegate.abort();
} }
} }
} }

0 comments on commit 011e763

Please sign in to comment.