Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 533 lines (439 sloc) 14.1 KB
#!/usr/bin/env python3
# pandora-mpv.py
# Author: William Woodruff
# ------------------------
# Stream a Pandora station into mpv.
# Uses python-mpv interface to control an mpv process.
# Uses the Pithos Project's python interface to access Pandora.
# ------------------------
# This code is licensed by William Woodruff under the MIT License.
# python-mpv is licensed by Lars Gustäbel under the MIT License.
# http://opensource.org/licenses/MIT
import sys
import os
import time
import json
import socket
import select
import tempfile
import threading
import subprocess
import inspect
# site-packages wasn't in my module path. why?
sys.path.append('/usr/lib/python3.5/site-packages')
sys.path.append('/usr/lib/python3/site-packages')
try:
import pithos.pandora
import pithos.pandora.data
except ImportError:
sys.exit('I need Pithos to connect to Pandora. Is Pithos installed?')
from queue import Queue, Empty
station = None
station_name = 'QuickMix'
if len(sys.argv) >= 2:
station_name = sys.argv[1]
class MPVError(Exception):
pass
class MPVProcessError(MPVError):
pass
class MPVCommunicationError(MPVError):
pass
class MPVCommandError(MPVError):
pass
class MPVTimeoutError(MPVError):
pass
class MPVBase:
"""Base class for communication with the mpv media player via unix socket
based JSON IPC.
"""
executable = "/usr/bin/mpv"
default_argv = [
"--idle",
"--force-window=no",
"--no-ytdl"
]
def __init__(self, window_id=None, debug=False):
self.window_id = window_id
self.debug = debug
self._prepare_socket()
self._prepare_process()
self._start_process()
self._start_socket()
self._prepare_thread()
self._start_thread()
def __del__(self):
self._stop_thread()
self._stop_process()
self._stop_socket()
def _thread_id(self):
return threading.get_ident()
#
# Process
#
def _prepare_process(self):
"""Prepare the argument list for the mpv process.
"""
self.argv = [self.executable]
self.argv += self.default_argv
# https://github.com/gustaebel/python-mpv/pull/2
self.argv += ["--input-ipc-server", self._sock_filename]
if self.window_id is not None:
self.argv += ["--wid", str(self.window_id)]
def _start_process(self):
"""Start the mpv process.
"""
self._proc = subprocess.Popen(self.argv)
def _stop_process(self):
"""Stop the mpv process.
"""
if hasattr(self, "_proc"):
try:
self._proc.terminate()
except ProcessLookupError:
pass
#
# Socket communication
#
def _prepare_socket(self):
"""Create a random socket filename which we pass to mpv with the
--input-unix-socket option.
"""
fd, self._sock_filename = tempfile.mkstemp(prefix="mpv.")
os.close(fd)
os.remove(self._sock_filename)
def _start_socket(self):
"""Wait for the mpv process to create the unix socket and finish
startup.
"""
# FIXME timeout to give up
while self.is_running():
time.sleep(0.1)
try:
self._sock = socket.socket(socket.AF_UNIX)
self._sock.connect(self._sock_filename)
except (FileNotFoundError, ConnectionRefusedError):
continue
else:
break
else:
raise MPVProcessError("unable to start process")
def _stop_socket(self):
"""Clean up the socket.
"""
if hasattr(self, "_sock_filename"):
try:
os.remove(self._sock_filename)
except OSError:
pass
def _prepare_thread(self):
"""Set up the queues for the communication threads.
"""
self._request_queue = Queue(1)
self._response_queues = {}
self._event_queue = Queue()
self._stop_event = threading.Event()
def _start_thread(self):
"""Start up the communication threads.
"""
self._thread = threading.Thread(target=self._reader)
self._thread.start()
def _stop_thread(self):
"""Stop the communication threads.
"""
if hasattr(self, "_stop_event"):
self._stop_event.set()
if hasattr(self, "_thread"):
self._thread.join()
def _reader(self):
"""Read the incoming json messages from the unix socket that is
connected to the mpv process. Pass them on to the message handler.
"""
buf = b""
while not self._stop_event.is_set():
r, w, e = select.select([self._sock], [], [], 1)
if r:
b = self._sock.recv(1024)
if not b:
break
buf += b
newline = buf.find(b"\n")
while newline >= 0:
data = buf[:newline + 1]
buf = buf[newline + 1:]
if self.debug:
sys.stderr.write("<<< " + data.decode("utf8", "replace"))
message = self._parse_message(data)
self._handle_message(message)
newline = buf.find(b"\n")
#
# Message handling
#
def _compose_message(self, message):
"""Return a json representation from a message dictionary.
"""
# XXX may be strict is too strict ;-)
data = json.dumps(message, separators=",:")
return data.encode("utf8", "strict") + b"\n"
def _parse_message(self, data):
"""Return a message dictionary from a json representation.
"""
# XXX may be strict is too strict ;-)
data = data.decode("utf8", "strict")
return json.loads(data)
def _handle_message(self, message):
"""Handle different types of incoming messages, i.e. responses to
commands or asynchronous events.
"""
if "error" in message:
# This message is a reply to a request.
try:
thread_id = self._request_queue.get(timeout=1)
except Empty:
raise MPVCommunicationError("got a response without a pending request")
self._response_queues[thread_id].put(message)
elif "event" in message:
# This message is an asynchronous event.
self._event_queue.put(message)
else:
raise MPVCommunicationError("invalid message %r" % message)
def _send_message(self, message, timeout=None):
"""Send a message/command to the mpv process, message must be a
dictionary of the form {"command": ["arg1", "arg2", ...]}. Responses
from the mpv process must be collected using _get_response().
"""
data = self._compose_message(message)
if self.debug:
sys.stderr.write(">>> " + data.decode("utf8", "replace"))
# Request/response cycles are coordinated across different threads, so
# that they don't get mixed up. This makes it possible to use commands
# (e.g. fetch properties) from event callbacks that run in a different
# thread context.
thread_id = self._thread_id()
if thread_id not in self._response_queues:
# Prepare a response queue for the thread to wait on.
self._response_queues[thread_id] = Queue()
# Put the id of the current thread on the request queue. This id is
# later used to associate responses from the mpv process with this
# request.
try:
self._request_queue.put(thread_id, block=True, timeout=timeout)
except Full:
raise MPVTimeoutError("unable to put request")
# Write the message data to the socket.
while data:
size = self._sock.send(data)
if size == 0:
raise MPVCommunicationError("broken sender socket")
data = data[size:]
def _get_response(self, timeout=None):
"""Collect the response message to a previous request. If there was an
error a MPVCommandError exception is raised, otherwise the command
specific data is returned.
"""
try:
message = self._response_queues[self._thread_id()].get(block=True, timeout=timeout)
except Empty:
raise MPVTimeoutError("unable to get response")
if message["error"] != "success":
raise MPVCommandError(message["error"])
else:
return message.get("data")
def _get_event(self, timeout=None):
"""Collect a single event message that has been received out-of-band
from the mpv process. If a timeout is specified and there have not
been any events during that period, None is returned.
"""
try:
return self._event_queue.get(block=timeout is not None, timeout=timeout)
except Empty:
return None
def _send_request(self, message, timeout=None):
"""Send a command to the mpv process and collect the result.
"""
self._send_message(message, timeout)
try:
return self._get_response(timeout)
except MPVCommandError as e:
raise MPVCommandError("%r: %s" % (message["command"], e))
#
# Public API
#
def is_running(self):
"""Return True if the mpv process is still active.
"""
return self._proc.poll() is None
def close(self):
"""Shutdown the mpv process and our communication setup.
"""
if self.is_running():
self._send_request({"command": ["quit"]}, timeout=1)
self._stop_process()
self._stop_thread()
self._stop_socket()
class MPV(MPVBase):
"""Class for communication with the mpv media player via unix socket
based JSON IPC. It adds a few usable methods and a callback API.
To automatically register methods as event callbacks, subclass this
class and define specially named methods as follows:
def on_file_loaded(self):
# This is called for every 'file-loaded' event.
...
def on_property_time_pos(self, position):
# This is called whenever the 'time-pos' property is updated.
...
Please note that callbacks are executed inside a separate thread. The
MPV class itself is completely thread-safe. Requests from different
threads to the same MPV instance are synchronized.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._callbacks = {}
self._property_serials = {}
self._new_serial = iter(range(sys.maxsize))
# Enumerate all methods and auto-register callbacks for
# events and property-changes.
for method_name, method in inspect.getmembers(self):
if not inspect.ismethod(method):
continue
if method_name.startswith("on_property_"):
name = method_name[12:]
name = name.replace("_", "-")
self.register_property_callback(name, method)
elif method_name.startswith("on_"):
name = method_name[3:]
name = name.replace("_", "-")
self.register_callback(name, method)
# Simulate an init event when the process and all callbacks have been
# completely set up.
if hasattr(self, "on_init"):
self.on_init()
#
# Socket communication
#
def _start_thread(self):
"""Start up the communication threads.
"""
super()._start_thread()
self._event_thread = threading.Thread(target=self._event_reader)
self._event_thread.start()
def _stop_thread(self):
"""Stop the communication threads.
"""
super()._stop_thread()
if hasattr(self, "_event_thread"):
self._event_thread.join()
#
# Event/callback API
#
def _event_reader(self):
"""Collect incoming event messages and call the event handler.
"""
while not self._stop_event.is_set():
message = self._get_event(timeout=1)
if message is None:
continue
self._handle_event(message)
def _handle_event(self, message):
"""Lookup and call the callbacks for a particular event message.
"""
if message["event"] == "property-change":
name = "property-" + message["name"]
else:
name = message["event"]
for callback in self._callbacks.get(name, []):
if "data" in message:
callback(message["data"])
else:
callback()
def register_callback(self, name, callback):
"""Register a function `callback` for the event `name`.
"""
try:
self.command("enable_event", name)
except MPVCommandError:
raise MPVError("no such event %r" % name)
self._callbacks.setdefault(name, []).append(callback)
def unregister_callback(self, name, callback):
"""Unregister a previously registered function `callback` for the event
`name`.
"""
try:
callbacks = self._callbacks[name]
except KeyError:
raise MPVError("no callbacks registered for event %r" % name)
try:
callbacks.remove(callback)
except ValueError:
raise MPVError("callback %r not registered for event %r" % (callback, name))
def register_property_callback(self, name, callback):
"""Register a function `callback` for the property-change event on
property `name`.
"""
# Property changes are normally not sent over the connection unless they
# are requested using the 'observe_property' command.
# XXX We manually have to check for the existence of the property name.
# Apparently observe_property does not check it :-(
proplist = self.command("get_property", "property-list")
if name not in proplist:
raise MPVError("no such property %r" % name)
self._callbacks.setdefault("property-" + name, []).append(callback)
# 'observe_property' expects some kind of id which can be used later
# for unregistering with 'unobserve_property'.
serial = next(self._new_serial)
self.command("observe_property", serial, name)
self._property_serials[(name, callback)] = serial
return serial
def unregister_property_callback(self, name, callback):
"""Unregister a previously registered function `callback` for the
property-change event on property `name`.
"""
try:
callbacks = self._callbacks["property-" + name]
except KeyError:
raise MPVError("no callbacks registered for property %r" % name)
try:
callbacks.remove(callback)
except ValueError:
raise MPVError("callback %r not registered for property %r" % (callback, name))
serial = self._property_serials.pop((name, callback))
self.command("unobserve_property", serial)
#
# Public API
#
def command(self, *args, timeout=1):
"""Execute a single command on the mpv process and return the result.
"""
return self._send_request({"command": list(args)}, timeout=timeout)
def get_property(self, name):
"""Return the value of property `name`.
"""
return self.command("get_property", name)
def set_property(self, name, value):
"""Set the value of property `name`.
"""
return self.command("set_property", name, value)
pandora = pithos.pandora.make_pandora()
pandora.connect(
pithos.pandora.data.client_keys[pithos.pandora.data.default_client_id],
os.environ['PANDORA_EMAIL'],
os.environ['PANDORA_PASSWORD'])
for s in pandora.get_stations():
if s.name == station_name:
station = s
if not station:
sys.exit("Could not find the '{}' station.".format(station_name))
player = MPV()
while True:
for song in station.get_playlist():
song_url = song.audioUrlMap['highQuality']['audioUrl']
player.command('loadfile', song_url, 'append-play')
position = player.get_property('playlist-pos')
playlist_size = player.get_property('playlist-count')
# poll the playlist position every quarter of a second if still running
while position != (playlist_size - 1):
try:
position = player.get_property('playlist-pos')
time.sleep(0.25)
except Exception:
player.close()
os._exit(0)