Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Idle sub/sub connections #7

Open
mjeffrey18 opened this issue Nov 9, 2021 · 14 comments
Open

Idle sub/sub connections #7

mjeffrey18 opened this issue Nov 9, 2021 · 14 comments

Comments

@mjeffrey18
Copy link
Contributor

Hey @jgaskins, thanks for the amazing work on this shard!

I was looking at your fork of cable.cr and noticed you using this connection setup;

getter redis_subscribe = Redis::Connection.new(URI.parse(Cable.settings.url))
getter redis_publish = Redis::Client.new(URI.parse(Cable.settings.url))

I'm having a huge issue with long-running sockets and my debugging efforts have led me to believe Redis is losing connection with this block

spawn(name: "Cable::Server - subscribe") do
  redis_subscribe.subscribe("_internal") do |subscription|
    subscription.on_message do |channel, message|
      fiber_channel.send({channel, message})
    end
  end
end

I was curious why you used Redis::Connection for the sub/sub part?

I've manually run Redis publish commands using the CLI etc and nothing is coming through and I can see the connection on the client list but for some reason, nothing is coming into this block.
This issue only happens after an hour or so...

I started implementing a ping/pong server every 3 seconds to attempt to keep the connection open but looks like this is not possible with Redis::Connection unless I'm mistaken?

Would love to hear if you have experienced something similar and also if you think having a ping/pong operation on Redis::Connection may help solve this issue?

@jgaskins
Copy link
Owner

jgaskins commented Nov 9, 2021

I was curious why you used Redis::Connection for the sub/sub part?

It’s not necessary to use one or the other here (you should be able to use a Client anywhere you use a Connection), just an “I don’t need a full-on Client here” sort of thing. The main difference between Client and Connection is that Client can be used in multiple fibers. Connection just wraps a TCPSocket, so its use needs to be confined to a single fiber.

Running a command against a Client is implemented as checking out a Connection, passing the command to it, and then checking the connection back in.

Since subscribing monopolizes a connection and blocks a single fiber basically for the lifetime of the application, it didn’t need a Client’s connection pool or delegation. That was the only reason. 😄

I started implementing a ping/pong server every 3 seconds to attempt to keep the connection open but looks like this is not possible with Redis::Connection unless I'm mistaken?

It looks like the ping method just isn’t implemented yet. I haven’t added every Redis command mainly because there are so many and I don’t understand them all yet. 😄 But you can run any Redis command with redis.run({“command”, *args}) (this shard uses tuples instead of arrays for commands whenever feasible to minimize heap allocations, but run accepts any Enumerable), so you should be able to do it with redis.run({“ping”}).

Would love to hear if you have experienced something similar and also if you think having a ping/pong operation on Redis::Connection may help solve this issue?

I haven’t seen it with this shard specifically, but I don’t doubt it happens. At my previous company, we had Redis connection errors all the time using the Ruby redis gem in Kubernetes connecting to AWS Elasticache.

Periodic pings might help, but I actually don’t know if the Redis PING command would work on a connection blocked on a SUBSCRIBE. The pings would likely have to run in a separate fiber since the current one is blocked on this line. The same read call inside the run method means that we don’t have any guarantees on which one of those reads gets the pong reply.

Since most Redis commands are request/response, the retry mechanism built into Connection#run is often enough there, but maybe subscribe needs something similar. I’ll try to replicate this and, if so, see if I can come up with something to keep it alive even if it’s just to publish bogus messages that nobody will receive.

@mjeffrey18
Copy link
Contributor Author

Hi, @jgaskins thanks for the reply! Really appreciate it!

It's been a nightmare bug...
it seems to all work, then it all stops, no messages at all...
I tried a different Redis provider also, still no luck.

I ended up trying to hack together a pulse check type scenario and that actually made it worse...

private def subscribe
  spawn(name: "Cable::Server - subscribe") do
    redis_subscribe.subscribe("_internal") do |subscription|
      subscription.on_message do |channel, message|
        if channel == "_internal" && message == "ping"
          Cable::Logger.debug { "Cable::Server#subscribe -> PONG" }
        else
          fiber_channel.send({channel, message})
          Cable::Logger.debug { "Cable::Server#subscribe channel:#{channel} message:#{message}" }
        end
      end
    end
  end
