Skip to content

Commit

Permalink
RATIS-1270. Set default primary DataStreamServer in RaftClient.Builde…
Browse files Browse the repository at this point in the history
…r. (apache#380)
  • Loading branch information
szetszwo authored and symious committed Feb 19, 2024
1 parent 44102b1 commit 54affce
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 339 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.Collection;
import java.util.Objects;

/** A client who sends requests to a raft service. */
Expand Down Expand Up @@ -98,9 +99,14 @@ public RaftClient build() {
clientRpc = factory.newRaftClientRpc(clientId, properties);
}
}
return ClientImplUtils.newRaftClient(clientId,
Objects.requireNonNull(group, "The 'group' field is not initialized."),
leaderId, primaryDataStreamServer,
Objects.requireNonNull(group, "The 'group' field is not initialized.");
if (primaryDataStreamServer == null) {
final Collection<RaftPeer> peers = group.getPeers();
if (!peers.isEmpty()) {
primaryDataStreamServer = peers.iterator().next();
}
}
return ClientImplUtils.newRaftClient(clientId, group, leaderId, primaryDataStreamServer,
Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."),
properties, retryPolicy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,149 +18,34 @@
package org.apache.ratis.datastream;

import org.apache.ratis.BaseTest;
import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.DivisionProperties;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamTestUtils.MyDataChannel;
import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.GroupListReply;
import org.apache.ratis.protocol.GroupListRequest;
import org.apache.ratis.protocol.GroupManagementRequest;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.ServerFactory;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.NetUtils;
import org.junit.Assert;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

abstract class DataStreamBaseTest extends BaseTest {
class MyDivision implements RaftServer.Division {
private final RaftServer server;
private final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine();
private final DataStreamMap streamMap;
private RaftClient client;

MyDivision(RaftServer server) {
this.server = server;
this.streamMap = RaftServerTestUtil.newDataStreamMap(server.getId());
}

@Override
public DivisionProperties properties() {
return null;
}

@Override
public RaftGroupMemberId getMemberId() {
return null;
}

@Override
public DivisionInfo getInfo() {
return null;
}

@Override
public RaftConfiguration getRaftConf() {
RaftConfiguration getRaftConf() {
final List<RaftPeer> peers = servers.stream().map(Server::getPeer).collect(Collectors.toList());
return RaftServerTestUtil.newRaftConfiguration(peers);
}

@Override
public RaftServer getRaftServer() {
return server;
}

@Override
public RaftServerMetrics getRaftServerMetrics() {
return null;
}

@Override
public MultiDataStreamStateMachine getStateMachine() {
return stateMachine;
}

@Override
public RaftLog getRaftLog() {
return null;
}

@Override
public RaftStorage getRaftStorage() {
return null;
}

@Override
public RetryCache getRetryCache() {
return null;
}

@Override
public DataStreamMap getDataStreamMap() {
return streamMap;
}

public void setRaftClient(RaftClient client) {
this.client = client;
}

@Override
public RaftClient getRaftClient() {
return this.client;
}

@Override
public void close() {}
}

static class Server {
Expand Down Expand Up @@ -205,192 +90,6 @@ Server getPrimaryServer() {
return servers.get(0);
}

protected MyRaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
return new MyRaftServer(peer, properties);
}

class MyRaftServer implements RaftServer {
private final RaftPeer peer;
private final RaftProperties properties;
private final ConcurrentMap<RaftGroupId, MyDivision> divisions = new ConcurrentHashMap<>();

MyRaftServer(RaftPeer peer, RaftProperties properties) {
this.peer = peer;
this.properties = properties;
}

@Override
public RaftPeerId getId() {
return peer.getId();
}

@Override
public RaftPeer getPeer() {
return peer;
}

@Override
public MyDivision getDivision(RaftGroupId groupId) {
return divisions.computeIfAbsent(groupId, key -> new MyDivision(this));
}

@Override
public RaftProperties getProperties() {
return properties;
}

@Override
public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
return null;
}

@Override
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) {
return null;
}

@Override
public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) {
return null;
}

@Override
public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
return null;
}

@Override
public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
return null;
}

@Override
public RaftServerRpc getServerRpc() {
return null;
}

@Override
public DataStreamServerRpc getDataStreamServerRpc() {
return null;
}

@Override
public RaftClientReply submitClientRequest(RaftClientRequest request) {
return submitClientRequestAsync(request).join();
}

@Override
public RaftClientReply setConfiguration(SetConfigurationRequest request) {
return null;
}

@Override
public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
return null;
}

@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
final MyDivision d = getDivision(request.getRaftGroupId());
return d.getDataStreamMap()
.remove(ClientInvocationId.valueOf(request))
.thenApply(StateMachine.DataStream::getDataChannel)
.thenApply(channel -> buildRaftClientReply(request, channel));
}

RaftClientReply buildRaftClientReply(RaftClientRequest request, DataChannel channel) {
Assert.assertTrue(channel instanceof MyDataChannel);
final MyDataChannel dataChannel = (MyDataChannel) channel;
return RaftClientReply.newBuilder()
.setRequest(request)
.setSuccess()
.setMessage(() -> DataStreamTestUtils.bytesWritten2ByteString(dataChannel.getBytesWritten()))
.build();
}

@Override
public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) {
return null;
}

@Override
public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
throws IOException {
return null;
}

@Override
public GroupListReply getGroupList(GroupListRequest request) {
return null;
}

@Override
public GroupInfoReply getGroupInfo(GroupInfoRequest request) {
return null;
}

@Override
public RaftClientReply groupManagement(GroupManagementRequest request) {
return null;
}

@Override
public CompletableFuture<GroupListReply> getGroupListAsync(GroupListRequest request) {
return null;
}

@Override
public CompletableFuture<GroupInfoReply> getGroupInfoAsync(GroupInfoRequest request) {
return null;
}

@Override
public CompletableFuture<RaftClientReply> groupManagementAsync(GroupManagementRequest request) {
return null;
}

@Override
public void close() {
}

@Override
public Iterable<RaftGroupId> getGroupIds() {
return null;
}

@Override
public Iterable<RaftGroup> getGroups() {
return null;
}

@Override
public ServerFactory getFactory() {
return null;
}

@Override
public void start() {
}

@Override
public LifeCycle.State getLifeCycleState() {
return null;
}
}


protected void setup(int numServers){
final List<RaftPeer> peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
.map(RaftPeerId::valueOf)
.map(id -> RaftPeer.newBuilder().setId(id).setDataStreamAddress(NetUtils.createLocalServerAddress()).build())
.collect(Collectors.toList());

List<RaftServer> raftServers = new ArrayList<>();
peers.forEach(peer -> raftServers.add(newRaftServer(peer, properties)));
setup(RaftGroupId.randomId(), peers, raftServers);
}


void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) {
raftGroup = RaftGroup.valueOf(groupId, peers);
this.peers = peers;
Expand All @@ -410,14 +109,6 @@ private Collection<RaftPeer> removePeerFromList(RaftPeer peer, List<RaftPeer> pe
return otherPeers;
}

RaftClient newRaftClientForDataStream() {
return RaftClient.newBuilder()
.setRaftGroup(raftGroup)
.setPrimaryDataStreamServer(getPrimaryServer().getPeer())
.setProperties(properties)
.build();
}

RaftClient newRaftClientForDataStream(ClientId clientId) {
return RaftClient.newBuilder()
.setClientId(clientId)
Expand Down

0 comments on commit 54affce

Please sign in to comment.