Skip to content

Commit

Permalink
Feat: Eventbridge v2: Add pattern matching (#10664)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxhoheiser committed May 7, 2024
1 parent 0f7d49f commit 21b57ab
Show file tree
Hide file tree
Showing 14 changed files with 1,301 additions and 1,096 deletions.
3 changes: 0 additions & 3 deletions localstack/services/events/event_ruler.py
Expand Up @@ -3,7 +3,6 @@
from functools import cache
from pathlib import Path

from localstack import config
from localstack.services.events.packages import event_ruler_package
from localstack.services.events.utils import InvalidEventPatternException
from localstack.utils.objects import singleton_factory
Expand Down Expand Up @@ -43,8 +42,6 @@ def matches_rule(event: str, rule: str) -> bool:
There is a single static boolean method Ruler.matchesRule(event, rule) -
both arguments are provided as JSON strings.
"""
if config.EVENT_RULE_ENGINE != "java":
raise NotImplementedError("Set EVENT_RULE_ENGINE=java to enable the Java Event Ruler.")

start_jvm()
import jpype.imports # noqa F401: required for importing Java modules
Expand Down
136 changes: 116 additions & 20 deletions localstack/services/events/provider_v2.py
@@ -1,5 +1,7 @@
import base64
import json
import logging
from datetime import datetime, timezone
from typing import Optional

from localstack.aws.api import RequestContext, handler
Expand All @@ -16,14 +18,18 @@
EventPattern,
EventsApi,
EventSourceName,
InvalidEventPatternException,
LimitMax100,
ListEventBusesResponse,
ListRuleNamesByTargetResponse,
ListRulesResponse,
ListTargetsByRuleResponse,
NextToken,
PutEventsRequestEntry,
PutEventsRequestEntryList,
PutEventsResponse,
PutEventsResultEntry,
PutEventsResultEntryList,
PutPartnerEventsRequestEntryList,
PutPartnerEventsResponse,
PutRuleResponse,
Expand All @@ -43,10 +49,12 @@
TargetId,
TargetIdList,
TargetList,
TestEventPatternResponse,
)
from localstack.aws.api.events import EventBus as ApiTypeEventBus
from localstack.aws.api.events import Rule as ApiTypeRule
from localstack.services.events.event_bus import EventBusService, EventBusServiceDict
from localstack.services.events.event_ruler import matches_rule
from localstack.services.events.models_v2 import (
EventBus,
EventBusDict,
Expand All @@ -59,7 +67,11 @@
)
from localstack.services.events.rule import RuleService, RuleServiceDict
from localstack.services.events.target import TargetSender, TargetSenderDict, TargetSenderFactory
from localstack.services.events.utils import (
InvalidEventPatternException as InternalInvalidEventPatternException,
)
from localstack.services.plugins import ServiceLifecycleHook
from localstack.utils.strings import long_uid

LOG = logging.getLogger(__name__)

Expand All @@ -79,6 +91,57 @@ def get_filtered_dict(name_prefix: str, input_dict: dict) -> dict:
return {name: value for name, value in input_dict.items() if name.startswith(name_prefix)}


def get_event_time(event: PutEventsRequestEntry) -> str:
event_time = datetime.now(timezone.utc)
if event_timestamp := event.get("Time"):
try:
# use time from event if provided
event_time = event_timestamp.replace(tzinfo=timezone.utc)
except ValueError:
# use current time if event time is invalid
LOG.debug(
"Could not parse the `Time` parameter, falling back to current time for the following Event: '%s'",
event,
)
formatted_time_string = event_time.strftime("%Y-%m-%dT%H:%M:%SZ")
return formatted_time_string


def validate_event(event: PutEventsRequestEntry) -> None | PutEventsResultEntry:
if not event.get("Source"):
return {
"ErrorCode": "InvalidArgument",
"ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
}
elif not event.get("DetailType"):
return {
"ErrorCode": "InvalidArgument",
"ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
}
elif not event.get("Detail"):
return {
"ErrorCode": "InvalidArgument",
"ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
}


def format_event(event: PutEventsRequestEntry, region: str, account_id: str) -> dict:
# See https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html
formatted_event = {
"version": "0",
"id": str(long_uid()),
"detail-type": event.get("DetailType"),
"source": event.get("Source"),
"account": account_id,
"time": get_event_time(event),
"region": region,
"resources": event.get("Resources", []),
"detail": json.loads(event.get("Detail", "{}")),
}

return formatted_event


class EventsProvider(EventsApi, ServiceLifecycleHook):
# api methods are grouped by resource type and sorted in hierarchical order
# each group is sorted alphabetically
Expand Down Expand Up @@ -303,6 +366,20 @@ def put_rule(
response = PutRuleResponse(RuleArn=rule_service.arn)
return response

@handler("TestEventPattern")
def test_event_pattern(
self, context: RequestContext, event_pattern: EventPattern, event: str, **kwargs
) -> TestEventPatternResponse:
"""Test event pattern uses EventBridge event pattern matching:
https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html
"""
try:
result = matches_rule(event, event_pattern)
except InternalInvalidEventPatternException as e:
raise InvalidEventPatternException(e.message) from e

return TestEventPatternResponse(Result=result)

#########
# Targets
#########
Expand Down Expand Up @@ -345,7 +422,7 @@ def put_targets(
rule_service = self.get_rule_service(context, rule, event_bus_name)
failed_entries = rule_service.add_targets(targets)
rule_arn = rule_service.arn
for target in targets:
for target in targets: # TODO only add successful targets
self.create_target_sender(target, region, account_id, rule_arn)

response = PutTargetsResponse(
Expand Down Expand Up @@ -384,9 +461,12 @@ def put_events(
endpoint_id: EndpointId = None,
**kwargs,
) -> PutEventsResponse:
failed_entries = self._put_entries(context, entries)
entries, failed_entry_count = self._process_entries(context, entries)

response = PutEventsResponse(FailedEntryCount=len(failed_entries), Entries=failed_entries)
response = PutEventsResponse(
Entries=entries,
FailedEntryCount=failed_entry_count,
)
return response

@handler("PutPartnerEvents")
Expand Down Expand Up @@ -578,25 +658,41 @@ def _delete_target_sender(self, ids: TargetIdList, rule) -> None:
except KeyError:
LOG.error(f"Error deleting target service {target_arn}.")

def _put_entries(self, context: RequestContext, entries: PutEventsRequestEntryList) -> list:
failed_entries = []
def _process_entries(
self, context: RequestContext, entries: PutEventsRequestEntryList
) -> tuple[PutEventsResultEntryList, int]:
processed_entries = []
failed_entry_count = 0
for event in entries:
event_bus_name = event.get("EventBusName", "default")
if event_failed_validation := validate_event(event):
processed_entries.append(event_failed_validation)
failed_entry_count += 1
continue
event = format_event(event, context.region, context.account_id)
store = self.get_store(context)
event_bus = self.get_event_bus(event_bus_name, store)
# TODO add pattern matching
try:
event_bus = self.get_event_bus(event_bus_name, store)
except ResourceNotFoundException:
# ignore events for non-existing event buses but add processed event
processed_entries.append({"EventId": event["id"]})
continue
matching_rules = [rule for rule in event_bus.rules.values()]
for rule in matching_rules:
for target in rule.targets.values():
target_sender = self._target_sender_store[target["Arn"]]
try:
target_sender.send_event(event)
except Exception as error:
failed_entries.append(
{
"Entry": event,
"ErrorCode": "InternalException",
"ErrorMessage": str(error),
}
)
return failed_entries
event_pattern = rule.event_pattern
event_str = json.dumps(event)
if matches_rule(event_str, event_pattern):
for target in rule.targets.values():
target_sender = self._target_sender_store[target["Arn"]]
try:
target_sender.send_event(event)
processed_entries.append({"EventId": event["id"]})
except Exception as error:
processed_entries.append(
{
"ErrorCode": "InternalException",
"ErrorMessage": str(error),
}
)
failed_entry_count += 1
return processed_entries, failed_entry_count
6 changes: 5 additions & 1 deletion localstack/services/events/rule.py
Expand Up @@ -19,7 +19,11 @@
TargetIdList,
TargetList,
)
from localstack.services.events.models_v2 import Rule, TargetDict, ValidationException
from localstack.services.events.models_v2 import (
Rule,
TargetDict,
ValidationException,
)

TARGET_ID_REGEX = re.compile(r"^[\.\-_A-Za-z0-9]+$")
TARGET_ARN_REGEX = re.compile(r"arn:[\d\w:\-/]*")
Expand Down
6 changes: 5 additions & 1 deletion localstack/services/events/target.py
Expand Up @@ -7,6 +7,7 @@

from localstack.aws.api.events import (
Arn,
PutEventsRequestEntry,
Target,
)
from localstack.aws.connect import connect_to
Expand Down Expand Up @@ -53,7 +54,7 @@ def client(self):
return self._client

@abstractmethod
def send_event(self):
def send_event(self, event: PutEventsRequestEntry):
pass

def _validate_input(self, target: Target):
Expand Down Expand Up @@ -83,6 +84,8 @@ def _initialize_client(self) -> BaseClient:

TargetSenderDict = dict[Arn, TargetSender]

# Target Senders are ordered alphabetically by service name


class ApiGatewayTargetSender(TargetSender):
def send_event(self, event):
Expand Down Expand Up @@ -174,6 +177,7 @@ def send_event(self, event):

def _validate_input(self, target: Target):
super()._validate_input(target)
# TODO add validated test to check if RoleArn is mandatory
if not collections.get_safe(target, "$.RoleArn"):
raise ValueError("RoleArn is required for Kinesis target")
if not collections.get_safe(target, "$.KinesisParameters.PartitionKeyPath"):
Expand Down
49 changes: 45 additions & 4 deletions tests/aws/services/events/test_event_patterns.py
@@ -1,5 +1,6 @@
import json
import os
from datetime import datetime
from pathlib import Path
from typing import List, Tuple

Expand All @@ -8,7 +9,6 @@

from localstack.testing.aws.util import is_aws_cloud
from localstack.testing.pytest import markers
from tests.aws.services.events.helper_functions import is_v2_provider

THIS_FOLDER: str = os.path.dirname(os.path.realpath(__file__))
REQUEST_TEMPLATE_DIR = os.path.join(THIS_FOLDER, "event_pattern_templates")
Expand Down Expand Up @@ -80,7 +80,6 @@ def list_files_with_suffix(directory_path: str, suffix: str) -> List[str]:
# TODO: extend these test cases based on the open source docs + tests: https://github.com/aws/event-ruler
# For example, "JSON Array Matching", "And and Or Relationship among fields with Ruler", rule validation,
# and exception handling.
@pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet")
@pytest.mark.parametrize(
"request_template,label", request_template_tuples, ids=[t[1] for t in request_template_tuples]
)
Expand Down Expand Up @@ -118,7 +117,6 @@ def test_test_event_pattern(aws_client, snapshot, request_template, label):
assert response["Result"]


@pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet")
@markers.aws.validated
def test_test_event_pattern_with_multi_key(aws_client):
"""Test the special case of a duplicate JSON key separately because it requires working around the
Expand All @@ -140,7 +138,6 @@ def test_test_event_pattern_with_multi_key(aws_client):
assert response["Result"]


@pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet")
@markers.aws.validated
def test_test_event_pattern_with_escape_characters(aws_client):
r"""Test the special case of using escape characters separately because it requires working around JSON escaping.
Expand All @@ -159,3 +156,47 @@ def test_test_event_pattern_with_escape_characters(aws_client):
EventPattern=event_pattern,
)
assert response["Result"]


@markers.aws.validated
def test_event_pattern_source(aws_client, snapshot, account_id, region_name):
response = aws_client.events.test_event_pattern(
Event=json.dumps(
{
"id": "1",
"source": "order",
"detail-type": "Test",
"account": account_id,
"region": region_name,
"time": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
}
),
EventPattern=json.dumps(
{
"source": ["order"],
"detail-type": ["Test"],
}
),
)
snapshot.match("eventbridge-test-event-pattern-response", response)

# negative test, source is not matched
response = aws_client.events.test_event_pattern(
Event=json.dumps(
{
"id": "1",
"source": "order",
"detail-type": "Test",
"account": account_id,
"region": region_name,
"time": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
}
),
EventPattern=json.dumps(
{
"source": ["shipment"],
"detail-type": ["Test"],
}
),
)
snapshot.match("eventbridge-test-event-pattern-response-no-match", response)
19 changes: 19 additions & 0 deletions tests/aws/services/events/test_event_patterns.snapshot.json
Expand Up @@ -430,5 +430,24 @@
"tests/aws/services/events/test_event_patterns.py::test_test_event_pattern[exists_dynamodb_NEG]": {
"recorded-date": "09-04-2024, 16:51:59",
"recorded-content": {}
},
"tests/aws/services/events/test_event_patterns.py::test_event_pattern_source": {
"recorded-date": "29-04-2024, 14:12:14",
"recorded-content": {
"eventbridge-test-event-pattern-response": {
"Result": true,
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
}
},
"eventbridge-test-event-pattern-response-no-match": {
"Result": false,
"ResponseMetadata": {
"HTTPHeaders": {},
"HTTPStatusCode": 200
}
}
}
}
}
3 changes: 3 additions & 0 deletions tests/aws/services/events/test_event_patterns.validation.json
@@ -1,4 +1,7 @@
{
"tests/aws/services/events/test_event_patterns.py::test_event_pattern_source": {
"last_validated_date": "2024-04-29T14:12:14+00:00"
},
"tests/aws/services/events/test_event_patterns.py::test_test_event_pattern[arrays]": {
"last_validated_date": "2024-04-08T19:33:55+00:00"
},
Expand Down

0 comments on commit 21b57ab

Please sign in to comment.