Skip to content

Commit

Permalink
Updated apprise to the latest version. #1834
Browse files Browse the repository at this point in the history
  • Loading branch information
morpheus65535 committed May 8, 2022
1 parent fcd67c1 commit 1dff555
Show file tree
Hide file tree
Showing 34 changed files with 5,058 additions and 1,345 deletions.
212 changes: 112 additions & 100 deletions libs/apprise/Apprise.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import re
import os
import six
from markdown import markdown
from itertools import chain
from .common import NotifyType
from .common import NotifyFormat
from .common import MATCH_ALL_TAG
from .common import MATCH_ALWAYS_TAG
from .conversion import convert_between
from .utils import is_exclusive_match
from .utils import parse_list
from .utils import parse_urls
Expand All @@ -44,6 +43,7 @@
from .config.ConfigBase import ConfigBase
from .plugins.NotifyBase import NotifyBase


from . import plugins
from . import __version__

Expand Down Expand Up @@ -305,7 +305,7 @@ def clear(self):
"""
self.servers[:] = []

def find(self, tag=MATCH_ALL_TAG):
def find(self, tag=MATCH_ALL_TAG, match_always=True):
"""
Returns an list of all servers matching against the tag specified.
Expand All @@ -321,6 +321,10 @@ def find(self, tag=MATCH_ALL_TAG):
# tag=[('tagA', 'tagC'), 'tagB'] = (tagA and tagC) or tagB
# tag=[('tagB', 'tagC')] = tagB and tagC

# A match_always flag allows us to pick up on our 'any' keyword
# and notify these services under all circumstances
match_always = MATCH_ALWAYS_TAG if match_always else None

# Iterate over our loaded plugins
for entry in self.servers:

Expand All @@ -334,13 +338,14 @@ def find(self, tag=MATCH_ALL_TAG):
for server in servers:
# Apply our tag matching based on our defined logic
if is_exclusive_match(
logic=tag, data=server.tags, match_all=MATCH_ALL_TAG):
logic=tag, data=server.tags, match_all=MATCH_ALL_TAG,
match_always=match_always):
yield server
return

