Permalink
Browse files

Added basic bandwidth relay to reduce wallet load.

  • Loading branch information...
psobot committed Nov 17, 2012
1 parent a33c191 commit ef3afe8f97a3b4e8f0196db8fd6b21916b6a832f
Showing with 121 additions and 2 deletions.
  1. +6 −0 assets/front.coffee
  2. +6 −0 config.yml
  3. +103 −0 forever/relay.py
  4. +6 −2 forever/server.py
@@ -325,6 +325,12 @@ $(document).ready ->
window.__spinning = true
window.__titular = new Titular

# MASSIVE HACK
# Rewrite the URL of the audio stream to the relay server permanently
# - can't change server.py while people are listening
$('.ui360 a').attr("href", "http://relay01.forever.fm/all.mp3")
soundManager.reboot()

$('body').keyup (e) ->
s = window.soundManager.sounds.ui360Sound0
if e.keyCode == 32
@@ -43,3 +43,9 @@ blacklist:
tag:
- sample pack
- loops

# Bandwidth relay settings
primary_url: http://forever.fm/all.mp3
relay_limit: 20 # After this many listeners, delegate the rest to relay servers.
relays:
- http://relay01.forever.fm/all.mp3
@@ -0,0 +1,103 @@
"""
Forever.fm Bandwidth Relay
by @psobot, Nov 17 2012
"""

import config
import apikeys
import customlog
import logging

import os
import sys
import time
import Queue
import socket
import restart
import urllib2
import datetime
import traceback
import threading
import tornado.web
import tornado.ioloop
import tornado.template
from daemon import Daemon

mp3_queue = Queue.Queue()

started_at_timestamp = time.time()


def listen(addr=config.primary_url):
r = urllib2.urlopen(addr)
while True:
mp3_queue.put(r.read(128))

class StreamHandler(tornado.web.RequestHandler):
listeners = []
__packet = None

@classmethod
def stream_frames(cls):
while True:
try:
cls.__packet = mp3_queue.get()
for i, listener in enumerate(cls.listeners):
if listener.request.connection.stream.closed():
try:
listener.finish()
except:
log.error("Could not finish listener:\n%s",
traceback.format_exc())
else:
listener.write(cls.__packet)
listener.flush()
except:
log.error("Could not broadcast due to: \n%s", traceback.format_exc())

@tornado.web.asynchronous
def get(self):
ip = self.request.headers.get('X-Real-Ip', self.request.remote_ip)
log.info("Added new listener at %s", ip)
self.set_header("Content-Type", "audio/mpeg")
if self.__packet:
self.write(self.__packet)
self.flush()
self.listeners.append(self)

def on_finish(self):
ip = self.request.headers.get('X-Real-Ip', self.request.remote_ip)
log.info("Removed listener at %s", ip)
self.listeners.remove(self)

if __name__ == "__main__":
Daemon()

for handler in logging.root.handlers:
logging.root.removeHandler(handler)
logging.root.addHandler(customlog.MultiprocessingStreamHandler())

log = logging.getLogger(config.log_name)
log.info("Starting %s relay...", config.app_name)

tornado.ioloop.PeriodicCallback(
lambda: restart.check('restart.txt',
started_at_timestamp,
len(StreamHandler.listeners)),
config.restart_timeout * 1000
).start()

application = tornado.web.Application([
(r"/all.mp3", StreamHandler)
])

frame_sender = threading.Thread(target=StreamHandler.stream_frames)
frame_sender.daemon = True
frame_sender.start()

reader = threading.Thread(target=listen)
reader.daemon = True
reader.start()

application.listen(config.http_port)
tornado.ioloop.IOLoop.instance().start()
@@ -14,6 +14,7 @@
import lame
import time
import info
import random
import restart
import datetime
import traceback
@@ -143,8 +144,11 @@ def init_streams(cls, streams):
def get(self):
ip = self.request.headers.get('X-Real-Ip', self.request.remote_ip)
log.info("Added new listener at %s", ip)
self.set_header("Content-Type", "audio/mpeg")
self.listeners.append(self)
if len(self.listeners) > config.relay_limit:
self.redirect(random.choice(config.relays))
else:
self.set_header("Content-Type", "audio/mpeg")
self.listeners.append(self)

def on_finish(self):
ip = self.request.headers.get('X-Real-Ip', self.request.remote_ip)

0 comments on commit ef3afe8

Please sign in to comment.