end
require "schedule"

module Cable
  class RedisPinger
    @@started : Bool = false
    @@seconds : Int32 | Float64 = Cable.settings.redis_ping

    def self.run_every(value : Int32 | Float64, &block)
      @@seconds = value

      yield

      @@seconds = Cable.settings.redis_ping
    end

    def self.start(server : Cable::Server)
      new(server).start unless @@started
      @@started = true
    end

    def self.seconds
      @@seconds
    end

    def initialize(@server : Cable::Server)
    end

    def start
      runner = Schedule::Runner.new
      runner.every(Cable::RedisPinger.seconds.seconds) do
        check_redis_subscribe
        check_redis_publish
      rescue e
        @@started = false
        ExceptionService.notify(e)
        raise Schedule::StopException.new("Stopped")
      end
    end
    
    def check_redis_subscribe
      Cable.server.publish("_internal", "ping")
    end

    def check_redis_publish
      result = @server.redis_publish.run({"ping"})
      Cable::Logger.debug { "Cable::RedisPinger.check_redis_publish -> #{result}" }
    end
  end
end

Here is the scenario

  1. Restart dyno
  2. All sockets, Redis connection reset, all good
  3. Connect and keep to 15-25 clients open and start streaming data
  4. After 30-60 minutes everything stops working

Debug efforts

  1. Adding logging EVERYWHERE... haha
  2. I get nothing from the logs inside the redis_subscribe.subscribe("_internal") do |subscription| block
  3. I even try to use redis-cli to just manually send publish commands, nothing
  4. try to connect new clients - they all open sockets and get hooked up but nothing streams
  5. Old clients continue to stay connected but also nothing streaming
  6. I checked Redis-server for connection clients and can see 1 pubsub type still there...
  7. But whenever I try to do anything, nothing is received.

I've tried everything I could think of including

  • Using the other redis shard - same issue
  • passing in different combinations of connection pooling strings to the redis clients i.e. ?initial_pool_size=50&max_pool_size=50&max_idle_pool_size=50
  • ping/pong
  • created a debug JSON endpoint to understand what is connected and for how long (see below)
  • 3 different Redis providers
  • reached out to our regular Redis provider to ask if they close anything got a response below, however, not much I can seem to do with it...

The worst part is, we've been using ActionCable ruby for 2+ years and AnyCable Go for 6+ months without this issue.
So I know for sure there is something weird going on either with the way Crystal is holding the spawned connection stream or the fact that maybe the connection is open too long?

Unless I can find some other solution I'm thinking about using RabbitMQ instead of Redis, but means a lot more changes again so I guess it's the last resort.

Redis Labs response

Hi Marc,

Thank you for your patience and understanding.

Please note, some CONFIG GET/SET commands are blocked in Redis Enterprise Cloud, including timeouts. For more details, you can refer to this article Are you fully compatible with open source Redis?.

Our clusters are configured with timeout=0, and with TCP keepalive. So our server will only close the connection if the peer does not respond to the keepalive, but can stay idle for a long duration (low traffic subscriber for example).

The usual configuration is to keep the control on the application side (the connection owner) by setting a timeout on connections and transactions from the application side. (TCP keepalive is also recommended to make sure you are not keeping stale connections).

In general, the Redis client you are using should send the KeepAlive messages. If the Redis client does not send the KeepAlive messages, we start sending KeepAlives every 30 seconds after 120 seconds for 6 times. If no response is received then we will disconnect the socket.

If you want to look at your active connections you can use the CLIENT LIST command. If there are connections that you want to kill you can use the CLIENT KILL command.

It is also worth taking a look at Connection Pooling. Please let us know if you need further assistance.

debug json

