Skip to content

Commit

Permalink
RATIS-1125. Fix TestDataStream (apache#247)
Browse files Browse the repository at this point in the history
* RATIS.1125. Fix TestDataStream

* fixup! add comment back
  • Loading branch information
amaliujia authored and symious committed Feb 7, 2024
1 parent e8a2484 commit ddfa291
Showing 1 changed file with 3 additions and 8 deletions.
Expand Up @@ -22,7 +22,6 @@
import org.apache.ratis.client.impl.DataStreamClientImpl;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.GroupInfoReply;
Expand Down Expand Up @@ -151,11 +150,9 @@ public RaftClientRequest getWriteRequest() {

private List<DataStreamServerImpl> servers;
private List<RaftPeer> peers;
private List<MultiDataStreamStateMachine> stateMachines;
private ConcurrentMap<RaftGroupId, MultiDataStreamStateMachine> stateMachines;

protected RaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
final ConcurrentMap<RaftGroupId, StateMachine> stateMachines = new ConcurrentHashMap<>();

return new RaftServer() {
@Override
public RaftPeerId getId() {
Expand Down Expand Up @@ -285,11 +282,9 @@ protected void setup(int numServers){
.map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
.collect(Collectors.toList());
servers = new ArrayList<>(peers.size());
stateMachines = new ArrayList<>(peers.size());
stateMachines = new ConcurrentHashMap<>();
// start stream servers on raft peers.
for (int i = 0; i < peers.size(); i++) {
final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine();
stateMachines.add(stateMachine);
final RaftPeer peer = peers.get(i);
final RaftServer server = newRaftServer(peer, properties);
final DataStreamServerImpl streamServer = new DataStreamServerImpl(server, properties, null);
Expand Down Expand Up @@ -378,7 +373,7 @@ private void runTestDataStream(DataStreamOutputImpl out, int bufferSize, int buf
}

final RaftClientRequest header = out.getHeader();
for (MultiDataStreamStateMachine s : stateMachines) {
for (MultiDataStreamStateMachine s : stateMachines.values()) {
final SingleDataStream stream = s.getSingleDataStream(header.getCallId());
if (stream == null) {
continue;
Expand Down

0 comments on commit ddfa291

Please sign in to comment.