From d70a92e9476e61b12a79c5642629ed81c9ad703f Mon Sep 17 00:00:00 2001 From: morpheus65535 Date: Sun, 12 May 2024 23:13:21 -0400 Subject: [PATCH] Fixed uppercase issue in Apprise module name. --- libs/apprise/apprise.py | 887 +++++++++++++++++++++++++++++++++++++++ libs/apprise/apprise.pyi | 62 +++ 2 files changed, 949 insertions(+) create mode 100644 libs/apprise/apprise.py create mode 100644 libs/apprise/apprise.pyi diff --git a/libs/apprise/apprise.py b/libs/apprise/apprise.py new file mode 100644 index 000000000..05a2ee3cc --- /dev/null +++ b/libs/apprise/apprise.py @@ -0,0 +1,887 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2024, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import asyncio +import concurrent.futures as cf +import os +from itertools import chain +from . import common +from .conversion import convert_between +from .utils import is_exclusive_match +from .manager_plugins import NotificationManager +from .utils import parse_list +from .utils import parse_urls +from .utils import cwe312_url +from .emojis import apply_emojis +from .logger import logger +from .asset import AppriseAsset +from .apprise_config import AppriseConfig +from .apprise_attachment import AppriseAttachment +from .locale import AppriseLocale +from .config.base import ConfigBase +from .plugins.base import NotifyBase + +from . import plugins +from . import __version__ + +# Grant access to our Notification Manager Singleton +N_MGR = NotificationManager() + + +class Apprise: + """ + Our Notification Manager + + """ + + def __init__(self, servers=None, asset=None, location=None, debug=False): + """ + Loads a set of server urls while applying the Asset() module to each + if specified. + + If no asset is provided, then the default asset is used. + + Optionally specify a global ContentLocation for a more strict means + of handling Attachments. + """ + + # Initialize a server list of URLs + self.servers = list() + + # Assigns an central asset object that will be later passed into each + # notification plugin. Assets contain information such as the local + # directory images can be found in. It can also identify remote + # URL paths that contain the images you want to present to the end + # user. If no asset is specified, then the default one is used. + self.asset = \ + asset if isinstance(asset, AppriseAsset) else AppriseAsset() + + if servers: + self.add(servers) + + # Initialize our locale object + self.locale = AppriseLocale() + + # Set our debug flag + self.debug = debug + + # Store our hosting location for optional strict rule handling + # of Attachments. Setting this to None removes any attachment + # restrictions. + self.location = location + + @staticmethod + def instantiate(url, asset=None, tag=None, suppress_exceptions=True): + """ + Returns the instance of a instantiated plugin based on the provided + Server URL. If the url fails to be parsed, then None is returned. + + The specified url can be either a string (the URL itself) or a + dictionary containing all of the components needed to istantiate + the notification service. If identifying a dictionary, at the bare + minimum, one must specify the schema. + + An example of a url dictionary object might look like: + { + schema: 'mailto', + host: 'google.com', + user: 'myuser', + password: 'mypassword', + } + + Alternatively the string is much easier to specify: + mailto://user:mypassword@google.com + + The dictionary works well for people who are calling details() to + extract the components they need to build the URL manually. + """ + + # Initialize our result set + results = None + + # Prepare our Asset Object + asset = asset if isinstance(asset, AppriseAsset) else AppriseAsset() + + if isinstance(url, str): + # Acquire our url tokens + results = plugins.url_to_dict( + url, secure_logging=asset.secure_logging) + + if results is None: + # Failed to parse the server URL; detailed logging handled + # inside url_to_dict - nothing to report here. + return None + + elif isinstance(url, dict): + # We already have our result set + results = url + + if results.get('schema') not in N_MGR: + # schema is a mandatory dictionary item as it is the only way + # we can index into our loaded plugins + logger.error('Dictionary does not include a "schema" entry.') + logger.trace( + 'Invalid dictionary unpacked as:{}{}'.format( + os.linesep, os.linesep.join( + ['{}="{}"'.format(k, v) + for k, v in results.items()]))) + return None + + logger.trace( + 'Dictionary unpacked as:{}{}'.format( + os.linesep, os.linesep.join( + ['{}="{}"'.format(k, v) for k, v in results.items()]))) + + # Otherwise we handle the invalid input specified + else: + logger.error( + 'An invalid URL type (%s) was specified for instantiation', + type(url)) + return None + + if not N_MGR[results['schema']].enabled: + # + # First Plugin Enable Check (Pre Initialization) + # + + # Plugin has been disabled at a global level + logger.error( + '%s:// is disabled on this system.', results['schema']) + return None + + # Build a list of tags to associate with the newly added notifications + results['tag'] = set(parse_list(tag)) + + # Set our Asset Object + results['asset'] = asset + + if suppress_exceptions: + try: + # Attempt to create an instance of our plugin using the parsed + # URL information + plugin = N_MGR[results['schema']](**results) + + # Create log entry of loaded URL + logger.debug( + 'Loaded {} URL: {}'.format( + N_MGR[results['schema']].service_name, + plugin.url(privacy=asset.secure_logging))) + + except Exception: + # CWE-312 (Secure Logging) Handling + loggable_url = url if not asset.secure_logging \ + else cwe312_url(url) + + # the arguments are invalid or can not be used. + logger.error( + 'Could not load {} URL: {}'.format( + N_MGR[results['schema']].service_name, + loggable_url)) + return None + + else: + # Attempt to create an instance of our plugin using the parsed + # URL information but don't wrap it in a try catch + plugin = N_MGR[results['schema']](**results) + + if not plugin.enabled: + # + # Second Plugin Enable Check (Post Initialization) + # + + # Service/Plugin is disabled (on a more local level). This is a + # case where the plugin was initially enabled but then after the + # __init__() was called under the hood something pre-determined + # that it could no longer be used. + + # The only downside to doing it this way is services are + # initialized prior to returning the details() if 3rd party tools + # are polling what is available. These services that become + # disabled thereafter are shown initially that they can be used. + logger.error( + '%s:// has become disabled on this system.', results['schema']) + return None + + return plugin + + def add(self, servers, asset=None, tag=None): + """ + Adds one or more server URLs into our list. + + You can override the global asset if you wish by including it with the + server(s) that you add. + + The tag allows you to associate 1 or more tag values to the server(s) + being added. tagging a service allows you to exclusively access them + when calling the notify() function. + """ + + # Initialize our return status + return_status = True + + if asset is None: + # prepare default asset + asset = self.asset + + if isinstance(servers, str): + # build our server list + servers = parse_urls(servers) + if len(servers) == 0: + return False + + elif isinstance(servers, dict): + # no problem, we support kwargs, convert it to a list + servers = [servers] + + elif isinstance(servers, (ConfigBase, NotifyBase, AppriseConfig)): + # Go ahead and just add our plugin into our list + self.servers.append(servers) + return True + + elif not isinstance(servers, (tuple, set, list)): + logger.error( + "An invalid notification (type={}) was specified.".format( + type(servers))) + return False + + for _server in servers: + + if isinstance(_server, (ConfigBase, NotifyBase, AppriseConfig)): + # Go ahead and just add our plugin into our list + self.servers.append(_server) + continue + + elif not isinstance(_server, (str, dict)): + logger.error( + "An invalid notification (type={}) was specified.".format( + type(_server))) + return_status = False + continue + + # Instantiate ourselves an object, this function throws or + # returns None if it fails + instance = Apprise.instantiate(_server, asset=asset, tag=tag) + if not isinstance(instance, NotifyBase): + # No logging is required as instantiate() handles failure + # and/or success reasons for us + return_status = False + continue + + # Add our initialized plugin to our server listings + self.servers.append(instance) + + # Return our status + return return_status + + def clear(self): + """ + Empties our server list + + """ + self.servers[:] = [] + + def find(self, tag=common.MATCH_ALL_TAG, match_always=True): + """ + Returns a list of all servers matching against the tag specified. + + """ + + # Build our tag setup + # - top level entries are treated as an 'or' + # - second level (or more) entries are treated as 'and' + # + # examples: + # tag="tagA, tagB" = tagA or tagB + # tag=['tagA', 'tagB'] = tagA or tagB + # tag=[('tagA', 'tagC'), 'tagB'] = (tagA and tagC) or tagB + # tag=[('tagB', 'tagC')] = tagB and tagC + + # A match_always flag allows us to pick up on our 'any' keyword + # and notify these services under all circumstances + match_always = common.MATCH_ALWAYS_TAG if match_always else None + + # Iterate over our loaded plugins + for entry in self.servers: + + if isinstance(entry, (ConfigBase, AppriseConfig)): + # load our servers + servers = entry.servers() + + else: + servers = [entry, ] + + for server in servers: + # Apply our tag matching based on our defined logic + if is_exclusive_match( + logic=tag, data=server.tags, + match_all=common.MATCH_ALL_TAG, + match_always=match_always): + yield server + return + + def notify(self, body, title='', notify_type=common.NotifyType.INFO, + body_format=None, tag=common.MATCH_ALL_TAG, match_always=True, + attach=None, interpret_escapes=None): + """ + Send a notification to all the plugins previously loaded. + + If the body_format specified is NotifyFormat.MARKDOWN, it will + be converted to HTML if the Notification type expects this. + + if the tag is specified (either a string or a set/list/tuple + of strings), then only the notifications flagged with that + tagged value are notified. By default, all added services + are notified (tag=MATCH_ALL_TAG) + + This function returns True if all notifications were successfully + sent, False if even just one of them fails, and None if no + notifications were sent at all as a result of tag filtering and/or + simply having empty configuration files that were read. + + Attach can contain a list of attachment URLs. attach can also be + represented by an AttachBase() (or list of) object(s). This + identifies the products you wish to notify + + Set interpret_escapes to True if you want to pre-escape a string + such as turning a \n into an actual new line, etc. + """ + + try: + # Process arguments and build synchronous and asynchronous calls + # (this step can throw internal errors). + sequential_calls, parallel_calls = self._create_notify_calls( + body, title, + notify_type=notify_type, body_format=body_format, + tag=tag, match_always=match_always, attach=attach, + interpret_escapes=interpret_escapes, + ) + + except TypeError: + # No notifications sent, and there was an internal error. + return False + + if not sequential_calls and not parallel_calls: + # Nothing to send + return None + + sequential_result = Apprise._notify_sequential(*sequential_calls) + parallel_result = Apprise._notify_parallel_threadpool(*parallel_calls) + return sequential_result and parallel_result + + async def async_notify(self, *args, **kwargs): + """ + Send a notification to all the plugins previously loaded, for + asynchronous callers. + + The arguments are identical to those of Apprise.notify(). + + """ + try: + # Process arguments and build synchronous and asynchronous calls + # (this step can throw internal errors). + sequential_calls, parallel_calls = self._create_notify_calls( + *args, **kwargs) + + except TypeError: + # No notifications sent, and there was an internal error. + return False + + if not sequential_calls and not parallel_calls: + # Nothing to send + return None + + sequential_result = Apprise._notify_sequential(*sequential_calls) + parallel_result = \ + await Apprise._notify_parallel_asyncio(*parallel_calls) + return sequential_result and parallel_result + + def _create_notify_calls(self, *args, **kwargs): + """ + Creates notifications for all the plugins loaded. + + Returns a list of (server, notify() kwargs) tuples for plugins with + parallelism disabled and another list for plugins with parallelism + enabled. + """ + + all_calls = list(self._create_notify_gen(*args, **kwargs)) + + # Split into sequential and parallel notify() calls. + sequential, parallel = [], [] + for (server, notify_kwargs) in all_calls: + if server.asset.async_mode: + parallel.append((server, notify_kwargs)) + else: + sequential.append((server, notify_kwargs)) + + return sequential, parallel + + def _create_notify_gen(self, body, title='', + notify_type=common.NotifyType.INFO, + body_format=None, tag=common.MATCH_ALL_TAG, + match_always=True, attach=None, + interpret_escapes=None): + """ + Internal generator function for _create_notify_calls(). + """ + + if len(self) == 0: + # Nothing to notify + msg = "There are no service(s) to notify" + logger.error(msg) + raise TypeError(msg) + + if not (title or body or attach): + msg = "No message content specified to deliver" + logger.error(msg) + raise TypeError(msg) + + try: + if title and isinstance(title, bytes): + title = title.decode(self.asset.encoding) + + if body and isinstance(body, bytes): + body = body.decode(self.asset.encoding) + + except UnicodeDecodeError: + msg = 'The content passed into Apprise was not of encoding ' \ + 'type: {}'.format(self.asset.encoding) + logger.error(msg) + raise TypeError(msg) + + # Tracks conversions + conversion_body_map = dict() + conversion_title_map = dict() + + # Prepare attachments if required + if attach is not None and not isinstance(attach, AppriseAttachment): + attach = AppriseAttachment( + attach, asset=self.asset, location=self.location) + + # Allow Asset default value + body_format = self.asset.body_format \ + if body_format is None else body_format + + # Allow Asset default value + interpret_escapes = self.asset.interpret_escapes \ + if interpret_escapes is None else interpret_escapes + + # Iterate over our loaded plugins + for server in self.find(tag, match_always=match_always): + # If our code reaches here, we either did not define a tag (it + # was set to None), or we did define a tag and the logic above + # determined we need to notify the service it's associated with + + # First we need to generate a key we will use to determine if we + # need to build our data out. Entries without are merged with + # the body at this stage. + key = server.notify_format if server.title_maxlen > 0\ + else f'_{server.notify_format}' + + if server.interpret_emojis: + # alter our key slightly to handle emojis since their value is + # pulled out of the notification + key += "-emojis" + + if key not in conversion_title_map: + + # Prepare our title + conversion_title_map[key] = '' if not title else title + + # Conversion of title only occurs for services where the title + # is blended with the body (title_maxlen <= 0) + if conversion_title_map[key] and server.title_maxlen <= 0: + conversion_title_map[key] = convert_between( + body_format, server.notify_format, + content=conversion_title_map[key]) + + # Our body is always converted no matter what + conversion_body_map[key] = \ + convert_between( + body_format, server.notify_format, content=body) + + if interpret_escapes: + # + # Escape our content + # + + try: + # Added overhead required due to Python 3 Encoding Bug + # identified here: https://bugs.python.org/issue21331 + conversion_body_map[key] = \ + conversion_body_map[key]\ + .encode('ascii', 'backslashreplace')\ + .decode('unicode-escape') + + conversion_title_map[key] = \ + conversion_title_map[key]\ + .encode('ascii', 'backslashreplace')\ + .decode('unicode-escape') + + except AttributeError: + # Must be of string type + msg = 'Failed to escape message body' + logger.error(msg) + raise TypeError(msg) + + if server.interpret_emojis: + # + # Convert our :emoji: definitions + # + + conversion_body_map[key] = \ + apply_emojis(conversion_body_map[key]) + conversion_title_map[key] = \ + apply_emojis(conversion_title_map[key]) + + kwargs = dict( + body=conversion_body_map[key], + title=conversion_title_map[key], + notify_type=notify_type, + attach=attach, + body_format=body_format + ) + yield (server, kwargs) + + @staticmethod + def _notify_sequential(*servers_kwargs): + """ + Process a list of notify() calls sequentially and synchronously. + """ + + success = True + + for (server, kwargs) in servers_kwargs: + try: + # Send notification + result = server.notify(**kwargs) + success = success and result + + except TypeError: + # These are our internally thrown notifications. + success = False + + except Exception: + # A catch all so we don't have to abort early + # just because one of our plugins has a bug in it. + logger.exception("Unhandled Notification Exception") + success = False + + return success + + @staticmethod + def _notify_parallel_threadpool(*servers_kwargs): + """ + Process a list of notify() calls in parallel and synchronously. + """ + + n_calls = len(servers_kwargs) + + # 0-length case + if n_calls == 0: + return True + + # There's no need to use a thread pool for just a single notification + if n_calls == 1: + return Apprise._notify_sequential(servers_kwargs[0]) + + # Create log entry + logger.info( + 'Notifying %d service(s) with threads.', len(servers_kwargs)) + + with cf.ThreadPoolExecutor() as executor: + success = True + futures = [executor.submit(server.notify, **kwargs) + for (server, kwargs) in servers_kwargs] + + for future in cf.as_completed(futures): + try: + result = future.result() + success = success and result + + except TypeError: + # These are our internally thrown notifications. + success = False + + except Exception: + # A catch all so we don't have to abort early + # just because one of our plugins has a bug in it. + logger.exception("Unhandled Notification Exception") + success = False + + return success + + @staticmethod + async def _notify_parallel_asyncio(*servers_kwargs): + """ + Process a list of async_notify() calls in parallel and asynchronously. + """ + + n_calls = len(servers_kwargs) + + # 0-length case + if n_calls == 0: + return True + + # (Unlike with the thread pool, we don't optimize for the single- + # notification case because asyncio can do useful work while waiting + # for that thread to complete) + + # Create log entry + logger.info( + 'Notifying %d service(s) asynchronously.', len(servers_kwargs)) + + async def do_call(server, kwargs): + return await server.async_notify(**kwargs) + + cors = (do_call(server, kwargs) for (server, kwargs) in servers_kwargs) + results = await asyncio.gather(*cors, return_exceptions=True) + + if any(isinstance(status, Exception) + and not isinstance(status, TypeError) for status in results): + # A catch all so we don't have to abort early just because + # one of our plugins has a bug in it. + logger.exception("Unhandled Notification Exception") + return False + + if any(isinstance(status, TypeError) for status in results): + # These are our internally thrown notifications. + return False + + return all(results) + + def details(self, lang=None, show_requirements=False, show_disabled=False): + """ + Returns the details associated with the Apprise object + + """ + + # general object returned + response = { + # Defines the current version of Apprise + 'version': __version__, + # Lists all of the currently supported Notifications + 'schemas': [], + # Includes the configured asset details + 'asset': self.asset.details(), + } + + for plugin in N_MGR.plugins(): + # Iterate over our hashed plugins and dynamically build details on + # their status: + + content = { + 'service_name': getattr(plugin, 'service_name', None), + 'service_url': getattr(plugin, 'service_url', None), + 'setup_url': getattr(plugin, 'setup_url', None), + # Placeholder - populated below + 'details': None, + + # Let upstream service know of the plugins that support + # attachments + 'attachment_support': getattr( + plugin, 'attachment_support', False), + + # Differentiat between what is a custom loaded plugin and + # which is native. + 'category': getattr(plugin, 'category', None) + } + + # Standard protocol(s) should be None or a tuple + enabled = getattr(plugin, 'enabled', True) + if not show_disabled and not enabled: + # Do not show inactive plugins + continue + + elif show_disabled: + # Add current state to response + content['enabled'] = enabled + + # Standard protocol(s) should be None or a tuple + protocols = getattr(plugin, 'protocol', None) + if isinstance(protocols, str): + protocols = (protocols, ) + + # Secure protocol(s) should be None or a tuple + secure_protocols = getattr(plugin, 'secure_protocol', None) + if isinstance(secure_protocols, str): + secure_protocols = (secure_protocols, ) + + # Add our protocol details to our content + content.update({ + 'protocols': protocols, + 'secure_protocols': secure_protocols, + }) + + if not lang: + # Simply return our results + content['details'] = plugins.details(plugin) + if show_requirements: + content['requirements'] = plugins.requirements(plugin) + + else: + # Emulate the specified language when returning our results + with self.locale.lang_at(lang): + content['details'] = plugins.details(plugin) + if show_requirements: + content['requirements'] = plugins.requirements(plugin) + + # Build our response object + response['schemas'].append(content) + + return response + + def urls(self, privacy=False): + """ + Returns all of the loaded URLs defined in this apprise object. + """ + return [x.url(privacy=privacy) for x in self.servers] + + def pop(self, index): + """ + Removes an indexed Notification Service from the stack and returns it. + + The thing is we can never pop AppriseConfig() entries, only what was + loaded within them. So pop needs to carefully iterate over our list + and only track actual entries. + """ + + # Tracking variables + prev_offset = -1 + offset = prev_offset + + for idx, s in enumerate(self.servers): + if isinstance(s, (ConfigBase, AppriseConfig)): + servers = s.servers() + if len(servers) > 0: + # Acquire a new maximum offset to work with + offset = prev_offset + len(servers) + + if offset >= index: + # we can pop an element from our config stack + fn = s.pop if isinstance(s, ConfigBase) \ + else s.server_pop + + return fn(index if prev_offset == -1 + else (index - prev_offset - 1)) + + else: + offset = prev_offset + 1 + if offset == index: + return self.servers.pop(idx) + + # Update our old offset + prev_offset = offset + + # If we reach here, then we indexed out of range + raise IndexError('list index out of range') + + def __getitem__(self, index): + """ + Returns the indexed server entry of a loaded notification server + """ + # Tracking variables + prev_offset = -1 + offset = prev_offset + + for idx, s in enumerate(self.servers): + if isinstance(s, (ConfigBase, AppriseConfig)): + # Get our list of servers associate with our config object + servers = s.servers() + if len(servers) > 0: + # Acquire a new maximum offset to work with + offset = prev_offset + len(servers) + + if offset >= index: + return servers[index if prev_offset == -1 + else (index - prev_offset - 1)] + + else: + offset = prev_offset + 1 + if offset == index: + return self.servers[idx] + + # Update our old offset + prev_offset = offset + + # If we reach here, then we indexed out of range + raise IndexError('list index out of range') + + def __getstate__(self): + """ + Pickle Support dumps() + """ + attributes = { + 'asset': self.asset, + # Prepare our URL list as we need to extract the associated tags + # and asset details associated with it + 'urls': [{ + 'url': server.url(privacy=False), + 'tag': server.tags if server.tags else None, + 'asset': server.asset} for server in self.servers], + 'locale': self.locale, + 'debug': self.debug, + 'location': self.location, + } + + return attributes + + def __setstate__(self, state): + """ + Pickle Support loads() + """ + self.servers = list() + self.asset = state['asset'] + self.locale = state['locale'] + self.location = state['location'] + for entry in state['urls']: + self.add(entry['url'], asset=entry['asset'], tag=entry['tag']) + + def __bool__(self): + """ + Allows the Apprise object to be wrapped in an 'if statement'. + True is returned if at least one service has been loaded. + """ + return len(self) > 0 + + def __iter__(self): + """ + Returns an iterator to each of our servers loaded. This includes those + found inside configuration. + """ + return chain(*[[s] if not isinstance(s, (ConfigBase, AppriseConfig)) + else iter(s.servers()) for s in self.servers]) + + def __len__(self): + """ + Returns the number of servers loaded; this includes those found within + loaded configuration. This funtion nnever actually counts the + Config entry themselves (if they exist), only what they contain. + """ + return sum([1 if not isinstance(s, (ConfigBase, AppriseConfig)) + else len(s.servers()) for s in self.servers]) diff --git a/libs/apprise/apprise.pyi b/libs/apprise/apprise.pyi new file mode 100644 index 000000000..5a34c9c65 --- /dev/null +++ b/libs/apprise/apprise.pyi @@ -0,0 +1,62 @@ +from typing import Any, Dict, List, Iterable, Iterator, Optional + +from . import (AppriseAsset, AppriseAttachment, AppriseConfig, ConfigBase, + NotifyBase, NotifyFormat, NotifyType) +from .common import ContentLocation + +_Server = Union[str, ConfigBase, NotifyBase, AppriseConfig] +_Servers = Union[_Server, Dict[Any, _Server], Iterable[_Server]] +# Can't define this recursively as mypy doesn't support recursive types: +# https://github.com/python/mypy/issues/731 +_Tag = Union[str, Iterable[Union[str, Iterable[str]]]] + +class Apprise: + def __init__( + self, + servers: _Servers = ..., + asset: Optional[AppriseAsset] = ..., + location: Optional[ContentLocation] = ..., + debug: bool = ... + ) -> None: ... + @staticmethod + def instantiate( + url: Union[str, Dict[str, NotifyBase]], + asset: Optional[AppriseAsset] = ..., + tag: Optional[_Tag] = ..., + suppress_exceptions: bool = ... + ) -> NotifyBase: ... + def add( + self, + servers: _Servers = ..., + asset: Optional[AppriseAsset] = ..., + tag: Optional[_Tag] = ... + ) -> bool: ... + def clear(self) -> None: ... + def find(self, tag: str = ...) -> Iterator[Apprise]: ... + def notify( + self, + body: str, + title: str = ..., + notify_type: NotifyType = ..., + body_format: NotifyFormat = ..., + tag: _Tag = ..., + attach: Optional[AppriseAttachment] = ..., + interpret_escapes: Optional[bool] = ... + ) -> bool: ... + async def async_notify( + self, + body: str, + title: str = ..., + notify_type: NotifyType = ..., + body_format: NotifyFormat = ..., + tag: _Tag = ..., + attach: Optional[AppriseAttachment] = ..., + interpret_escapes: Optional[bool] = ... + ) -> bool: ... + def details(self, lang: Optional[str] = ...) -> Dict[str, Any]: ... + def urls(self, privacy: bool = ...) -> Iterable[str]: ... + def pop(self, index: int) -> ConfigBase: ... + def __getitem__(self, index: int) -> ConfigBase: ... + def __bool__(self) -> bool: ... + def __iter__(self) -> Iterator[ConfigBase]: ... + def __len__(self) -> int: ... \ No newline at end of file