def notify(self, body, title='', notify_type=NotifyType.INFO,
body_format=None, tag=MATCH_ALL_TAG, attach=None,
interpret_escapes=None):
body_format=None, tag=MATCH_ALL_TAG, match_always=True,
attach=None, interpret_escapes=None):
"""
Send a notification to all of the plugins previously loaded.
Expand Down Expand Up @@ -370,7 +375,7 @@ def notify(self, body, title='', notify_type=NotifyType.INFO,
self.async_notify(
body, title,
notify_type=notify_type, body_format=body_format,
tag=tag, attach=attach,
tag=tag, match_always=match_always, attach=attach,
interpret_escapes=interpret_escapes,
),
debug=self.debug
Expand Down Expand Up @@ -468,8 +473,8 @@ def _notifyhandlerasync(server, **kwargs):
return py3compat.asyncio.toasyncwrap(status)

def _notifyall(self, handler, body, title='', notify_type=NotifyType.INFO,
body_format=None, tag=MATCH_ALL_TAG, attach=None,
interpret_escapes=None):
body_format=None, tag=MATCH_ALL_TAG, match_always=True,
attach=None, interpret_escapes=None):
"""
Creates notifications for all of the plugins loaded.
Expand All @@ -480,22 +485,43 @@ def _notifyall(self, handler, body, title='', notify_type=NotifyType.INFO,

if len(self) == 0:
# Nothing to notify
raise TypeError("No service(s) to notify")
msg = "There are service(s) to notify"
logger.error(msg)
raise TypeError(msg)

if not (title or body):
raise TypeError("No message content specified to deliver")

if six.PY2:
# Python 2.7.x Unicode Character Handling
# Ensure we're working with utf-8
if isinstance(title, unicode): # noqa: F821
title = title.encode('utf-8')
msg = "No message content specified to deliver"
logger.error(msg)
raise TypeError(msg)

if isinstance(body, unicode): # noqa: F821
body = body.encode('utf-8')
try:
if six.PY2:
# Python 2.7 encoding support isn't the greatest, so we try
# to ensure that we're ALWAYS dealing with unicode characters
# prior to entrying the next part. This is especially required
# for Markdown support
if title and isinstance(title, str): # noqa: F821
title = title.decode(self.asset.encoding)

if body and isinstance(body, str): # noqa: F821
body = body.decode(self.asset.encoding)

else: # Python 3+
if title and isinstance(title, bytes): # noqa: F821
title = title.decode(self.asset.encoding)

if body and isinstance(body, bytes): # noqa: F821
body = body.decode(self.asset.encoding)

except UnicodeDecodeError:
msg = 'The content passed into Apprise was not of encoding ' \
'type: {}'.format(self.asset.encoding)
logger.error(msg)
raise TypeError(msg)

# Tracks conversions
conversion_map = dict()
conversion_body_map = dict()
conversion_title_map = dict()

# Prepare attachments if required
if attach is not None and not isinstance(attach, AppriseAttachment):
Expand All @@ -511,106 +537,92 @@ def _notifyall(self, handler, body, title='', notify_type=NotifyType.INFO,
if interpret_escapes is None else interpret_escapes

# Iterate over our loaded plugins
for server in self.find(tag):
for server in self.find(tag, match_always=match_always):
# If our code reaches here, we either did not define a tag (it
# was set to None), or we did define a tag and the logic above
# determined we need to notify the service it's associated with
if server.notify_format not in conversion_map:
if body_format == NotifyFormat.MARKDOWN and \
server.notify_format == NotifyFormat.HTML:

# Apply Markdown
conversion_map[server.notify_format] = markdown(body)

elif body_format == NotifyFormat.TEXT and \
server.notify_format == NotifyFormat.HTML:

# Basic TEXT to HTML format map; supports keys only
re_map = {
# Support Ampersand
r'&': '&',

# Spaces to   for formatting purposes since
# multiple spaces are treated as one an this may
# not be the callers intention
r' ': ' ',

# Tab support
r'\t': '   ',

# Greater than and Less than Characters
r'>': '>',
r'<': '&lt;',
}

# Compile our map
re_table = re.compile(
r'(' + '|'.join(
map(re.escape, re_map.keys())) + r')',
re.IGNORECASE,
)

# Execute our map against our body in addition to
# swapping out new lines and replacing them with <br/>
conversion_map[server.notify_format] = \
re.sub(r'\r*\n', '<br/>\r\n',
re_table.sub(
lambda x: re_map[x.group()], body))
if server.notify_format not in conversion_body_map:
# Perform Conversion
conversion_body_map[server.notify_format] = \
convert_between(
body_format, server.notify_format, content=body)

# Prepare our title
conversion_title_map[server.notify_format] = \
'' if not title else title

# Tidy Title IF required (hence it will become part of the
# body)
if server.title_maxlen <= 0 and \
conversion_title_map[server.notify_format]:

conversion_title_map[server.notify_format] = \
convert_between(
body_format, server.notify_format,
content=conversion_title_map[server.notify_format])

if interpret_escapes:
#
# Escape our content
#

else:
# Store entry directly
conversion_map[server.notify_format] = body

if interpret_escapes:
#
# Escape our content
#

try:
# Added overhead required due to Python 3 Encoding Bug
# identified here: https://bugs.python.org/issue21331
conversion_map[server.notify_format] = \
conversion_map[server.notify_format]\
.encode('ascii', 'backslashreplace')\
.decode('unicode-escape')

except UnicodeDecodeError: # pragma: no cover
# This occurs using a very old verion of Python 2.7 such
# as the one that ships with CentOS/RedHat 7.x (v2.7.5).
conversion_map[server.notify_format] = \
conversion_map[server.notify_format] \
.decode('string_escape')

except AttributeError:
# Must be of string type
logger.error('Failed to escape message body')
raise TypeError

if title:
try:
# Added overhead required due to Python 3 Encoding Bug
# identified here: https://bugs.python.org/issue21331
title = title\
conversion_body_map[server.notify_format] = \
conversion_body_map[server.notify_format]\
.encode('ascii', 'backslashreplace')\
.decode('unicode-escape')

conversion_title_map[server.notify_format] = \
conversion_title_map[server.notify_format]\
.encode('ascii', 'backslashreplace')\
.decode('unicode-escape')

except UnicodeDecodeError: # pragma: no cover
# This occurs using a very old verion of Python 2.7
# such as the one that ships with CentOS/RedHat 7.x
# (v2.7.5).
title = title.decode('string_escape')
conversion_body_map[server.notify_format] = \
conversion_body_map[server.notify_format] \
.decode('string_escape')

conversion_title_map[server.notify_format] = \
conversion_title_map[server.notify_format] \
.decode('string_escape')

except AttributeError:
# Must be of string type
logger.error('Failed to escape message title')
raise TypeError
msg = 'Failed to escape message body'
logger.error(msg)
raise TypeError(msg)

if six.PY2:
# Python 2.7 strings must be encoded as utf-8 for
# consistency across all platforms
if conversion_body_map[server.notify_format] and \
isinstance(
conversion_body_map[server.notify_format],
unicode): # noqa: F821
conversion_body_map[server.notify_format] = \
conversion_body_map[server.notify_format]\
.encode('utf-8')

if conversion_title_map[server.notify_format] and \
isinstance(
conversion_title_map[server.notify_format],
unicode): # noqa: F821
conversion_title_map[server.notify_format] = \
conversion_title_map[server.notify_format]\
.encode('utf-8')

yield handler(
server,
body=conversion_map[server.notify_format],
title=title,
body=conversion_body_map[server.notify_format],
title=conversion_title_map[server.notify_format],
notify_type=notify_type,
attach=attach
attach=attach,
body_format=body_format,
)

def details(self, lang=None, show_requirements=False, show_disabled=False):
Expand Down
20 changes: 20 additions & 0 deletions libs/apprise/AppriseAsset.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ class AppriseAsset(object):
NotifyType.WARNING: '#CACF29',
}

# Ascii Notification
ascii_notify_map = {
NotifyType.INFO: '[i]',
NotifyType.SUCCESS: '[+]',
NotifyType.FAILURE: '[!]',
NotifyType.WARNING: '[~]',
}

# The default color to return if a mapping isn't found in our table above
default_html_color = '#888888'

Expand Down Expand Up @@ -110,6 +118,9 @@ class AppriseAsset(object):
# to a new line.
interpret_escapes = False

# Defines the encoding of the content passed into Apprise
encoding = 'utf-8'

# For more detail see CWE-312 @
# https://cwe.mitre.org/data/definitions/312.html
#
Expand Down Expand Up @@ -181,6 +192,15 @@ def color(self, notify_type, color_type=None):
raise ValueError(
'AppriseAsset html_color(): An invalid color_type was specified.')

def ascii(self, notify_type):
"""
Returns an ascii representation based on passed in notify type
"""

# look our response up
return self.ascii_notify_map.get(notify_type, self.default_html_color)

def image_url(self, notify_type, image_size, logo=False, extension=None):
"""
Apply our mask to our image URL
Expand Down
14 changes: 12 additions & 2 deletions libs/apprise/AppriseConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from .AppriseAsset import AppriseAsset

from .common import MATCH_ALL_TAG
from .common import MATCH_ALWAYS_TAG
from .utils import GET_SCHEMA_RE
from .utils import parse_list
from .utils import is_exclusive_match
Expand Down Expand Up @@ -266,7 +267,7 @@ def add_config(self, content, asset=None, tag=None, format=None,
# Return our status
return True

def servers(self, tag=MATCH_ALL_TAG, *args, **kwargs):
def servers(self, tag=MATCH_ALL_TAG, match_always=True, *args, **kwargs):
"""
Returns all of our servers dynamically build based on parsed
configuration.
Expand All @@ -277,7 +278,15 @@ def servers(self, tag=MATCH_ALL_TAG, *args, **kwargs):
This is for filtering the configuration files polled for
results.
If the anytag is set, then any notification that is found
set with that tag are included in the response.
"""

# A match_always flag allows us to pick up on our 'any' keyword
# and notify these services under all circumstances
match_always = MATCH_ALWAYS_TAG if match_always else None

# Build our tag setup
# - top level entries are treated as an 'or'
# - second level (or more) entries are treated as 'and'
Expand All @@ -294,7 +303,8 @@ def servers(self, tag=MATCH_ALL_TAG, *args, **kwargs):

# Apply our tag matching based on our defined logic
if is_exclusive_match(
logic=tag, data=entry.tags, match_all=MATCH_ALL_TAG):
logic=tag, data=entry.tags, match_all=MATCH_ALL_TAG,
match_always=match_always):
# Build ourselves a list of services dynamically and return the
# as a list
response.extend(entry.servers())
Expand Down
Loading

0 comments on commit 1dff555

Please sign in to comment.