def debug_json
  _channels = {} of String => Set(String)

  @channels.each do |k, v|
    _channels[v.first.class.to_s] ||= Set{k}
    _channels[v.first.class.to_s] << k
  end

  {
    "connections"         => @connections.size,
    "channels"            => @channels.size,
    "channels_mounted"    => _channels,
    "connections_mounted" => @connections.map do |key, connection|
      connections_mounted_channels = [] of Hash(String, String | Nil)
      @channels.each do |_, v|
        v.each do |channel|
          next unless channel.connection.connection_identifier == key
          connections_mounted_channels << {
            "channel" => channel.class.to_s,
            "key"     => channel.stream_identifier,
          }
        end
      end

      {
        "key"        => key,
        "identifier" => connection.identifier,
        "started_at" => connection.started_at.to_s("%Y-%m-%dT%H:%M:%S.%6N"),
        "channels"   => connections_mounted_channels,
      }
    end,
  }
end

@jgaskins
Copy link
Owner

@mjeffrey18 This is all super helpful! I couldn't reproduce it myself (I spun up a Redis server on DigitalOcean yesterday and subscribed to a subject on it from my house, sending a message once every 1-2 hours from a separate process, and it's still receiving messages this morning), so there might be something in the specific infrastructure you're running on that is different from what I'm seeing.

We can twiddle TCP keepalive bits in Crystal, so we might as well give that a shot. That should help avoid race conditions and other weird situations with my off-the-cuff suggestion above. 😄 I don't know what reasonable settings are for it, but maybe some tuning can help. The hard part is how long the feedback loop is.

@mjeffrey18
Copy link
Contributor Author

Hi @jgaskins, thanks you so much for trying to debug this issue.

Yeah, it seems to only happen intermittently. Which makes this way more annoying haha I've also never encountered the issue locally, only in staging/production. I've seen it be fine all day and sometimes within a short period of time.
The amount of channels/messages being sent doesn't seem to effect it more or less but I was initially convinced the more traffic > the worst it got, but now I'm not sure.

I posted this issue in the luckyframework Discord chat and looks like a good few others experienced the same issue at some point but no one was sure why it happened.

I think your onto something with your suggestion.
I found this - redis/redis#7855

If you can give me any pointers as to how I might implement/investigate those setting in Crystal in relation to this Shard, I can start poking around to see if I can find a solution while replicating the issue.

Thanks again for your support!

@jgaskins
Copy link
Owner

@mjeffrey18 Good find on that Redis issue! Good to know it's not just this shard or even just Crystal experiencing this problem, but at least we seem to be able to respond to it.

I imagine the best place to start with TCP keepalive probes would be where we create the socket. Right now, the only option we're setting is sync — we set it to false so we buffer a complete command before sending it and so that query pipelining remains fast.

If I understand it correctly, setting socket.tcp_keepalive_* there should be all that needs to be done to set up keepalive probes. I don't think it matters whether it's done above or below the sync setting since AFAICT they're all just setsockopt(3) calls.

The hard part is likely figuring out what to do if a keepalive probe closes the socket. This will likely raise an IO::Error on this line. I'm not all that sure how we would reinitialize the connection from there and resubscribe to all the channels, though.

@mjeffrey18
Copy link
Contributor Author

Thanks @jgaskins, really appreciate it!

I ended up going with this implementation for the first attempt.

def initialize(@uri = URI.parse("redis:///"))
  host = uri.host.presence || "localhost"
  port = uri.port || 6379
  socket = TCPSocket.new(host, port)
  socket.sync = false
  socket.tcp_keepalive_count = 3
  socket.tcp_keepalive_idle = 60
  socket.tcp_keepalive_interval = 30

I'll keep you posted if we managed to survive a few days without any issues...

Regarding the recovery, if we get this first part working I'll look at a way to simulate the disconnect then see how to gracefully recover.

Thanks again for your help so far!

@mjeffrey18
Copy link
Contributor Author

Hey @jgaskins it didn't work out, it stalled a few hours after deploying.

However, I may have found the issue :-)

https://redis.io/topics/clients#output-buffers-limits

Turns out one of our WS channels can on occasion send a fair amount of data (by mistake). It only happens in rare cases if the record has a ton of associations.

I believe the buffer is being reached, the socket gets killed by the Redis server and the client just silently did nothing... Until I added the above tcp_keepalive stuff, which caused the client to crash and give me some hint to what may be going on!

Anyways, will check again for the next few days and report back. 🙏

@jgaskins
Copy link
Owner

