From ef3afe8f97a3b4e8f0196db8fd6b21916b6a832f Mon Sep 17 00:00:00 2001 From: Peter Sobot Date: Sat, 17 Nov 2012 15:29:46 -0500 Subject: [PATCH] Added basic bandwidth relay to reduce wallet load. --- assets/front.coffee | 6 +++ config.yml | 6 +++ forever/relay.py | 103 ++++++++++++++++++++++++++++++++++++++++++++ forever/server.py | 8 +++- 4 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 forever/relay.py diff --git a/assets/front.coffee b/assets/front.coffee index bba0114..e09cca2 100644 --- a/assets/front.coffee +++ b/assets/front.coffee @@ -325,6 +325,12 @@ $(document).ready -> window.__spinning = true window.__titular = new Titular + # MASSIVE HACK + # Rewrite the URL of the audio stream to the relay server permanently + # - can't change server.py while people are listening + $('.ui360 a').attr("href", "http://relay01.forever.fm/all.mp3") + soundManager.reboot() + $('body').keyup (e) -> s = window.soundManager.sounds.ui360Sound0 if e.keyCode == 32 diff --git a/config.yml b/config.yml index 0da5282..715b1bb 100644 --- a/config.yml +++ b/config.yml @@ -43,3 +43,9 @@ blacklist: tag: - sample pack - loops + +# Bandwidth relay settings +primary_url: http://forever.fm/all.mp3 +relay_limit: 20 # After this many listeners, delegate the rest to relay servers. +relays: + - http://relay01.forever.fm/all.mp3 diff --git a/forever/relay.py b/forever/relay.py new file mode 100644 index 0000000..a181884 --- /dev/null +++ b/forever/relay.py @@ -0,0 +1,103 @@ +""" +Forever.fm Bandwidth Relay +by @psobot, Nov 17 2012 +""" + +import config +import apikeys +import customlog +import logging + +import os +import sys +import time +import Queue +import socket +import restart +import urllib2 +import datetime +import traceback +import threading +import tornado.web +import tornado.ioloop +import tornado.template +from daemon import Daemon + +mp3_queue = Queue.Queue() + +started_at_timestamp = time.time() + + +def listen(addr=config.primary_url): + r = urllib2.urlopen(addr) + while True: + mp3_queue.put(r.read(128)) + +class StreamHandler(tornado.web.RequestHandler): + listeners = [] + __packet = None + + @classmethod + def stream_frames(cls): + while True: + try: + cls.__packet = mp3_queue.get() + for i, listener in enumerate(cls.listeners): + if listener.request.connection.stream.closed(): + try: + listener.finish() + except: + log.error("Could not finish listener:\n%s", + traceback.format_exc()) + else: + listener.write(cls.__packet) + listener.flush() + except: + log.error("Could not broadcast due to: \n%s", traceback.format_exc()) + + @tornado.web.asynchronous + def get(self): + ip = self.request.headers.get('X-Real-Ip', self.request.remote_ip) + log.info("Added new listener at %s", ip) + self.set_header("Content-Type", "audio/mpeg") + if self.__packet: + self.write(self.__packet) + self.flush() + self.listeners.append(self) + + def on_finish(self): + ip = self.request.headers.get('X-Real-Ip', self.request.remote_ip) + log.info("Removed listener at %s", ip) + self.listeners.remove(self) + +if __name__ == "__main__": + Daemon() + + for handler in logging.root.handlers: + logging.root.removeHandler(handler) + logging.root.addHandler(customlog.MultiprocessingStreamHandler()) + + log = logging.getLogger(config.log_name) + log.info("Starting %s relay...", config.app_name) + + tornado.ioloop.PeriodicCallback( + lambda: restart.check('restart.txt', + started_at_timestamp, + len(StreamHandler.listeners)), + config.restart_timeout * 1000 + ).start() + + application = tornado.web.Application([ + (r"/all.mp3", StreamHandler) + ]) + + frame_sender = threading.Thread(target=StreamHandler.stream_frames) + frame_sender.daemon = True + frame_sender.start() + + reader = threading.Thread(target=listen) + reader.daemon = True + reader.start() + + application.listen(config.http_port) + tornado.ioloop.IOLoop.instance().start() diff --git a/forever/server.py b/forever/server.py index 2df5a94..31b55dd 100644 --- a/forever/server.py +++ b/forever/server.py @@ -14,6 +14,7 @@ import lame import time import info +import random import restart import datetime import traceback @@ -143,8 +144,11 @@ def init_streams(cls, streams): def get(self): ip = self.request.headers.get('X-Real-Ip', self.request.remote_ip) log.info("Added new listener at %s", ip) - self.set_header("Content-Type", "audio/mpeg") - self.listeners.append(self) + if len(self.listeners) > config.relay_limit: + self.redirect(random.choice(config.relays)) + else: + self.set_header("Content-Type", "audio/mpeg") + self.listeners.append(self) def on_finish(self): ip = self.request.headers.get('X-Real-Ip', self.request.remote_ip)