Skip to content

Commit

Permalink
[mirroring client] Catch-up handover review with missing features and…
Browse files Browse the repository at this point in the history
… review comments (googleapis#3347)

* chore: revert review comments

* feat: add MirroringOperationException exception markers (googleapis#125)

* feat: concurrent writes in MirroringBufferedMutator (googleapis#80)

* refactor: implement multiple argument operations on MirroringAsyncTable with specific operations rather than batch() (googleapis#75)

* feat: implement MirroringAsyncTable#getName() (googleapis#132)

* feat: use Logger rather than stdout in DefaultMismatchDetector (googleapis#128)

* feat: synchronous writes (googleapis#88)

* fix: implement heapSize method for RowCell (googleapis#111)

* feat: FlowController accounts for memory usage (googleapis#137)

* refactor: remove Configuration as a base of MirroringConfiguration (googleapis#127)

* feat: MirroringAsyncBufferedMutator (googleapis#81)

* refactor: rename WRITE_MISMATCH to SECONDARY_WRITE_ERROR (googleapis#138)

* fix: BufferedMutator close() waits for all secondary flushes to finish (googleapis#110)

* feat: 2.x reads sampling (googleapis#114)

* refactor: make MirroringResultScanner synchronize on itself rather than MirroringTable (googleapis#134)

* ConcurrentBufferedMutator integration tests (googleapis#135)

* feat: add synchronous MirroringConnection to 2.x (googleapis#109)

* fix: MirroringConnection in 2.x failed to compile (googleapis#139)

* fix: fix BufferedMutator ITs (googleapis#140)

* feat: run 1.x integration tests on MirroringConnection etc. from 2.x (googleapis#108)

* feat: 2.x - rewrite Increment and Append as Put in batch (googleapis#116)

* fix: fix build (googleapis#142)

* refactor: minor fixes after review (googleapis#117)

* feat: MirroringAsyncTable#getScanner() (googleapis#58)

* test: 2.x integration tests (googleapis#112)

* feat: implement MirroringAsyncBufferedMutatorBuilder (googleapis#144)

* feat: log rows and values in DefaultMismatchDetector (googleapis#129)

* fix: ITs - add expected parameter to MismatchDetectors (googleapis#153)

* fix: force Append and Increment to return results and discard that result before returning it to user (googleapis#136)

* fix: review fixes in utils

* fix: review fixes in BufferedMutator

* fix: review fixes in Faillog

* fix: fixed reference counting

* fix: review fixes in FlowController

* fix: review fixes in metrics

* fix: review fixes in verification

* fix: Review fixes in MirroringTable

* fix: review fixes in HBase 2.x client

* fix: fixes in ITs

* feat: MirrorinAsyncTable: scan(), scanAll() (googleapis#131)

* fix: review fixes in tests

* feat: MirroringConnection: timeout in close() and abort() (googleapis#133)

* feat: better mismatch detection of scan results (googleapis#130)

* feat: quickstart (googleapis#105)

* fix: 2.x scan ITs (googleapis#158)

* fix: DefaultMismatchDetector tests (googleapis#157)

* fix: ConcurrentBufferedMutator waits for both flushes to finish before closing (googleapis#161)

* fix: additional minor fixes after review (googleapis#163)

* fix: BufferedMutator review fixes (googleapis#164)

- Simplify #flush().
- Add javadocs.
- (sequential) Fix flush() exception handling.
- (sequential) Move error handling to a separate inner class.

* fix: PR fixes

* fix: report zeroed error metrics after successful operations

* fix: prepend MismatchDetectorCounter with Test to better reflect its purpose

* feat: Client-side timestamping (googleapis#165)

* fix: reduce timeout in TestBlocking to make the tests run faster

* fix: asyncClose -> closePrimaryAndScheduleSecondaryClose

* fix: remove unused Batcher#throwBatchDataExceptionIfPresent

* fix: remove unused Comparators#compareRows

* fix: extract failedReads from MatchingSuccessfulReadsResults to reduce confusion

* feat: remove unused MirroringTracer from FailedMutationLogger

* fix: MirroringAsyncBufferedMutator - test if failed mutation is passed to secondary write error consumer

* fix: TestMirroringAsyncTableInputModification typo fix

* fix: describe user flush() in Buffered Mutator in quickstart

* fix: MirroringBufferedMutator - move flush threshold from BufferedMutations to FlushSerializer

* refactor: MirroringBufferedMutator#close() - use AccumulatedExceptions insteand of List<Exception>

* BufferedMutator - add close timeout

* AsyncBufferedMutator - add close timeout

* fix: remove stale addSecondaryMutation comment

* fix: add a comment that addSecondaryMutation handles failed writes

* fix: unify implementations of flushBufferedMutatorBeforeClosing

* fix: BufferedMutator - throw exceptions on close

* fix: BufferedMutator - add comment explaining that chain of flush operations is created

* fix: BufferedMutator - clarify  comments

* fix: Concurrent BufferedMutator - fix throwFlushExceptionIfAvailable

* fix: explain why flush is called in Sequential BufferedMutator test

* fix: TestConcurrentMirroringBufferedMutator - make waiting for calls explicit

* refactor: BufferedMutator rename scheduleFlushAll() to scheduleFlush()

* refactor: make FlushSerializer non-static

* fix: BufferedMutator - use HierarchicalReferenceCounter

* feat: Add MirroringConnection constructor taking MirroringConfiguration

* refactor: move releaseReservations to finally

* fix: use less convoluted example in lastFlushFutures description

* fix: merge small Timeestamper files into a single file

* fix: add a comment explaining which exceptions are forwarded to the user and why in SequentialMirroringBufferedMutator

* fix: use UnsupportedOperationException instead of RuntimeException when forbidden mutation type is encountered

* fix: add comment explaining why batch is complicated

* fix: add a TODO to implement point writes without batch

Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com>
Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
Co-authored-by: Kajetan Boroszko <kajetan@unoperate.com>
  • Loading branch information
4 people authored and vermas2012 committed May 11, 2022
1 parent d9eccab commit 4aecdc2
Show file tree
Hide file tree
Showing 150 changed files with 15,829 additions and 4,267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;

/**
* RowCell is an alternative implementation of {@link KeyValue}. Unlike KeyValue, RowCell stores
Expand Down Expand Up @@ -277,4 +278,22 @@ public String toString() {
+ "/"
+ Type.codeToType(getTypeByte());
}

public long heapSize() {
long labelSize = ClassSize.ARRAYLIST;
for (String label : labels) {
labelSize += ClassSize.STRING + ClassSize.align(label.length());
}
return ClassSize.align(rowArray.length)
+ ClassSize.ARRAY
+ ClassSize.align(familyArray.length)
+ ClassSize.ARRAY
+ ClassSize.align(qualifierArray.length)
+ ClassSize.ARRAY
+ 8 // timestamp
+ ClassSize.align(valueArray.length)
+ ClassSize.ARRAY
+ labelSize
+ ClassSize.OBJECT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ limitations under the License.

<systemPropertyVariables>
<integration-tests-config-file-name>hbase-to-bigtable-local-configuration.xml</integration-tests-config-file-name>
<integrations.compat.table-creator-impl>com.google.cloud.bigtable.hbase.mirroring.utils.compat.TableCreator1x</integrations.compat.table-creator-impl>
</systemPropertyVariables>

<environmentVariables>
Expand Down Expand Up @@ -143,6 +144,7 @@ limitations under the License.

<systemPropertyVariables>
<integration-tests-config-file-name>bigtable-to-hbase-local-configuration.xml</integration-tests-config-file-name>
<integrations.compat.table-creator-impl>com.google.cloud.bigtable.hbase.mirroring.utils.compat.TableCreator1x</integrations.compat.table-creator-impl>
</systemPropertyVariables>

<environmentVariables>
Expand Down Expand Up @@ -317,6 +319,12 @@ limitations under the License.
<version>1.1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
Expand All @@ -329,6 +337,19 @@ limitations under the License.
<version>2.14.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-mirroring-client-1.x</artifactId>
<version>2.0.0-alpha2-SNAPSHOT</version> <!-- {x-version-update:bigtable-client-parent:current} -->
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>org.apache.hbase</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,33 @@
*/
package com.google.cloud.bigtable.hbase.mirroring;

import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FLOW_CONTROLLER_MAX_OUTSTANDING_REQUESTS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_MISMATCH_DETECTOR_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONNECTION_CONNECTION_TERMINATION_TIMEOUT;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FLOW_CONTROLLER_STRATEGY_FACTORY_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_MISMATCH_DETECTOR_FACTORY_CLASS;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;

import com.google.cloud.bigtable.hbase.mirroring.utils.BlockingFlowControllerStrategy;
import com.google.cloud.bigtable.hbase.mirroring.utils.BlockingMismatchDetector;
import com.google.cloud.bigtable.hbase.mirroring.utils.ConfigurationHelper;
import com.google.cloud.bigtable.hbase.mirroring.utils.ConnectionRule;
import com.google.cloud.bigtable.hbase.mirroring.utils.DatabaseHelpers;
import com.google.cloud.bigtable.hbase.mirroring.utils.ExecutorServiceRule;
import com.google.cloud.bigtable.hbase.mirroring.utils.Helpers;
import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounter;
import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounterRule;
import com.google.cloud.bigtable.hbase.mirroring.utils.SlowMismatchDetector;
import com.google.cloud.bigtable.hbase.mirroring.utils.ZipkinTracingRule;
import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter;
import com.google.cloud.bigtable.mirroring.hbase1_x.ExecutorServiceRule;
import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringConnection;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Table;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -42,110 +50,198 @@

@RunWith(JUnit4.class)
public class TestBlocking {
static final byte[] columnFamily1 = "cf1".getBytes();
static final byte[] qualifier1 = "q1".getBytes();
@ClassRule public static ConnectionRule connectionRule = new ConnectionRule();
@ClassRule public static ZipkinTracingRule zipkinTracingRule = new ZipkinTracingRule();
@Rule public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
public DatabaseHelpers databaseHelpers = new DatabaseHelpers(connectionRule, executorServiceRule);
@Rule public ExecutorServiceRule executorServiceRule = ExecutorServiceRule.cachedPoolExecutor();
private DatabaseHelpers databaseHelpers =
new DatabaseHelpers(connectionRule, executorServiceRule);

@Rule
public MismatchDetectorCounterRule mismatchDetectorCounterRule =
new MismatchDetectorCounterRule();

@Test
public void testConnectionCloseBlocksUntilAllRequestsHaveBeenVerified()
throws IOException, InterruptedException {
long beforeTableClose;
long afterTableClose;
long afterConnectionClose;
private static final byte[] columnFamily1 = "cf1".getBytes();
private static final byte[] qualifier1 = "q1".getBytes();

private TableName tableName;

@Before
public void setUp() throws IOException {
this.tableName = connectionRule.createTable(columnFamily1);
}

@Test(timeout = 10000)
public void testConnectionCloseBlocksUntilAllRequestsHaveBeenVerified()
throws IOException, InterruptedException, TimeoutException, ExecutionException {
Configuration config = ConfigurationHelper.newConfiguration();
config.set(MIRRORING_MISMATCH_DETECTOR_CLASS, SlowMismatchDetector.class.getCanonicalName());
SlowMismatchDetector.sleepTime = 1000;
config.set(
MIRRORING_MISMATCH_DETECTOR_FACTORY_CLASS,
BlockingMismatchDetector.Factory.class.getName());
BlockingMismatchDetector.reset();

TableName tableName;
try (MirroringConnection connection = databaseHelpers.createConnection(config)) {
tableName = connectionRule.createTable(connection, columnFamily1);
try (Table t = connection.getTable(tableName)) {
for (int i = 0; i < 10; i++) {
Get get = new Get("1".getBytes());
get.addColumn(columnFamily1, qualifier1);
t.get(get);
}
beforeTableClose = System.currentTimeMillis();
final MirroringConnection connection = databaseHelpers.createConnection(config);
tableName = connectionRule.createTable(connection, columnFamily1);
try (Table t = connection.getTable(tableName)) {
for (int i = 0; i < 10; i++) {
Get get = new Get("1".getBytes());
get.addColumn(columnFamily1, qualifier1);
t.get(get);
}
afterTableClose = System.currentTimeMillis();
} // There are in-flight requests but closing a Table object shouldn't block.

final SettableFuture<Void> closingThreadStarted = SettableFuture.create();
final SettableFuture<Void> closingThreadEnded = SettableFuture.create();

Thread closingThread =
new Thread() {
@Override
public void run() {
try {
closingThreadStarted.set(null);
connection.close();
closingThreadEnded.set(null);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
closingThread.start();

// Wait until closing thread starts.
closingThreadStarted.get(1, TimeUnit.SECONDS);

// And give it some time to run, to verify that is has blocked. It will block until timeout is
// encountered or all async operations are finished. We will hit the second case here, because
// we will unblock the mismatch detector.
try {
closingThreadEnded.get(1, TimeUnit.SECONDS);
fail("should throw");
} catch (TimeoutException ignored) {
// expected
}
afterConnectionClose = System.currentTimeMillis();
long tableCloseDuration = afterTableClose - beforeTableClose;
long connectionCloseDuration = afterConnectionClose - afterTableClose;
assertThat(tableCloseDuration).isLessThan(100);
assertThat(connectionCloseDuration).isGreaterThan(900);
assertThat(MismatchDetectorCounter.getInstance().getVerificationsStartedCounter())
.isEqualTo(10);
assertThat(MismatchDetectorCounter.getInstance().getVerificationsFinishedCounter())

// Finish running verifications
BlockingMismatchDetector.unblock();

// And now Connection#close() should unblock.
closingThreadEnded.get(1, TimeUnit.SECONDS);

// And all verification should have finished.
assertThat(TestMismatchDetectorCounter.getInstance().getVerificationsFinishedCounter())
.isEqualTo(10);
}

@Test
public void testSlowSecondaryConnection() throws IOException {
@Test(timeout = 10000)
public void flowControllerBlocksScheduling()
throws IOException, InterruptedException, ExecutionException, TimeoutException {
Configuration config = ConfigurationHelper.newConfiguration();
config.set(MIRRORING_MISMATCH_DETECTOR_CLASS, SlowMismatchDetector.class.getCanonicalName());
SlowMismatchDetector.sleepTime = 100;
config.set(MIRRORING_FLOW_CONTROLLER_MAX_OUTSTANDING_REQUESTS, "10");
TableName tableName;
byte[] row = "1".getBytes();
try (MirroringConnection connection = databaseHelpers.createConnection(config)) {
tableName = connectionRule.createTable(connection, columnFamily1);
try (Table table = connection.getTable(tableName)) {
table.put(Helpers.createPut(row, columnFamily1, qualifier1, "1".getBytes()));
}
}
config.set(
MIRRORING_FLOW_CONTROLLER_STRATEGY_FACTORY_CLASS,
BlockingFlowControllerStrategy.Factory.class.getName());
BlockingFlowControllerStrategy.reset();

long startTime;
long endTime;
long duration;
final byte[] row = "1".getBytes();
final SettableFuture<Void> closingThreadStarted = SettableFuture.create();
final SettableFuture<Void> closingThreadEnded = SettableFuture.create();

try (MirroringConnection connection = databaseHelpers.createConnection(config)) {
startTime = System.currentTimeMillis();
try (Table table = connection.getTable(tableName)) {
for (int i = 0; i < 1000; i++) {
table.get(Helpers.createGet(row, columnFamily1, qualifier1));
}
}
}
endTime = System.currentTimeMillis();
duration = endTime - startTime;
// 1000 requests * 100 ms / 10 concurrent requests
assertThat(duration).isGreaterThan(10000);
Thread t =
new Thread() {
@Override
public void run() {
closingThreadStarted.set(null);
try {
table.put(Helpers.createPut(row, columnFamily1, qualifier1, "1".getBytes()));
closingThreadEnded.set(null);
} catch (IOException e) {
closingThreadEnded.setException(e);
throw new RuntimeException(e);
}
}
};
t.start();

config.set(MIRRORING_FLOW_CONTROLLER_MAX_OUTSTANDING_REQUESTS, "50");
try (MirroringConnection connection = databaseHelpers.createConnection(config)) {
startTime = System.currentTimeMillis();
try (Table table = connection.getTable(tableName)) {
for (int i = 0; i < 1000; i++) {
table.get(Helpers.createGet(row, columnFamily1, qualifier1));
}
}
}
endTime = System.currentTimeMillis();
duration = endTime - startTime;
// 1000 requests * 100 ms / 50 concurrent requests
assertThat(duration).isGreaterThan(2000);
// Wait until thread starts.
closingThreadStarted.get(1, TimeUnit.SECONDS);

config.set(MIRRORING_FLOW_CONTROLLER_MAX_OUTSTANDING_REQUESTS, "1000");
try (MirroringConnection connection = databaseHelpers.createConnection(config)) {
startTime = System.currentTimeMillis();
try (Table table = connection.getTable(tableName)) {
for (int i = 0; i < 1000; i++) {
table.get(Helpers.createGet(row, columnFamily1, qualifier1));
// Give it some time to run, to verify that is has blocked. We are expecting that this
// operation will timeout because it is waiting for the FlowController to admit resources
// for the `put` operation.
try {
closingThreadEnded.get(1, TimeUnit.SECONDS);
fail("should throw");
} catch (TimeoutException ignored) {
// expected
}
// Unlock flow controller.
BlockingFlowControllerStrategy.unblock();
// And verify that it has unblocked.
closingThreadEnded.get(1, TimeUnit.SECONDS);
}
}
endTime = System.currentTimeMillis();
duration = endTime - startTime;
// 1000 requests * 100 ms / 1000 concurrent requests
assertThat(duration).isLessThan(1000);
}

@Test(timeout = 10000)
public void testMirroringConnectionCloseTimeout()
throws IOException, InterruptedException, ExecutionException, TimeoutException {
long timeoutMillis = 1000;

final Configuration config = ConfigurationHelper.newConfiguration();
config.set(MIRRORING_CONNECTION_CONNECTION_TERMINATION_TIMEOUT, String.valueOf(timeoutMillis));
config.set(
MIRRORING_MISMATCH_DETECTOR_FACTORY_CLASS,
BlockingMismatchDetector.Factory.class.getName());
BlockingMismatchDetector.reset();

final byte[] row = "1".getBytes();

final TableName tableName = connectionRule.createTable(columnFamily1);
final SettableFuture<MirroringConnection> closingThreadStartedFuture = SettableFuture.create();
final SettableFuture<Long> closingThreadFinishedFuture = SettableFuture.create();

Thread t =
new Thread() {
@Override
public void run() {
try {
// Not in try-with-resources, we are calling close() explicitly.
MirroringConnection connection = databaseHelpers.createConnection(config);
Table table = connection.getTable(tableName);
table.get(Helpers.createGet(row, columnFamily1, qualifier1));
table.close();

closingThreadStartedFuture.set(connection);
Stopwatch stopwatch = Stopwatch.createStarted();
connection.close();
stopwatch.stop();
closingThreadFinishedFuture.set(stopwatch.elapsed(TimeUnit.MILLISECONDS));
} catch (IOException e) {
closingThreadFinishedFuture.setException(e);
}
}
};

t.start();

// Wait until the thread starts.
MirroringConnection c = closingThreadStartedFuture.get(1, TimeUnit.SECONDS);
// And wait for it to finish. It should time-out after 1 second.
long closeDuration = closingThreadFinishedFuture.get(3, TimeUnit.SECONDS);
// The closingThreadFinishedFuture did not timeout, thus we know that closing the connection
// lasted no longer than 3 seconds.
// We also need to check that it waited at least `timeoutMillis`.
// `closeDuration` is strictly greater than timeout because it includes some overhead,
// but `timeoutMillis` >> expected overhead, thus false-positives are unlikely.
assertThat(closeDuration).isAtLeast(timeoutMillis);
assertThat(c.getPrimaryConnection().isClosed()).isTrue();
assertThat(c.getSecondaryConnection().isClosed()).isFalse();

// Finish asynchronous operation.
BlockingMismatchDetector.unblock();
// Give it a second to run.
Thread.sleep(1000);
assertThat(c.getPrimaryConnection().isClosed()).isTrue();
assertThat(c.getSecondaryConnection().isClosed()).isTrue();
}
}
Loading

0 comments on commit 4aecdc2

Please sign in to comment.