Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative SSE client #25

Open
tmaxmax opened this issue Feb 6, 2024 · 11 comments
Open

Alternative SSE client #25

tmaxmax opened this issue Feb 6, 2024 · 11 comments
Labels
proposal New APIs, changes to existing APIs and behaviors
Milestone

Comments

@tmaxmax
Copy link
Owner

tmaxmax commented Feb 6, 2024

The current client API offers a great deal of flexibility:

  • with an sse.Client multiple Connections with the same configuration can be made
  • there can be multiple event listeners (for distinct or the same event)
  • event listeners can be added or removed after the connection is established
  • event listeners can listen to a single event type or all of them.

This doesn't come for free, though: both the user-facing API and the implementation code are complex, and the client uses a bit more resources, generates more garbage and must ensure serialized concurrent access to internal state. Moreover, the current client:

  • does not announce in any way what the connection status is – see Appendix 1 to this proposal for context and how it could be tackled.
  • is cumbersome for consuming OpenAI-style streams – see Appendix 2 for a utility specifically designed for reading such streams.

As of now, the instantiation of a connection with cancellation, some custom configuration and sending data on a channel looks as follows:

ctx, cancel := context.WithCancel(context.Background()) // or other means of creating a context, might come from somewhere
defer cancel()

client := sse.Client{
	/* your options */
}

r, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:8000", http.NoBody)
conn := client.NewConnection(r)

ch := make(chan sse.Event)
go func() {
	for ev := range ch {
		fmt.Println("%s\n\n", event.Data)
	}
}()

conn.SubscribeMessages(func(event sse.Event) {
	ch <- event
})

if err := conn.Connect(); err != nil {
	fmt.Fprintln(os.Stderr, err)
}

I've added the channel because from what I have observed, most users of the library create callbacks which mainly send the events on a channel to be consumed elsewhere.

I think this is quite a mouthful and I wonder whether enough use of the aforementioned flexibilities is made for them to justify the current API.

Here's another way in which I think the client could be designed. Instead of having a Client type and a Connection type with many methods, we could instead have the following:

package sse

// Connect does the HTTP request, receives the events from the server and sends them
// on the given channel.
//
// Returns errors if any of the parameters are invalid. Besides that it has the exact same
// behavior as `sse.Connection.Connect` has.
func Connect(req *http.Request, msgs chan<- Event, config *ConnectConfig) error

type ConnectConfig struct {
	/* the same stuff that is on sse.Client currently */
}

Usage would look as follows:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan sse.Event)
go func() {
	for ev := range ch {
		if ev.Type == "" {
			fmt.Printf("%s\n\n", event.Data)
		}
	}
}()

config := &see.ConnectConfig{
	/* your options */
}

r, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:8000", http.NoBody)
if err := sse.Connect(r, ch, config); err != nil {
	fmt.Fprintln(os.Stderr, err)
}

It is not that much shorter, but assuming that the context comes from elsewhere and that the configuration is already defined, the code necessary for establishing a connection is significantly shorter – creating an http.Request and calling Connect. Connection with the default configuration would also not need a separate top-level function – just pass nil instead of a ConnectionConfig!

There are two important changes here, though:

  • checking that the received events are of the desired type is now the user's responsibility
  • new event listeners cannot be added (so easily) on the fly – the user would have to implement this themselves

For example, if we receive three diferent event types and we handle them differently, previously one could do:

conn.SubscribeEvent("a", func(e sse.Event) {
	aCh <- e
})
conn.SubscribeEvent("b", func(e sse.Event) {
	bCh <- e
})
conn.SubscribeEvent("c", func(e sse.Event) {
	cCh <- e
})
if err := conn.Connect(); err != nil {
	// handle error
}

With this change, it would look like this:

evs := make(chan sse.Event)
go func() {
	for e := range evs {
		switch e.Type {
		case "a":
			aCh <- e // or just handle them here instead of sending them on another channel
		case "b":
			bCh <- e
		case "c":
			cCh <- e
		}
	}
}()
if err := sse.Connect(req, evs, nil); err != nil {
	// handle error
}

On the flipside, simple requests would be easier to make. Consider a request to ChatGPT:

prompt, _ := json.Marshal(OpenAIRequest{
	Model:    "gpt-4-1106-preview",
	Messages: msgs,
	Stream:   true,
})
r, _ := http.NewRequest(http.MethodPost, "https://api.openai.com/v1/chat/completions", strings.NewReader(string(prompt)))
r.Header.Add("Authorization", fmt.Sprintf("Bearer %s", OpenAIKey))
r.Header.Add("Content-Type", "application/json")
conn := sse.NewConnection(r)

