Skip to content

Commit

Permalink
Refactor Snapshot Strategy (#99)
Browse files Browse the repository at this point in the history
* 抽取 LearnerSnapshotManager (#46) (#98)

* refactor

---------

Co-authored-by: 李航 <30505377+githubli97@users.noreply.github.com>
  • Loading branch information
shihuili1218 and githubli97 committed Jul 12, 2024
1 parent 0b12640 commit d660108
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import org.apache.commons.lang3.StringUtils;

import com.google.common.collect.Lists;
import com.ofcoder.klein.common.util.SystemPropertyUtil;
import com.ofcoder.klein.consensus.facade.exception.SnapshotException;
import com.ofcoder.klein.rpc.facade.Endpoint;
import com.ofcoder.klein.rpc.facade.util.RpcUtil;

Expand Down Expand Up @@ -63,6 +65,13 @@ public class ConsensusProp {
* calculate quorum.
*/
private String nwr = SystemPropertyUtil.get("klein.consensus.nwr", "majority");

/**
* 快照配置.
* default: 1 minute 1w req || 5 minutes 10 req || 30 minutes 1 req.
*/
private List<SnapshotStrategy> snapshotStrategy = parseSnapStrategy(SystemPropertyUtil.get("klein.snapshot.generation-policy", "60 10000 300 10 1800 1"));

private PaxosProp paxosProp = new PaxosProp();

private List<Endpoint> parseMember(final String members) {
Expand All @@ -79,6 +88,20 @@ private List<Endpoint> parseMember(final String members) {
return endpoints;
}

private List<SnapshotStrategy> parseSnapStrategy(final String prop) {
String[] props = prop.split(" ");
if (props.length % 2 != 0) {
throw new SnapshotException("klein.snapshot.generation-policy config must appear in pairs.");
}

List<SnapshotStrategy> snapshotStrategies = Lists.newArrayList();
for (int i = 0; i < props.length;) {
SnapshotStrategy snapshotStrategy = new SnapshotStrategy(Integer.parseInt(props[i++]), Integer.parseInt(props[i++]));
snapshotStrategies.add(snapshotStrategy);
}
return snapshotStrategies;
}

public Endpoint getSelf() {
return self;
}
Expand Down Expand Up @@ -142,4 +165,12 @@ public String getNwr() {
public void setNwr(final String nwr) {
this.nwr = nwr;
}

public void setSnapshotStrategy(final List<SnapshotStrategy> snapshotStrategy) {
this.snapshotStrategy = snapshotStrategy;
}

public List<SnapshotStrategy> getSnapshotStrategy() {
return snapshotStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,22 @@
package com.ofcoder.klein.consensus.facade.config;

/**
* snap prop.
*
* @author 释慧利
* Snapshot Strategy.
*/
public class SnapProp {
private long generateSnapInterval;
public class SnapshotStrategy {
private int second;
private int reqCount;

public SnapshotStrategy(final int second, final int reqCount) {
this.second = second;
this.reqCount = reqCount;
}

public int getSecond() {
return second;
}

public int getReqCount() {
return reqCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ofcoder.klein.consensus.facade.exception;

import com.ofcoder.klein.common.exception.KleinException;

/**
* Snapshot Exception.
*
* @author hang.li
*/
public class SnapshotException extends KleinException {
public SnapshotException(final String message) {
super(message);
}

public SnapshotException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ofcoder.klein.common.util.KleinThreadFactory;
import com.ofcoder.klein.consensus.facade.Command;
import com.ofcoder.klein.consensus.facade.config.ConsensusProp;
import com.ofcoder.klein.consensus.facade.config.SnapshotStrategy;
import com.ofcoder.klein.spi.ExtensionLoader;
import com.ofcoder.klein.storage.facade.Instance;
import com.ofcoder.klein.storage.facade.LogManager;
Expand All @@ -48,10 +52,13 @@ public class SMApplier {
private LogManager<Command> logManager;
private Long lastAppliedId = 0L;
private Long lastCheckpoint = 0L;
private Long lastSnapTime = System.currentTimeMillis();
private final List<SnapshotStrategy> snapshotStrategies;

public SMApplier(final String group, final SM sm) {
public SMApplier(final String group, final SM sm, final ConsensusProp op) {
this.group = group;
this.sm = sm;
this.snapshotStrategies = op.getSnapshotStrategy();
this.applyQueue = new PriorityBlockingQueue<>(11, Comparator.comparingLong(value -> value.priority));
this.logManager = ExtensionLoader.getExtensionLoader(LogManager.class).getJoin();
ExecutorService applyExecutor = Executors.newFixedThreadPool(1, KleinThreadFactory.create(this.group + "-apply", true));
Expand Down Expand Up @@ -134,6 +141,20 @@ private void _apply(final Task task) {

// the instance has been applied.
task.callback.onApply(applyResult);

if (task.taskType == TaskEnum.APPLY) {
checkAndSnapshot();
}
}

private void checkAndSnapshot() {
long now = System.currentTimeMillis();
long applySize = lastAppliedId - lastCheckpoint;
long lastSnapshotInterval = (now - lastSnapTime) / 1000;
if (snapshotStrategies.stream().anyMatch(it -> lastSnapshotInterval >= it.getSecond() && applySize >= it.getReqCount())) {
offer(Task.createTakeSnapTask(Task.FAKE_CALLBACK));
}
// else: do nothing
}

/**
Expand All @@ -153,6 +174,22 @@ public void offer(final Task t) {
* close sm.
*/
public void close() {
CountDownLatch latch = new CountDownLatch(1);
offer(Task.createTakeSnapTask(new TaskCallback() {
@Override
public void onTakeSnap(final Snap snap) {
latch.countDown();
}
}));

try {
if (!latch.await(1L, TimeUnit.SECONDS)) {
LOG.error("generate snapshot timeout. lastApplyId: {}, lastCheckpoint: {}", lastAppliedId, lastCheckpoint);
}
} catch (InterruptedException ex) {
LOG.error(String.format("generate snapshot occur exception. %s", ex.getMessage()), ex);
}

shutdown = true;
sm.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@
*/
package com.ofcoder.klein.consensus.paxos.core;

import java.util.List;
import java.util.Map;
import java.util.Set;

import com.ofcoder.klein.common.Role;
import com.ofcoder.klein.consensus.facade.config.ConsensusProp;
import com.ofcoder.klein.consensus.facade.sm.SM;
import com.ofcoder.klein.consensus.facade.sm.SMApplier;
import com.ofcoder.klein.consensus.paxos.rpc.vo.ConfirmReq;
import com.ofcoder.klein.consensus.paxos.rpc.vo.LearnReq;
import com.ofcoder.klein.consensus.paxos.rpc.vo.LearnRes;
import com.ofcoder.klein.consensus.paxos.rpc.vo.NodeState;
import com.ofcoder.klein.consensus.paxos.rpc.vo.SnapSyncReq;
import com.ofcoder.klein.consensus.paxos.rpc.vo.SnapSyncRes;
import com.ofcoder.klein.storage.facade.Snap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Learner Role.
Expand All @@ -44,12 +44,7 @@ public interface Learner extends Role<ConsensusProp> {

Set<String> getGroups();

/**
* generate and save snapshot.
*
* @return snaps
*/
Map<String, Snap> generateSnap();
Map<String, SMApplier> getSms();

/**
* load snap.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public void init(final ConsensusProp op) {
@Override
public void shutdown() {
this.dataAligner.close();
generateSnap();
this.sms.values().forEach(SMApplier::close);
}

Expand All @@ -116,8 +115,7 @@ public Set<String> getGroups() {
return sms.keySet();
}

@Override
public Map<String, Snap> generateSnap() {
private Map<String, Snap> generateSnap() {
Map<String, SMApplier> sms = new HashMap<>(this.sms);
ConcurrentMap<String, Snap> result = new ConcurrentHashMap<>();
CountDownLatch latch = new CountDownLatch(sms.size());
Expand All @@ -144,6 +142,11 @@ public void onTakeSnap(final Snap snap) {
}
}

@Override
public Map<String, SMApplier> getSms() {
return sms;
}

@Override
public void loadSnapSync(final Map<String, Snap> snaps) {
if (MapUtils.isEmpty(snaps)) {
Expand Down Expand Up @@ -212,7 +215,7 @@ public void replayLog(final String group, final long start) {

@Override
public void loadSM(final String group, final SM sm) {
if (sms.putIfAbsent(group, new SMApplier(group, sm)) != null) {
if (sms.putIfAbsent(group, new SMApplier(group, sm, prop)) != null) {
LOG.error("the group[{}] has been loaded with sm.", group);
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.ofcoder.klein.consensus.paxos.core;

public class LearnerSnapshotManagerTest {

// private Learner learner;
// private ConsensusProp consensusProp;
// private LearnerSnapshotManager learnerSnapshotManager;
//
// @Before
// public void setUp() {
// learner = mock(Learner.class);
// consensusProp = mock(ConsensusProp.class);
// when(consensusProp.getSnapshotStrategy()).thenReturn("10 5"); // 每10秒5个请求生成一次快照
//
// LearnerSnapshotManager.initAndStart(consensusProp, learner);
// learnerSnapshotManager = LearnerSnapshotManager.getInstance();
// }
//
// @Test
// public void testInitAndStart() {
// assertNotNull(learnerSnapshotManager);
// }
//
// @Test
// public void testAddReqCount() {
// learnerSnapshotManager.addReqCount();
// assertEquals(1, learnerSnapshotManager.getReqCount());
// }

// @Test
// public void testGenerateSnap() {
// Map<String, SMApplier> sms = new HashMap<>();
// SMApplier smApplier = mock(SMApplier.class);
// sms.put("testGroup", smApplier);
//
// when(learner.getSms()).thenReturn(sms);
//
// Map<String, Snap> snaps = learnerSnapshotManager.generateSnap();
//
// assertNotNull(snaps);
// assertTrue(snaps.containsKey("testGroup"));
// }
//
// @Test
// public void testCheckAndSnapshot() throws InterruptedException {
// Map<String, SMApplier> sms = new HashMap<>();
// SMApplier smApplier = mock(SMApplier.class);
// sms.put("testGroup", smApplier);
//
// when(learner.getSms()).thenReturn(sms);
//
// learnerSnapshotManager.addReqCount();
// learnerSnapshotManager.addReqCount();
// learnerSnapshotManager.addReqCount();
// learnerSnapshotManager.addReqCount();
// learnerSnapshotManager.addReqCount();
//
// TimeUnit.SECONDS.sleep(11); // 等待足够时间以触发快照生成
//
// Map<String, Snap> snaps = learnerSnapshotManager.generateSnap();
//
// assertNotNull(snaps);
// assertTrue(snaps.containsKey("testGroup"));
// }

// @Test
// public void testShutdown() {
// learnerSnapshotManager.shutdown();
// verify(learner).getSms(); // 验证在关闭时会调用 generateSnap
// }
}

0 comments on commit d660108

Please sign in to comment.