Skip to content

Commit

Permalink
Improved Block+Observe handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthias Kovatsch committed Mar 17, 2014
1 parent b82c95f commit 3e10310
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 32 deletions.
Expand Up @@ -124,13 +124,18 @@ public void sendResponse(Exchange exchange, Response response) {
response.getDestination().getAddress(), response.getDestinationPort());
exchangesByMID.put(idByMID, exchange);

if (response.getOptions().hasBlock2() && !response.getOptions().hasObserve()) {
// Remember ongoing blockwise GET requests
if (response.getOptions().hasBlock2()) {
Request request = exchange.getRequest();
KeyUri idByUri = new KeyUri(request.getURI(),
response.getDestination().getAddress(), response.getDestinationPort());
LOGGER.fine("New ongoing exchange for Block2 response with key "+idByUri);
ongoingExchanges.put(idByUri, exchange);
if (exchange.getResponseBlockStatus()!=null && !response.getOptions().hasObserve()) {
// Remember ongoing blockwise GET requests
LOGGER.fine("Ongoing Block2 started, storing "+idByUri + "\nOngoing " + request + "\nOngoing " + response);
ongoingExchanges.put(idByUri, exchange);
} else {
LOGGER.fine("Ongoing Block2 completed, cleaning up "+idByUri + "\nOngoing " + request + "\nOngoing " + response);
ongoingExchanges.remove(idByUri);
}
}

if (response.getType() == Type.ACK || response.getType() == Type.NON) {
Expand All @@ -152,7 +157,7 @@ public void sendEmptyMessage(Exchange exchange, EmptyMessage message) {
* We do not expect any response for an empty message
*/
if (message.getMID() == Message.NONE)
LOGGER.warning("Empy message "+ message+" has MID NONE // debugging");
LOGGER.severe("Empy message "+ message+" has no MID // debugging");
}

public Exchange receiveRequest(Request request) {
Expand Down Expand Up @@ -325,13 +330,15 @@ public void completed(Exchange exchange) {
// TODO: We can optimize this and only do it, when the request really had blockwise transfer
KeyUri uriKey = new KeyUri(request.getURI(),
request.getSource().getAddress(), request.getSourcePort());
LOGGER.severe("++++++++++++++++++Ongoing completed, cleaning up "+uriKey);
ongoingExchanges.remove(uriKey);
}
// TODO: What if the request is only a block?
// TODO: This should only happen if the transfer was blockwise

Response response = exchange.getResponse();
if (response != null) {
// only response MIDs are stored for ACK and RST, no reponse Tokens
KeyMID midKey = new KeyMID(response.getMID(),
response.getDestination().getAddress(), response.getDestinationPort());
exchangesByMID.remove(midKey);
Expand Down
Expand Up @@ -155,7 +155,7 @@ public void receiveRequest(Exchange exchange, Request request) {
// Assemble and deliver
Request assembled = new Request(request.getCode()); // getAssembledRequest(status, request);
assembleMessage(status, assembled, request);
// assembled.setAcknowledged(true); // TODO: prevents accept from sending ACK. smart?
// assembled.setAcknowledged(true); // TODO: prevents accept from sending ACK. Maybe the resource uses separate...
exchange.setRequest(assembled);
super.receiveRequest(exchange, assembled);
}
Expand All @@ -180,18 +180,16 @@ public void receiveRequest(Exchange exchange, Request request) {
status.setCurrentNum(block2.getNum());
status.setCurrentSzx(block2.getSzx());

Response block = getNextResponsesBlock(response, status);
Response block = getNextResponseBlock(response, status);
block.setToken(request.getToken());

// TODO: Are we allowed to NOT remove the observe option?
if (status.getCurrentNum() > 0)
block.getOptions().removeObserve();
block.getOptions().removeObserve();

//FIXME
// This is necessary for notifications that are sent blockwise:
if (status.isComplete()) {
status.setCurrentNum(0);
response.setAcknowledged(true); // allows to send the next notification
// clean up blockwise status
LOGGER.severe("Ongoing is complete "+status);
exchange.setResponseBlockStatus(null);
} else {
LOGGER.severe("Ongoing is continuing "+status);
}

exchange.setCurrentResponse(block);
Expand All @@ -217,14 +215,15 @@ public void sendResponse(Exchange exchange, Response response) {

BlockwiseStatus status = findResponseBlockStatus(exchange, response);

Response block = getNextResponsesBlock(response, status);
Response block = getNextResponseBlock(response, status);
block.setType(response.getType()); // This is only true for the first block
if (block1 != null) // in case we still have to ack the last block1
block.getOptions().setBlock1(block1);
if (block.getToken() == null)
block.setToken(exchange.getRequest().getToken());

if (response.getOptions().hasObserve()) {
// the ACK for the first block should acknowledge the whole notification
exchange.setCurrentResponse(response);
} else {
exchange.setCurrentResponse(block);
Expand Down Expand Up @@ -336,8 +335,10 @@ public void receiveResponse(Exchange exchange, Response response) {
// TODO: This scenario is not specified in the draft.
// Currently, we reject it and cancel the request.
LOGGER.warning("Wrong block number. Expected "+status.getCurrentNum()+" but received "+block2.getNum()+". Reject response; exchange has failed.");
EmptyMessage rst = EmptyMessage.newRST(response);
super.sendEmptyMessage(exchange, rst);
if (response.getType()==Type.CON) {
EmptyMessage rst = EmptyMessage.newRST(response);
super.sendEmptyMessage(exchange, rst);
}
exchange.getRequest().cancel();
}
}
Expand Down Expand Up @@ -419,7 +420,7 @@ private Request getNextRequestBlock(Request request, BlockwiseStatus status) {
return block;
}

private Response getNextResponsesBlock(Response response, BlockwiseStatus status) {
private Response getNextResponseBlock(Response response, BlockwiseStatus status) {
int szx = status.getCurrentSzx();
int num = status.getCurrentNum();
Response block = new Response(response.getCode());
Expand Down
Expand Up @@ -151,8 +151,9 @@ public void sendResponse(Exchange exchange, Response response) {

@Override
public void receiveRequest(Exchange exchange, Request request) {
// if there is no BlockwiseLayer we still have to set it
if (exchange.getRequest() == null)
throw new NullPointerException("Final assembled request of exchange must not be null");
exchange.setRequest(request);
if (deliverer != null) {
deliverer.deliverRequest(exchange);
} else {
Expand All @@ -165,7 +166,6 @@ public void receiveResponse(Exchange exchange, Response response) {
if (!response.getOptions().hasObserve())
exchange.setComplete();
if (deliverer != null) {
LOGGER.fine("Top of CoAP stack delivers response");
deliverer.deliverResponse(exchange, response); // notify request that response has arrived
} else {
LOGGER.severe("Top of CoAP stack has no deliverer to deliver response");
Expand Down
Expand Up @@ -65,12 +65,10 @@ public void sendResponse(final Exchange exchange, final Response response) {
synchronized (exchange) {
Response current = relation.getCurrentControlNotification();
if (current != null && isInTransit(current)) {
LOGGER.fine("A former notification is still in transit. Postpone this one");
LOGGER.fine("A former notification is still in transit. Postpone " + response);
relation.setNextControlNotification(response);
return;

} else {
LOGGER.finer("There is no current CON notification in transit. Go ahead and send the new one.");
relation.setCurrentControlNotification(response);
relation.setNextControlNotification(null);
}
Expand Down
Expand Up @@ -97,6 +97,7 @@ public void sendResponse(final Exchange exchange, final Response response) {
}

if (response.getType() == Type.CON) {
LOGGER.finer("Scheduling retransmission for " + response);
prepareRetransmission(exchange, new RetransmissionTask(exchange, response) {
public void retransmit() {
sendResponse(exchange, response);
Expand Down
Expand Up @@ -84,8 +84,12 @@ public void test_all() throws Exception {
test_POST_long_short();
test_POST_short_long();
test_POST_long_long();
// repeat test to check ongoing clean-up
test_POST_long_long();
test_GET_short();
test_GET_long();
// repeat test to check ongoing clean-up
test_GET_long();
}

public void test_POST_short_short() throws Exception {
Expand Down Expand Up @@ -156,8 +160,7 @@ private void executePOSTRequest() throws Exception {
try {
interceptor.clear();
Request request = new Request(CoAP.Code.POST);
request.setDestination(InetAddress.getLocalHost());
request.setDestinationPort(serverPort);
request.setURI("coap://localhost:" + serverPort + "/" + request_short + respond_short);
if (request_short) request.setPayload(SHORT_POST_REQUEST);
else request.setPayload(LONG_POST_REQUEST);
clientEndpoint.sendRequest(request);
Expand Down
Expand Up @@ -612,15 +612,15 @@ private void testObserveWithBlockwiseResponse() throws Exception {
client.sendRequest(CON, GET, tok, ++mid).path(path).observe(0).go();
client.expectResponse(ACK, CONTENT, tok, mid).block2(0, true, 128).observe(0).block2(0, true, 128).payload(respPayload.substring(0, 128)).go();

byte[] tok4 = generateNextToken();
client.sendRequest(CON, GET, tok4, ++mid).path(path).block2(1, false, 128).go();
client.expectResponse(ACK, CONTENT, tok4, mid).block2(1, true, 128).noOption(OBSERVE).payload(respPayload.substring(128, 256)).go();
byte[] tok1 = generateNextToken();
client.sendRequest(CON, GET, tok1, ++mid).path(path).block2(1, false, 128).go();
client.expectResponse(ACK, CONTENT, tok1, mid).block2(1, true, 128).noOption(OBSERVE).payload(respPayload.substring(128, 256)).go();

client.sendRequest(CON, GET, tok4, ++mid).path(path).block2(2, false, 128).go();
client.expectResponse(ACK, CONTENT, tok4, mid).block2(2, false, 128).noOption(OBSERVE).payload(respPayload.substring(256, 300)).go();
client.sendRequest(CON, GET, tok1, ++mid).path(path).block2(2, false, 128).go();
client.expectResponse(ACK, CONTENT, tok1, mid).block2(2, false, 128).noOption(OBSERVE).payload(respPayload.substring(256, 300)).go();

System.out.println("Send first notification");
serverInterceptor.log("\n... time passes ...");
System.out.println("Send first notification");
respPayload = generatePayload(280);
test1.changed();

Expand Down

0 comments on commit 3e10310

Please sign in to comment.