conn.SubscribeMessages(func(ev sse.Event) {
	events <- ev // it is processed elsewhere
})

if err := conn.Connect(); err != nil {
	/* handle error */
}

This would be the new version:

prompt, _ := json.Marshal(OpenAIRequest{
	Model:    "gpt-4-1106-preview",
	Messages: msgs,
	Stream:   true,
})
r, _ := http.NewRequest(http.MethodPost, "https://api.openai.com/v1/chat/completions", strings.NewReader(string(prompt)))
r.Header.Add("Authorization", fmt.Sprintf("Bearer %s", OpenAIKey))
r.Header.Add("Content-Type", "application/json")

if err := sse.Connect(r, events, nil); err != nil {
	/* handle error */
}

There are obvious benefits:

  • much less boilerplate – no more NewConnection -> SubscribeMessages -> Connect
  • it is not possible to connect without receiving the messages
  • given that the connection code is shorter, focus is moved on the creation of the request
  • handling the response data happens directly in user's code – there's no function boundary to separate business logic, no inversion of control

As an analogy, imagine if the net/http.Client would be used something like this:

conn := http.Client.NewConnection(req)
conn.HandleResponse(func(res *http.Response) {
	// do something with the response
})
if err := conn.Connect(); err != nil {
	// handle error
}

It would be painful to use.

The main advantage of the new API would be, I believe, that the control of the response is fully in the library user's hands. There are no callbacks one needs to reason about; there is no need for the user to look up the source code to find out how the Connection behaves in various respects – for example, in what order the event listeners are called; finally, in a paradoxical manner there would be one single way to do things: for example, if one wants to handle multiple event types, currently they can register multiple callbacks for each event, or write the same switch code as above inside a callback passed to SubscribeAll. Also, it would be much easier to maintain – this change would result in ~200LOC and 6 public API entities being removed. This reduction in API surface reduces documentation and in the end how much the user must learn about the library in order to use it effectively.

Looking forward on your input regarding this API change!

@tmaxmax tmaxmax added the proposal New APIs, changes to existing APIs and behaviors label Feb 6, 2024
@tmaxmax tmaxmax added this to the v1 milestone Feb 6, 2024
@tmaxmax tmaxmax mentioned this issue Feb 6, 2024
29 tasks
@tmaxmax tmaxmax added help wanted Extra attention is needed and removed help wanted Extra attention is needed labels Feb 6, 2024
@hspaay
Copy link

hspaay commented May 19, 2024

Just my 2c's as far as I understand this.

  • As a general point of view, the more code can be removed the better, especially if that code is not part of the problem that is being solved.
  • connecting using a single Connect function with parameters that describe what is needed is more self descriptive that multiple methods. The parameters are my guide that is easier to discover than having to remember there are multiple steps needed.
  • Using an options struct in Connect() with optional configuration, such as max buffer size, would fit here nicely. It also matches the http connect approach.
  • I've been using SSE events with and without subscription and ended up not using it. The switch-case approach is just fine for all my use-cases and the code is easier to follow and debug. Subscribing to events adds the burden of unsubscribing with little benefit. Less mental load so a plus a my book.
  • The only counter argument to providing an event handler on Connect, is when there is a need to set the event handler after connection is made. For example when the connection must be established before the event handler is instantiated. Perhaps this can still be accommodated with a 'SetHandler' method.

So in short, these look like good improvements.

@tmaxmax
Copy link
Owner Author

tmaxmax commented May 19, 2024

Thank you for the feedback!

As an observation, unsubscribing is not actually required for the correctness of the program. It is there to enable the user to stop receiving events on the given subscription. This reinforces that the API presents a set of unnecessary confusion points.

With regards to adding event handlers on the fly, this could still be achieved with some userland code. One possible solution could be something like this:

type handlerRegistration {
    eventType string
    handler   func(sse.Event)
}

handlers := map[string]func(sse.Event){} // the key is the event type
handlersCh := make(chan handlerRegistration)
ch := make(chan sse.Event)

go func() {
    for {
        select {
        case e := <-ch:
            if h := handlers[e.Type]; h != nil {
                h(e)
            }
        case r := <-handlersCh:
            handlers[r.eventType] = r.handler
        case <-ctx.Done():
            return
        }
    }
}()

