Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Commit

Permalink
DL-32: Fix Findbug warnings
Browse files Browse the repository at this point in the history
- Bump the version to 3.0.3
- Fix all the findbug warnings
- Enable findbugs:check on travis ci

Author: Jon Derrick <jonathan.derrickk@gmail.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #19 from jderrickk/jd/fix_findbugs_error and squashes the following commits:

c48c89c [Jon Derrick] Merge branch 'master' into jd/fix_findbugs_error
d9b0425 [Jon Derrick] verify findbugs on travis ci
985501b [Jon Derrick] Fix findbug errors on all modules
18e8267 [Jon Derrick] Remove distributedlog-example
ffa8361 [Jon Derrick] Fix findbugs in distributedlog-protocol
  • Loading branch information
jderrickk authored and Sijie Guo committed Oct 13, 2016
1 parent 93bdad0 commit bb6990d
Show file tree
Hide file tree
Showing 48 changed files with 184 additions and 964 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ install:

script:
- travis_retry mvn clean apache-rat:check
- travis_wait 60 mvn package
- travis_wait 60 mvn package findbugs:check

cache:
directories:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,8 @@ protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress>
MapDifference<Integer, SocketAddress> difference =
Maps.difference(shardId2Address, newMap);
left = difference.entriesOnlyOnLeft();
for (Integer shard : left.keySet()) {
for (Map.Entry<Integer, SocketAddress> shardEntry : left.entrySet()) {
int shard = shardEntry.getKey();
if (shard >= 0) {
SocketAddress host = shardId2Address.get(shard);
if (null != host) {
Expand All @@ -452,19 +453,19 @@ protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress>
} else {
// shard id is negative - they are resolved from finagle name, which instances don't have shard id
// in this case, if they are removed from serverset, we removed them directly
SocketAddress host = left.get(shard);
SocketAddress host = shardEntry.getValue();
if (null != host) {
removeHostInternal(host, Optional.<Throwable>absent());
removedList.add(host);
}
}
}
// we need to find if any shards are replacing old shards
for (Integer shard : newMap.keySet()) {
SocketAddress oldHost = shardId2Address.get(shard);
SocketAddress newHost = newMap.get(shard);
for (Map.Entry<Integer, SocketAddress> shard : newMap.entrySet()) {
SocketAddress oldHost = shardId2Address.get(shard.getKey());
SocketAddress newHost = shard.getValue();
if (!newHost.equals(oldHost)) {
join(shard, newHost, removedList);
join(shard.getKey(), newHost, removedList);
joinedList.add(newHost);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,10 @@ private void truncateLogSegmentsIfNecessary(BKLogWriteHandler writeHandler) {

// skip scheduling if there is task that's already running
//
if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) {
lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep);
synchronized (this) {
if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) {
lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) {
private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
private final ScheduledExecutorService executorService;
private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
private final Object scheduleLock = new Object();
private final AtomicLong scheduleCount = new AtomicLong(0);
final private Stopwatch scheduleDelayStopwatch;
final private Stopwatch readNextDelayStopwatch;
Expand All @@ -112,7 +113,7 @@ public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) {
private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() {
@Override
public void run() {
synchronized (scheduleCount) {
synchronized (scheduleLock) {
backgroundScheduleTask = null;
}
scheduleBackgroundRead();
Expand Down Expand Up @@ -485,7 +486,7 @@ public Future<Void> asyncClose() {
LOG.info("{}: Failed to cancel the background idle reader timeout task", bkLedgerManager.getFullyQualifiedName());
}

synchronized (scheduleCount) {
synchronized (scheduleLock) {
if (null != backgroundScheduleTask) {
backgroundScheduleTask.cancel(true);
}
Expand All @@ -508,7 +509,7 @@ private void cancelAllPendingReads(Throwable throwExc) {

@Override
public void run() {
synchronized(scheduleCount) {
synchronized(scheduleLock) {
if (scheduleDelayStopwatch.isRunning()) {
scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
}
Expand All @@ -533,11 +534,11 @@ public void run() {
backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS));
return;
}
}

if (disableProcessingReadRequests) {
LOG.info("Reader of {} is forced to stop processing read requests", bkLedgerManager.getFullyQualifiedName());
return;
if (disableProcessingReadRequests) {
LOG.info("Reader of {} is forced to stop processing read requests", bkLedgerManager.getFullyQualifiedName());
return;
}
}

// If the oldest pending promise is interrupted then we must mark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ Future<Long> flushAndCommit() {
}
}
if (null == writerFuture) {
return Future.value(lastTxId);
return Future.value(getLastTxId());
}
return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,10 +486,11 @@ public synchronized void unregisterListener(LogSegmentListener listener) {
}

public void checkClosedOrInError(String operation) throws AlreadyClosedException {
if (null != closePromise) {
throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager");
synchronized (this) {
if (null != closePromise) {
throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager");
}
}

if (null != writerBKC) {
writerBKC.checkClosedOrInError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ class BKLogWriteHandler extends BKLogHandler {
protected final boolean sanityCheckTxnId;
protected final boolean validateLogSegmentSequenceNumber;
protected final int regionId;
protected volatile boolean closed = false;
protected final RollingPolicy rollingPolicy;
protected Future<? extends DistributedLock> lockFuture = null;
protected final PermitLimiter writeLimiter;
Expand Down Expand Up @@ -225,7 +224,7 @@ public Future<Long> apply(List<LogSegmentMetadata> segmentList) {

// Rolling Policy
if (conf.getLogSegmentRollingIntervalMinutes() > 0) {
rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000);
rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000L);
} else {
rollingPolicy = new SizeBasedRollingPolicy(conf.getMaxLogSegmentBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private void setReadAheadCallback(ReadAheadCallback callback) {
}
}

private void setLastSeenDLSN(DLSN dlsn) {
private synchronized void setLastSeenDLSN(DLSN dlsn) {
synchronized (sharedLock) {
this.lastSeenDLSN = dlsn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class DistributedLogConstants {
public static final String INPROGRESS_LOGSEGMENT_PREFIX = "inprogress";
public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement";
public static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);

// An ACL that gives all permissions to node creators and read permissions only to everyone else.
public static final List<ACL> EVERYONE_READ_CREATOR_ALL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ public void processResult(int rc, String path, Object ctx, byte[] data, Stat sta

static LogSegmentMetadata parseDataV1(String path, byte[] data, String[] parts)
throws IOException {
long versionStatusCount = Long.valueOf(parts[0]);
long versionStatusCount = Long.parseLong(parts[0]);

long version = versionStatusCount & METADATA_VERSION_MASK;
assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE);
Expand All @@ -637,8 +637,8 @@ static LogSegmentMetadata parseDataV1(String path, byte[] data, String[] parts)
assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX);

if (parts.length == 3) {
long ledgerId = Long.valueOf(parts[1]);
long txId = Long.valueOf(parts[2]);
long ledgerId = Long.parseLong(parts[1]);
long txId = Long.parseLong(parts[2]);
return new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId)
.setRegionId(regionId)
.setStatus(status)
Expand All @@ -647,10 +647,10 @@ static LogSegmentMetadata parseDataV1(String path, byte[] data, String[] parts)
long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT;
assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE);

long ledgerId = Long.valueOf(parts[1]);
long firstTxId = Long.valueOf(parts[2]);
long lastTxId = Long.valueOf(parts[3]);
long completionTime = Long.valueOf(parts[4]);
long ledgerId = Long.parseLong(parts[1]);
long firstTxId = Long.parseLong(parts[2]);
long lastTxId = Long.parseLong(parts[3]);
long completionTime = Long.parseLong(parts[4]);
return new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId)
.setInprogress(false)
.setLastTxId(lastTxId)
Expand All @@ -667,7 +667,7 @@ static LogSegmentMetadata parseDataV1(String path, byte[] data, String[] parts)

static LogSegmentMetadata parseDataV2(String path, byte[] data, String[] parts)
throws IOException {
long versionStatusCount = Long.valueOf(parts[0]);
long versionStatusCount = Long.parseLong(parts[0]);

long version = versionStatusCount & METADATA_VERSION_MASK;
assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE);
Expand All @@ -682,9 +682,9 @@ static LogSegmentMetadata parseDataV2(String path, byte[] data, String[] parts)
assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX);

if (parts.length == 4) {
long ledgerId = Long.valueOf(parts[1]);
long txId = Long.valueOf(parts[2]);
long logSegmentSequenceNumber = Long.valueOf(parts[3]);
long ledgerId = Long.parseLong(parts[1]);
long txId = Long.parseLong(parts[2]);
long logSegmentSequenceNumber = Long.parseLong(parts[3]);
return new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId)
.setLogSegmentSequenceNo(logSegmentSequenceNumber)
.setRegionId(regionId)
Expand All @@ -694,13 +694,13 @@ static LogSegmentMetadata parseDataV2(String path, byte[] data, String[] parts)
long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT;
assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE);

long ledgerId = Long.valueOf(parts[1]);
long firstTxId = Long.valueOf(parts[2]);
long lastTxId = Long.valueOf(parts[3]);
long completionTime = Long.valueOf(parts[4]);
long logSegmentSequenceNumber = Long.valueOf(parts[5]);
long lastEntryId = Long.valueOf(parts[6]);
long lastSlotId = Long.valueOf(parts[7]);
long ledgerId = Long.parseLong(parts[1]);
long firstTxId = Long.parseLong(parts[2]);
long lastTxId = Long.parseLong(parts[3]);
long completionTime = Long.parseLong(parts[4]);
long logSegmentSequenceNumber = Long.parseLong(parts[5]);
long lastEntryId = Long.parseLong(parts[6]);
long lastSlotId = Long.parseLong(parts[7]);
return new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId)
.setInprogress(false)
.setLastTxId(lastTxId)
Expand All @@ -721,7 +721,7 @@ static LogSegmentMetadata parseDataV2(String path, byte[] data, String[] parts)

static LogSegmentMetadata parseDataVersionsWithMinActiveDLSN(String path, byte[] data, String[] parts)
throws IOException {
long versionStatusCount = Long.valueOf(parts[0]);
long versionStatusCount = Long.parseLong(parts[0]);

long version = versionStatusCount & METADATA_VERSION_MASK;
assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE);
Expand All @@ -737,11 +737,11 @@ static LogSegmentMetadata parseDataVersionsWithMinActiveDLSN(String path, byte[]
assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX);

if (parts.length == 6) {
long ledgerId = Long.valueOf(parts[1]);
long txId = Long.valueOf(parts[2]);
long logSegmentSequenceNumber = Long.valueOf(parts[3]);
long minActiveEntryId = Long.valueOf(parts[4]);
long minActiveSlotId = Long.valueOf(parts[5]);
long ledgerId = Long.parseLong(parts[1]);
long txId = Long.parseLong(parts[2]);
long logSegmentSequenceNumber = Long.parseLong(parts[3]);
long minActiveEntryId = Long.parseLong(parts[4]);
long minActiveSlotId = Long.parseLong(parts[5]);

LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId)
.setLogSegmentSequenceNo(logSegmentSequenceNumber)
Expand All @@ -757,15 +757,15 @@ static LogSegmentMetadata parseDataVersionsWithMinActiveDLSN(String path, byte[]
long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT;
assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE);

long ledgerId = Long.valueOf(parts[1]);
long firstTxId = Long.valueOf(parts[2]);
long lastTxId = Long.valueOf(parts[3]);
long completionTime = Long.valueOf(parts[4]);
long logSegmentSequenceNumber = Long.valueOf(parts[5]);
long lastEntryId = Long.valueOf(parts[6]);
long lastSlotId = Long.valueOf(parts[7]);
long minActiveEntryId = Long.valueOf(parts[8]);
long minActiveSlotId = Long.valueOf(parts[9]);
long ledgerId = Long.parseLong(parts[1]);
long firstTxId = Long.parseLong(parts[2]);
long lastTxId = Long.parseLong(parts[3]);
long completionTime = Long.parseLong(parts[4]);
long logSegmentSequenceNumber = Long.parseLong(parts[5]);
long lastEntryId = Long.parseLong(parts[6]);
long lastSlotId = Long.parseLong(parts[7]);
long minActiveEntryId = Long.parseLong(parts[8]);
long minActiveSlotId = Long.parseLong(parts[9]);
LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId)
.setInprogress(false)
.setLastTxId(lastTxId)
Expand All @@ -791,7 +791,7 @@ static LogSegmentMetadata parseDataVersionsWithMinActiveDLSN(String path, byte[]

static LogSegmentMetadata parseDataVersionsWithSequenceId(String path, byte[] data, String[] parts)
throws IOException {
long versionStatusCount = Long.valueOf(parts[0]);
long versionStatusCount = Long.parseLong(parts[0]);

long version = versionStatusCount & METADATA_VERSION_MASK;
assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE);
Expand All @@ -807,12 +807,12 @@ static LogSegmentMetadata parseDataVersionsWithSequenceId(String path, byte[] da
assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX);

if (parts.length == 7) {
long ledgerId = Long.valueOf(parts[1]);
long txId = Long.valueOf(parts[2]);
long logSegmentSequenceNumber = Long.valueOf(parts[3]);
long minActiveEntryId = Long.valueOf(parts[4]);
long minActiveSlotId = Long.valueOf(parts[5]);
long startSequenceId = Long.valueOf(parts[6]);
long ledgerId = Long.parseLong(parts[1]);
long txId = Long.parseLong(parts[2]);
long logSegmentSequenceNumber = Long.parseLong(parts[3]);
long minActiveEntryId = Long.parseLong(parts[4]);
long minActiveSlotId = Long.parseLong(parts[5]);
long startSequenceId = Long.parseLong(parts[6]);

LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId)
.setLogSegmentSequenceNo(logSegmentSequenceNumber)
Expand All @@ -827,16 +827,16 @@ static LogSegmentMetadata parseDataVersionsWithSequenceId(String path, byte[] da
long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT;
assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE);

long ledgerId = Long.valueOf(parts[1]);
long firstTxId = Long.valueOf(parts[2]);
long lastTxId = Long.valueOf(parts[3]);
long completionTime = Long.valueOf(parts[4]);
long logSegmentSequenceNumber = Long.valueOf(parts[5]);
long lastEntryId = Long.valueOf(parts[6]);
long lastSlotId = Long.valueOf(parts[7]);
long minActiveEntryId = Long.valueOf(parts[8]);
long minActiveSlotId = Long.valueOf(parts[9]);
long startSequenceId = Long.valueOf(parts[10]);
long ledgerId = Long.parseLong(parts[1]);
long firstTxId = Long.parseLong(parts[2]);
long lastTxId = Long.parseLong(parts[3]);
long completionTime = Long.parseLong(parts[4]);
long logSegmentSequenceNumber = Long.parseLong(parts[5]);
long lastEntryId = Long.parseLong(parts[6]);
long lastSlotId = Long.parseLong(parts[7]);
long minActiveEntryId = Long.parseLong(parts[8]);
long minActiveSlotId = Long.parseLong(parts[9]);
long startSequenceId = Long.parseLong(parts[10]);
LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId)
.setInprogress(false)
.setLastTxId(lastTxId)
Expand Down Expand Up @@ -867,7 +867,7 @@ static LogSegmentMetadata parseData(String path, byte[] data, boolean skipMinVer
String[] parts = new String(data, UTF_8).split(";");
long version;
try {
version = Long.valueOf(parts[0]) & METADATA_VERSION_MASK;
version = Long.parseLong(parts[0]) & METADATA_VERSION_MASK;
} catch (Exception exc) {
throw new IOException("Invalid ledger entry, "
+ new String(data, UTF_8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ synchronized void store(long maxTxId) throws IOException {
}
String txidStr = Long.toString(maxTxId);
try {
Stat stat = zkc.get().setData(path, txidStr.getBytes("UTF-8"), -1);
zkc.get().setData(path, txidStr.getBytes("UTF-8"), -1);
currentMax = maxTxId;
} catch (Exception e) {
LOG.error("Error writing new MaxTxId value {}", maxTxId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public class VoidFunctions {

public static AbstractFunction1<List<Void>, Void> LIST_TO_VOID_FUNC =
public static final AbstractFunction1<List<Void>, Void> LIST_TO_VOID_FUNC =
new AbstractFunction1<List<Void>, Void>() {
@Override
public Void apply(List<Void> list) {
Expand Down

0 comments on commit bb6990d

Please sign in to comment.