@mjeffrey18 Nice discovery! I didn't realize pub/sub set different I/O limits than plain data-structure queries, but it makes sense.

@jwoertink
Copy link
Contributor

I wonder if this is the same thing I'm seeing in my app related to cable-cr/cable#44 🤔 Since it looks like this shard is just using db/pool, maybe moving Cable over to this would fix the issue? I'll try to give that a shot and test it out on my app. If anyone has any other ideas/suggestions on what we can do to improve Cable, let me know!

@jgaskins
Copy link
Owner

My fork of cable that @mjeffrey18 mentioned in the issue description uses this Redis shard, but it hasn't kept up with upstream. @mjeffrey18 actually proposed that some of my updates be incorporated upstream, so some of the changes over there might be the same as on my fork, but I'm not sure. Either way, feel free to give it a shot. 😄

@mjeffrey18
Copy link
Contributor Author

Hey @jwoertink / @jgaskins, apologies for the delayed response.
I ended up going with various different approaches, and our backend has been purring along for months without any issues

Screenshot 2022-06-26 at 15 11 45

  1. I'm still not using the official cable shard yet. I needed soo many changes. I pushed a lot of our custom changes upstream a while back. However, I've still not managed to push the rest of the changes as they are borderline breaking for most. For instance, I'm using @jgaskins Redis shard with some tweaks. Plus I'm using the schedule shard used in @jgaskins fork hugoabonizio/schedule.cr
  2. I've monkey patched that redis shards connection to include a few extra lines to help mitigate TCP timeout issues
module Redis
  class Connection
    def initialize(@uri = URI.parse("redis:///"))
      host = uri.host.presence || "localhost"
      port = uri.port || 6379
      socket = TCPSocket.new(host, port)
      socket.sync = false

      # new lines added to help with tcp_keepalive
      count, idle, interval = Config.settings.tcp_keepalive_settings.split(",").map(&.chomp.strip.to_i)
      socket.tcp_keepalive_count = count
      socket.tcp_keepalive_idle = idle
      socket.tcp_keepalive_interval = interval
      # end

      # Check whether we should use SSL
      if uri.scheme == "rediss"
        socket = OpenSSL::SSL::Socket::Client.new(socket)
        socket.sync = false
      end

      @socket = socket
      @parser = Parser.new(@socket)

      pipeline do |_redis|
        # Authentication
        if password = uri.password
          run({"auth", password})
        end

        # DB select
        db = if {"", "/"}.includes?(uri.path)
               "0"
             else
               uri.path[1..-1]
             end
        run({"select", db}) unless db == "0"
      end
    end
  end
end
  1. I'm using the connection pooling setting for the Redis connections
class Config
  DEFAULT_REDIS_POOLING_QUERY    = "?initial_pool_size=50&max_pool_size=50&max_idle_pool_size=50"
  DEFAULT_POSTGRES_POOLING_QUERY = "?initial_pool_size=5&max_pool_size=5&max_idle_pool_size=5"
  DEFAULT_TCP_KEEPALIVE_SETTINGS = "3,60,30"
  DEFAULT_ALLOWED_ERRORS         = "Lucky::UnknownAcceptHeaderError"
  Habitat.create do
    setting app_name : String = ENV.fetch("APP_NAME", LuckyEnv.environment)
    setting redis_url : String = ENV.fetch("REDIS_URL", "redis://localhost:6379"), example: "redis://localhost:6379"
    setting redis_pooling_query : String = ENV.fetch("REDIS_POOLING_QUERY", DEFAULT_REDIS_POOLING_QUERY)
    setting postgres_pooling_query : String = ENV.fetch("POSTGRES_POOLING_QUERY", DEFAULT_POSTGRES_POOLING_QUERY)
    setting tcp_keepalive_settings : String = ENV.fetch("TCP_KEEPALIVE_SETTINGS", DEFAULT_TCP_KEEPALIVE_SETTINGS)
    setting allowed_errors : Set(String) = ENV.fetch("ALLOWED_ERRORS", DEFAULT_ALLOWED_ERRORS).split(",").map(&.chomp.strip).to_set
  end
end

# In the cable main class