// the call to sse.Connect

This is actually very similar to how it's implemented internally. The thing is this code would reside in the user's codebase, which means it can be adjusted to whatever their needs are – subsequent event handler registration could be implemented in a totally different manner, if that's more fit.

There could maybe be some type which would wrap the events channel and provide the handler registration functionality separately but honestly for the reasons above I'd rather just not have go-sse offer a solution for this. It seems as if the most flexibility is achieved by doing nothing, in this case.

To collect more data points, what's a concrete scenario where adding handlers after connecting was useful or necessary?

@hspaay
Copy link

hspaay commented May 20, 2024

Thanks for the example. It makes a lot of sense. No concerns there.

The use-case for adding handlers after connecting is when the connection is required as part of initialization of a service so the service can receive async responses. This also makes it easy to use different transports for testing or based on configuration.

If the client is an end-user (web browser) then it would be the other way around as the application starts before the connection is established or re-established.

This isn't a deal breaker as it is easy enough to create a helper that is added as SSE listener before the connection is established and passed to the service after the connection is successful. Just a bit more work. Not a big deal. There are many ways to cut this apple. :)
I hope this explanation makes sense.

@slimsag
Copy link

slimsag commented Jul 13, 2024

Question: where does specifying a custom http.Client fit into the new API design?

One thing that is common throughout other codebases is using a subset of an *http.Client rather than the whole thing, e.g.:

// A Doer captures the Do method of an http.Client.
type Doer interface {
	Do(*http.Request) (*http.Response, error)
}

Unless go-sse needs a full *http.Client for some underlying reason, it might be nice to similarly have go-sse take this interface instead of a full http.Client. Just an idea :)

@tmaxmax
Copy link
Owner Author

tmaxmax commented Jul 14, 2024

@slimsag the sse.ConnectConfig would have a Client *http.Client field.

When it comes to mocking the client, I think the canonical way to do it in Go is to actually use a mock http.RoundTripper for the client's Transport field. This is how I've been doing it and how I'm doing it for example in this library's tests.

I also wouldn't add another API entity just for mocking when the same can already be achieved with the standard library directly, in order to keep go-sse's footprint minimal.

With all these being said, it will
be possible to mock the client.

@slimsag
Copy link

slimsag commented Jul 14, 2024

I actually don't mean for mocking (although it is useful for that, too) - I really mean for using other HTTP clients (typically a wrapper of an *http.Client that have other behavior, e.g. tracing/metrics, logging, etc.)

@tmaxmax
Copy link
Owner Author

tmaxmax commented Jul 14, 2024

I understand now. I actually happen not to be very familiar with this pattern – are there some open-source projects which employ it or some libraries that help with creating such clients?

I'm trying to determine whether there is something either unachievable through http.Client.Transport to this pattern or which provides significant advantages over just using that, and how widespread it is.

To me at a first glance, if one wishes to log stuff around a request, a wrapper http.RoundTripper could be used to that effect with equal success.

@tmaxmax
Copy link
Owner Author

tmaxmax commented Jul 14, 2024

Thank you for the plethora of examples!

I find it interesting how in some of them, namely Sourcegraph, Cilium, the implementors of that interface in the respective files actually wrap an http.RoundTripper. I haven't explored usage in the other codebases or beyond the linked source files, so admittedly this is a very low sample size to draw conclusions from.

I'll most certainly revisit this once I start working on the client. If other people see this discussion and would be interested in having this, please leave a reaction to any of the relevant messages.

@tmaxmax
Copy link
Owner Author

tmaxmax commented Aug 24, 2024

Appendix 1: connection state handling

As discussed in #37, right now the only way to determine that the client has connected to the server is by having the server send an event after the connection is established. This should not be necessary, though – the browser's EventSource client has readyState, which announces when a connection is established, lost, or the client is closed.

Based on its specification something similar could be implemented in Go. Here's how it could look like together with the new client proposal:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

events := make(chan sse.Event)
status := make(chan sse.ConnectStatus)

go func() {
	for {
		select {
		case s := <-status:
			/* do something with the status */
		case e, ok := <-events:
			if !ok { return }
			/* consume event */
		}
	}
}()

config := &see.ConnectConfig{
	Status: status,
	/* other options */
}

r, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:8000", http.NoBody)
if err := sse.Connect(r, events, config); err != nil {
	fmt.Fprintln(os.Stderr, err)
}

