feat(aws_sns): support for message Subject in aws_sns#3960
Conversation
|
This would be great to see it in a future release! |
|
What a legend |
|
"Gratis" PR 😏 |
ac83cf8 to
83fa141
Compare
|
@biagiopietro Thank you for the contribution! Once you're finished let me know and I can review it. |
|
Hello @josephwoodward, I pushed one last commit. Could you please review when convenient? Hereby some reports: ➜ connect git:(feature/support_message_subject_in_aws_sns) ✗ task lint
task: [lint] bin/golangci-lint run cmd/... internal/... public/...
0 issues.
➜ connect git:(feature/support_message_subject_in_aws_sns) ✗ task fmt
task: [fmt] bin/golangci-lint fmt cmd/... internal/... public/...
task: [fmt] go mod tidy
➜ connect git:(feature/support_message_subject_in_aws_sns) ✗ task test
task: [test:unit] go test -timeout 1m -shuffle=on -v ./...
task: Task "build:redpanda-connect" is up to date
task: [test:template] target/redpanda-connect template lint internal/impl/twitter/search_input.tmpl.yaml internal/impl/kafka/output_redpanda_migrator_bundle.tmpl.yaml internal/impl/kafka/input_redpanda_migrator_bundle.tmpl.yaml
task: [test:template] target/redpanda-connect test ./config/test/...
Test 'config/test/awk.yaml' succeeded
Test 'config/test/bloblang/also_tests_boolean_operands.yaml' succeeded
Test 'config/test/bloblang/boolean_operands.yaml' succeeded
<redacted by me>
PASS
ok github.com/redpanda-data/connect/v4/public/schema 6.592s
-test.shuffle 1770284574464088000
=== RUN TestBenchmarkInsert10MRows
stream_test_helpers.go:41: Skipping as execution was not requested explicitly using go test -run '^Test.*Integration.*$'
--- SKIP: TestBenchmarkInsert10MRows (0.00s)
PASS
ok github.com/redpanda-data/connect/v4/tools/spanner/benchmark 6.473sI validated the integration locally using a #!/bin/bash
# Don't forget to:
# - add this script and config `yaml` in the root of the project when testing
# - run `chmod +x setup_localstack_sns_sqs.sh`
# - run `aws configure set aws_access_key_id test && aws configure set aws_secret_access_key test && aws configure set region us-east-1`
set -e
consume_and_display_message() {
local description="$1"
echo "Consuming message from SQS ($description):"
message=$(aws --endpoint-url=http://localhost:4566 --region us-east-1 --no-cli-pager sqs receive-message --queue-url "$queue_url" --message-attribute-names All --output json)
if [ "$message" != "" ] && [ "$message" != "null" ]; then
echo "Raw SQS Message:"
echo "$message" | jq '.'
echo ""
echo "Message Body (parsed):"
echo "$message" | jq -r '.Messages[0].Body' | jq '.'
echo ""
echo "Message Subject:"
echo "$message" | jq -r '.Messages[0].Body' | jq -r '.Subject // "No subject"'
receipt_handle=$(echo "$message" | jq -r '.Messages[0].ReceiptHandle')
if [ "$receipt_handle" != "null" ] && [ "$receipt_handle" != "" ]; then
if aws --endpoint-url=http://localhost:4566 --region us-east-1 --no-cli-pager sqs delete-message --queue-url "$queue_url" --receipt-handle "$receipt_handle"; then
echo "✓ Message successfully deleted from queue"
else
echo "✗ Failed to delete message from queue"
fi
fi
else
echo "No message received from SQS"
fi
}
for cmd in podman aws jq awk go; do
if ! command -v $cmd &> /dev/null; then
echo "Error: $cmd is not installed or not in PATH"
exit 1
fi
done
podman ps | grep localstack_main || podman run -d --name localstack_main -p 4566:4566 -p 4571:4571 localstack/localstack:latest
sleep 5
echo "Cleaning up existing resources..."
aws --endpoint-url=http://localhost:4566 --region us-east-1 --no-cli-pager sns delete-topic --topic-arn arn:aws:sns:us-east-1:000000000000:test-topic 2>/dev/null || true
aws --endpoint-url=http://localhost:4566 --region us-east-1 --no-cli-pager sqs delete-queue --queue-url http://localhost:4566/000000000000/test-queue 2>/dev/null || true
echo "Creating fresh resources..."
topic_arn=$(aws --endpoint-url=http://localhost:4566 --region us-east-1 --no-cli-pager sns create-topic --name test-topic --output text --query 'TopicArn')
echo "SNS Topic ARN: $topic_arn"
queue_url=$(aws --endpoint-url=http://localhost:4566 --region us-east-1 --no-cli-pager sqs create-queue --queue-name test-queue --output text --query 'QueueUrl')
echo "SQS Queue URL: $queue_url"
queue_arn=$(aws --endpoint-url=http://localhost:4566 --region us-east-1 --no-cli-pager sqs get-queue-attributes --queue-url "$queue_url" --attribute-names QueueArn --output text | awk '{print $2}')
echo "SQS Queue ARN: $queue_arn"
cat > /tmp/policy.json <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "sqs:SendMessage",
"Resource": "$queue_arn",
"Condition": {"ArnEquals": {"aws:SourceArn": "$topic_arn"}}
}
]
}
EOF
cat > /tmp/attributes.json <<EOF
{
"Policy": "$(cat /tmp/policy.json | jq -c . | sed 's/"/\\"/g')"
}
EOF
aws --endpoint-url=http://localhost:4566 --region us-east-1 --no-cli-pager sqs set-queue-attributes --queue-url "$queue_url" --attributes file:///tmp/attributes.json
aws --endpoint-url=http://localhost:4566 --region us-east-1 --no-cli-pager sns subscribe --topic-arn "$topic_arn" --protocol sqs --notification-endpoint "$queue_arn"
echo "Subscribed SQS to SNS"
cd ./cmd/redpanda-connect
echo "Building Redpanda Connect binary..."
go build -o ../../redpanda-connect
cd ../../
echo "=== Testing AWS SNS with Subject ==="
cat <<EOF | ./redpanda-connect run ./config/test/aws_sns_localstack.yaml
Hello from Redpanda Connect with subject!
EOF
consume_and_display_message "with subject"
sleep 15
echo ""
echo "=== Testing AWS SNS without Subject (backward compatibility) ==="
cat <<EOF | ./redpanda-connect run ./config/test/aws_sns_localstack_no_subject.yaml
Hello from Redpanda Connect without subject!
EOF
consume_and_display_message "without subject"with input:
stdin: {}
output:
aws_sns:
topic_arn: arn:aws:sns:us-east-1:000000000000:test-topic
region: us-east-1
endpoint: http://localhost:4566
subject: "TestSubjectFromLocalStack"
credentials:
id: test
secret: testand input:
stdin: {}
output:
aws_sns:
topic_arn: arn:aws:sns:us-east-1:000000000000:test-topic
region: us-east-1
endpoint: http://localhost:4566
credentials:
id: test
secret: testoutput ➜ connect git:(feature/support_message_subject_in_aws_sns) ✗ ./setup_localstack_sns_sqs.sh
27a7aeb764c3 docker.io/localstack/localstack:latest 42 minutes ago Up 28 minutes (healthy) 0.0.0.0:4566->4566/tcp, 0.0.0.0:4571->4571/tcp, 4510-4559/tcp, 5678/tcp localstack_main
Cleaning up existing resources...
Creating fresh resources...
SNS Topic ARN: arn:aws:sns:us-east-1:000000000000:test-topic
SQS Queue URL: http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/test-queue
SQS Queue ARN: arn:aws:sqs:us-east-1:000000000000:test-queue
{
"SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:test-topic:8d3b91ff-5695-4abd-b5eb-440985cab63b"
}
Subscribed SQS to SNS
Building Redpanda Connect binary...
=== Testing AWS SNS with Subject ===
INFO[2026-02-05T10:28:40+01:00] Running main config from specified file @service=redpanda-connect benthos_version="" path=./config/test/aws_sns_localstack.yaml
INFO[2026-02-05T10:28:40+01:00] Successfully loaded Redpanda license @service=redpanda-connect expires_at="2036-02-03T10:28:40+01:00" license_org="" license_type=open_source
INFO[2026-02-05T10:28:40+01:00] Listening for HTTP requests at: http://0.0.0.0:4195 @service=redpanda-connect
INFO[2026-02-05T10:28:40+01:00] Launching a Redpanda Connect instance, use CTRL+C to close @service=redpanda-connect
INFO[2026-02-05T10:28:40+01:00] Input type stdin is now active @service=redpanda-connect label="" path=root.input
INFO[2026-02-05T10:28:40+01:00] Output type aws_sns is now active @service=redpanda-connect label="" path=root.output
INFO[2026-02-05T10:28:40+01:00] Pipeline has terminated. Shutting down the service @service=redpanda-connect
Consuming message from SQS (with subject):
Raw SQS Message:
{
"Messages": [
{
"MessageId": "d4c02998-06a7-4fd5-a594-f35ba3e25d05",
"ReceiptHandle": "MmE0ZWEwZWEtNzI0NC00NjkzLWJkM2YtYzk3MGMxZTg1MWI0IGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6dGVzdC1xdWV1ZSBkNGMwMjk5OC0wNmE3LTRmZDUtYTU5NC1mMzViYTNlMjVkMDUgMTc3MDI4MzcyMC43NzIzNjg3",
"MD5OfBody": "5f3cd4464c5531d995acfe884378078e",
"Body": "{\"Type\": \"Notification\", \"MessageId\": \"6175f008-de23-41cb-8a55-4cfd4991eee4\", \"TopicArn\": \"arn:aws:sns:us-east-1:000000000000:test-topic\", \"Message\": \"Hello from Redpanda Connect with subject!\", \"Timestamp\": \"2026-02-05T09:28:40.220Z\", \"UnsubscribeURL\": \"http://localhost.localstack.cloud:4566/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:000000000000:test-topic:8d3b91ff-5695-4abd-b5eb-440985cab63b\", \"Subject\": \"TestSubjectFromLocalStack\", \"SignatureVersion\": \"1\", \"Signature\": \"<redacted by me>", \"SigningCertURL\": \"http://localhost.localstack.cloud:4566/_aws/sns/SimpleNotificationService-6c6f63616c737461636b69736e696365.pem\"}"
}
]
}
Message Body (parsed):
{
"Type": "Notification",
"MessageId": "6175f008-de23-41cb-8a55-4cfd4991eee4",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:test-topic",
"Message": "Hello from Redpanda Connect with subject!",
"Timestamp": "2026-02-05T09:28:40.220Z",
"UnsubscribeURL": "http://localhost.localstack.cloud:4566/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:000000000000:test-topic:8d3b91ff-5695-4abd-b5eb-440985cab63b",
"Subject": "TestSubjectFromLocalStack",
"SignatureVersion": "1",
"Signature": "<redacted by me>",
"SigningCertURL": "http://localhost.localstack.cloud:4566/_aws/sns/SimpleNotificationService-6c6f63616c737461636b69736e696365.pem"
}
Message Subject:
TestSubjectFromLocalStack
✓ Message successfully deleted from queue
=== Testing AWS SNS without Subject (backward compatibility) ===
INFO[2026-02-05T10:28:56+01:00] Running main config from specified file @service=redpanda-connect benthos_version="" path=./config/test/aws_sns_localstack_no_subject.yaml
INFO[2026-02-05T10:28:56+01:00] Successfully loaded Redpanda license @service=redpanda-connect expires_at="2036-02-03T10:28:56+01:00" license_org="" license_type=open_source
INFO[2026-02-05T10:28:56+01:00] Listening for HTTP requests at: http://0.0.0.0:4195 @service=redpanda-connect
INFO[2026-02-05T10:28:56+01:00] Launching a Redpanda Connect instance, use CTRL+C to close @service=redpanda-connect
INFO[2026-02-05T10:28:56+01:00] Input type stdin is now active @service=redpanda-connect label="" path=root.input
INFO[2026-02-05T10:28:56+01:00] Output type aws_sns is now active @service=redpanda-connect label="" path=root.output
INFO[2026-02-05T10:28:56+01:00] Pipeline has terminated. Shutting down the service @service=redpanda-connect
Consuming message from SQS (without subject):
Raw SQS Message:
{
"Messages": [
{
"MessageId": "7768c0a6-76f9-4e65-bd35-5eb67025fa2f",
"ReceiptHandle": "MDZhNzEzYWQtZDE2NS00MTdkLWJhNWMtMDdkNGM1NGVjMzVmIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6dGVzdC1xdWV1ZSA3NzY4YzBhNi03NmY5LTRlNjUtYmQzNS01ZWI2NzAyNWZhMmYgMTc3MDI4MzczNy4xNTMxODAx",
"MD5OfBody": "c37a32c25be235f31a71c902ee57760e",
"Body": "{\"Type\": \"Notification\", \"MessageId\": \"4c65723c-3029-4d19-8ca8-0d71b02b2c5c\", \"TopicArn\": \"arn:aws:sns:us-east-1:000000000000:test-topic\", \"Message\": \"Hello from Redpanda Connect without subject!\", \"Timestamp\": \"2026-02-05T09:28:56.618Z\", \"UnsubscribeURL\": \"http://localhost.localstack.cloud:4566/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:000000000000:test-topic:8d3b91ff-5695-4abd-b5eb-440985cab63b\", \"SignatureVersion\": \"1\", \"Signature\": \"<redacted by me>\", \"SigningCertURL\": \"http://localhost.localstack.cloud:4566/_aws/sns/SimpleNotificationService-6c6f63616c737461636b69736e696365.pem\"}"
}
]
}
Message Body (parsed):
{
"Type": "Notification",
"MessageId": "4c65723c-3029-4d19-8ca8-0d71b02b2c5c",
"TopicArn": "arn:aws:sns:us-east-1:000000000000:test-topic",
"Message": "Hello from Redpanda Connect without subject!",
"Timestamp": "2026-02-05T09:28:56.618Z",
"UnsubscribeURL": "http://localhost.localstack.cloud:4566/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:000000000000:test-topic:8d3b91ff-5695-4abd-b5eb-440985cab63b",
"SignatureVersion": "1",
"Signature": "<redacted by me>",
"SigningCertURL": "http://localhost.localstack.cloud:4566/_aws/sns/SimpleNotificationService-6c6f63616c737461636b69736e696365.pem"
}
Message Subject:
No subject
✓ Message successfully deleted from queue |
|
@josephwoodward Just asking if you could give us some approximations on how long does it take for a release after a PR gets merged. Getting pressured by the tight deadlines and this would really solve our problems. |
| }) | ||
| } | ||
|
|
||
| type snsClientIface interface { |
There was a problem hiding this comment.
nit (non-blocking): publisher or snsClient would probably be sufficient, no need for the Iface suffix. Not a blocker though as it's not exported.
|
@biagiopietro Thank you for the PR, and for the thorough testing report! |
1b97997 to
24a43cc
Compare
|
Thanks for your time @squiidz and @josephwoodward. |
|
Good morning @squiidz and @josephwoodward |
|
Hi @biagiopietro, the next release is likely to be next week, is that okay? |
|
Hello @josephwoodward We truly appreciate the effort you’re putting in to accommodate this schedule. That being said, if there is any chance of a release this week, we would be very very (and very) happy :D! |
|
Hi @biagiopietro, your change is in the latest v4.81.0 release. Once again, thank you for your contribution! |
based on #3956