module Cable
  VERSION = "0.1.0"

  INTERNAL = {
    message_types: {
      welcome:      "welcome",
      disconnect:   "disconnect",
      ping:         "ping",
      confirmation: "confirm_subscription",
      rejection:    "reject_subscription",
      unsubscribe:  "confirm_unsubscription",
    },
    disconnect_reasons: {
      unauthorized:    "unauthorized",
      invalid_request: "invalid_request",
      server_restart:  "server_restart",
    },
    default_mount_path: "/cable",
    protocols:          ["actioncable-v1-json", "actioncable-unsupported"],
  }

  def self.message(event : Symbol)
    INTERNAL[:message_types][event]
  end

  # this keeps us close to the open source config but using our own
  Habitat.create do
    setting route : String = "/cable", example: "/cable"
    setting token : String = "token", example: "token"
    setting socket_ping : Int32, example: 3
    setting redis_ping : Int32, example: 3
    setting url : String = Config.settings.redis_url + Config.settings.redis_pooling_query
  end
end

# Configure your settings
Cable.configure do |settings|
  settings.socket_ping = 3
  settings.redis_ping = 15 # not this, we'll discuss this in point 4
end
  1. I created a Redis connection PING/PONG schedule for every 15 seconds to double ensure Redis stays alive and also it has a recovery Server.restart to ensure we can recover from connections issues. This process only runs once for the server instance
require "schedule"

module Cable
  class RedisPinger
    @@started : Bool = false
    @@seconds : Int32 | Float64 = Cable.settings.redis_ping

    def self.run_every(value : Int32 | Float64, &block)
      @@seconds = value

      yield

      @@seconds = Cable.settings.redis_ping
    end

    def self.start(server : Cable::Server)
      new(server).start unless @@started
      @@started = true
    end

    def self.seconds
      @@seconds
    end

    def initialize(@server : Cable::Server)
    end

    def start
      runner = Schedule::Runner.new
      runner.every(Cable::RedisPinger.seconds.seconds) do
        check_redis_subscribe
        check_redis_publish
      rescue e
        ExceptionService.notify(e)
        Cable.restart # critical for recovery
      end
    end

    # since @server.redis_subscribe connection is called on a block loop
    # we basically cannot call ping outside of the block
    # instead, we just spin up another new redis connection
    # then publish a special channel/message broadcast
    # the @server.redis_subscribe picks up this special combination
    # and calls ping on the block loop for us
    def check_redis_subscribe
      Cable.server.publish("_internal", "ping")
    end

    def check_redis_publish
      result = @server.redis_publish.run({"ping"})
      Cable::Logger.debug { {source: "cable", message: "Cable::RedisPinger.check_redis_publish -> #{result}"} }
    end
  end
end

This needs to be started in the handler - I'll share the complete handler code at the end

Cable::RedisPinger.start Cable.server

Changes also need to be made in the Cable::Server#subscribe logic to support this

require "mutex"
require "set"

module Cable
  class Server
    private def subscribe
      spawn(name: "Cable::Server - subscribe") do
        redis_subscribe.subscribe("_internal") do |subscription|
          subscription.on_message do |channel, message|
            if channel == "_internal" && message == "ping"
              Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#subscribe -> PONG"} }
            elsif channel == "_internal" && message == "debug"
              Cable.server.debug
            else
              fiber_channel.send({channel, message})
              Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#subscribe channel:#{channel} message:#{message}"} }
            end
          end
        end
      end
    end
  end
end
  1. I added a recovery in the handler, should Redis freak out, the next connection attempt will blow the whistle and restart the server. All connection attempts thereafter will start working again.
# Handle incoming message and echo back to the client
socket.on_message do |message|
  begin
    connection.receive(message)
  rescue e : Cable::Connection::UnathorizedConnectionException
    # do nothing, we're all good
  rescue e : IO::Error
    Cable::Logger.error { {logger: "Cable", message: "#{e.class.name} Exception: #{e.message} -> #{self.class.name}#call { socket.on_message(message) }"} }
    ExceptionService.notify(e)
    Cable.restart # This is the critical part
  rescue e : Exception
    Cable::Logger.error { {logger: "Cable", message: "#{e.class.name} Exception: #{e.message} -> #{self.class.name}#call { socket.on_message(message) }"} }
    ExceptionService.notify(e)
  end