In other words, the new client could take through the Config struct an optional Status channel, on which the connection status would be announced. A disadvantage to having ConnectConfig.Status would be that this prohibits a design for ConnectConfig which enables its reusability across multiple Connect calls, especially concurrent ones – you don't want to receive statuses from multiple Connects on the same channel.

  • an alternative could be to use an options pattern, something like:
sse.Connect(r, events,
	sse.HTTPClient(...),
	&sse.Backoff{...},
	sse.Buffer(...), // corresponding to the current sse.Connection.Buffer
	sse.Status(status))

This would massively bloat the library and would also ask for lengthy names (in order to keep it in a top-level sse package) so I'd like to avoid it.

  • another alternative is to make it a separate parameter to Connect:
// valid usages of sse.Connect
sse.Connect(r, events, nil, nil)
sse.Connect(r, events, nil, config)
sse.Connect(r, events, status, config)

As seen above, usage without status announcement would be achieved by simply passing a nil channel. I do not like this because the simple usage (no config, no status) would have two magic nils in the parameter list and a mandatory parameter doesn't indicate by means of language that it is permitted to be nil, as struct fields do (they're optional). I do like this, on the other hand, because it allows reusable design for ConnectConfig, and makes it very easily discoverable that you can also track the connection status.

  • a third alternative would be to keep a single channel and receive on it some sse.ConnectResponse (naming not important) with a structure like:
type ConnectResponse struct {
    Status ConnectStatus
    Event  Event
}

I do not like this because it is a discriminated union in disguise, for which Go doesn't have any syntactic support – correct usage is not intuitive but must be documented. It also adds a new top-level type, which means API surface bloat, and bloats usage which doesn't need to check the connection status.

Usage aside, here's what ConnectStatus could look like:

type ConnectStatus struct {
    Kind       ConnectStatusKind
    RetryError error
}

type ConnectStatusKind int

const (
    ConnectStatusKindConnecting ConnectStatusKind = iota
    ConnectStatusKindOpen
    ConnectStatusKindClosed
)

Not having be it just an enum type makes it more useful – through the RetryError field errors which triggered a backoff & reconnect sequence could be reported and handled, which would remove the need for the current sse.Client.OnRetry callback. Having the Closed status is redundant in our case – sse.Connect will return when closed and the events channel will also be closed, so it could be removed from the API.

The sequence in which status updates would be sent would be the following:

  1. ConnectStatus{Kind: Connecting, RetryError: nil} for the first connection attempt
  2. (if successful) ConnectStatus{Kind: Open, RetryError: nil}
  3. (if failed) ConnectStatus{Kind: Connecting, RetryError: <some error>}, then back to 2 on connection success

A RetryTime field could also exist on ConnectStatus, if knowing the backoff time is desired.

This would be one way of handling connection state announcement. Let me know what you think!

@tmaxmax
Copy link
Owner Author

tmaxmax commented Aug 24, 2024

Appendix 2: sse.ReadResponse and OpenAI streams

For reading OpenAI-style streams the backoff & retry functionality, connection status and other things the normal client provides are not necessary. These streams do not require a long-running connection, but merely to be parsed until EOF – and they always have an EOF.

With these in mind I propose a simpler solution than sse.Client or the proposed sse.Connect:

 prompt, _ := json.Marshal(OpenAIRequest{
	Model:    "gpt-4-1106-preview",
	Messages: msgs,
	Stream:   true,
})
r, _ := http.NewRequest(http.MethodPost, "https://api.openai.com/v1/chat/completions", strings.NewReader(string(prompt)))
r.Header.Add("Authorization", fmt.Sprintf("Bearer %s", OpenAIKey))
r.Header.Add("Content-Type", "application/json")

res, err := http.DefaultClient.Do(r)
if err != nil {
	/* handle error */
}
defer res.Body.Close()

for ev, err := range sse.ReadResponse(res.Body) {
	if err != nil {
		/* handle read error */
		break
	}
	/* do something with event */
}

// if at this point without error then EOF was succesfully reached

A minimal footprint solution exactly for this use-case.

As it can be deduced from the example, it makes use of Go 1.23 iterators, so the header of ReadResponse would be:

package sse

func ReadResponse(r io.Reader) func(func(Event, error) bool)

The return value would be replaced with iter.Seq2[Event, error] on the release of Go 1.24 – otherwise go-sse's promise of supporting the 2 major versions supported by the Go team would be broken. Syntax for Go 1.22 would, of course, be a little uglier:

sse.ReadResponse(res.Body)(func(ev Event, err error) bool {
	if err != nil {
		/* handle error */
		return false
	}
	/* handle event */
	return true
})

I expect that most people are on the latest version of Go anyway, so impact should be minimal. Plus, Go 1.22 users can still benefit from this syntax by enabling the "range-over-func" experiment.

This proposal comes as a response to #38, which proved that the current client is not a good fit for these requests. While the sse.Connect proposal would make handling OpenAI streams nicer, it's still a lot of unnecessary boilerplate and an overkill tool. Besides these subjective measures there is also an objective issue that sse.Connect has, for which I'm not able to find a satisfying solution, it's the same one the current client also has and which has determined the linked PR to emerge: the EOF handling. sse.Connect by default would still retry on io.EOF, which is not desired for these kind of requests. A solution could be to have the EOF behavior configurable:

// EOF as failure by default...
sse.Connect(r, events, &sse.ConnectConfig{
	TreatEOFAsSuccess: true,
})
// ...or EOF as success by default
sse.Connect(r, events, &sse.ConnectConfig{
	RetryOnEOF: true,
})

Regardless of naming choice, the issues (besides the ones pointed to above) I have with this approach are:

  • one of the use-cases (long-running connections vs one-shot requests) will be syntactically favoured over the other, depending on the chosen default. For the other use-case there will always be the "Don't forget the EOF flag" gotcha
  • other configuration options apart from buffering and HTTP client do not make sense for OpenAI streams – by not trying to combine the two use-cases less confusion will emerge and less will have to be documented
  • the two use-cases cannot be individually developed, which may limit improvements specific to one use-case in the future

Furthermore, neither OpenAI themselves do not provide a full-fledged client to read their responses – in the OpenAI API reference they show examples using their own Python & Node SDKs which include only a stream parser, exactly what ReadResponse would be. There is precedent for this approach – go-sse would be the only Go library to provide such an utility.

When it comes to configuration options, the HTTP client is covered: ReadResponse just expects an io.Reader so its origin does not matter. Library users can choose the de-facto http.Client, a mock one, read a dump from a file or whatever other reader source they desire.

The other configuration option which may need to be tackled is the read buffer (an equivalent to the current sse.Connection.Buffer or a possible sse.ConnectConfig.BufferSize). The parsing internals use a bufio.Scanner and if the buffer is not big enough for an event then that event is dropped (see #2, the first substantial PR in this repo, which fixes exactly this). I personally am not familiar with how big OpenAI events can be – and by consequence whether buffer size can be an issue or not – but if it is, here are some proposed designs:

  • take a bufio.Scanner instead of an io.Reader
    This way the buffer can be configured outside by the user with nothing added to our API. The only gripe I'd have with this design is that it leaves space for the scanner to be used beforehand, which will cause a panic in ReadResponse as it will need to change the SplitFunc of the scanner, operation not supported after reading is started. Anyway, correct code would look like this:
sc := bufio.NewScanner(res.Body)
sc.Buffer(buf, 1<<16) // or other max size, not important

for ev, err := sse.ReadResponse(sc) {
	/* handle event or error */
}

This may also limit development of the internal parser in the future, as it basically forces it to continue using bufio.Scanner underneath.

  • pass a max token size or both a byte slice and a size as additional parameters:
// Variant 1: max token size only, ReadResponse creates the buffer
// use default max token size
sse.ReadResponse(res.Body, 0)
// max token size provided)
sse.ReadResponse(res.Body, 1<<16)

// Variant 2: both buffer and max token size
// let ReadResponse create the buffer, use default max token size
sse.ReadResponse(res.Body, nil, 0)
// let ReadResponse create the buffer, max token size provided)
sse.ReadResponse(res.Body, nil, 1<<16)
// own buffer, max token size provided
sse.ReadResponse(res.Body, buf, 1<<16)

This is not as leaky of an abstraction as the first solution and doesn't have misusage issues (though it requires quite some documentation for the buffer-related parameters). Having these additional parameters takes away the elegance and simplicity of the initial solution, though.

  • have both a simple sse.ReadResponse and some specialized sse.ReadResponseBuffer
    The specialized version could be subsequently added, if there is demand. (I'd honestly wish that the default buffer size is enough – or find a way to remove this issue caused by the bufio.Scanner altogether)

This would be in my view a better solution for parsing OpenAI-style streams. Let me know what you think!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal New APIs, changes to existing APIs and behaviors
Projects
None yet
Development

No branches or pull requests

3 participants