Skip to content
Permalink
Browse files

first implementation of congestion control

  • Loading branch information...
klaus7 committed Nov 2, 2016
1 parent a29ce47 commit 551acfd4591acb4a1e631eb064244d7b7c9116d3
@@ -0,0 +1,29 @@
/*******************************************************************************
* Copyright 2016 Klaus Pfeiffer - klaus@allpiper.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/

package com.jfastnet;

/** @author Klaus Pfeiffer - klaus@allpiper.com */
public class ConfigStateContainer {

public final Config config;
public final State state;

public ConfigStateContainer(Config config, State state) {
this.config = config;
this.state = state;
}
}
@@ -217,15 +217,6 @@ protected boolean afterSend(Message message) {
* @param message message about to send
* @return true if we are ready to send the message, false otherwise */
public boolean beforeSend(Message message) {
for (IMessageSenderPreProcessor processor : state.getMessageSenderPreProcessors()) {
if (processor.beforeCongestionControl(message) == null) {
log.trace("Processor {} discarded message {} at beforeCongestionControl", processor, message);
return false;
}
}

// TODO: congestion control

for (IMessageSenderPreProcessor processor : state.getMessageSenderPreProcessors()) {
if (processor.beforeSend(message) == null) {
log.trace("Processor {} discarded message {} at beforeSend", processor, message);
@@ -17,13 +17,12 @@
package com.jfastnet;

import com.jfastnet.messages.*;
import com.jfastnet.state.ClientStates;
import com.jfastnet.util.NullsafeHashMap;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/** @author Klaus Pfeiffer - klaus@allpiper.com */
@@ -32,13 +31,13 @@

/** Timestamp of time when a message was last received from client id.
* Key: client id; Value: timestamp */
protected Map<Integer, Long> lastReceivedMap = new ConcurrentHashMap<>();
private Map<Integer, Long> lastReceivedMap = new ConcurrentHashMap<>();

/** Track count of incoming messages. */
protected Map<Class, Counter> incomingMessages = new NullsafeCounterHashMap();
private Map<Class, Counter> incomingMessages = new NullsafeCounterHashMap();

/** Track count of outgoing messages. */
protected Map<Class, Counter> outgoingMessages = new NullsafeCounterHashMap();
private Map<Class, Counter> outgoingMessages = new NullsafeCounterHashMap();

private long lastKeepAliveCheck;

@@ -60,8 +59,10 @@ public boolean start() {
public void process() {
super.process();

calculateNetworkQualityToClients();

long currentTime = config.timeProvider.get();
int clientSize = state.getClients().size();
int clientSize = state.getClientStates().size();
if (clientSize > 0 && clientSize >= config.requiredClients.size() && lastKeepAliveCheck + config.keepAliveInterval < currentTime) {

// Potentially "Keep Alive" will be sent, when first client joins.
@@ -82,11 +83,16 @@ public void process() {
}
}

private void calculateNetworkQualityToClients() {
state.getClientStates().process();
}

@Override
public void receive(Message message) {
boolean isConnectRequest = message instanceof ConnectRequest;
Map<Integer, InetSocketAddress> clients = state.getClients();
if (!clients.containsValue(message.getSocketAddressSender())) {

ClientStates clientStates = state.getClientStates();
if (!clientStates.hasAddress(message.getSocketAddressSender())) {
if (!isConnectRequest) {
log.warn("No client found under {}", message.getSocketAddressSender());
log.warn("Message was: {}", message);
@@ -110,30 +116,26 @@ public void receive(Message message) {
// Sender (client) id was 0 and this is a connect request
// -> client needs an id

Optional<Map.Entry<Integer, InetSocketAddress>> socketAddressOptional = clients.entrySet().stream()
.filter(entry -> entry.getValue().equals(message.getSocketAddressSender()))
.findFirst();
Integer clientIdBySocketAddress = clientStates.getIdBySocketAddress(message.getSocketAddressSender());

if (socketAddressOptional.isPresent()) {
Map.Entry<Integer, InetSocketAddress> socketAddressEntry = socketAddressOptional.get();
clientId = socketAddressEntry.getKey();
if (clientIdBySocketAddress != null) {
clientId = clientIdBySocketAddress;
log.info("Assign previous client id {} to {}.", clientId, message.getSocketAddressSender());
} else {
Integer maximumId = clients.keySet().stream().max(Comparator.naturalOrder()).orElse(0);
clientId = maximumId == null ? 1 : maximumId + 1;
clientId = clientStates.newClientId();
log.info("Assign new client id {} to {}.", clientId, message.getSocketAddressSender());
}
connectRequest.setSenderId(clientId);
connectRequest.setClientId(clientId);
}
lastReceivedMap.put(clientId, config.timeProvider.get());

boolean clientAlreadyInMap = clients.containsKey(clientId);
boolean clientAlreadyInMap = clientStates.hasId(clientId);
if (clientAlreadyInMap) {
log.info("Client {} is already in list - could be a re-join.", clientId);
unregisterClientAtProcessors(clientId);
}
clients.put(clientId, message.getSocketAddressSender());
clientStates.put(clientId, message.getSocketAddressSender());
log.info("Added {} with address {} to clients.", clientId, message.getSocketAddressSender());
registerClientAtProcessors(clientId);
}
@@ -188,14 +190,11 @@ private boolean internalSend(Message message, int exceptId) {
if (!createPayload(message)) {
return false;
}
// if (!checkPayloadSize(message)) {
// return false;
// }
}

boolean beforeSendState = true;
boolean afterSendState = true;
for (Map.Entry<Integer, InetSocketAddress> entry : state.getClients().entrySet()) {
for (Map.Entry<Integer, InetSocketAddress> entry : state.getClientStates().addressEntrySet()) {
Integer clientId = entry.getKey();
if (exceptId > 0 && exceptId == clientId) {
continue;
@@ -206,9 +205,6 @@ private boolean internalSend(Message message, int exceptId) {
if (!createPayload(message)) {
beforeSendState = false;
}
// else if (!checkPayloadSize(message)) {
// return false;
// }
}
message.socketAddressRecipient = entry.getValue();

@@ -217,9 +213,6 @@ private boolean internalSend(Message message, int exceptId) {
state.getUdpPeer().send(message);
afterSendState &= super.afterSend(message);
}
// beforeSendState &= super.beforeSend(message);
// config.udpPeer.send(message);
// afterSendState &= super.afterSend(message);

}
log.trace("Sent message: {}", message);
@@ -258,7 +251,7 @@ private boolean internalSendSameIds(Message message, int exceptId) {
}


for (Map.Entry<Integer, InetSocketAddress> entry : state.getClients().entrySet()) {
for (Map.Entry<Integer, InetSocketAddress> entry : state.getClientStates().addressEntrySet()) {
Integer clientId = entry.getKey();
if (exceptId > 0 && exceptId == clientId) {
continue;
@@ -275,18 +268,18 @@ private boolean internalSendSameIds(Message message, int exceptId) {
}

public boolean send(int clientId, Message message) {
InetSocketAddress client = state.getClients().get(clientId);
InetSocketAddress client = state.getClientStates().getById(clientId).getSocketAddress();
if (client == null) {
log.warn("Client with id {} not found.", clientId);
log.warn("Client address with id {} not found.", clientId);
return false;
}
message.socketAddressRecipient = client;
return super.send(message);
}

public void unregister(int clientId) {
log.info("Bye {}", state.getClients().get(clientId));
state.getClients().remove(clientId);
log.info("Bye {}", state.getClientStates().getById(clientId));
state.getClientStates().remove(clientId);
lastReceivedMap.remove(clientId);
config.requiredClients.remove(clientId);
unregisterClientAtProcessors(clientId);
@@ -23,16 +23,15 @@
import com.jfastnet.processors.IMessageReceiverPreProcessor;
import com.jfastnet.processors.IMessageSenderPostProcessor;
import com.jfastnet.processors.IMessageSenderPreProcessor;
import com.jfastnet.state.ClientStates;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/** @author Klaus Pfeiffer - klaus@allpiper.com */
@Slf4j
@@ -51,7 +50,7 @@
private boolean isHost;

/** The server holds track of its clients. */
private Map<Integer, InetSocketAddress> clients = new ConcurrentHashMap<>();
private final ClientStates clientStates;

/** Client will only receive messages, if connected. */
public volatile boolean connected = false;
@@ -87,6 +86,7 @@ public State(Config config) {
createIdProvider(config);
createUdpPeer(config);
createProcessors(config);
clientStates = new ClientStates(config);
}

private void createIdProvider(Config config) {
@@ -20,6 +20,7 @@
import com.jfastnet.MessageLog;
import com.jfastnet.events.RequestedMessageNotInLogEvent;
import com.jfastnet.processors.MessageLogProcessor;
import com.jfastnet.state.ClientState;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
@@ -56,6 +57,7 @@ public void process(Object context) {
// Clear sender id, if every client receives the same id for a particular message
keySenderId = 0;
}
degradeNetworkQuality();
MessageLog messageLog = getState().getProcessorOf(MessageLogProcessor.class).getMessageLog();
for (Long absentId : missingIds) {
MessageKey key = MessageKey.newKey(Message.ReliableMode.SEQUENCE_NUMBER, keySenderId, absentId);
@@ -72,4 +74,13 @@ public void process(Object context) {
getConfig().netStats.resentMessages.incrementAndGet();
}
}

private void degradeNetworkQuality() {
// If other side requests missing messages, it means we should slow down sending.
// Thus we degrade the network quality to this peer.
ClientState clientState = getState().getClientStates().getById(getSenderId());
if (clientState != null) {
clientState.getNetworkQuality().requestedMissingMessages(missingIds.size(), getConfig().timeProvider.get());
}
}
}
@@ -0,0 +1,100 @@
/*******************************************************************************
* Copyright 2016 Klaus Pfeiffer - klaus@allpiper.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/

package com.jfastnet.peers;

import com.jfastnet.ConfigStateContainer;
import com.jfastnet.messages.Message;
import com.jfastnet.state.ClientState;
import com.jfastnet.state.NetworkQuality;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.function.Consumer;

/** @author Klaus Pfeiffer - klaus@allpiper.com */
@Slf4j
public class CongestionControl<T> {

private final ConfigStateContainer configStateContainer;
private final Consumer<T> packetSender;

private final Queue<DelayedPacket> packetQueue = new ArrayDeque<>();

private long delay;

public CongestionControl(ConfigStateContainer configStateContainer, Consumer<T> packetSender) {
this.configStateContainer = configStateContainer;
this.packetSender = packetSender;
}

public void send(Message message, T packet) {
if (message.isResendMessage()) {
immediateSend(packet);
return;
}
InetSocketAddress socketAddressRecipient = message.socketAddressRecipient;
ClientState clientState = configStateContainer.state.getClientStates().getBySocketAddress(socketAddressRecipient);
float qualityFactor = 1f;
if (clientState != null) {
NetworkQuality networkQuality = clientState.getNetworkQuality();
qualityFactor = networkQuality.qualityFactor;
}

if (qualityFactor > 0.9f && packetQueue.isEmpty()) {
immediateSend(packet);
delay = 0;
} else {
delay = (long) ((1f - qualityFactor) * 1000);

long sendTimeStamp;
DelayedPacket lastDelayedPacket = packetQueue.peek();
if (lastDelayedPacket != null) {
sendTimeStamp = lastDelayedPacket.sendTimeStamp + delay;
} else {
long currentTime = configStateContainer.config.timeProvider.get();
sendTimeStamp = currentTime + delay;
}
packetQueue.add(new DelayedPacket(sendTimeStamp, packet));
}
}

private void immediateSend(T packet) {
packetSender.accept(packet);
}

public void process() {
long currentTime = configStateContainer.config.timeProvider.get();
for (DelayedPacket delayedPacket = packetQueue.peek();
delayedPacket != null && delayedPacket.sendTimeStamp <= currentTime;
delayedPacket = packetQueue.peek()) {

immediateSend(delayedPacket.packet);
packetQueue.poll();
}
}

private class DelayedPacket {
long sendTimeStamp;
T packet;
DelayedPacket(long sendTimeStamp, T packet) {
this.sendTimeStamp = sendTimeStamp;
this.packet = packet;
}
}
}
Oops, something went wrong.

0 comments on commit 551acfd

Please sign in to comment.
You can’t perform that action at this time.