end
  1. We found out that Redis has a Pub/Sub buffer which can give you a dead connection without any warning... I mean nothing. All the clients still believe they are connected... This was by far the most difficult to find and resolve. Ultimately there is nothing you can do on the client-side of things. We're using redis.com so we asked them to double our buffer for safety and reviewed all payloads being broadcast, limiting the content to smaller chunks to be sure this doesn't happen.

Since all these changes, everything has been amazing! There are a bunch of other small improvements we made across the board but those will not help with this issue so no point in mentioning them.

Sharing out current server / handler classes;

require "http/server"

module Cable
  class Handler(T)
    include HTTP::Handler

    def on_error(&@on_error : Exception ->) : self
      self
    end

    def call(context)
      return call_next(context) unless ws_route_found?(context) && websocket_upgrade_request?(context)

      remote_address = context.request.remote_address
      path = context.request.path
      Cable::Logger.debug { {logger: "Cable", message: "Started GET \"#{path}\" [WebSocket] for #{remote_address} at #{Time.utc}"} }

      unless ENV["DISABLE_SEC_WEBSOCKET_PROTOCOL_HEADER"]?
        context.response.headers["Sec-WebSocket-Protocol"] = "actioncable-v1-json"
      end

      ws = HTTP::WebSocketHandler.new do |socket, _context|
        connection = T.new(context.request, socket)
        connection_id = connection.connection_identifier

        # we should not add any connections which have been rejected
        if connection.connection_rejected?
          Cable.instrument(:reject_connection)
        else
          Cable.server.add_connection(connection)
        end

        # Send welcome message to the client
        socket.send({type: Cable.message(:welcome)}.to_json)

        Cable::RedisPinger.start Cable.server
        Cable::WebsocketPinger.start socket

        socket.on_ping do
          socket.pong context.request.path
          Cable::Logger.debug { {logger: "Cable", message: "Ping received"} }
        end

        # Handle incoming message and echo back to the client
        socket.on_message do |message|
          begin
            connection.receive(message)
          rescue e : Cable::Connection::UnathorizedConnectionException
            # do nothing, we're all good
          rescue e : IO::Error
            Cable::Logger.error { {logger: "Cable", message: "#{e.class.name} Exception: #{e.message} -> #{self.class.name}#call { socket.on_message(message) }"} }
            ExceptionService.notify(e)
            Cable.restart
          rescue e : Exception
            Cable::Logger.error { {logger: "Cable", message: "#{e.class.name} Exception: #{e.message} -> #{self.class.name}#call { socket.on_message(message) }"} }
            ExceptionService.notify(e)
          end
        end

        socket.on_close do
          Cable.server.remove_connection(connection_id)
          Cable::Logger.debug { {logger: "Cable", message: "Finished \"#{path}\" [WebSocket] for #{remote_address} at #{Time.utc}"} }
        end
      end

      Cable::Logger.debug { {logger: "Cable", message: "Successfully upgraded to WebSocket (REQUEST_METHOD: GET, HTTP_CONNECTION: Upgrade, HTTP_UPGRADE: websocket)"} }
      ws.call(context)
    rescue e
      ExceptionService.notify(e)
      raise e
    end

    private def websocket_upgrade_request?(context)
      return unless upgrade = context.request.headers["Upgrade"]?
      return unless upgrade.compare("websocket", case_insensitive: true) == 0

      context.request.headers.includes_word?("Connection", "Upgrade")
    end

    private def ws_route_found?(context)
      return true if context.request.path === Cable.settings.route
      false
    end
  end
end



require "mutex"
require "set"

