In [None]:
import logging
import hashlib
import time

from epdlib import Layout, Update

# Ideas
    * Write "Active" and "passive" plugin structure - all plugins are passive now
        - Active plugins can update whenever needed
        - passive display for a minimum time and are updated only when data changes
    * Add a default plugin module to display if all else fails and no other plugin is displaying
        - Display time and helpful hints for diagnosing and fixing issue
    * Periodically refresh the screen to prevent burn-in when only one plugin is dispalying
        - Call basic clock plugin to do this
        

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
logging.basicConfig(level=logging.INFO)
logging.debug('foobar')


In [None]:
logging.getLogger().setLevel('INFO')
logging.getLogger().setLevel('DEBUG')

In [None]:
# migrated to library/CacheFiles.py
from pathlib import Path
import tempfile
import shutil
import requests

class CacheFiles:
    '''file caching object
        Download remote files using requests.get and cache locally
        locally cached files will not be downloaded again'''
    
    def __init__(self, path=None, path_prefix=None):
        self.path_prefix = path_prefix
        self.path = path        
    
    def __repr__(self):
        return f'{type(self._path)}({self.path})'
    
    def __str__(self):
        return str(self.path)

    @property
    def path(self):
        '''path to file cache
            if no path is provided /tmp/ will be used
        
        Args:
            path(None or `str`): None(default) or /full/path/to/file_cache/'''
        if isinstance(self._path, tempfile.TemporaryDirectory):
            return Path(self._path.name)
        elif isinstance(self._path, Path):
            return str(self._path.absolute)
        else:
            return str(self._path)
    
    @path.setter
    def path(self, path):
        if path:
            self._path = Path(path)
        else:
            self._path = tempfile.TemporaryDirectory(prefix=self.path_prefix)
            
    @property
    def path_prefix(self):
        '''prefix to use when setting a cache path in /tmp
            prefixes will always be suffixed with "_" to make more readable
        
        Args:
            prefix(`str`): prefix-to-append; '''
        return self._path_prefix
    
    @path_prefix.setter
    def path_prefix(self, path_prefix):
        if not path_prefix:
            self._path_prefix = ''  
        elif path_prefix.endswith('_'):
            self._path_prefix = path_prefix
        else:
            self._path_prefix = path_prefix+'_'
    
    def cleanup(self):
        '''recursively remove all cached files and cache path'''
        if isinstance(self._path, tempfile.TemporaryDirectory):
            self._path.cleanup()
        
        elif isinstance(self._path, Path):
            shutil.rmtree(self._path)
        else:
            logging.warning(f'no cleanup method for this datatype: {type(self.path)}')
    
    def cache_file(self, url, file_id, force=False):
        '''download a remote file and return the local path to the file
            if a local file with the same name is found, download is skipped and path returned
        
        Args:
            url(`str`): url to remote file
            file_id(`str`): name to use for local file
            force(`bool`): force a download ignoring local files with the same name'''
        file_id = str(file_id)
        local_file = Path(self.path/file_id).absolute()
        
        logging.debug(f'caching file from url {url} to {local_file}')
        
        if local_file.exists() and force is False:
            logging.debug(f'file previously cached')
            return local_file
        
        try:
            r = requests.get(url, stream=True)
        except requests.exceptions.RequestException as e:
            logging.error(f'failed to fetch file at: {url} with error: {e}')
            return None
        
        if r.status_code == 200:
            logging.debug(f'writing file to file{local_file}')
            
            try:
                with open(local_file, 'wb') as file:
                    shutil.copyfileobj(r.raw, file)
            except (OSError, ValueError) as e:
                logging.error(f'failed to write {local_file}: {e}')
                return None
            except (FileExistsError) as e:
                logging.warning(f'file "{local_file}" appears to exist already; no action taken')
                return local_file
        else:
            logging.error(f'failed to fetch file at {url} with response code: {r.status_code}')
            return None
            
        return local_file
        
        
    

In [None]:
# migrated to library/Plugin.py

def strict_enforce(*types):
    """strictly enforce type compliance within classes
    
    Usage:
        @strict_enforce(type1, type2, (type3, type4))
        def foo(val1, val2, val3):
            ...
    """
    def decorator(f):
        def new_f(self, *args, **kwds):
            #we need to convert args into something mutable   
            newargs = []        
            for (a, t) in zip(args, types):
                if not isinstance(a, t):
                    raise TypeError(f'"{a}" is not type {t}')
#                 newargs.append( t(a)) #feel free to have more elaborated convertion
            return f(self, *args, **kwds)
        return new_f
    return decorator

