Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhooks): implement automatic retries for failed webhook deliveries using scheduler #3842

Merged
merged 36 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b9034dd
chore(api_models): reorder `serde` derive attribute
SanchithHegde Feb 13, 2024
158e08e
refactor(webhooks): remove `merchant_account` parameter from `create_…
SanchithHegde Feb 13, 2024
19fb227
refactor(webhooks): change visibility of functions to private
SanchithHegde Feb 13, 2024
7c1dbf7
refactor(webhooks): extract event logging logic to a function
SanchithHegde Feb 14, 2024
e6d61b6
refactor(webhooks): rename `get_outgoing_webhook_event_type()` method…
SanchithHegde Feb 14, 2024
c905365
refactor(webhooks): accept delivery attempt as parameter when trigger…
SanchithHegde Feb 14, 2024
fba4567
refactor(process_tracker): update `make_process_tracker_new()` to acc…
SanchithHegde Feb 14, 2024
6d4a41b
feat(webhooks): add task to process tracker before initial outgoing w…
SanchithHegde Feb 14, 2024
bca272d
chore: fix typo
SanchithHegde Feb 21, 2024
b389645
refactor(webhooks): add `merchant_id` field in `OutgoingWebhookTracki…
SanchithHegde Feb 21, 2024
f42a168
refactor(webhooks): accept `event_id` instead of `event` as parameter…
SanchithHegde Feb 21, 2024
41a421e
refactor(webhooks): decide outgoing webhook type just before sending …
SanchithHegde Feb 21, 2024
ed23a00
Merge branch 'main' into outgoing-webhooks-automatic-retry
SanchithHegde Feb 23, 2024
39ec3cf
chore: fix errors after merging main
SanchithHegde Feb 24, 2024
c9db463
chore: fix typo
SanchithHegde Feb 24, 2024
7852838
refactor(scheduler): simplify `get_delay()` utility function
SanchithHegde Feb 24, 2024
30fa7fa
fix: use `Debug` impl to log errors when unable to fetch workflow ret…
SanchithHegde Feb 24, 2024
f21bd2d
fix(process_data): remove `rename_all = camelCase` annotation from `C…
SanchithHegde Feb 24, 2024
eb96701
fix(workflows): fix payments sync workflow being incorrectly schedule…
SanchithHegde Feb 24, 2024
63a188d
feat(webhooks): automatically retry delivery of failed webhooks using…
SanchithHegde Feb 24, 2024
0aec985
refactor(webhooks): populate timestamp in webhook payload from event …
SanchithHegde Feb 24, 2024
b66a1ee
refactor(webhooks): extract out common error, success and error respo…
SanchithHegde Feb 25, 2024
339dc26
feat(outgoing_webhook_retry): add support for retrying refunds, manda…
SanchithHegde Feb 26, 2024
0c85f28
Merge branch 'main' into outgoing-webhooks-automatic-retry
SanchithHegde Feb 26, 2024
22838c0
refactor(outgoing_webhook_retry): instrument function calls and log c…
SanchithHegde Feb 26, 2024
6df96b8
refactor(webhooks): make process tracker task insertion optional duri…
SanchithHegde Feb 26, 2024
55b9dd0
refactor(webhooks): raise metrics in case of task insertion success/f…
SanchithHegde Feb 26, 2024
20e7f8c
fix(refunds): remove unused function
SanchithHegde Feb 26, 2024
1d3bf5c
Merge branch 'main' into outgoing-webhooks-automatic-retry
SanchithHegde Feb 26, 2024
9fea5fe
chore(db): intrument all `EventInterface` methods
SanchithHegde Feb 28, 2024
85d7f3c
Merge branch 'main' into outgoing-webhooks-automatic-retry
SanchithHegde Feb 28, 2024
38185cc
Merge branch 'main' into outgoing-webhooks-automatic-retry
SanchithHegde Feb 28, 2024
2c91e0f
Merge branch 'main' into outgoing-webhooks-automatic-retry
SanchithHegde Feb 29, 2024
b2dd060
Merge branch 'main' into outgoing-webhooks-automatic-retry
SanchithHegde Mar 3, 2024
a38212f
chore(scheduler): add unit tests for `get_delay()` utility function
SanchithHegde Mar 3, 2024
52c0934
chore: fix typos
SanchithHegde Mar 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
check-filename = true

[default.extend-identifiers]
"ABD" = "ABD" # Aberdeenshire, UK ISO 3166-2 code
BA = "BA" # Bosnia and Herzegovina country code
CAF = "CAF" # Central African Republic country code
flate2 = "flate2"
Expand Down
2 changes: 1 addition & 1 deletion crates/api_models/src/webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ pub struct OutgoingWebhook {

/// This is specific to the flow, for ex: it will be `PaymentsResponse` for payments flow
pub content: OutgoingWebhookContent,
#[serde(default, with = "custom_serde::iso8601")]

/// The time at which webhook was sent
#[serde(default, with = "custom_serde::iso8601")]
pub timestamp: PrimitiveDateTime,
}

Expand Down
1 change: 1 addition & 0 deletions crates/diesel_models/src/process_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ pub enum ProcessTrackerRunner {
RefundWorkflowRouter,
DeleteTokenizeDataWorkflow,
ApiKeyExpiryWorkflow,
OutgoingWebhookRetryWorkflow,
}

#[cfg(test)]
Expand Down
8 changes: 8 additions & 0 deletions crates/diesel_models/src/query/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ impl EventNew {
}

impl Event {
pub async fn find_by_event_id(conn: &PgPooledConn, event_id: &str) -> StorageResult<Self> {
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(
conn,
dsl::event_id.eq(event_id.to_owned()),
)
.await
}

pub async fn update(
conn: &PgPooledConn,
event_id: &str,
Expand Down
3 changes: 3 additions & 0 deletions crates/router/src/bin/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ impl ProcessTrackerWorkflows<routes::AppState> for WorkflowRunner {
)
}
}
storage::ProcessTrackerRunner::OutgoingWebhookRetryWorkflow => Ok(Box::new(
workflows::outgoing_webhook_retry::OutgoingWebhookRetryWorkflow,
)),
}
};

Expand Down
2 changes: 1 addition & 1 deletion crates/router/src/compatibility/wrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ where
.map_into_boxed_body()
}

Ok(api::ApplicationResponse::PaymenkLinkForm(boxed_payment_link_data)) => {
Ok(api::ApplicationResponse::PaymentLinkForm(boxed_payment_link_data)) => {
match *boxed_payment_link_data {
api::PaymentLinkAction::PaymentLinkFormData(payment_link_data) => {
match api::build_payment_link_html(payment_link_data) {
Expand Down
4 changes: 4 additions & 0 deletions crates/router/src/core/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ pub enum WebhooksFlowError {
OutgoingWebhookEncodingFailed,
#[error("Missing required field: {field_name}")]
MissingRequiredField { field_name: &'static str },
#[error("Failed to update outgoing webhook process tracker task")]
OutgoingWebhookProcessTrackerTaskUpdateFailed,
#[error("Failed to schedule retry attempt for outgoing webhook")]
OutgoingWebhookRetrySchedulingFailed,
}

#[derive(Debug, thiserror::Error)]
Expand Down
6 changes: 3 additions & 3 deletions crates/router/src/core/payment_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ pub async fn intiate_payment_link_flow(
js_script,
css_script,
};
return Ok(services::ApplicationResponse::PaymenkLinkForm(Box::new(
return Ok(services::ApplicationResponse::PaymentLinkForm(Box::new(
services::api::PaymentLinkAction::PaymentLinkStatus(payment_link_error_data),
)));
};
Expand Down Expand Up @@ -225,7 +225,7 @@ pub async fn intiate_payment_link_flow(
sdk_url: state.conf.payment_link.sdk_url.clone(),
css_script,
};
Ok(services::ApplicationResponse::PaymenkLinkForm(Box::new(
Ok(services::ApplicationResponse::PaymentLinkForm(Box::new(
services::api::PaymentLinkAction::PaymentLinkFormData(payment_link_data),
)))
}
Expand Down Expand Up @@ -574,7 +574,7 @@ pub async fn get_payment_link_status(
js_script,
css_script,
};
Ok(services::ApplicationResponse::PaymenkLinkForm(Box::new(
Ok(services::ApplicationResponse::PaymentLinkForm(Box::new(
services::api::PaymentLinkAction::PaymentLinkStatus(payment_link_status_data),
)))
}
4 changes: 2 additions & 2 deletions crates/router/src/core/payment_methods/vault.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,8 +1084,8 @@ pub async fn get_delete_tokenize_schedule_time(
.await;
let mapping = match redis_mapping {
Ok(x) => x,
Err(err) => {
logger::info!("Redis Mapping Error: {}", err);
Err(error) => {
logger::info!(?error, "Redis Mapping Error");
process_data::PaymentMethodsPTMapping::default()
}
};
Expand Down
31 changes: 0 additions & 31 deletions crates/router/src/core/refunds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,34 +1167,3 @@ pub async fn get_refund_sync_process_schedule_time(

Ok(process_tracker_utils::get_time_from_delta(time_delta))
}

pub async fn retry_refund_sync_task(
SanchithHegde marked this conversation as resolved.
Show resolved Hide resolved
db: &dyn db::StorageInterface,
connector: String,
merchant_id: String,
pt: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
let schedule_time =
get_refund_sync_process_schedule_time(db, &connector, &merchant_id, pt.retry_count).await?;

match schedule_time {
Some(s_time) => {
let retry_schedule = db
.as_scheduler()
.retry_process(pt, s_time)
.await
.map_err(Into::into);
metrics::TASKS_RESET_COUNT.add(
&metrics::CONTEXT,
1,
&[metrics::request::add_attributes("flow", "Refund")],
);
retry_schedule
}
None => db
.as_scheduler()
.finish_process_with_business_status(pt, "RETRIES_EXCEEDED".to_string())
.await
.map_err(Into::into),
}
}
Loading
Loading