feat(core): Support for multiple outgoing webhooks added#7578
feat(core): Support for multiple outgoing webhooks added#7578swetasharma03 wants to merge 4 commits intomainfrom
Conversation
crates/api_models/src/admin.rs
Outdated
|
|
||
| /// Webhook related details | ||
| pub webhook_details: Option<WebhookDetails>, | ||
| pub webhook_details: Option<Vec<Option<WebhookDetails>>>, |
There was a problem hiding this comment.
There are two options here, which seem redundant. Is there any specific reason to do so?
There was a problem hiding this comment.
After the migration, the schema.rs file generates the webhook_details field as Nullable<Array<Nullable<Json>>>.
The reason for having both options Option<Vec<Option<WebhookDetails>>> is to align with the schema generated after the migration.
There was a problem hiding this comment.
This is API related change in api-model, we shouldn't modify this for database related things, and changing a type of existing argument violate backward compatibility.
crates/api_models/src/admin.rs
Outdated
| ///The password for Webhook login | ||
| #[schema(value_type = Option<String>, max_length = 255, example = "ekart@123")] | ||
| pub webhook_password: Option<Secret<String>>, | ||
| pub webhook_endpoint_id: Option<id_type::WebhookEndpointId>, |
There was a problem hiding this comment.
is this change backwards compatible?
There was a problem hiding this comment.
Yes, the change is backwards compatible. Since webhook_endpoint_id is nullable, for the previous webhook_details, its value will be None.
crates/common_utils/src/lib.rs
Outdated
| id_type::ProfileId::generate() | ||
| } | ||
|
|
||
| /// Generate a profile id with default length, with prefix as `pro` |
There was a problem hiding this comment.
Please change the doc comment
| #[serde(rename_all = "snake_case")] | ||
| #[strum(serialize_all = "snake_case")] | ||
| pub enum OutgoingWebhookEndpointStatus { | ||
| Active, |
There was a problem hiding this comment.
please provide description to these enums. What is the difference between inactive and deprecated? if once made as deprecated, it cannot be made active?
e53313d to
1c4c899
Compare
f7a5ce1 to
66940ff
Compare
crates/api_models/src/admin.rs
Outdated
|
|
||
| /// Webhook related details | ||
| pub webhook_details: Option<WebhookDetails>, | ||
| pub webhook_details: Option<Vec<Option<WebhookDetails>>>, |
There was a problem hiding this comment.
This is API related change in api-model, we shouldn't modify this for database related things, and changing a type of existing argument violate backward compatibility.
| pub merchant_name: Option<Encryption>, | ||
| pub merchant_details: Option<Encryption>, | ||
| pub webhook_details: Option<crate::business_profile::WebhookDetails>, | ||
| pub webhook_details: Option<Vec<Option<crate::business_profile::WebhookDetails>>>, |
There was a problem hiding this comment.
| pub webhook_details: Option<Vec<Option<crate::business_profile::WebhookDetails>>>, | |
| pub webhook_details: Option<Vec<Option<business_profile::WebhookDetails>>>, |
| pub merchant_name: Option<Encryption>, | ||
| pub merchant_details: Option<Encryption>, | ||
| pub webhook_details: Option<crate::business_profile::WebhookDetails>, | ||
| pub webhook_details: Option<Vec<Option<crate::business_profile::WebhookDetails>>>, |
There was a problem hiding this comment.
| pub webhook_details: Option<Vec<Option<crate::business_profile::WebhookDetails>>>, | |
| pub webhook_details: Option<Vec<Option<business_profile::WebhookDetails>>>, |
| pub merchant_details: Option<Encryption>, | ||
| pub return_url: Option<String>, | ||
| pub webhook_details: Option<crate::business_profile::WebhookDetails>, | ||
| pub webhook_details: Option<Vec<Option<crate::business_profile::WebhookDetails>>>, |
There was a problem hiding this comment.
| pub webhook_details: Option<Vec<Option<crate::business_profile::WebhookDetails>>>, | |
| pub webhook_details: Option<Vec<Option<business_profile::WebhookDetails>>>, |
| pub merchant_details: Option<Encryption>, | ||
| pub return_url: Option<String>, | ||
| pub webhook_details: Option<crate::business_profile::WebhookDetails>, | ||
| pub webhook_details: Option<Vec<Option<crate::business_profile::WebhookDetails>>>, |
There was a problem hiding this comment.
| pub webhook_details: Option<Vec<Option<crate::business_profile::WebhookDetails>>>, | |
| pub webhook_details: Option<Vec<Option<business_profile::WebhookDetails>>>, |
a162510 to
910966f
Compare
910966f to
7781f38
Compare
7781f38 to
26f749b
Compare
ed5010c to
a9cdf24
Compare
SanchithHegde
left a comment
There was a problem hiding this comment.
Also, please include more test cases in the PR description:
- Ensuring that webhooks are correctly being sent to all webhook endpoints in case multiple endpoints are configured in the business profile.
- Ensuring that automatic and manual retries works for all webhook endpoints.
- Ensuring that no change in behavior is observed for old business profiles with old data: webhook delivery of initial attempts, automatic retries and manual retries must happen as they used to previously.
- Ensuring that analytics events are being correctly populated in the analytics pipeline, for all webhook endpoints. (Please set up Kafka and ClickHouse locally using Docker Compose, test it out and include screenshots.)
| ); | ||
| crate::impl_id_type_methods!(WebhookEndpointId, "webhook_endpoint_id"); | ||
| crate::impl_generate_id_id_type!(WebhookEndpointId, "web"); | ||
| crate::impl_default_id_type!(WebhookEndpointId, "web"); |
There was a problem hiding this comment.
Do we absolutely need to derive Default, or can we remove this line?
There was a problem hiding this comment.
We need this Default implementation to handle cases where webhook_endpoint_id is missing in older data. It allows us to convert the available data into a multiple_webhook_list, particularly in functions like get_webhook_details_for_event_type.
There was a problem hiding this comment.
Can't we make use of the generate_webhook_endpoint_id_of_default_length() function instead of using the Default trait?
The reason I don't prefer implementing the Default trait is because the implementation generates a random ID by default, and is not a static value. While Default trait implementations typically involve static values.
| #[allow(clippy::too_many_arguments)] | ||
| pub async fn trigger_payments_webhook<F, Op, D>( | ||
| merchant_context: domain::MerchantContext, | ||
| _merchant_context: domain::MerchantContext, |
There was a problem hiding this comment.
Why is this variable / parameter no longer being read?
There was a problem hiding this comment.
We were calling fn create_event_and_trigger_outgoing_webhook previously in trigger_payments_webhook fn, which required merchant_context to be passed as a parameter. But now we are calling add_bulk_outgoing_webhook_task_to_process_tracker from trigger_payments_webhook, which is why we don't need this parameter anymore.
527c5dc to
243affb
Compare
243affb to
b366272
Compare
SanchithHegde
left a comment
There was a problem hiding this comment.
Please include these test cases in the description, in addition to the existing ones:
- Ensuring that no change in behavior is observed for old business profiles with old data: webhook delivery of initial attempts, automatic retries and manual retries must happen (by the new application) as they used to previously. (These could be business profiles created from code on our
mainbranch for example.) - Ensuring that analytics events are being correctly populated in the analytics pipeline, for all webhook endpoints. (Please set up Kafka and ClickHouse locally using Docker Compose, test it out and include screenshots.)
- Ensuring that old application (or code from
mainbranch) can handle webhook deliveries (initial attempt, automatic and manual retries) for profiles created with new application.- This is to ensure that staggering deployments or rollbacks to the previous version would not break anything.
- Of course, we wouldn't be able to deliver webhooks to all endpoints in this case, we would only be able to deliver webhooks to the URL in the
webhook_urlfield.
And just so you're aware, we also have a v2 implementation of outgoing webhooks. We can take up adding support for multiple webhook endpoints for v2 in a separate PR. (Looping in @Aishwariyaa-Anand.)
| ); | ||
| crate::impl_id_type_methods!(WebhookEndpointId, "webhook_endpoint_id"); | ||
| crate::impl_generate_id_id_type!(WebhookEndpointId, "web"); | ||
| crate::impl_default_id_type!(WebhookEndpointId, "web"); |
There was a problem hiding this comment.
Can't we make use of the generate_webhook_endpoint_id_of_default_length() function instead of using the Default trait?
The reason I don't prefer implementing the Default trait is because the implementation generates a random ID by default, and is not a static value. While Default trait implementations typically involve static values.
|
|
||
| #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, diesel::AsExpression)] | ||
| #[diesel(sql_type = diesel::sql_types::Json)] | ||
| pub struct MultipleWebhookDetail { |
There was a problem hiding this comment.
Could you please add the two fields then? Won't hurt to have this information I feel.
| pub struct MultipleWebhookDetail { | ||
| pub webhook_endpoint_id: Option<id_type::WebhookEndpointId>, | ||
| pub webhook_url: Option<Secret<String>>, | ||
| pub events: Vec<common_enums::EventType>, | ||
| pub status: Option<common_enums::OutgoingWebhookEndpointStatus>, | ||
| } |
There was a problem hiding this comment.
If we don't need them to provide specific fields in the request, can we consider using two different types, one for request and one for response (say MultipleWebhookDetailRequest and MultipleWebhookDetailResponse or something similar)?
The response type could have fields similar to the diesel model type (because all of the data in the response would be populated from the diesel model anyway), while the request need not have fields like webhook_endpoint_id at all, while status may be optionally provided.
| // Hash and encode the input using SHA-256 and URL-safe base64 (without padding) | ||
| let hash = Sha256::digest(input.as_bytes()); | ||
| let encoded = URL_SAFE_NO_PAD.encode(hash); |
There was a problem hiding this comment.
- Please use
Sha256from thecommon_utilscrate instead. - And for base64 encoding, add a const in
common_utilscrate calledBASE64_ENGINE_URL_SAFE_NO_PAD:
hyperswitch/crates/common_utils/src/consts.rs
Lines 94 to 99 in dee5d0c
- Also, please add a comment explaining why we're doing this hashing + encoding (and why we're not using the concatenated value directly).
| // Truncate to MAX_PREFIX_LEN (56) if needed | ||
| let common_prefix = if encoded.len() > MAX_PREFIX_LEN { | ||
| &encoded[..MAX_PREFIX_LEN] | ||
| } else { | ||
| &encoded | ||
| }; |
There was a problem hiding this comment.
Truncating most likely won't be needed, considering SHA256 output would be 256 bits (or 32 bytes), and base64 encoding adds ~33% overhead, so encoded output should be of size ~43 characters (32 * 1.33), well within our limit of 56 characters.
| for webhook_detail in webhook_details.iter() { | ||
| let webhook_detail_clone = webhook_detail.clone(); |
There was a problem hiding this comment.
Nit: You should be able to do something like:
| for webhook_detail in webhook_details.iter() { | |
| let webhook_detail_clone = webhook_detail.clone(); | |
| for webhook_detail in webhook_details { |
And you can avoid the webhook_detail.clone().
| webhook_endpoint_id: initial_event.webhook_endpoint_id, | ||
| is_overall_delivery_successful: Some(false), | ||
| }; | ||
| let webhook_id = new_event.webhook_endpoint_id.clone(); |
There was a problem hiding this comment.
Nit: Naming.
| let webhook_id = new_event.webhook_endpoint_id.clone(); | |
| let webhook_endpoint_id = new_event.webhook_endpoint_id.clone(); |
| use common_enums::EventClass; | ||
| use common_utils::{errors::ReportSwitchExt, events::ApiEventsType, ext_traits::AsyncExt}; | ||
| use diesel_models::ConnectorMandateReferenceId; | ||
| use diesel_models::{enums::EventObjectType, ConnectorMandateReferenceId}; |
There was a problem hiding this comment.
Let's avoid these direct imports please, we can qualify them with enums:: instead.
| let (content, _) = Box::pin(get_outgoing_webhook_content_and_event_type( | ||
| state.clone(), | ||
| state.get_req_state(), | ||
| merchant_account.clone(), | ||
| merchant_key_store.clone(), | ||
| &tracking_data, | ||
| )) | ||
| .await?; |
There was a problem hiding this comment.
get_outgoing_webhook_content_and_event_type() isn't meant to be used as the primary way of obtaining the outgoing webhook content: it was added for backward compatibility reasons.
Another reason I would prefer to completely avoid it is that it has the caveat that the resource could have transitioned to a completely different status (as compared to what we're notifying about) by the time the workflow would be executed.
We previously had checks for it:
hyperswitch/crates/router/src/workflows/outgoing_webhook_retry.rs
Lines 175 to 228 in 0a06ad9
| &mandate_id, | ||
| storage::MandateUpdate::StatusUpdate { mandate_status }, | ||
| mandate, | ||
| mandate.clone(), |
| use error_stack::{report, Report, ResultExt}; | ||
| use hyperswitch_domain_models::type_encryption::{crypto_operation, CryptoOperation}; | ||
| use hyperswitch_interfaces::consts; | ||
| use masking::{ExposeInterface, Mask, PeekInterface, Secret}; |
There was a problem hiding this comment.
| use masking::{ExposeInterface, Mask, PeekInterface, Secret}; | |
| use masking; |
Avoid using direct imports
|
|
||
| const OUTGOING_WEBHOOK_TIMEOUT_SECS: u64 = 5; | ||
| pub const OUTGOING_WEBHOOK_BULK_TASK: &str = "OUTGOING_WEBHOOK_BULK"; | ||
| pub const OUTGOING_WEBHOOK_RETRY_TASK: &str = "OUTGOING_WEBHOOK_RETRY"; |
There was a problem hiding this comment.
Can we move these constants to crates/router/src/core/webhooks/types.rs ?
They are used across v1 and v2 files.
| pub const OUTGOING_WEBHOOK_BULK_TASK: &str = "OUTGOING_WEBHOOK_BULK"; | ||
| pub const OUTGOING_WEBHOOK_RETRY_TASK: &str = "OUTGOING_WEBHOOK_RETRY"; | ||
|
|
||
| pub(crate) fn get_webhook_details_for_event_type( |
There was a problem hiding this comment.
Can we reorder the functions to reflect the logical order of execution? That would help improve readability and make the flow easier to follow. Something like:
add_bulk_outgoing_webhook_task_to_process_tracker
create_event_and_trigger_outgoing_webhook
trigger_webhook_and_raise_event
trigger_webhook_to_merchant
raise_webhooks_analytics_event
| @@ -1,4 +1,4 @@ | |||
| use std::collections::HashMap; | |||
| use std::collections::{HashMap, HashSet}; | |||
|
|
|||
There was a problem hiding this comment.
- In
trigger_webhook_to_merchant, where we match ondelivery_attempt, we could introduce a trait to handle successful and failed deliveries for InitialAttempt, AutomaticRetry, and ManualRetry. This will help decouple the logic and improve code readability. - Functions like
increment_webhook_outgoing_received_countandincrement_webhook_outgoing_not_received_countcan be reused from crates/router/src/core/webhooks/utils.rs instead of redefining. - In raise_webhooks_analytics_event, we could pass the trigger_webhook_result directly to help avoid an extra DB fetch for the updated event:
trigger_webhook_result: CustomResult<
(domain::Event, Option<Report<errors::WebhooksFlowError>>),
errors::WebhooksFlowError,
>,
You can refer to this PR #6613
These changes can be taken up in a separate PR
| .to_not_found_response(errors::ApiErrorResponse::MerchantAccountNotFound)?; | ||
|
|
||
| #[cfg(feature = "v1")] | ||
| if let Some(ref webhook_details) = &req.webhook_details { |
There was a problem hiding this comment.
How about we introduce a separate set of webhook endpoints for create, update and delete.
This would make the code logic simple and also would make it easier for the merchant in case of webhook updation.
|
#7578 (comment) |
Type of Change
Description
In the current system, merchants can configure only one webhook endpoint per profile. This PR introduces support for configuring multiple webhook endpoints based on the event_type. These changes include modifications to the database schema and alterations in the webhook triggering logic.
In payments.rs (for other flows as well), instead of triggering a single webhook directly, a
bulk_outgoing_webhookjob is now created. When this job is executed, it triggers multiple webhooks. Additionally, for each webhook, a correspondingretry_jobis created in theprocess_tracker, ensuring that any failed webhook deliveries can be retried.Database Schema Changes
Business Profile Schema Update:
The webhook_details column in the business_profile table is being updated from a JSON object to an array of JSON objects. This will allow each business profile to store multiple webhook endpoints that can be associated with different event types.
Merchant Account Schema Update:
In addition to the business_profile table, the
merchant_accounttable will also include awebhook_detailsfield to store webhook configurations for the merchant. Similar to the changes in the business_profile table, this field will be updated to store an array of webhook configurations rather than a single configuration.How old data would work with new code?
Suppose we have webhook_details filled as
We have three functions which fetches webhook_details for outgoing_webhook_workflow.
get_webhook_details_for_event_type(): Here ifmultiple_webhooks_listis null, a struct multipleWebhooksList is created with present details and it is used up increate_event_and_trigger_outgoing_webhook, assuming that the single url present will receive webhooks for all events.get_webhook_detail_by_webhook_endpoint_id(): If webhook_endpoint_idis None, it represents an older form of data, and hencewebhook_url` present is returned.get_idempotent_event_id(): Previous version of this fn formed id with form{primary_object_id}_{event_type}, now it is base encoded of{primary_object_id}_{event_type}_{webhook_endpoint_id}. In casewebhook_endpoint_idpassed is None, it refers to older form of data, then id generated would be base64 of{primary_object_id}_{event_type}.How already stored tasks stored in process_tracker work?
If the task is already stored, it means
trigger_outgoing_webhook_bulk_workflow()will not be called for that outgoing_webhook.For
trigger_outgoing_webhook_retry_workflow(), granular description of functions used here are described above forget_idempotent_event_id,get_webhook_detail_by_webhook_endpoint_idThe update ensures
backward compatibility, as existing records may still use the old schema (storing a single webhook configuration).Additional Changes
Motivation and Context
How did you test it?
Response
Response
Payment created with id
pay_6vacj5lnQX9BKSLeXYUyAfter doing a payment with status succeeded, bulk webhook added to process_tracker

This bulk task, adds all the multiple outgoing webhooks (three for the merchant created) in process_tracker, two outgoing webhooks notified, one is pending as the endpoint was wrong

Three events created initially for corresponding tasks of process tracker above. One notification failed, so there is one more entry in events table for automatic retry.

Events created by process_tracker:
Two webhooks received in different endpoints which were correctly configured

Response
Webhook received for manual retry in first endpoint

Response
Webhook received for manual retry in the second endpoint

Response
Checklist
cargo +nightly fmt --allcargo clippy