In [None]:
class A:
    '''dummy class for testing'''
    pass

In [1]:
# migrated to library/Plugin.py

class Plugin:
    def __init__(self, name=None, resolution=None, layout={}, 
                 update_function=None, max_priority=-1,
                 refresh_rate=60, min_display_time=30, config={},
                 cache=None,
                 **kwargs):
        self.name = name
        if resolution:
            self.resolution = resolution
        
        self.layout = layout
        
        if update_function:
            self._add_update_function(update_function)
        else:
            self.update_function = print('no update function set')
        
        self.max_priority = max_priority
        
        self.refresh_rate = refresh_rate
        self.min_display_time = min_display_time
        
        self.config = config

        self.cache = cache
        
        self._last_ask = 0
        self.hash = self._generate_hash()
        self.data = {}
        self.image = None
        self.priority = -1
    
    @property
    def name(self):
        '''name of plugin
        name(`str`)'''
        return self._name
    
    @name.setter
    def name(self, name):
        self._name = str(name)
        
    @property
    def resolution(self):
        return self._resolution
        
    @resolution.setter
    @strict_enforce((list, tuple))
    def resolution(self, resolution):
        self._resolution = resolution    
    
    @property
    def layout(self):
        return self.layout_obj.layout
    
    @layout.setter
    @strict_enforce(dict)
    def layout(self, layout):
        self.layout_obj = Layout(resolution=self.resolution, layout=layout)
    
    
    @property
    def cache(self):
        return self._cache
    
    @cache.setter
    def cache(self, cache):
        if cache:
            self._cache = cache
        else:
            self._cache = None
    
    @property
    def last_ask(self):
        '''Records monotonic time of last time an update function was called 
            This is used by the self._is_ready() function to throttle update requests'''
        return self._last_ask
    
    @last_ask.setter
    def last_ask(self, last_ask):
        self._last_ask = last_ask
        
    
    def _add_update_function(self, function):
        self.update_function = function.__get__(self)
        
    def _generate_hash(self):
        my_hash = hashlib.sha1()
        my_hash.update(str(time.time()).encode('utf-8')+str(self.name).encode('utf-8'))
        return my_hash.hexdigest()[:10]        
    
    def _is_ready(self):
        '''simple throttle of update requests
            Checks time between current request (monotonic) and self._last_ask and compares to 
            self.refresh_rate
        
        Returns:
            `bool`: True if cooldown period has expired, false otherwise'''
        if time.monotonic() - self._last_ask > self.refresh_rate:
            self._last_ask = time.monotonic()
            return True
        else:
            logging.debug(f'wait for {self.refresh_rate - (time.monotonic() - self._last_ask)} seconds')
            return False
        
    def update(self):
        if self._is_ready():
            is_updated, data, priority = self.update_function()
            if data != self.data:
                self.data = data
                self.layout_obj.update_contents(data)
                self.image = self.layout_obj.concat()
                self.hash = self._generate_hash()
            # always update the priority    
            self.priority = priority
        else:
            pass
        
            
        return self.hash
    
    

NameError: name 'strict_enforce' is not defined

In [None]:
bogus_layout = {
    'number': {
        'image': None,
        'max_lines': 1,
        'width': 1,
        'height': 1,
        'abs_coordinates': (0, 0),
        'rand': True,
        'font': '../fonts/Dosis/Dosis-VariableFontwght.ttf',
    },
}


bogus = A()
bogus.config = {'update_interval': 10}
bogus.max_priority = -1

def bogus_plugin(self):
    data = {'number': '8675'}
    priority = self.max_priority
    is_updated = True
    
    return (is_updated, data, priority) 

In [None]:
bogus_plugin(bogus)

In [None]:
clock_layout = {
    'digit_time': {
        'image': None,
        'max_lines': 3,
        'width': 1,
        'height': 1,
        'abs_coordinates': (0, 0),
        'rand': True,
        'font': '../fonts/Dosis/Dosis-VariableFontwght.ttf',
    },
}


test = A()
test.config = {'update_interval': 10}
test.max_priority = 1

from datetime import datetime
def clock_plugin(self):
    data = {'digit_time': datetime.now().strftime("%H:%M:%S")}
    priority = self.max_priority
    is_updated = True
    
    return (is_updated, data, priority) 

In [None]:
clock_plugin(test)

