Skip to content

Commit

Permalink
Replace AtomicLong to long while ReentrantReadWriteLock is enough (#3)
Browse files Browse the repository at this point in the history
* Replace AtomicLong to long while ReentrantReadWriteLock is enough

* replace ReentrantReadWriteLock to StampedLock

* unlockWrite
  • Loading branch information
pifuant authored and fengjiachun committed Mar 6, 2019
1 parent 820bd0d commit 147a5bf
Showing 1 changed file with 28 additions and 28 deletions.
56 changes: 28 additions & 28 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
*/
package com.alipay.sofa.jraft.core;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.StampedLock;

import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -51,10 +48,8 @@ public class BallotBox implements Lifecycle<BallotBoxOptions> {

private FSMCaller waiter;
private ClosureQueue closureQueue;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
private final AtomicLong lastCommittedIndex = new AtomicLong(0);
private final StampedLock stampedLock = new StampedLock();
private long lastCommittedIndex = 0;
private long pendingIndex;
private final ArrayDequeue<Ballot> pendingMetaQueue = new ArrayDequeue<>();

Expand All @@ -69,11 +64,16 @@ ArrayDequeue<Ballot> getPendingMetaQueue() {
}

public long getLastCommittedIndex() {
readLock.lock();
long stamp = stampedLock.tryOptimisticRead();
long optimisticVal = this.lastCommittedIndex;
if (stampedLock.validate(stamp)) {
return optimisticVal;
}
stamp = stampedLock.readLock();
try {
return this.lastCommittedIndex.get();
return this.lastCommittedIndex;
} finally {
readLock.unlock();
stampedLock.unlockRead(stamp);
}
}

Expand All @@ -94,7 +94,7 @@ public boolean init(BallotBoxOptions opts) {
*/
public boolean commitAt(long firstLogIndex, long lastLogIndex, PeerId peer) {
//TODO use lock-free algorithm here?
writeLock.lock();
final long stamp = stampedLock.writeLock();
long lastCommittedIndex = 0;
try {
if (pendingIndex == 0) {
Expand Down Expand Up @@ -131,9 +131,9 @@ public boolean commitAt(long firstLogIndex, long lastLogIndex, PeerId peer) {
LOG.debug("Committed log index={}", index);
}
pendingIndex = lastCommittedIndex + 1;
this.lastCommittedIndex.set(lastCommittedIndex);
this.lastCommittedIndex = lastCommittedIndex;
} finally {
writeLock.unlock();
stampedLock.unlockWrite(stamp);
}
this.waiter.onCommitted(lastCommittedIndex);
return true;
Expand All @@ -146,13 +146,13 @@ public boolean commitAt(long firstLogIndex, long lastLogIndex, PeerId peer) {
* truncate.
*/
public void clearPendingTasks() {
writeLock.lock();
final long stamp = stampedLock.writeLock();
try {
this.pendingMetaQueue.clear();
this.pendingIndex = 0;
this.closureQueue.clear();
} finally {
writeLock.unlock();
stampedLock.unlockWrite(stamp);
}
}

Expand All @@ -166,23 +166,23 @@ public void clearPendingTasks() {
* @return returns true if reset success
*/
public boolean resetPendingIndex(long newPendingIndex) {
writeLock.lock();
final long stamp = stampedLock.writeLock();
try {
if (!(pendingIndex == 0 && pendingMetaQueue.isEmpty())) {
LOG.error("resetPendingIndex fail, pendingIndex={}, pendingMetaQueueSize={}", pendingIndex,
pendingMetaQueue.size());
return false;
}
if (newPendingIndex <= this.lastCommittedIndex.get()) {
if (newPendingIndex <= this.lastCommittedIndex) {
LOG.error("resetPendingIndex fail, newPendingIndex={}, lastCommittedIndex={}", newPendingIndex,
lastCommittedIndex.get());
lastCommittedIndex);
return false;
}
this.pendingIndex = newPendingIndex;
this.closureQueue.resetFirstIndex(newPendingIndex);
return true;
} finally {
writeLock.unlock();
stampedLock.unlockWrite(stamp);
}
}

Expand All @@ -201,7 +201,7 @@ public boolean appendPendingTask(Configuration conf, Configuration oldConf, Clos
LOG.error("Fail to init ballot");
return false;
}
writeLock.lock();
final long stamp = stampedLock.writeLock();
try {
if (pendingIndex <= 0) {
LOG.error("Fail to appendingTask, pendingIndex={}", pendingIndex);
Expand All @@ -211,7 +211,7 @@ public boolean appendPendingTask(Configuration conf, Configuration oldConf, Clos
this.closureQueue.appendPendingClosure(done);
return true;
} finally {
writeLock.unlock();
stampedLock.unlockWrite(stamp);
}
}

Expand All @@ -223,26 +223,26 @@ public boolean appendPendingTask(Configuration conf, Configuration oldConf, Clos
*/
public boolean setLastCommittedIndex(long lastCommittedIndex) {
boolean doUnlock = true;
writeLock.lock();
final long stamp = stampedLock.writeLock();
try {
if (pendingIndex != 0 || !pendingMetaQueue.isEmpty()) {
Requires.requireTrue(lastCommittedIndex < this.pendingIndex,
"Node changes to leader, pendingIndex=%d, param lastCommittedIndex=%d",
pendingIndex,lastCommittedIndex);
return false;
}
if (lastCommittedIndex < this.lastCommittedIndex.get()) {
if (lastCommittedIndex < this.lastCommittedIndex) {
return false;
}
if (lastCommittedIndex > this.lastCommittedIndex.get()) {
this.lastCommittedIndex.set(lastCommittedIndex);
writeLock.unlock();
if (lastCommittedIndex > this.lastCommittedIndex) {
this.lastCommittedIndex = lastCommittedIndex;
stampedLock.unlockWrite(stamp);
doUnlock = false;
this.waiter.onCommitted(lastCommittedIndex);
}
} finally {
if (doUnlock) {
writeLock.unlock();
stampedLock.unlockWrite(stamp);
}
}
return true;
Expand Down

0 comments on commit 147a5bf

Please sign in to comment.