Skip to content

Commit

Permalink
Still working on socket framing upgrades...
Browse files Browse the repository at this point in the history
  • Loading branch information
B. W. Lewis committed Oct 13, 2011
1 parent f2aea55 commit 4b81b58
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 56 deletions.
8 changes: 5 additions & 3 deletions R/websockets-internal.R
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ print(rawToBits(rest))
{
cs <- .SOCK_ACCEPT(socket)
client_sockets = server$client_sockets
client_sockets[[length(client_sockets)+1]] =
list(socket=cs, wsinfo=NULL, server=server)
# client_sockets[[length(client_sockets)+1]] =
client_sockets[[as.character(cs)]] =
list(socket=cs, wsinfo=NULL, server=server, new=TRUE)
assign('client_sockets',client_sockets, envir=server)
invisible()
}
Expand All @@ -254,7 +255,8 @@ print(rawToBits(rest))
{
server <- socket$server
cs <- socket$server$client_sockets
cs <- cs[!(unlist(lapply(cs,function(x) x$socket)) == socket$socket)]
# cs <- cs[!(unlist(lapply(cs,function(x) x$socket)) == socket$socket)]
cs[[as.character(socket$socket)]] = c()
j = .SOCK_CLOSE(socket$socket)
assign('client_sockets',cs, envir=server)
# Trigger client closed callback
Expand Down
97 changes: 49 additions & 48 deletions R/websockets.R
Original file line number Diff line number Diff line change
Expand Up @@ -156,71 +156,72 @@
# j holds just the socket file descriptor, or a negated descriptor
# indicating an error condition. Retrieve the client socket from the server
# environment in J. XXX Improve this with a hashed lookup.
J <- server$client_sockets[
unlist(lapply(server$client_sockets,
function(x) x$socket)) == abs(j)][[1]]
J <- server$client_sockets[[as.character(j)]]
if(j<0) {
# Poll reports an error condition for this socket. Close it.
websocket_close(J)
next
}
# A connected client is sending something.
# Note: Presently, program copies into a raw vector. Will also
# soon support in place recv via external pointers.
x <- .SOCK_RECV(j,max_buffer_size=getOption("websockets_max_buffer_size"))
if(length(x)<1) {
# Can't have an empty transmission, close the socket.
websocket_close(J)
next
}
h <- .parse_header(x)
if(is.null(h) && !is.null(J$wsinfo$v)) {
# Not a GET request, assume an incoming websocket payload.
if(!is.function(server$receive)){
# Burn payload, nothing to do with it...
# A connected client is sending us something!
if(J$new) {
# This is a new client connection, handshake.
J$new <- FALSE
x <- .SOCK_RECV_HTTP_HEAD(j)
h <- .parse_header(x)
if(is.null(h)) {
# Shucks, something wrong with this client. Drop him.
websocket_close(J)
next
}
v <- J$wsinfo$v
# XXX OK, high incoming data rates can result in multiple frames
# combined into one payload.
# XXX handle this...
if(v<4) {
server$receive(WS=J, DATA=.v00_unframe(x), COOKIE=NULL)
}
else{
DATA <- .unframe(x)
if(DATA$header$opcode == 1){
server$receive(WS=J, DATA=DATA$data, COOKIE=NULL)
} else if(DATA$header$opcode == 8) {
websocket_close(J)
next
}
}
}
else if(is.null(h$Upgrade))
{
# A static request, not a websocket
if(is.function(server$static)) server$static(j,h)
.remove_client(J)
}
else {
# Try to establish a new websocket connection
v <- 0
if(!is.null(h[["Sec-WebSocket-Version"]]))
if(as.numeric(h[["Sec-WebSocket-Version"]])>=4) v <- 4
h$v <- v
# Stash this client's header, identifying websocket protocol version, etc.
# in the appropriate client_socket list
cs <- server$client_sockets
cs[unlist(lapply(cs,function(x) x$socket)) == j][[1]]$wsinfo <- h
J$wsinfo <- h
cs[[as.character(j)]] <- J
assign("client_sockets",cs,envir=server)

if(is.null(h$Upgrade)) {
# Not a handshake request, serve a static web page
if(is.function(server$static)) server$static(j,h)
.remove_client(J)
next
}
# Negotiate a websocket connection
if(v<4) .SOCK_SEND(j,.v00_resp_101(h))
else .SOCK_SEND(j,.v04_resp_101(h))
# Trigger callback for newly-established connections
if(is.function(server$established))
server$established(WS=J,DATA=NULL,COOKIE=NULL)
# COOKIE will go away in next version...it is useless in this version
# just there for compatibility with older versions...
next
} else if(J$wsinfo$v < 4) {
# Old protocol
x <- .SOCK_RECV_FRAME00(j,max_buffer_size=getOption("websockets_max_buffer_size"))
} else {
# Try the latest protocol
x <- .SOCK_RECV_FRAME(j,max_buffer_size=getOption("websockets_max_buffer_size"))
}
if(length(x)<1) {
# Can't have an empty transmission, close the socket.
websocket_close(J)
next
}
# Burn payload if we cant use it.
if(!is.function(server$receive)) next
if(J$wsinfo$v < 4) {
server$receive(WS=J, DATA=.v00_unframe(x), COOKIE=NULL)
}
else{
DATA <- .unframe(x)
if(DATA$header$opcode == 1){
server$receive(WS=J, DATA=DATA$data, COOKIE=NULL)
} else if(DATA$header$opcode == 8) {
websocket_close(J)
next
}
}
}
}
Expand Down Expand Up @@ -269,9 +270,9 @@
.SOCK_CLOSE(s)
stop("Connection timeout")
}
x <- .SOCK_RECV(s,max_buffer_size=getOption("websockets_max_buffer_size"))
x <- .SOCK_RECV_HTTP_HEAD(s)
# XXX XXX parse for valid connection headers to finish handshake...
context$client_sockets[[length(context$client_sockets) + 1]] =
list(socket=s, wsinfo=list(v=8), server=NULL)
context$client_sockets[[as.character(s)]] <-
list(socket=s, wsinfo=list(v=8), server=NULL, new=FALSE)
context
}
7 changes: 2 additions & 5 deletions src/libsock.c
Original file line number Diff line number Diff line change
Expand Up @@ -542,20 +542,17 @@ SEXP SOCK_RECV_HTTP_HEAD(SEXP S)
j = recv(s, &c, 1, 0);
if(j<0) break;
buf[k] = c;
k++;
if(k>3 && buf[k]=='\n' &&
buf[k-1]=='\r' &&
buf[k-2]=='\n' &&
buf[k-3]=='\r') break;
k++;
if(k+1 > bufsize) {
bufsize = bufsize + MBUF;
buf = (char *)realloc(buf, bufsize);
}
h = poll(&pfds, 1, 50);
if(h<1) {
--k;
break;
}
if(h<1) break;
}
PROTECT(ans=allocVector(RAWSXP,k));
p = (char *)RAW(ans);
Expand Down

0 comments on commit 4b81b58

Please sign in to comment.