In [None]:
slim_layout = {
    'title': {                       # text only block
        'image': None,               # do not expect an image
        'max_lines': 2,              # number of lines of text
        'width': 1,                  # 1/1 of the width - this stretches the entire width of the display
        'height': 2/3,               # 1/3 of the entire height
        'abs_coordinates': (0, 0),   # this block is the key block that all other blocks will be defined in terms of
        'hcenter': True,             # horizontally center text
        'vcenter': True,             # vertically center text 
        'relative': False,           # this block is not relative to any other. It has an ABSOLUTE position (0, 0)
        'font': '../fonts/Anton/Anton-Regular.ttf', # path to font file
        'font_size': None            # Calculate the font size because none was provided
    },

    'artist': {
        'image': None,
        'max_lines': 1,
        'width': 1,
        'height': 1/3,
        'abs_coordinates': [0, None],   # X = 0, Y will be calculated
        'hcenter': True,
        'vcenter': True,
        'font': '../fonts/Dosis/Dosis-VariableFontwght.ttf',
        'relative': ['artist', 'title'], # use the X postion from abs_coord from `artist` (this block: 0)
                                       # calculate the y position based on the size of `title` block
    }
}

slim_test = A()
slim_test = A()
slim_test.config = {'player_name': 'MacPlay',   # name of player to track
                    'refresh_rate': 30,         # minimum number of seconds between each update
                    'idle_timeout': 10}         # when idle_timeout is exceeded, make pause screen lower priority for display
slim_test.priority = -1
slim_test.max_priority = 0


import lmsquery
import requests
def slim_plugin(self):

    now_playing = None
    data = {
        'id': 0,
        'title': 'Err: No Player',
        'artist': 'Err: No Player',
        'coverid': 'Err: No Player',
        'duration': 0,
        'album_id': 'Err: No Player',
        'genre': 'Err: No Player',
        'album': 'Err: No Player',
        'artwork_url': 'Err: No Player',
        'mode': 'None'
    }

    is_updated = False
    priority = -1
    
    failure = (is_updated, data, priority)
    
    player_name = self.config['player_name']
    
    if not hasattr(self, 'play_state'):
        self.play_state = 'None'
    
    # add the idle timer on first run
    if not hasattr(self, 'idle_timer'):
        logging.debug(f'adding idle_timer of class `Update()`')
        self.idle_timer = Update()
    
    # check if LMS Query object is initiated
    if not hasattr(self, 'my_lms'):
        # add LMSQuery object to self
        logging.debug(f'building LMS Query object for player: {player_name}')
        self.my_lms = lmsquery.LMSQuery(player_name=player_name)
    try:
        # fetch the now playing data for the player
        now_playing = self.my_lms.now_playing()
        # remove the time key to make comparisions now_playing data updates easier in the Plugin class
        if 'time' in now_playing:
            now_playing.pop('time')
            
    except requests.exceptions.ConnectionError as e:
        logging.error(f'could not find player "{player_name}": {e}')
        return failure
    except KeyError as e:
        logging.warning(f'error getting now plyaing information for "{player_name}": KeyError {e}')
        logging.warning('this error is typical of newly added player or player that has no "now playing" data')
        return failure
    
    if now_playing:
        data = now_playing
    
    # set the priority based on play state
    logging.debug(f'play_state before checking now_playing: {self.play_state}')
    if now_playing['mode'] == 'play':
        priority = self.max_priority
        is_updated = True
        self.play_state = now_playing['mode']
    elif now_playing['mode'] == 'pause':
        # if switching from play to pause, refresh the idle timer
        if self.play_state == 'play':
            logging.debug('resetting idle_timer')
            self.idle_timer.update()
        if self.idle_timer.last_updated > self.config['idle_timeout']:
            priority = self.max_priority + 2
        else:
            priority = self.max_priority + 1
        is_updated = True
        self.play_state = now_playing['mode']
    else:
        priority = -1
        is_updated = False
        play_state = now_playing['mode']
    
    logging.debug(f'current priority: {priority}, current play state: {self.play_state}')
    return (is_updated, data, priority)
    

In [None]:
slim_plugin(slim_test)

In [None]:
spot_layout = {
    'coverart': { # coverart image
                'image': True, # has an image that may need to be resized
                'max_lines': None, # number of lines of text associated with this section
                'padding': 10, # amount of padding at edge
                'width': 1/3, # fraction of total width of display
                'height': 1, # fraction of total height
                'abs_coordinates': (0, 0), # X, Y for top left position of section
                'hcenter': True, # horizontal center-align the contents
                'vcenter': True, # vertically center-align the contents
                'relative': False, # False if position is absolute
                'font': '../fonts/Open_Sans/OpenSans-Regular.ttf',
                'font_size': None # font size to use for text
    },
    'title': { # track title
                'image': None, # none if no image is needed
                'max_lines': 3, # number of lines of text associated with this section
                'padding': 10,  # padding at edge
                'width': 2/3, # fraction of total width of display
                'height': 3/5, # fraction of total height of display
                'abs_coordinates': (None, 0), # X, Y for top left position of section
                                          # None indicates that the position is not 
                                          # known and will be calculated 
                                          # relative to another section
                                          # integer indicates an absolute position to use
                'hcenter': False, # horizontal center-align the contents
                'vcenter': True, # vertically center-align the contents
                'relative': ['coverart', 'title'], # [X Section: abs_coordinates+dimension
                                                   #, Y section abs_coordinates+dimension]
                'font': '../fonts/Open_Sans/OpenSans-Regular.ttf',
                'font_size': None # font size to use for text

    },
    'artist': { # track artist
                'image': None,
                'max_lines': 2,
                'padding': 10,
                'width': 2/3,
                'height': 1/5,
                'abs_coordinates': (None, None),
                'hcenter': False,
                'vcenter': True,
                'relative': ['coverart', 'title'],
                'font': '../fonts/Open_Sans/OpenSans-Regular.ttf',
                'font_size': None
    },
    'album': { #album name
                'image': None,
                'max_lines': 2,
                'padding': 10,
                'width': 2/3,
                'height': 1/5,
                'abs_coordinates': (None, None),
                'hcenter': False,
                'vcenter': True,
                'relative': ['coverart', 'artist'],
                'font': '../fonts/Open_Sans/OpenSans-Regular.ttf',
                'font_size': None
    }
}

spot_test = A()
spot_test.config = {'player_name': 'SpoCon-Spotify',
                    'idle_timeout': 10}
spot_test.max_priority = 0
spot_test.cache = CacheFiles(path_prefix='spot_test')

import requests
from json import JSONDecodeError
import constants_spot
from dictor import dictor

def spot_plugin(self):
    data = {
        'title': 'Err: no data',
        'artist': 'Err: no data',
        'album': 'Err: no data',
        'artwork_url': 'Err: no data',
        'duration': 0,
        'player': 'Err: no data',
        'mode': 'None'}
    priority = -1
    is_updated = False
    
    failure = (is_updated, data, priority)
#     logging.info('creating libre-spot spotify plugin (Spocon)')
    logging.debug(f'fetching access token from librespot player {self.config["player_name"]}')
    logging.debug(f'requesting access scope: {constants_spot.spot_scope}')
    
    # add the property play_state for recording current play state 
    if not hasattr(self, 'play_state'):
        self.play_state = 'None'
    
    # add the idle timer on first run
    if not hasattr(self, 'idle_timer'):
        logging.debug(f'adding idle_timer of class `Update()`')
        self.idle_timer = Update()    
    
    if not self.cache:
        self.cache = CacheFiles(path_prefix=self.config['player_name'])
        
        
    try:
        token = requests.post(constants_spot.libre_token_url)
    except requests.ConnectionError as e:
        logging.error(f'Failed to pull Spotify token from librespot at url: {constants_spot.libre_token_url}')
        logging.error(f'{e}')
        return failure
    
    if token.status_code == 200:
        logging.debug('token received')
        try:
            headers = {'Authorization': 'Bearer ' + token.json()['token']}
        except JSONDecodeError as e:
            logging.error(f'failed to decode token JSON object: {e}')
            return failure
    else:
        logging.info(f'no token available from librespot status: {token.status_code}')
        return failure
    
    # use the token to fetch player information from spotify
    if 'Authorization' in headers:
        player_status = requests.get(constants_spot.spot_player_url, headers=headers)
    else:
        logging.warning(f'no valid Authroization token found in response from librespot: {headers}')
        return failure
    
    if player_status.status_code == 200:
        try:
            player_json = player_status.json()
        except JSONDecodeError as e:
            logging.error(f'failed to decode player status JSON object: {e}')
            return failure
        
        # bail out if the player name does not match
        if not dictor(player_json, 'device.name') == self.config['player_name']:
            logging.info(f'{self.config["player_name"]} is not active: no data')
            return failure
        
        # map json data to dictionary format that Layout() objects can use
        # probably should wrap this in a try:
        for key in constants_spot.spot_map:
            data[key] = dictor(player_json, constants_spot.spot_map[key])
            
        if 'artwork_url' in data and 'id' in data:
            data['coverart'] = self.cache.cache_file(url=data['artwork_url'], file_id=data['id'])
                
        playing = dictor(player_status.json(), 'is_playing')
        if playing is True:
            data['mode'] = 'play'
            is_updated = True
            self.play_state = 'play'
            priority = self.max_priority 
        elif playing is False:
            data['mode'] = 'paused'
            if self.play_state == 'play':
                logging.debug('resetting idle_timer')
                self.idle_timer.update()
            if self.idle_timer.last_updated > self.config['idle_timeout']:
                priority = self.max_priority + 2
            else:
                priority = self.max_priority + 1
            is_updated = True
            self.play_state = 'paused'
        else:
            data['mode'] = None            
            is_updated = True
            priority = -1    
            self.play_state = 'None'
    
    return (is_updated, data, priority)

