-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multi-raft-group setup rebalance without PD #176
Changes from 9 commits
4e61d04
16d0779
a1f620a
d3589a0
28e26d3
29c7eae
d5f5fca
92fddbf
c2ff322
6916c36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,8 +16,7 @@ | |
*/ | ||
package com.alipay.sofa.jraft.core; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.*; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 禁止 inport * |
||
import java.util.concurrent.Future; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
|
@@ -330,7 +329,7 @@ public Status getLeader(final String groupId, final Configuration conf, final Pe | |
final Status st = new Status(-1, "Fail to get leader of group %s", groupId); | ||
for (final PeerId peer : conf) { | ||
if (!this.cliClientService.connect(peer.getEndpoint())) { | ||
LOG.error("Fail to connect peer {} to get leader for group {}.", groupId); | ||
LOG.error("Fail to connect peer {} to get leader for group {}.", peer, groupId); | ||
fengjiachun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
continue; | ||
} | ||
|
||
|
@@ -383,6 +382,66 @@ public List<PeerId> getAlivePeers(final String groupId, final Configuration conf | |
return getPeers(groupId, conf, true); | ||
} | ||
|
||
@Override | ||
public Status rebalance(final List<String> groupIds, final Configuration conf, final Map<String, PeerId> leaderIds) { | ||
Requires.requireTrue(!groupIds.isEmpty(), "Empty group id list"); | ||
Requires.requireNonNull(conf, "Null configuration"); | ||
Requires.requireTrue(!conf.isEmpty(), "No peers of configuration"); | ||
|
||
final int groupSizePerPeer = groupIds.size() / conf.size(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. conf 只 check 了是否为 null, 没有 check size 是否为0 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @fengjiachun 这边check configuration的size是指Peer节点数量么? |
||
final Queue<String> groupDeque = new ArrayDeque<>(groupIds); | ||
final Map<PeerId, Integer> peerMap = new HashMap<>(); | ||
for (;;) { | ||
fengjiachun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
final String groupId = groupDeque.poll(); | ||
if (StringUtils.isEmpty(groupId)) { | ||
break; | ||
} | ||
final PeerId leaderId = new PeerId(); | ||
try { | ||
final Status status = getLeader(groupId, conf, leaderId); | ||
if (!status.isOk()) { | ||
throw new JRaftException("No leader in group: " + groupId); | ||
} | ||
if (leaderId.getEndpoint() == null) { | ||
continue; | ||
} | ||
LOG.info("Group {} leader is {}.", groupId, leaderId); | ||
} catch (final Exception e) { | ||
groupDeque.add(groupId); | ||
continue; | ||
} | ||
final Integer size = peerMap.get(leaderId); | ||
if (size == null) { | ||
peerMap.put(leaderId, 1); | ||
continue; | ||
} | ||
if (size <= groupSizePerPeer) { | ||
peerMap.put(leaderId, size + 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. groupSizePerPeer * size <= sum 而不是 groupSizePerPeer * size == sum 所以这里 if (size < groupSizePerPeer) 是不是有问题? |
||
continue; | ||
} | ||
for (final PeerId peerId : getAlivePeers(groupId, conf)) { | ||
final Integer pSize = peerMap.get(peerId); | ||
if (pSize != null && pSize >= groupSizePerPeer) { | ||
continue; | ||
} | ||
try { | ||
final Status status = transferLeader(groupId, conf, peerId); | ||
if (status.isOk()) { | ||
LOG.info("Group {} transfer leader to {}.", groupId, peerId); | ||
groupDeque.add(groupId); | ||
break; | ||
} else { | ||
LOG.error("Fail to transfer leader to {}.", peerId); | ||
} | ||
} catch (final Exception e) { | ||
LOG.error("Fail to transfer leader to {}.", peerId); | ||
} | ||
} | ||
leaderIds.put(groupId, leaderId); | ||
} | ||
return Status.OK(); | ||
} | ||
|
||
private List<PeerId> getPeers(final String groupId, final Configuration conf, final boolean onlyGetAlive) { | ||
Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id"); | ||
Requires.requireNonNull(conf, "Null conf"); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,26 +18,24 @@ | |
|
||
import java.io.File; | ||
import java.nio.ByteBuffer; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.*; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 同上,修改一下你的 ide 设置,禁止 import * |
||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import com.alipay.sofa.jraft.*; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 同上 |
||
import org.apache.commons.io.FileUtils; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
||
import com.alipay.sofa.jraft.CliService; | ||
import com.alipay.sofa.jraft.Node; | ||
import com.alipay.sofa.jraft.NodeManager; | ||
import com.alipay.sofa.jraft.RouteTable; | ||
import com.alipay.sofa.jraft.conf.Configuration; | ||
import com.alipay.sofa.jraft.entity.PeerId; | ||
import com.alipay.sofa.jraft.entity.Task; | ||
import com.alipay.sofa.jraft.option.CliOptions; | ||
import com.alipay.sofa.jraft.test.TestUtils; | ||
import org.mockito.Mockito; | ||
import org.mockito.MockitoAnnotations; | ||
import org.mockito.Spy; | ||
|
||
import static org.junit.Assert.assertArrayEquals; | ||
import static org.junit.Assert.assertEquals; | ||
|
@@ -51,14 +49,17 @@ public class CliServiceTest { | |
private String dataPath; | ||
|
||
private TestCluster cluster; | ||
private final String groupId = "CliServiceTest"; | ||
|
||
private CliService cliService; | ||
private final String groupId = "CliServiceTest"; | ||
|
||
@Spy | ||
private CliService cliService = new CliServiceImpl(); | ||
|
||
private Configuration conf; | ||
|
||
@Before | ||
public void setup() throws Exception { | ||
MockitoAnnotations.initMocks(this); | ||
|
||
this.dataPath = TestUtils.mkTempDir(); | ||
FileUtils.forceMkdir(new File(this.dataPath)); | ||
|
@@ -71,7 +72,6 @@ public void setup() throws Exception { | |
} | ||
cluster.waitLeader(); | ||
|
||
cliService = new CliServiceImpl(); | ||
this.conf = new Configuration(peers); | ||
assertTrue(cliService.init(new CliOptions())); | ||
} | ||
|
@@ -242,4 +242,24 @@ public void testGetAlivePeers() throws Exception { | |
assertEquals("Fail to get leader of group " + this.groupId, e.getMessage()); | ||
} | ||
} | ||
|
||
@Test | ||
public void testRebalance() { | ||
final PeerId leader = cluster.getLeader().getNodeId().getPeerId().copy(); | ||
assertNotNull(leader); | ||
|
||
Mockito.doReturn(this.conf.getPeers()).when(this.cliService).getAlivePeers(groupId, this.conf); | ||
for (final PeerId peer : this.conf.getPeerSet()) { | ||
Mockito.doReturn(Status.OK()).when(this.cliService).transferLeader(groupId, this.conf, peer); | ||
} | ||
|
||
final List<String> groupIds = new ArrayList<>(); | ||
groupIds.add(groupId); | ||
groupIds.add(groupId); | ||
groupIds.add(groupId); | ||
|
||
final Map<String, PeerId> leaderIds = new HashMap<>(); | ||
assertTrue(this.cliService.rebalance(groupIds, this.conf, leaderIds).isOk()); | ||
assertEquals(1, leaderIds.size()); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 需要更严谨的单测 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 测试不够 |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
多余的 import ? 检查一下,ide 一般都会有告警