module Cable
  alias Channels = Set(Cable::Channel)

  TRACE_KEYS = {
    connection:               "http.websocket.connection_count",
    broadcast:                "http.websocket.broadcast_count",
    tags:                     Array(String).new,
    connection_success_tags:  %w[status:success],
    connection_rejected_tags: %w[status:rejected],
  }

  def self.server
    @@server ||= Server.new
  end

  def self.restart
    if current_server = @@server
      current_server.shutdown
    end
    @@server = Server.new
  end

  def self.instrument(action : Symbol, tags = TRACE_KEYS[:tags])
    case action
    when :add_connection
      Datadog.metrics.increment TRACE_KEYS[:connection], tags: TRACE_KEYS[:connection_success_tags]
    when :reject_connection
      Datadog.metrics.increment TRACE_KEYS[:connection], tags: TRACE_KEYS[:connection_rejected_tags]
    when :broadcast
      Datadog.metrics.increment TRACE_KEYS[:broadcast], tags: tags
    end
  rescue e
    Cable::Logger.error { {logger: "Cable", message: "Instrument Exception: #{e.message}"} }
    ExceptionService.notify(e)
  end

  class Server
    include Debug

    getter connections = {} of String => Connection
    getter redis_subscribe = Redis::Connection.new(URI.parse(Cable.settings.url))
    getter redis_publish = Redis::Client.new(URI.parse(Cable.settings.url))
    getter fiber_channel = ::Channel({String, String}).new

    @channels = {} of String => Channels
    @channel_mutex = Mutex.new

    def initialize
      subscribe
      process_subscribed_messages
    rescue e
      ExceptionService.notify(e)
      raise e
    end

    def add_connection(connection)
      connections[connection.connection_identifier] = connection
      Cable.instrument(:add_connection)
    end

    def remove_connection(connection_id)
      connections.delete(connection_id).try(&.close)
    end

    def subscribe_channel(channel : Channel, stream_identifier : String)
      @channel_mutex.synchronize do
        if !@channels.has_key?(stream_identifier)
          @channels[stream_identifier] = Channels.new
        end

        @channels[stream_identifier] << channel
      end

      redis_subscribe.encode({"subscribe", stream_identifier})
      redis_subscribe.flush
    end

    def unsubscribe_channel(channel : Channel, stream_identifier : String)
      @channel_mutex.synchronize do
        if @channels.has_key?(stream_identifier)
          @channels[stream_identifier].delete(channel)

          if @channels[stream_identifier].size == 0
            redis_subscribe.unsubscribe stream_identifier

            @channels.delete(stream_identifier)
          end
        else
          redis_subscribe.unsubscribe stream_identifier
        end
      end
    end

    # redis only accepts strings, so we should be strict here
    def publish(stream_identifier : String, message : String)
      redis_publish.publish(stream_identifier, message)
    end

    def send_to_channels(stream_identifier, message)
      return unless @channels.has_key?(stream_identifier)

      parsed_message = safe_decode_message(message)

      @channels[stream_identifier].each do |channel|
        if channel.connection.socket.closed?
          channel.close
        else
          Cable::Logger.debug { {logger: "Cable", message: "#{channel.class} transmitting #{parsed_message} (via streamed from #{channel.stream_identifier})"} }
          channel.connection.socket.send({
            identifier: channel.identifier,
            message:    parsed_message,
          }.to_json)
        end
        Cable.instrument(:broadcast, tags: ["channel:#{channel.class}"])
      rescue e : IO::Error
        Cable::Logger.error { {logger: "Cable", message: "IO::Error Cable::Server#send_to_channels Exception: #{e.message} -> #{self.class.name}#send_to_channels(channel, message)"} }
        ExceptionService.notify(e)
      end
    end

    def safe_decode_message(message)
      case message
      when String
        JSON.parse(message)
      else
        message
      end
    rescue JSON::ParseException
      message
    end

    def shutdown
      redis_subscribe.run({"unsubscribe"})
      redis_subscribe.close
      redis_publish.close
      connections.each do |_, v|
        v.close
      end
    end

    private def process_subscribed_messages
      server = self
      spawn(name: "Cable::Server - process_subscribed_messages") do
        while received = fiber_channel.receive
          stream_identifier, message = received
          server.send_to_channels(stream_identifier, message)
          Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#process_subscribed_messages stream:#{stream_identifier} message:#{message}"} }
        end
      end
    end

    private def subscribe
      spawn(name: "Cable::Server - subscribe") do
        redis_subscribe.subscribe("_internal") do |subscription|
          subscription.on_message do |channel, message|
            if channel == "_internal" && message == "ping"
              Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#subscribe -> PONG"} }
            elsif channel == "_internal" && message == "debug"
              # Cable.server.debug
              Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#subscribe -> ALIVE"} }
            else
              fiber_channel.send({channel, message})
              Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#subscribe channel:#{channel} message:#{message}"} }
            end
          end
        end
      end
    end
  end