In [None]:
spot_plugin(spot_test)

In [None]:
config = {
    'main': {
        'resolution': (500, 400)
    },
    'Plugin: Clock': {
        'layout': clock_layout,
        'update_function': clock_plugin,
        'refresh_rate': 30,
        'min_display_time': 10,
        'max_priority': 1
    },
    'Plugin: LMS': {
        'layout': slim_layout,
        'update_function': slim_plugin,
        'refresh_rate': 5,
        'player_name': 'MacPlay',
        'min_display_time': 15,
        'max_priority': 0,
        'idle_timeout': 30
    },
    'Plugin: Bogus': {
        'layout': bogus_layout,
        'update_function': bogus_plugin,
        'refresh_rate': 5,
        'min_display_time': 20,
        'max_priority': -1,
    },
    'Plugin: LibreSpot': {
        'layout': spot_layout,
        'update_function': spot_plugin,
        'refresh_rate': 10,
        'player_name': 'SpoCon-Spotify',
        'idle_timeout': 15,
        'max_priority': 0,
    }
}

cache = CacheFiles()

plugins = []

for section in config:
    if section.startswith('Plugin: '):
        my_config = config[section]
        my_config['name'] = section
        my_config['resolution'] = config['main']['resolution']
        my_config['config'] = config[section]
        my_config['cache'] = cache # make a cache object available to all plugins (use the same one!)
        my_plugin = Plugin(**my_config)
        my_plugin.update()
        my_dict = {'plugin': my_plugin,
#                    'display_timer': Update(),
#                    'hash': my_plugin.hash,
                   'min_display_time': my_plugin.min_display_time}
        
        plugins.append(my_dict)

In [None]:
logging.getLogger().setLevel('INFO')


In [None]:
from IPython.display import display
from itertools import cycle


plugin_cycle = cycle(plugins)
this_plugin = next(plugin_cycle)
this_plugin_timer = Update()
max_priority = -1
this_hash = ''

while True:
    logging.info('updating plugins')
    logging.getLogger().setLevel('INFO')
    priority_list = []
    for plugin in plugins:
        logging.debug(f'updating plugin: [[ {plugin["plugin"].name} ]]')
        plugin['plugin'].update()
        # record the priority of all actiuve plugins (priority >= 0)
        if plugin['plugin'].priority >= 0:
            priority_list.append(plugin['plugin'].priority)
        
    # priority increases as it approaches 0; negative priorities are considered inactive
    max_priority = min(priority_list)
    logging.info(f'maximum priority is set to: {max_priority}')
        

    logging.getLogger().setLevel('DEBUG')
    logging.info(f'plugin currently displayed << {this_plugin["plugin"].name} >>')
    if this_plugin_timer.last_updated > this_plugin['plugin'].min_display_time:
        logging.info(f'display timer expired; switching plugin')
        plugin_is_active = False
        
        # find the next active plugin in the cycle
        while not plugin_is_active:
            this_plugin = next(plugin_cycle)
            logging.debug(f'checking display priority of {this_plugin["plugin"].name}')
            logging.debug(f'max_priority: {max_priority}, this plugin priority {this_plugin["plugin"].priority}')
            # beef this up to check priority overall
            if this_plugin['plugin'].priority >= 0 and this_plugin['plugin'].priority <= max_priority:
                plugin_is_active = True
            else:
                plugin_is_active = False
                logging.debug(f'{this_plugin["plugin"].name} has a priority of {this_plugin["plugin"].priority}; skipping')
        
        logging.debug(f'using plugin {this_plugin["plugin"].name}')
        
        # if the hashes do not match, display the new image, update the hash
        if this_hash != this_plugin['plugin'].hash:
            logging.debug('plugin image has refreshed -- refreshing screen')
            this_hash = this_plugin['plugin'].hash
            display(this_plugin['plugin'].image)
        else:
            logging.debug('plugin image has not refreshed -- skipping screen refresh')
            # record the number of times the screen is not refreshed here
            # if that number grows past N, wipe the display and then display the image
            pass
        this_plugin_timer.update()
        
    time.sleep(1)
        
        
        
        
        
