Skip to content

Commit

Permalink
libuv
Browse files Browse the repository at this point in the history
  • Loading branch information
sksum committed Jul 5, 2021
1 parent f686b5e commit 2c5bfc0
Show file tree
Hide file tree
Showing 9 changed files with 471 additions and 3 deletions.
6 changes: 3 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "rhaegal/libuv"]
path = rhaegal/libuv
url = https://github.com/libuv/libuv.git
[submodule "libuv"]
path = libuv
url = https://github.com/libuv/libuv.git
Empty file added Rhaegal/demo/README.md
Empty file.
13 changes: 13 additions & 0 deletions Rhaegal/src/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.DEFAULT_GOAL := libuvl
CC=gcc
PORT = 42069
n_threads_testing = 4

libuvl:
${CC} -Wall -luv -pthread -O3 -g -DNDEBUG -lpthread server.c utils.c -o rheagal-core.out
./rheagal-core.out ${PORT}
clean:
@echo "Cleaning up..."
rm *.out
test:
python3 sample-client.py localhost ${PORT} -n ${n_threads_testing}
9 changes: 9 additions & 0 deletions Rhaegal/src/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## Steps:
- Update submodules libuv(event loop abstraction) and http_parser(parse requests)
- Install libuv and http_parser (method on their github)
- ### libuv
- sh autogen.sh
- ./configure
- make
- make check
- sudo make install
100 changes: 100 additions & 0 deletions Rhaegal/src/sample-client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Simple client used to interact with concurrent servers.
#
# Launches N concurrent client connections, each executing a pre-set sequence of
# sends to the server, and logs what was received back.
#
# Tested with Python 3.6
#
# Eli Bendersky [http://eli.thegreenplace.net]
# This code is in the public domain.
import argparse
import logging
import socket
import sys
import threading
import time


class ReadThread(threading.Thread):
def __init__(self, name, sockobj):
super().__init__()
self.sockobj = sockobj
self.name = name
self.bufsize = 8 * 1024

def run(self):
fullbuf = b''
while True:
buf = self.sockobj.recv(self.bufsize)
logging.info('{0} received {1}'.format(self.name, buf))
fullbuf += buf
if b'1111' in fullbuf:
break


def make_new_connection(name, host, port):
"""Creates a single socket connection to the host:port.
Sets a pre-set sequence of messages to the server with pre-set delays; in
parallel, reads from the socket in a separate thread.
"""
sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sockobj.connect((host, port))
if sockobj.recv(1) != b'*':
logging.error('Something is wrong! Did not receive *')
logging.info('{0} connected...'.format(name))

rthread = ReadThread(name, sockobj)
rthread.start()

s = b'^abc$de^abte$f'
logging.info('{0} sending {1}'.format(name, s))
sockobj.send(s)
time.sleep(1.0)

s = b'xyz^123'
logging.info('{0} sending {1}'.format(name, s))
sockobj.send(s)
time.sleep(1.0)

# The 0000 sent to the server here will result in an echo of 1111, which is
# a sign for the reading thread to terminate.
# Add WXY after 0000 to enable kill-switch in some servers.
s = b'25$^ab0000$abab'
logging.info('{0} sending {1}'.format(name, s))
sockobj.send(s)
time.sleep(0.2)

rthread.join()
sockobj.close()
logging.info('{0} disconnecting'.format(name))


def main():
argparser = argparse.ArgumentParser('Simple TCP client')
argparser.add_argument('host', help='Server host name')
argparser.add_argument('port', type=int, help='Server port')
argparser.add_argument('-n', '--num_concurrent', type=int,
default=1,
help='Number of concurrent connections')
args = argparser.parse_args()
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)s:%(asctime)s:%(message)s')

t1 = time.time()
connections = []
for i in range(args.num_concurrent):
name = 'conn{0}'.format(i)
tconn = threading.Thread(target=make_new_connection,
args=(name, args.host, args.port))
tconn.start()
connections.append(tconn)

for conn in connections:
conn.join()

print('Elapsed:', time.time() - t1)


if __name__ == '__main__':
main()
221 changes: 221 additions & 0 deletions Rhaegal/src/server.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "uv.h"
#include "utils.h"
// #include "llhttp.h"

#define N_BACKLOG 64
#define SENDBUF_SIZE 1024

typedef enum
{
INITIAL_ACK,
WAIT_FOR_MSG,
IN_MSG
} ProcessingState;
typedef struct
{
ProcessingState state;
char sendbuf[SENDBUF_SIZE];
int sendbuf_end;
uv_tcp_t *client;
} peer_state_t;

void on_wrote_init_ack(uv_write_t *req, int status)
{
if (status)
{
die("Write error: %s\n", uv_strerror(status));
}
peer_state_t *peerstate = (peer_state_t *)req->data;
// Flip the peer state to WAIT_FOR_MSG, and start listening for incoming data
// from this peer.
peerstate->state = WAIT_FOR_MSG;
peerstate->sendbuf_end = 0;

int rc;
if ((rc = uv_read_start((uv_stream_t *)peerstate->client, on_alloc_buffer,
on_peer_read)) < 0)
{
die("uv_read_start failed: %s", uv_strerror(rc));
}

// Note: the write request doesn't own the peer state, hence we only free the
// request itself, not the state.
free(req);
}

void on_peer_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf)
{
if (nread < 0)
{
if (nread != uv_eof)
{
fprintf(stderr, "read error: %s\n", uv_strerror(nread));
}
uv_close((uv_handle_t *)client, on_client_closed);
}
else if (nread == 0)
{
// from the documentation of uv_read_cb: nread might be 0, which does not
// indicate an error or eof. this is equivalent to eagain or ewouldblock
// under read(2).
}
else
{
// nread > 0
assert(buf->len >= nread);

peer_state_t *peerstate = (peer_state_t *)client->data;
if (peerstate->state == initial_ack)
{
// if the initial ack hasn't been sent for some reason, ignore whatever
// the client sends in.
free(buf->base);
return;
}

// run the protocol state machine.
for (int i = 0; i < nread; ++i)
{
switch (peerstate->state)
{
case initial_ack:
assert(0 && "can't reach here");
break;
case wait_for_msg:
if (buf->base[i] == '^')
{
peerstate->state = in_msg;
}
break;
case in_msg:
if (buf->base[i] == '$')
{
peerstate->state = wait_for_msg;
}
else
{
assert(peerstate->sendbuf_end < sendbuf_size);
peerstate->sendbuf[peerstate->sendbuf_end++] = buf->base[i] + 1;
}
break;
}
}

if (peerstate->sendbuf_end > 0)
{
// we have data to send. the write buffer will point to the buffer stored
// in the peer state for this client.
uv_buf_t writebuf =
uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
uv_write_t *writereq = (uv_write_t *)xmalloc(sizeof(*writereq));
writereq->data = peerstate;
int rc;
if ((rc = uv_write(writereq, (uv_stream_t *)client, &writebuf, 1,
on_wrote_buf)) < 0)
{
die("uv_write failed: %s", uv_strerror(rc));
}
}
}
free(buf->base);
}

void on_peer_connected(uv_stream_t *server_stream, int status)
{
printf("Connected my g !\n");
if (status < 0)
die("%s failed: %s", "Peer Connection", uv_strerror(status));

// client will represent this peer; it's allocated on the heap and only
// released when the client disconnects. The client holds a pointer to
// peer_state_t in its data field; this peer state tracks the protocol state
// with this client throughout interaction.
uv_tcp_t *client = (uv_tcp_t *)xmalloc(sizeof(*client));
status = uv_tcp_init(uv_default_loop(), client);
if (status < 0)
die("%s failed: %s", "uv_tcp_init", uv_strerror(status));

client->data = NULL;
if (uv_accept(server_stream, (uv_stream_t *)client) == 0)
{
struct sockaddr_storage peername;
int namelen = sizeof(peername);
status = uv_tcp_getpeername(client, (struct sockaddr *)&peername, &namelen);
if (status < 0)
die("%s failed: %s", "uv_tcp_getpeername", uv_strerror(status));
report_peer_connected((const struct sockaddr_in *)&peername, namelen);

// Initialize the peer state for a new client: we start by sending the peer
// the initial '*' ack.
peer_state_t *peerstate = (peer_state_t *)xmalloc(sizeof(*peerstate));
peerstate->state = INITIAL_ACK;
peerstate->sendbuf[0] = '*';
peerstate->sendbuf_end = 1;
peerstate->client = client;
client->data = peerstate;

// Enqueue the write request to send the ack; when it's done,
// on_wrote_init_ack will be called. The peer state is passed to the write
// request via the data pointer; the write request does not own this peer
// state - it's owned by the client handle.
uv_buf_t writebuf = uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
uv_write_t *req = (uv_write_t *)xmalloc(sizeof(*req));
req->data = peerstate;
status = uv_write(req, (uv_stream_t *)client, &writebuf, 1, on_wrote_init_ack);
if (status < 0)
die("%s failed: %s", "uv_write", uv_strerror(status));
}
else
uv_close((uv_handle_t *)client, on_client_closed);
}

int main(int argc, const char **argv)
{
// print with no buffer
setvbuf(stdout, NULL, _IONBF, 0);
uv_loop_t *DEFAULT_LOOP = uv_default_loop();
int portnum = 9090;
if (argc >= 2)
{
portnum = atoi(argv[1]);
}

printf("Serving on port %d\n", portnum);

uv_tcp_t server_stream;
struct sockaddr_in server_address;
// init default loop by libuv and start a server stream.
int status = uv_tcp_init(DEFAULT_LOOP, &server_stream);
printf("%d\n", status);
if (status < 0)
die("%s failed: %s", "uv_tcp_init", uv_strerror(status));
printf("%d\n", status);

// create a socket for the server to listen on
status = uv_ip4_addr("0.0.0.0", portnum, &server_address);
if (status < 0)
die("%s failed: %s", "uv_ip4_addr", uv_strerror(status));
printf("%d\n", status);

// connect/bind the server on the socket
status = uv_tcp_bind(&server_stream, (const struct sockaddr *)&server_address, 0);
if (status < 0)
die("%s failed: %s", "uv_tcp_bind", uv_strerror(status));
printf("%d\n", status);

//setup on connect callback and start listening on stream.
status = uv_listen((uv_stream_t *)&server_stream, N_BACKLOG, on_peer_connected);
if (status < 0)
die("%s failed: %s", "uv_listen", uv_strerror(status));
printf("%d\n", status);

// Run the libuv event loop.
uv_run(DEFAULT_LOOP, UV_RUN_DEFAULT);
// If uv_run returned, close the default loop before exiting.
return uv_loop_close(DEFAULT_LOOP);
}
Loading

0 comments on commit 2c5bfc0

Please sign in to comment.