Skip to content

Commit

Permalink
x.vweb.sse: reimplement SSE module for x.vweb (#20203)
Browse files Browse the repository at this point in the history
  • Loading branch information
Casper64 committed Dec 17, 2023
1 parent 25c900f commit cfd19bf
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 2 deletions.
10 changes: 8 additions & 2 deletions vlib/x/vweb/context.v
Expand Up @@ -97,10 +97,16 @@ pub fn (mut ctx Context) send_response_to_client(mimetype string, response strin
// set Content-Type and Content-Length headers
mut custom_mimetype := if ctx.content_type.len == 0 { mimetype } else { ctx.content_type }
ctx.res.header.set(.content_type, custom_mimetype)
ctx.res.header.set(.content_length, ctx.res.body.len.str())
if ctx.res.body.len > 0 {
ctx.res.header.set(.content_length, ctx.res.body.len.str())
}
// send vweb's closing headers
ctx.res.header.set(.server, 'VWeb')
ctx.res.header.set(.connection, 'close')
// sent `Connection: close header` by default, if the user hasn't specified that the
// connection should not be closed.
if !ctx.takeover {
ctx.res.header.set(.connection, 'close')
}
// set the http version
ctx.res.set_version(.v1_1)
if ctx.res.status_code == 0 {
Expand Down
61 changes: 61 additions & 0 deletions vlib/x/vweb/sse/README.md
@@ -0,0 +1,61 @@
# Server Sent Events

This module implements the server side of `Server Sent Events`, SSE.
See [mozilla SSE][mozilla_sse]
as well as [whatwg][whatwg html spec]
for detailed description of the protocol, and a simple web browser client example.

## Usage

With SSE we want to keep the connection open, so we are able to
keep sending events to the client. But if we hold the connection open indefinitely
vweb isn't able to process any other requests.

We can let vweb know that it can continue
processing other requests and that we will handle the connection ourself by
returning `ctx.takeover_conn()`. Vweb will not close the connection and we can handle
the connection in a seperate thread.

**Example:**
```v ignore
import x.vweb.sse
// endpoint handler for SSE connections
fn (app &App) sse(mut ctx Context) vweb.Result {
// handle the connection in a new thread
spawn handle_sse_conn(mut ctx)
// let vweb know that the connection should not be closed
return ctx.takeover_conn()
}
fn handle_sse_conn(mut ctx Context) {
// pass vweb.Context
mut sse_conn := sse.start_connection(mut ctx.Context)
// send a message every second 3 times
for _ in 0.. 3 {
time.sleep(time.second)
sse_conn.send_message(data: 'ping') or { break }
}
// close the SSE connection
sse_conn.close()
}
```

Javascript code:
```js
const eventSource = new EventSource('/sse');

eventSource.addEventListener('message', (event) => {
console.log('received mesage:', event.data);
});

eventSource.addEventListener('close', () => {
console.log('closing the connection')
// prevent browser from reconnecting
eventSource.close();
});
```

[mozilla_sse]: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
[whatwg html spec]: https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events
74 changes: 74 additions & 0 deletions vlib/x/vweb/sse/sse.v
@@ -0,0 +1,74 @@
module sse

import x.vweb
import net
import strings

// This module implements the server side of `Server Sent Events`.
// See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
// as well as https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events
// for detailed description of the protocol, and a simple web browser client example.
//
// > Event stream format
// > The event stream is a simple stream of text data which must be encoded using UTF-8.
// > Messages in the event stream are separated by a pair of newline characters.
// > A colon as the first character of a line is in essence a comment, and is ignored.
// > Note: The comment line can be used to prevent connections from timing out;
// > a server can send a comment periodically to keep the connection alive.
// >
// > Each message consists of one or more lines of text listing the fields for that message.
// > Each field is represented by the field name, followed by a colon, followed by the text
// > data for that field's value.

@[params]
pub struct SSEMessage {
pub mut:
id string
event string
data string
retry int
}

@[heap]
pub struct SSEConnection {
pub mut:
conn &net.TcpConn @[required]
}

// start an SSE connection
pub fn start_connection(mut ctx vweb.Context) &SSEConnection {
ctx.takeover_conn()
ctx.res.header.set(.connection, 'keep-alive')
ctx.res.header.set(.cache_control, 'no-cache')
ctx.send_response_to_client('text/event-stream', '')

return &SSEConnection{
conn: ctx.conn
}
}

// send_message sends a single message to the http client that listens for SSE.
// It does not close the connection, so you can use it many times in a loop.
pub fn (mut sse SSEConnection) send_message(message SSEMessage) ! {
mut sb := strings.new_builder(512)
if message.id != '' {
sb.write_string('id: ${message.id}\n')
}
if message.event != '' {
sb.write_string('event: ${message.event}\n')
}
if message.data != '' {
sb.write_string('data: ${message.data}\n')
}
if message.retry != 0 {
sb.write_string('retry: ${message.retry}\n')
}
sb.write_string('\n')
sse.conn.write(sb)!
}

// send a 'close' event and close the tcp connection.
pub fn (mut sse SSEConnection) close() {
sse.send_message(event: 'close', data: 'Closing the connection', retry: -1) or {}
sse.conn.close() or {}
}
66 changes: 66 additions & 0 deletions vlib/x/vweb/sse/sse_test.v
@@ -0,0 +1,66 @@
import x.vweb
import x.vweb.sse
import time
import net.http

const port = 13008
const localserver = 'http://127.0.0.1:${port}'
const exit_after = time.second * 10

pub struct Context {
vweb.Context
}

pub struct App {}

fn (app &App) sse(mut ctx Context) vweb.Result {
spawn handle_sse_conn(mut ctx)
return ctx.takeover_conn()
}

fn handle_sse_conn(mut ctx Context) {
// pass vweb.Context
mut sse_conn := sse.start_connection(mut ctx.Context)

for _ in 0 .. 3 {
time.sleep(time.second)
sse_conn.send_message(data: 'ping') or { break }
}
sse_conn.close()
}

fn testsuite_begin() {
mut app := &App{}

spawn vweb.run_at[App, Context](mut app, port: port, family: .ip)
// app startup time
time.sleep(time.second * 2)
spawn fn () {
time.sleep(exit_after)
assert true == false, 'timeout reached!'
exit(1)
}()
}

fn test_sse() ! {
mut x := http.get('${localserver}/sse')!

connection := x.header.get(.connection) or {
assert true == false, 'Header Connection should be set!'
panic('missing header')
}
cache_control := x.header.get(.cache_control) or {
assert true == false, 'Header Cache-Control should be set!'
panic('missing header')
}
content_type := x.header.get(.content_type) or {
assert true == false, 'Header Content-Type should be set!'
panic('missing header')
}
assert connection == 'keep-alive'
assert cache_control == 'no-cache'
assert content_type == 'text/event-stream'

eprintln(x.body)
assert x.body == 'data: ping\n\ndata: ping\n\ndata: ping\n\nevent: close\ndata: Closing the connection\nretry: -1\n\n'
}

0 comments on commit cfd19bf

Please sign in to comment.