end

Hope this helps! We had a total nightmare debugging all this so happy to share our results.
I'll try to carve out some time in the coming weeks to propose some of these bigger changes into the main cable.cr shard.

@jwoertink
Copy link
Contributor

I've started porting this over in to Cable using this example from @mjeffrey18 cable-cr/cable#48

I am running in to a strange issue when running the specs. cable-cr/cable#48 (comment) It seems that when we call Cable.restart, the redis_subscribe socket stays closed. I have no idea why...

@jwoertink
Copy link
Contributor

@mjeffrey18 by chance are you running redis in Cluster mode?

I've been using the new branch of Cable with this shard for a week now. We were rebooting our app daily until we switched to using this Redis shard. We went a week before needing a reboot due to a memory leak. After the memory leak was found and fixed, I made 1 last change based on the above code to move the RedisPinger cable-cr/cable@6b11799

I deployed that yesterday, and in less than 24 hours later, our pub/sub just stopped working. I'm using redis hosted on AWS Elasticache in Cluster mode. I'm wondering if maybe a cluster node is randomly dying, and swapping to a new one, but since Cable doesn't use the Cluster setup, it's just not understanding how to fix the connection. We know this happens on Cable master branch with the old redis too, but that one doesn't support Cluster at all.

Our app is just using a normal chat room setup. Send a message, that goes to the server, then redis receives the message and broadcasts back out to all clients. When it breaks, the chat goes to the server, but the broadcast back out never happens.

I haven't tried yet, but I think if I can setup a cluster redis in docker locally, then kill off a node, maybe I can recreate this issue.

@mjeffrey18
Copy link
Contributor Author

Hey @jwoertink, I'm not running cluster mode. We use redis.com with half a dozen Redis DB + failovers for different purposes in production. In staging, we use Elasticache but not in cluster mode.
I would probably advise using a standard Redis DB for the PUB/SUB functionality, but it may not be cluster mode causing the issues. We're still running my own version of cable.cr, as there have been far too many changes we had to make to get our system stable. Even low-level things like changing the logging, if you run massive throughput, you can rack up 100$ of log costs just with cable.cr.
We've been running for months without the issues, in fact, we even recently update the restart model I mentioned above to be a lot of fault tolerance to avoid massive teardowns of the connections.
I promised a while back to have a go at porting over my changes.
I'll defo put some time in this weekend and have a look over what you guys have done so far and compare what I've done. Hopefully, we can get a place where I can use the official shard.🙏

Also, might be worth checking out one of my previous comments.

We found that [Redis has a Pub/Sub buffer](https://redis.io/docs/reference/clients/#output-buffer-limits), which can give you a dead connection without any warning... I mean nothing. 
All the clients still believe they are connected... 
This was by far the most difficult to find and resolve. 
Ultimately there is nothing you can do on the client side of things. 
We're using redis.com, so we asked them to double our buffer for safety and reviewed all payloads being broadcast, limiting the content to smaller chunks to be sure this doesn't happen.

If you do not receive any IO errors and, no matter what you do, you just can't seem to broadcast i.e. pub/sub is silently down, it may be that you exceed the dreaded output-buffer-limits. This took me weeks to figure out.
We're lucky that we can control the payload size as all broadcasts are made from our rails apps. If you have a chat room, there may be massive payloads which trip this buffer and everything silently stops working. It was horrendous to debug!
Ultimately, there is nothing you can do with this one, except build in a canary system (which we did client end for another reason). But that was a complex solution. I suggest trying to set a max-content size limit on the client side and increase the buffer limit by 2x also. You can also intercept the payload on the server size and truncate it. Might not be a great UX if parts of the payload are missing, but at least it keeps the system stable.

I'll ping you guys once I've had a chance to review everything.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants