Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The sliding buffer default size in sse is too short #43

Closed
CreativeQuantum opened this issue Nov 20, 2023 · 2 comments · Fixed by #47
Closed

The sliding buffer default size in sse is too short #43

CreativeQuantum opened this issue Nov 20, 2023 · 2 comments · Fixed by #47

Comments

@CreativeQuantum
Copy link

Using function calls, maybe because it's a tad slower, I realized I often had problems when parsing streamed results, as some tokens would get dropped along the way.

(loop [s (<!! c)] ;; sse channel
  (let [fc (-> s :choices first :delta :function_call)
        x  (or (:name x) (:arguments x))]
    (when x
      (Thread/sleep (rand 100)) ;; simulate delay
      (print x)
      (flush)
      (recur (<!! c)))))

Explicitly passing :max_tokens 1000 fixes this issue.

(defn calc-buffer-size
"Buffer size should be at least equal to max_tokens
or 16 (the default in openai as of 2023-02-19)
plus the [DONE] terminator"
[{:keys [max_tokens]
:or {max_tokens 16}}]
(inc max_tokens))

I think its too low. My use case is fairly basic. Query, print as characters are appended to a StringBuffer then parse.

Also OpenAI API Reference states the following now, not 16:

max_tokens
integer or null
Optional
Defaults to inf
The maximum number of tokens to generate in the chat completion.

@CreativeQuantum
Copy link
Author

Easy fix: increase the value
Smarter fix: introduce a new stream_buffer_len option I can pass when calling api/create-chat-completion and friends with this behavior:

  • sliding-buffer when it is passed
  • sliding buffer of length max_tokens if it is passed
  • fallback to infinite-length buffer.

Here is the patch I just developed to fix this:

(require '[clojure.core.async.impl.protocols :as impl])
(import  '[clojure.lang Counted]
         '[java.util    LinkedList])

(deftype InfiniteBuffer [^LinkedList buf]
  impl/UnblockingBuffer
  impl/Buffer
  (full? [_this]
    false)
  (remove! [_this]
    (.removeLast buf))
  (add!* [this itm]
    (.addFirst buf itm)
    this)
  (close-buf! [_this])
  Counted
  (count [_this]
    (.size buf)))

(defn infinite-buffer []
  (InfiniteBuffer. (LinkedList.)))

(defn calc-buffer-size
  "- Use stream_buffer_len if provided.
   - Otherwise, buffer size should be at least equal to max_tokens
     plus the [DONE] terminator if it is provided.
   - Else fallbacks on ##Inf and use an infinite-buffer instead"
  [{:keys [stream_buffer_len max_tokens]}]
  (or stream_buffer_len
      (when max_tokens (inc max_tokens))
      ##Inf))

(defn make-buffer [params]
  (let [size (calc-buffer-size params)]
    (if (= size ##Inf)
      (infinite-buffer)
      (a/sliding-buffer size))))

(defn sse-events
  "Returns a core.async channel with events as clojure data structures.
  Inspiration from https://gist.github.com/oliyh/2b9b9107e7e7e12d4a60e79a19d056ee"
  [{:keys [request params]}]
  (let [event-stream ^InputStream (:body (http/request (merge request
                                                              params
                                                              {:as :stream})))
        ;; ------ Modifications here
        events (a/chan (make-buffer params) (map parse-event))]
        ;; ------ End modifications.
    (a/thread
      (loop [byte-coll []]
        (let [byte-arr (byte-array (max 1 (.available event-stream)))
              bytes-read (.read event-stream byte-arr)]

          (if (neg? bytes-read)

            ;; Input stream closed, exiting read-loop
            (.close event-stream)

            (let [next-byte-coll (concat byte-coll (seq byte-arr))
                  data (slurp (byte-array next-byte-coll))]
              (if-let [es (not-empty (re-seq event-mask data))]
                (if (every? true? (map #(a/>!! events %) es))
                  (recur (drop (apply + (map #(count (.getBytes %)) es))
                               next-byte-coll))

                  ;; Output stream closed, exiting read-loop
                  (.close event-stream))

                (recur next-byte-coll)))))))
    events))

This fixes my buggy test-case above, and when I introduce :stream_buffer_len 16, it reintroduces the problematic behavior I described.

It took me several days to figure out what was the source of this problem, since I use rather complex core.async code downward the API call and I assumed it was the cause.

Note: I posted this issue 5 days ago but my account got sh4d0wbanana5. Took time to figure that out too.

@wkok
Copy link
Owner

wkok commented Nov 23, 2023

Thanks for this. Glad you have a workaround for now, I'll find some time to see how best to incorporate your proposed fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants