Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request from GHSA-x345-32rc-8h85
Browse files Browse the repository at this point in the history
* tests for push rule pattern matching

* tests for acl pattern matching

* factor out common `re.escape`

* Factor out common re.compile

* Factor out common anchoring code

* add word_boundary support to `glob_to_regex`

* Use `glob_to_regex` in push rule evaluator

NB that this drops support for character classes. I don't think anyone ever
used them.

* Improve efficiency of globs with multiple wildcards

The idea here is that we compress multiple `*` globs into a single `.*`. We
also need to consider `?`, since `*?*` is as hard to implement efficiently as
`**`.

* add assertion on regex pattern

* Fix mypy

* Simplify glob_to_regex

* Inline the glob_to_regex helper function

Signed-off-by: Dan Callahan <danc@element.io>

* Moar comments

Signed-off-by: Dan Callahan <danc@element.io>

Co-authored-by: Dan Callahan <danc@element.io>
  • Loading branch information
richvdh and callahad committed May 11, 2021
1 parent 4df26ab commit 03318a7
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 68 deletions.
4 changes: 2 additions & 2 deletions synapse/config/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import warnings
from datetime import datetime
from hashlib import sha256
from typing import List, Optional
from typing import List, Optional, Pattern

from unpaddedbase64 import encode_base64

Expand Down Expand Up @@ -124,7 +124,7 @@ def read_config(self, config: dict, config_dir_path: str, **kwargs):
fed_whitelist_entries = []

# Support globs (*) in whitelist values
self.federation_certificate_verification_whitelist = [] # type: List[str]
self.federation_certificate_verification_whitelist = [] # type: List[Pattern]
for entry in fed_whitelist_entries:
try:
entry_regex = glob_to_regex(entry.encode("ascii").decode("ascii"))
Expand Down
55 changes: 3 additions & 52 deletions synapse/push/push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from synapse.events import EventBase
from synapse.types import UserID
from synapse.util import glob_to_regex, re_word_boundary
from synapse.util.caches.lrucache import LruCache

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -183,7 +184,7 @@ def _contains_display_name(self, display_name: str) -> bool:
r = regex_cache.get((display_name, False, True), None)
if not r:
r1 = re.escape(display_name)
r1 = _re_word_boundary(r1)
r1 = re_word_boundary(r1)
r = re.compile(r1, flags=re.IGNORECASE)
regex_cache[(display_name, False, True)] = r

Expand Down Expand Up @@ -212,64 +213,14 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
try:
r = regex_cache.get((glob, True, word_boundary), None)
if not r:
r = _glob_to_re(glob, word_boundary)
r = glob_to_regex(glob, word_boundary)
regex_cache[(glob, True, word_boundary)] = r
return bool(r.search(value))
except re.error:
logger.warning("Failed to parse glob to regex: %r", glob)
return False


def _glob_to_re(glob: str, word_boundary: bool) -> Pattern:
"""Generates regex for a given glob.
Args:
glob
word_boundary: Whether to match against word boundaries or entire string.
"""
if IS_GLOB.search(glob):
r = re.escape(glob)

r = r.replace(r"\*", ".*?")
r = r.replace(r"\?", ".")

# handle [abc], [a-z] and [!a-z] style ranges.
r = GLOB_REGEX.sub(
lambda x: (
"[%s%s]" % (x.group(1) and "^" or "", x.group(2).replace(r"\\\-", "-"))
),
r,
)
if word_boundary:
r = _re_word_boundary(r)

return re.compile(r, flags=re.IGNORECASE)
else:
r = "^" + r + "$"

return re.compile(r, flags=re.IGNORECASE)
elif word_boundary:
r = re.escape(glob)
r = _re_word_boundary(r)

return re.compile(r, flags=re.IGNORECASE)
else:
r = "^" + re.escape(glob) + "$"
return re.compile(r, flags=re.IGNORECASE)


def _re_word_boundary(r: str) -> str:
"""
Adds word boundary characters to the start and end of an
expression to require that the match occur as a whole word,
but do so respecting the fact that strings starting or ending
with non-word characters will change word boundaries.
"""
# we can't use \b as it chokes on unicode. however \W seems to be okay
# as shorthand for [^0-9A-Za-z_].
return r"(^|\W)%s(\W|$)" % (r,)


def _flatten_dict(
d: Union[EventBase, dict],
prefix: Optional[List[str]] = None,
Expand Down
61 changes: 47 additions & 14 deletions synapse/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import json
import logging
import re
from typing import Pattern

import attr
from frozendict import frozendict
Expand All @@ -26,6 +27,9 @@
logger = logging.getLogger(__name__)


_WILDCARD_RUN = re.compile(r"([\?\*]+)")


def _reject_invalid_json(val):
"""Do not allow Infinity, -Infinity, or NaN values in JSON."""
raise ValueError("Invalid JSON value: '%s'" % val)
Expand Down Expand Up @@ -158,25 +162,54 @@ def log_failure(failure, msg, consumeErrors=True):
return failure


def glob_to_regex(glob):
def glob_to_regex(glob: str, word_boundary: bool = False) -> Pattern:
"""Converts a glob to a compiled regex object.
The regex is anchored at the beginning and end of the string.
Args:
glob (str)
glob: pattern to match
word_boundary: If True, the pattern will be allowed to match at word boundaries
anywhere in the string. Otherwise, the pattern is anchored at the start and
end of the string.
Returns:
re.RegexObject
compiled regex pattern
"""
res = ""
for c in glob:
if c == "*":
res = res + ".*"
elif c == "?":
res = res + "."

# Patterns with wildcards must be simplified to avoid performance cliffs
# - The glob `?**?**?` is equivalent to the glob `???*`
# - The glob `???*` is equivalent to the regex `.{3,}`
chunks = []
for chunk in _WILDCARD_RUN.split(glob):
# No wildcards? re.escape()
if not _WILDCARD_RUN.match(chunk):
chunks.append(re.escape(chunk))
continue

# Wildcards? Simplify.
qmarks = chunk.count("?")
if "*" in chunk:
chunks.append(".{%d,}" % qmarks)
else:
res = res + re.escape(c)
chunks.append(".{%d}" % qmarks)

res = "".join(chunks)

# \A anchors at start of string, \Z at end of string
return re.compile(r"\A" + res + r"\Z", re.IGNORECASE)
if word_boundary:
res = re_word_boundary(res)
else:
# \A anchors at start of string, \Z at end of string
res = r"\A" + res + r"\Z"

return re.compile(res, re.IGNORECASE)


def re_word_boundary(r: str) -> str:
"""
Adds word boundary characters to the start and end of an
expression to require that the match occur as a whole word,
but do so respecting the fact that strings starting or ending
with non-word characters will change word boundaries.
"""
# we can't use \b as it chokes on unicode. however \W seems to be okay
# as shorthand for [^0-9A-Za-z_].
return r"(^|\W)%s(\W|$)" % (r,)
19 changes: 19 additions & 0 deletions tests/federation/test_federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,25 @@ def test_block_ip_literals(self):
self.assertFalse(server_matches_acl_event("[1:2::]", e))
self.assertTrue(server_matches_acl_event("1:2:3:4", e))

def test_wildcard_matching(self):
e = _create_acl_event({"allow": ["good*.com"]})
self.assertTrue(
server_matches_acl_event("good.com", e),
"* matches 0 characters",
)
self.assertTrue(
server_matches_acl_event("GOOD.COM", e),
"pattern is case-insensitive",
)
self.assertTrue(
server_matches_acl_event("good.aa.com", e),
"* matches several characters, including '.'",
)
self.assertFalse(
server_matches_acl_event("ishgood.com", e),
"pattern does not allow prefixes",
)


class StateQueryTests(unittest.FederatingHomeserverTestCase):

Expand Down
166 changes: 166 additions & 0 deletions tests/push/test_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict

from synapse.api.room_versions import RoomVersions
from synapse.events import FrozenEvent
from synapse.push import push_rule_evaluator
Expand Down Expand Up @@ -66,6 +68,170 @@ def test_display_name(self):
# A display name with spaces should work fine.
self.assertTrue(evaluator.matches(condition, "@user:test", "foo bar"))

def _assert_matches(
self, condition: Dict[str, Any], content: Dict[str, Any], msg=None
) -> None:
evaluator = self._get_evaluator(content)
self.assertTrue(evaluator.matches(condition, "@user:test", "display_name"), msg)

def _assert_not_matches(
self, condition: Dict[str, Any], content: Dict[str, Any], msg=None
) -> None:
evaluator = self._get_evaluator(content)
self.assertFalse(
evaluator.matches(condition, "@user:test", "display_name"), msg
)

def test_event_match_body(self):
"""Check that event_match conditions on content.body work as expected"""

# if the key is `content.body`, the pattern matches substrings.

# non-wildcards should match
condition = {
"kind": "event_match",
"key": "content.body",
"pattern": "foobaz",
}
self._assert_matches(
condition,
{"body": "aaa FoobaZ zzz"},
"patterns should match and be case-insensitive",
)
self._assert_not_matches(
condition,
{"body": "aa xFoobaZ yy"},
"pattern should only match at word boundaries",
)
self._assert_not_matches(
condition,
{"body": "aa foobazx yy"},
"pattern should only match at word boundaries",
)

# wildcards should match
condition = {
"kind": "event_match",
"key": "content.body",
"pattern": "f?o*baz",
}

self._assert_matches(
condition,
{"body": "aaa FoobarbaZ zzz"},
"* should match string and pattern should be case-insensitive",
)
self._assert_matches(
condition, {"body": "aa foobaz yy"}, "* should match 0 characters"
)
self._assert_not_matches(
condition, {"body": "aa fobbaz yy"}, "? should not match 0 characters"
)
self._assert_not_matches(
condition, {"body": "aa fiiobaz yy"}, "? should not match 2 characters"
)
self._assert_not_matches(
condition,
{"body": "aa xfooxbaz yy"},
"pattern should only match at word boundaries",
)
self._assert_not_matches(
condition,
{"body": "aa fooxbazx yy"},
"pattern should only match at word boundaries",
)

# test backslashes
condition = {
"kind": "event_match",
"key": "content.body",
"pattern": r"f\oobaz",
}
self._assert_matches(
condition,
{"body": r"F\oobaz"},
"backslash should match itself",
)
condition = {
"kind": "event_match",
"key": "content.body",
"pattern": r"f\?obaz",
}
self._assert_matches(
condition,
{"body": r"F\oobaz"},
r"? after \ should match any character",
)

def test_event_match_non_body(self):
"""Check that event_match conditions on other keys work as expected"""

# if the key is anything other than 'content.body', the pattern must match the
# whole value.

# non-wildcards should match
condition = {
"kind": "event_match",
"key": "content.value",
"pattern": "foobaz",
}
self._assert_matches(
condition,
{"value": "FoobaZ"},
"patterns should match and be case-insensitive",
)
self._assert_not_matches(
condition,
{"value": "xFoobaZ"},
"pattern should only match at the start/end of the value",
)
self._assert_not_matches(
condition,
{"value": "FoobaZz"},
"pattern should only match at the start/end of the value",
)

# wildcards should match
condition = {
"kind": "event_match",
"key": "content.value",
"pattern": "f?o*baz",
}
self._assert_matches(
condition,
{"value": "FoobarbaZ"},
"* should match string and pattern should be case-insensitive",
)
self._assert_matches(
condition, {"value": "foobaz"}, "* should match 0 characters"
)
self._assert_not_matches(
condition, {"value": "fobbaz"}, "? should not match 0 characters"
)
self._assert_not_matches(
condition, {"value": "fiiobaz"}, "? should not match 2 characters"
)
self._assert_not_matches(
condition,
{"value": "xfooxbaz"},
"pattern should only match at the start/end of the value",
)
self._assert_not_matches(
condition,
{"value": "fooxbazx"},
"pattern should only match at the start/end of the value",
)
self._assert_not_matches(
condition,
{"value": "x\nfooxbaz"},
"pattern should not match after a newline",
)
self._assert_not_matches(
condition,
{"value": "fooxbaz\nx"},
"pattern should not match before a newline",
)

def test_no_body(self):
"""Not having a body shouldn't break the evaluator."""
evaluator = self._get_evaluator({})
Expand Down
Loading

0 comments on commit 03318a7

Please sign in to comment.