-
Notifications
You must be signed in to change notification settings - Fork 82
/
mpris.py
382 lines (320 loc) · 14.1 KB
/
mpris.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# This file is part of Trackma.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from enum import Enum
import os
import re
import sys
import threading
import urllib.parse
import asyncio
from typing import Dict, Union
from dataclasses import dataclass
from jeepney import HeaderFields, Properties, DBusAddress
from jeepney.io.asyncio import open_dbus_router, DBusRouter, Proxy
from jeepney.bus_messages import message_bus, MatchRule
from trackma import utils
from trackma.tracker import tracker
BUS_NAMESPACE = 'org.mpris.MediaPlayer2'
PATH_MPRIS = '/org/mpris/MediaPlayer2'
IFACE_PROPERTIES = 'org.freedesktop.DBus.Properties'
IFACE_MPRIS = 'org.mpris.MediaPlayer2'
IFACE_MPRIS_PLAYER = IFACE_MPRIS + '.Player'
class PlaybackStatus(str, Enum):
PLAYING = 'Playing'
PAUSED = 'Paused'
STOPPED = 'Stopped'
async def name_owner_watcher(router, tracker):
# Select name change signals for the well-known mpris service name.
# We only consider players with such a well-known name
# and thus treat any changes to the owned name
# as a creation and a disappearance of a service.
# Signals will be published by the service's unique name,
# which is why we need to track it.
match_rule = MatchRule(
type='signal',
sender=message_bus.bus_name,
interface=message_bus.interface,
member='NameOwnerChanged',
path=message_bus.object_path,
)
match_rule.add_arg_condition(0, BUS_NAMESPACE, kind='namespace')
message_proxy = Proxy(message_bus, router)
await message_proxy.AddMatch(match_rule)
with router.filter(match_rule, bufsize=20) as queue:
while True:
# https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-name-owner-changed
msg = await queue.get()
(wellknown_name, old_name, new_name) = msg.body
if new_name:
await tracker.on_bus_added(router, wellknown_name, new_name)
elif old_name:
tracker.on_bus_removed(wellknown_name, new_name)
def safe_get_dbus_value(pair, type_):
"""Assert the value's type before returning it None-safely."""
if pair is None:
return None
if pair[0] != type_:
raise TypeError(f"Expected type {type_!r} but found {pair[0]!r}")
return pair[1]
async def properties_watcher(router, tracker):
# Select PropertiesChanged signals for the mpris-player subinterface
# for all senders.
match_rule = MatchRule(
type='signal',
sender=None,
interface=IFACE_PROPERTIES,
member='PropertiesChanged',
path=PATH_MPRIS,
)
match_rule.add_arg_condition(0, IFACE_MPRIS_PLAYER)
message_proxy = Proxy(message_bus, router)
await message_proxy.AddMatch(match_rule)
with router.filter(match_rule, bufsize=10) as queue:
while True:
msg = await queue.get()
handle_properties_changed(tracker, msg)
def handle_properties_changed(tracker, msg):
# https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-properties
(_, changed_properties, _) = msg.body
sender = msg.header.fields[HeaderFields.sender]
try:
playback_status = safe_get_dbus_value(changed_properties.get('PlaybackStatus'), 's')
if playback_status:
tracker.on_playback_status_change(sender, playback_status)
metadata = safe_get_dbus_value(changed_properties.get('Metadata'), 'a{sv}')
if metadata and ('xesam:title' in metadata or 'xesam:url' in metadata):
title = safe_get_dbus_value(metadata.get('xesam:title'), 's')
url = safe_get_dbus_value(metadata.get('xesam:url'), 's')
tracker.on_filename_change(sender, title, url)
except TypeError as e:
tracker.msg.warn(f"Failed to read properties for {sender}: {e}")
async def collect_names(router) -> Dict[str, str]:
"""Find all active buses in the known namespace mapped by their unique name."""
message_proxy = Proxy(message_bus, router)
resp = await message_proxy.ListNames()
names = [name for name in resp[0] if name.startswith(BUS_NAMESPACE)]
unique_names = await asyncio.gather(*(message_proxy.GetNameOwner(name) for name in names))
name_map = {un[0]: n for un, n in zip(unique_names, names)}
return name_map
@dataclass
class Player:
router: DBusRouter
wellknown_name: str
unique_name: str
playback_status: Union[str, None] = None
title: Union[str, None] = None
url: Union[str, None] = None
@classmethod
async def new(cls, router, wellknown_name, unique_name):
player = Player(router, wellknown_name, unique_name)
await asyncio.gather(
player.update_filename(),
player.update_playback_status(),
)
return player
@property
def filename(self):
if self.title and len(self.title) > 5:
return self.title
elif self.url:
# TODO : Support for full path
return os.path.basename(urllib.parse.unquote_plus(self.url))
return self.title
async def update_filename(self):
msg = self._player_properties.get('Metadata')
reply = await self.router.send_and_get_reply(msg)
metadata = safe_get_dbus_value(reply.body[0], 'a{sv}')
self.title = safe_get_dbus_value(metadata.get('xesam:title'), 's') if metadata else None
self.url = safe_get_dbus_value(metadata.get('xesam:url'), 's') if metadata else None
# return self.filename
async def update_playback_status(self):
msg = self._player_properties.get('PlaybackStatus')
reply = await self.router.send_and_get_reply(msg)
self.playback_status = safe_get_dbus_value(reply.body[0], 's')
# return self.playback_status
async def get_position(self):
msg = self._player_properties.get('Position')
reply = await self.router.send_and_get_reply(msg)
return safe_get_dbus_value(reply.body[0], 'x')
@property
def _player_properties(self):
address = DBusAddress(PATH_MPRIS, bus_name=self.unique_name, interface=IFACE_MPRIS_PLAYER)
return Properties(address)
class MprisTracker(tracker.TrackerBase):
name = 'Tracker (MPRIS)'
def __init__(self, *args, **kwargs):
# The `TrackerBase.__init__` spawns a new thread
# for `observe`.
self.initalized = threading.Event()
super().__init__(*args, **kwargs)
self.re_players = re.compile(self.config['tracker_process'])
# Map "OwnerName"s to Player instances
# for monitoring when one appears or disappears
# via the `NameOwnerChanged` signal.
self.players = {}
self.timing = False
self.active_player = None
self.initalized.set()
def update_list(self, *args, **kwargs):
super().update_list(*args, **kwargs)
if self.last_state != utils.Tracker.PLAYING:
# Re-check if we have any player with a valid show running after a list update
self.last_filename = None
self.find_playing_player()
async def observe_async(self):
self.initalized.wait()
async with open_dbus_router() as router:
name_owner_watcher_task = asyncio.create_task(name_owner_watcher(router, self))
properties_watcher_task = asyncio.create_task(properties_watcher(router, self))
timer_task = asyncio.create_task(self._timer())
tasks = [name_owner_watcher_task, properties_watcher_task, timer_task]
name_map = await collect_names(router)
self.players = {
unique_name: await Player.new(router, wellknown_name, unique_name)
for unique_name, wellknown_name in name_map.items()
if self.valid_player(wellknown_name)
}
ignored_players = [name for name in name_map.values() if not self.valid_player(name)]
self.msg.debug(f"Ignoring players: {ignored_players}")
for player in self.players.values():
self.msg.debug(f"Player connected: {player.wellknown_name}")
self.find_playing_player()
try:
await asyncio.gather(*tasks)
except Exception:
self.msg.exception("Error in dbus watchers; cleaning up", sys.exc_info())
for task in tasks:
task.cancel()
await asyncio.gather(*tasks)
# let the thread die
def observe(self, config, watch_dirs):
self.msg.info("Using MPRIS.")
asyncio.run(self.observe_async())
def valid_player(self, wellknown_name):
return self.re_players.search(wellknown_name)
def find_playing_player(self) -> bool:
# Go through all connected players in random order
# (or not since dicts are ordered now).
return any(
self._update_active_player(player, probing=True)
for player in self.players.values()
if player.playback_status == PlaybackStatus.PLAYING
)
async def on_bus_added(self, router, wellknown_name, unique_name):
if not self.valid_player(wellknown_name):
self.msg.debug("Ignoring new bus that does not match configured player:"
f" {wellknown_name}")
return
player = await Player.new(router, wellknown_name, unique_name)
self.msg.debug(f"Player connected: {player.wellknown_name}")
self.players[unique_name] = player
self._handle_player_update(player)
def on_bus_removed(self, wellknown_name, unique_name):
player = self.players.pop(unique_name, None)
if not player:
return
self.msg.debug(f"Player disconnected: {player.wellknown_name}")
if player == self.active_player:
self._handle_player_stopped()
self.active_player = None
def on_playback_status_change(self, sender, playback_status):
player = self.players.get(sender)
if not player:
return
player.playback_status = playback_status
self._handle_player_update(player)
def on_filename_change(self, sender, title, url):
player = self.players.get(sender)
if not player:
return
player.title = title
player.url = url
self._handle_player_update(player)
def _handle_player_update(self, player):
if player == self.active_player:
if player.playback_status == PlaybackStatus.STOPPED:
self._handle_player_stopped()
else:
self._update_active_player(player)
elif self.active_player and self.active_player.playback_status == PlaybackStatus.PLAYING:
self.msg.debug("Still playing on active player; ignoring update")
return
elif player.playback_status == PlaybackStatus.PLAYING:
self._update_active_player(player)
def _update_active_player(self, player: Player, probing=False) -> bool:
is_new_player = player != self.active_player
(state, show_tuple) = (None, None)
previous_last_filename = self.last_filename
new_show = False
if player.filename != self.last_filename:
(state, show_tuple) = self._get_playing_show(player.filename)
if state in [utils.Tracker.UNRECOGNIZED, utils.Tracker.NOT_FOUND]:
self.msg.debug("Video not recognized")
elif state == utils.Tracker.NOVIDEO:
self.msg.debug("No video loaded")
else:
new_show = True
if is_new_player and not new_show:
if probing:
# Ignore this 'new' player & restore `last_filename`
# since we're just looking for a new player candidate.
# (this is a hack but a proper fix needs larger refactoring
# involving the parent class)
self.last_filename = previous_last_filename
return False
if new_show:
self.msg.debug(f"New tracker status: {state} (previously: {self.last_state})")
self.update_show_if_needed(state, show_tuple)
if is_new_player and new_show:
self.msg.debug(f"Setting active player: {player.wellknown_name}")
self.active_player = player
if player.playback_status == PlaybackStatus.PLAYING:
self._start_timer()
else:
self._stop_timer()
return True
def _handle_player_stopped(self):
# Active player got closed!
if self.active_player:
self.msg.debug(f"Clearing active player: {self.active_player.wellknown_name}")
self.active_player = None
self.view_offset = None
if not self.find_playing_player():
(state, show_tuple) = self._get_playing_show(None)
self.update_show_if_needed(state, show_tuple)
self._stop_timer()
def _start_timer(self):
self.resume_timer()
if not self.timing:
self.timing = True
self.msg.debug("MPRIS timer started.")
def _stop_timer(self):
self.pause_timer()
if self.timing:
self.timing = False
self.msg.debug("MPRIS timer paused.")
async def _timer(self):
while True:
if self.timing:
await self._on_tick()
await asyncio.sleep(1, True)
async def _on_tick(self):
if self.active_player:
self.view_offset = int(await self.active_player.get_position()) / 1000
if self.last_show_tuple:
self.update_timer(self.last_state, self.last_show_tuple)
if self.last_updated:
self._stop_timer()