Join GitHub today
GitHub is home to over 28 million developers working together to host and review code, manage projects, and build software together.Sign up
Cannot keep stream open with puma server #1035
I cannot get a stream to stay open (which is necessary for Server-Sent Events to work) with Sinatra running on Puma. I don't really have any idea why, but the connections close right away.
require 'sinatra' set :server, :puma set :connections,  get '/stream', provides: 'text/event-stream' do stream :keep_open do |out| settings.connections << out out.callback do puts 'stream closed' settings.connections.delete(out) end end end
Then I run
If I have the server as
This could be related to the problems on #962, but I'm using MRI (tested on 2.1.5p273) so it might be unrelated to any problems JRuby has.
referenced this issue
Sep 15, 2015
@zzak, when I made this issue, it was probably whatever was the lastest rack at the time. I just tried it again and it still was a problem.
Here's the Gemfile
source 'https://rubygems.org' gem 'puma' gem 'sinatra'
and the Gemfile.lock
referenced this issue
Feb 18, 2016
I can confirm that this is an issue. Puma identified it in this issue, closed as won't fix: puma/puma#3
Puma doesn't use EventMachine due to it's threading model, as explained here: WardCunningham/Smallest-Federated-Wiki#331
However, I think that it may be possible to write a Puma adapter.
This is my proposal at a high level:
However, I'm not sure how to handle this case or if it matters at all for using Puma with Sinatra. Perhaps that one could be left alone for now.
Anyway, that's my thoughts on it -- feedback is much appreciated
Okay, never mind! I was able to get a working SSE using of the
I was overthinking it by a long shot. Since Puma will handle the threading and connections, all you have to do is start an event loop and write any new data to the stream as it is generated. Puma will kill it for you when the connection dies.
My implementation for SSE's works like this:
get '/events', provides: 'text/event-stream' do protected! response.headers['X-Accel-Buffering'] = 'no' # Disable buffering for nginx stream do |out| out << latest_events loop do while event = settings.next_events.shift do out << event unless out.closed? end end end end
This might seem overly simplistic, but I arrived on this after many other more complicated designs. Puma detects that the connection has closed and it just kills the thread associated with it -- which kills the loop.
Short and sweet.
I'm curious how you made this work. I've tried doing that, and the puma server keeps dropping the connections.
I inspected the code, and it seems that sinatra doesn't "respect" the official rack hijack API.
Puma passes the rack.hijack object down below (it seems it is the Puma::Client object). Puma::Client gets hijacked only when #call method is called. This is the standard Rack Hijack API call.
Rack::Lint middleware adds an extra wrapper to it; Before the request gets passed to the application, an HijackWrapper gets set as
When using the
... if env['rack.hijack?'] && headers['rack.hijack'] ...
This should be true, but the second condition fails.
This leads me to believe that the stream helper is not doing enough, that is, it doesn't set the response hijack header, and the API is therefore only compatible with Thin.
You can take a look at the pull request we've got going here: Shopify/dashing#677
It ended up being a bit more complex for my use case, since the data being streamed was generated in another function previously.
I ended up keeping track of a queue of events for each connection and filling that array outside of the SSEs event loop. Inside the SSEs event loop, I simply drained the array that corresponded to the connection is being handled in an endless loop.
The endless loop is killed by Puma when the connection closes, so eventually these event queues start filling up if you don't remove them after the connection terminates. I decided to simply check the size of this array and if it grew to a maximum size, removed the connection is from the connection map. This is an ad-hoc way to detect disconnect events but it also serves as a natural throttling mechanism for poorly performing connections.
I am hoping I figure out a better way to detect disconnects, so that the memory space of that connections event queue can be freed right away.
@tylermauthe already read the solution and played around with it. From what I understood, you are letting the connections die and the client reconnect, while keeping track of the messages in between (correct me if I'm wrong).
That is different than the EM version, in which the connections are long-running and don't block each other on read/write. I'm trying to do this with puma, but it seems impossible, as the client socket is only non-blocking on reads, there is no way of reattaching it to the event loop even for writes, clients block as soon as you reach max-threads and all this is basically impossible without hijacking the io and handle the event loop somewhere else (I think this is what actioncable does with nio4r).
@TiagoCardoso1983 - It's been a bit of a journey, but I've got a stable SSE working for my application. I'll post what I've learned below -- it may be more than you need to know but hopefully other people can find this a useful primer in the future.
Long story short, building SSEs in Puma requires a fairly decent understanding of concurrency. Generally, this means you must protect concurrent access to shared resources (I/O, memory, etc) via Mutexes.
Firstly, I am not letting connections die. SSE's are long running streams of data -- reconnections would make the technology no better than AJAX polling. My application is already pretty chatty with an SSE, I certainly don't want to increase this by using a broken SSE implementation...
The main difference between Thin and Puma is the use of threads to enable concurrency. Thin uses a single thread, with EventMachine providing time-sharing capabilities via callbacks to mock concurrency. IE: EventMachine keeps track of the tasks the server has to do and switches between them fairly so that they all get done. This approach shields you from the horrors that come with concurrent programming, but doesn't take advantage of modern multi-core CPUs. You cannot scale a single process vertically, but you can run multiple processes side by side and use
Puma (Threads + Workers)
Puma has a pool of threads for each worker. The worker is essentially a fork of the process, so it will transparently enable the above model of running multiple processes pinned to a CPU. Each forked process (worker) allows you to spawn multiple threads. When hooked up to Rack, Puma will spawn a new thread for every request made to the Web Server. You can also spawn your own threads for things like DB access, file I/O and in-memory collection access. This allows you to scale very naturally with little effort, but you start to discover the evils of threads. Any time a limited resource is shared, issues around contention for that resource will occur. What happens when two threads want to do a good long read of the same file? Similarly, imagine you have a collection stored in memory and two threads attempt to append an item to it at the same instant -- which one wins?
The single-process callback model increases operational complexity in terms of ability to scale these apps. Containers can help out with resource utilization and scheduling across huge clusters of machines, but if you want to scale vertically on a big server you'll have to manually manage all of this stuff. Have fun when your configuration script accidentally sets the affinity for two of your processes to the same core and response time on your shopping cart submit page goes up >200%.
From a strictly development point of view, scaling across processes is fine until you want to share state across processes. Any information that is to be shared across processes requires a persistence layer (database, Memcache, Redis, whatever). This means that some state information MUST be duplicated (like the connection to the persistence layer) instead of being pooled.
Why Not Threads?
Threads do not have the same limitations on state access -- they all have access to the same memory locations. The blessing of concurrency also comes with the curse of contention for resources, so it's wise to minimize the amount of mutable data that is shared across threads. In general, the naive approach to most applications is to share all the mutable data all the time, everywhere (or at least it is for me!). For me, this means that developing with threads takes a bit longer because of the discipline and rigor involved in both developing and testing.
Good primer on concurrency: https://github.com/jruby/jruby/wiki/Concurrency-in-jruby
Enter the Semaphore
So, threads are great if you want to do something embarrassingly parallel, but what about when you want to do those pesky side-effect ridden things that your crusty, boring old users love to do. Users love mutable data and accessing limited resources. Things like keeping track of a user's session in memory so they can stay logged in to your dating website for beautiful stock photo models, saving their order for t-shirts to your database so that they'll eventually receive some hilarious swag to make their co-workers jealous or upload that really cute picture of their cat in a party hat to Facebook over their 3G connection.
To ensure that access to shared resources and mutable state is done safely, the Semaphore was created. The Semaphore (also called a mutex) acts a gate keeper that decides whether a scarce resource is available. Think of the bathrooms at a convenience store (scarce resource) -- if you want to access them you must first ask the clerk (semaphore) if they are available. If one is available, they will give you a key, if not you will have to wait. If you're waiting, the clerk will ask you to queue up until someone returns the key. Once the key is returned, the queue will be processed first-in-first-out until all requests for the bathroom are processed. Sadly, it's the only bathroom in town and there is a line around the block...
A New Class of Bug
The best part about threads is that they open up an entirely new class of bug: deadlocks( and livelocks). These occur when two semaphores depend on each other. If both semaphores will not change state until the state changes in it's companion, then you have a deadlock. If both semaphores change state at the same moment, causing a new conflict which results in a recursive series of state changes and resulting conflicts. Returning to the real life example above, imagine there are two bathrooms but the store had to cut costs and there is only one roll of toilet paper and one bar of soap between the two bathrooms. If two people want to use the bathrooms at the same time, this is fine as long as only one needs toilet paper and one needs soap. One day, a customer enters one bathroom with the toilet paper -- saying they're pretty sure they don't need soap. Another customer enters the other bathroom, stating that they only need soap and it's fine that the toilet paper is not available. Halfway through their business, both customers change their mind and realize they can't leave the bathroom until they get the other item (soap or toilet paper). Since neither occupant will surrender their item until they leave the bathroom and neither can leave the bathroom until they can access the others item, both bathroom occupants will eventually die of starvation. This is a deadlock!
For a great example of a livelock, see this wiki article.
Okay, that's great, but seriously how do I use Puma with Sinatra?
Carefully! If you follow the slightly crazy rant about concurrency above and do some of your own research, you'll find that reduction of mutable state access across threads and conservative usage of semaphores to protect important access to shared resources will be all the tools you need to replace EventMachine.
@TiagoCardoso1983 - For your specific use case, it sounds like you'll either want to protect access to your I/O resources with a Mutex or spawn a thread to do I/O and then store a queue of data to be read and written. Access to the queue will also need to be protected by a Mutex.
You are correct that clients will block when you hit
I don't know what kind of files you're trying to access, but if possible you should use a DBMS of some kind instead of a file and managing I/O will be a lot simpler. Even SQLite supports transactions (which are effectively a Mutex inside your DB).
@tylermauthe I guess we have different approaches. Spawning a thread for network I/O is overkill, it makes reaching the performance threshold quicker and is just not the solution for the I/O problem, which can be handled on multiple file handles on a single thread of execution (granted, not standard ruby approach). This is what EM does in a scalable way, in that it responds when is smth to read/write or some timer is triggered (or next_tick, which should be used with a grain of salt).
I guess this will be quite hard to handle using puma+sinatra only. In order for puma to free up the request handler thread for subsequent requests (my desired model), it requires the IO handle to be "hijacked" and handled somewhere by another thread with an independent reactor loop. This also requires the solution to support the standard Rack Hijack protocol and not the async.callback one (what sinatra is doing). This is what Faye does. Or maybe puma 3 allows an easier workaround, would have to check this. But the "tune the value of
@TiagoCardoso1983 I think what you're suggesting is changing the way Puma integrates with Rack -- I haven't looked into this option at all. If you could override the behaviour to spawn a new thread per connection, you could add a bunch of connections into a pool to be serviced by a handful of threads in series. This would be nice, especially if each connection is updated infrequently.
I guess you're handling many more connections that are much longer lived than I am, so tuning your max threads and workers may not be enough...
Also, after taking a closer read of the Puma docs -- I think Puma uses a Thread Pool for each worker. If that's the case, each request is put into a backlog and is serviced by a thread when one becomes available in the pool:
The code seems to support this line of thought, but I don't care to spend the time to parse the whole project so I'm likely mistaken:
Anyway, good luck and have fun. I've got my SSEs working swell and I'm pretty happy with it so far. If I learn anything new, I will update this thread. Please do the same if you make any headway on the Rack Hijack front.
Yeah, I just did a quick test. It almost certainly uses a thread pool in the way I described, each request goes into a queue and is serviced by a thread from a pool when it becomes available. So, it doesn't block when it hits max threads.
To test it, I set
That being said, my cheap-o Heroku free dyno supports 256 threads maximum and has 2-4 vCPUs, so I think I'll keep the thread pool limit cranked up to 50 with 4 workers...
@tylermauthe ok if it doesn't starve the puma thread pool it sounds better (what are you using for that?
Puma maintains it own thread pool, it just works. For background tasks, Rufus Scheduler is used. The background tasks update in-memory data structure which also pushes the changes as a command to the SSE. Rufus also creates its own pool of threads and each background task gets a thread (until the max is reached, then it starts context swapping).
Anyway, I've done a bunch more reading on reactor loops and thread pools and concurrency.
You're absolutely correct that a reactor loop + threads will give you better performance. A thread is an abstraction to help deal with CPU cores, whereas the reactor loop more cleanly abstracts the servicing of interrupts (like network and disk I/O). Thread pools are a bit better, as they let you get away with concurrency even on a single core, but their simplicity means your software doesn't get to decide when it will context swap. You can end up in crazy context-swapping madness if work starts piling up in your thread pool, and each context swap comes with nasty overhead... I still think that thread pools are good enough for my project, but even 200 / 300 concurrent connections would probably start context swapping pretty hard on fewer than 8 cores.
Did you make any headway with hijacking the socket from sinatra?
There are more things you can investigate if you want, like green threads, network socket event syscalls, etc etc, but ruby is relatively limited in support for all those.
I've been stalled for the last 4 weeks, making progress relatively slowly, but still promising, as my main proof of concept has been done. So, for my long-running case:
celluloid-io uses nio4r underneath, which is powering the current rails 4 actioncable event loop ditty. The concept they are using is basically the same, they're just reinventing the wheel a bit because they don't want the celluloid-suite as a dependency, but to sum it up, they also hijack the socket and pass it to a separate thread running the event loop, and then give the socket to the websocket-driver, which does the whole protocol magic. As I like the abstraction that celluloid-io provides, I prefer to go with it. Will keep you posted.
@zzak sure, but even better would be to provide some new stream API that works with stuff other than EventMachine. As you're preparing v2, I think it's time to break the public API a bit. The most glaring flaw of the
Main issues: it's dependent of EventMachine-only API (