From 1ce11de517f19edc4147d21a0aa4205088b57bc4 Mon Sep 17 00:00:00 2001 From: mirch Date: Tue, 22 Aug 2023 00:04:36 +0300 Subject: [PATCH] Add finish_payment logic --- Cargo.toml | 2 +- infrastructure/lambda.tf | 1 - src/database/client.rs | 13 +++++ src/database/mod.rs | 5 ++ src/database/payments_repository.rs | 78 +++++++++++++++++++++++++++++ src/finish_payment.rs | 56 ++++++++++++++++++++- src/initiate_payment.rs | 7 +-- src/lib.rs | 2 +- src/payments_repository.rs | 41 --------------- src/request_utils.rs | 12 +++++ 10 files changed, 166 insertions(+), 51 deletions(-) create mode 100644 src/database/client.rs create mode 100644 src/database/mod.rs create mode 100644 src/database/payments_repository.rs delete mode 100644 src/payments_repository.rs diff --git a/Cargo.toml b/Cargo.toml index 69f299c..6be6958 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ path = "src/finish_payment.rs" [dependencies] lambda_http = "0.8.1" -tokio = { version = "1", features = ["macros"] } +tokio = { version = "1", features = ["macros", "sync", "parking_lot"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" async-stripe = { version = "0.22", features = ["runtime-tokio-hyper"] } diff --git a/infrastructure/lambda.tf b/infrastructure/lambda.tf index 9a3078c..f2db4cf 100644 --- a/infrastructure/lambda.tf +++ b/infrastructure/lambda.tf @@ -35,7 +35,6 @@ resource "aws_lambda_function" "finish_payment_lambda" { variables = { PAYMENTS_TABLE_NAME = aws_dynamodb_table.payments.name STRIPE_SECRET_KEY = var.STRIPE_API_KEY - STRIPE_WEBHOOK_SECRET = "" } } } diff --git a/src/database/client.rs b/src/database/client.rs new file mode 100644 index 0000000..1595c7a --- /dev/null +++ b/src/database/client.rs @@ -0,0 +1,13 @@ +use aws_sdk_dynamodb::Client; +use tokio::sync::OnceCell; + +static CLIENT: OnceCell = OnceCell::const_new(); + +pub async fn get_client() -> &'static Client { + CLIENT + .get_or_init(|| async { + let config = aws_config::load_from_env().await; + Client::new(&config) + }) + .await +} diff --git a/src/database/mod.rs b/src/database/mod.rs new file mode 100644 index 0000000..d8292d2 --- /dev/null +++ b/src/database/mod.rs @@ -0,0 +1,5 @@ +pub mod client; +pub mod payments_repository; + +pub use client::*; +pub use payments_repository::*; diff --git a/src/database/payments_repository.rs b/src/database/payments_repository.rs new file mode 100644 index 0000000..b6b91c0 --- /dev/null +++ b/src/database/payments_repository.rs @@ -0,0 +1,78 @@ +use aws_sdk_dynamodb::{types::AttributeValue, Client}; +use tokio::sync::OnceCell; + +use crate::{ + domain::{Payment, PaymentStatus}, + environment::{get_env_var, PAYMENTS_TABLE}, +}; + +static PAYMENTS_REPOSITORY: OnceCell = OnceCell::const_new(); + +#[derive(Clone)] +pub struct PaymentsRepository { + client: Client, + table_name: String, +} + +impl PaymentsRepository { + pub async fn get() -> PaymentsRepository { + PAYMENTS_REPOSITORY + .get_or_init(|| async { + let client = Client::new(&aws_config::load_from_env().await); + PaymentsRepository::new(client) + }) + .await + .clone() + } + + fn new(client: Client) -> Self { + let table_name = get_env_var(PAYMENTS_TABLE) + .unwrap_or_else(|_| format!("{} variable not set", PAYMENTS_TABLE)); + + Self { client, table_name } + } + + pub async fn insert_payment(self, payment: Payment) -> Result<(), String> { + let id = AttributeValue::S(payment.id); + let amount = AttributeValue::N(payment.amount.to_string()); + let sender = AttributeValue::S(payment.sender); + let status = AttributeValue::N((payment.status as i8).to_string()); + + let _request = self + .client + .put_item() + .table_name(self.table_name) + .item("id", id) + .item("amount", amount) + .item("sender", sender) + .item("status", status) + .send() + .await + .map_err(|e| e.to_string())?; + + Ok(()) + } + + pub async fn update_payment_status( + self, + payment_id: &str, + new_status: PaymentStatus, + ) -> Result<(), String> { + let id = AttributeValue::S(String::from(payment_id)); + let status = AttributeValue::N((new_status as i8).to_string()); + + let _request = self + .client + .update_item() + .table_name(self.table_name) + .key("id", id) + .update_expression("SET #status = :status") + .expression_attribute_names("#status", "status") + .expression_attribute_values(":status", status) + .send() + .await + .map_err(|e| e.to_string())?; + + Ok(()) + } +} diff --git a/src/finish_payment.rs b/src/finish_payment.rs index b85db37..4d93921 100644 --- a/src/finish_payment.rs +++ b/src/finish_payment.rs @@ -1,11 +1,63 @@ use lambda_http::{service_fn, Body, Error, Request, Response}; +use serverless_payments::{ + database::PaymentsRepository, + domain::PaymentStatus, + environment::{get_env_var, STRIPE_SECRET_KEY}, + request_utils::get_header, +}; +use stripe::{EventObject, Webhook}; +use tracing_subscriber::FmtSubscriber; + +const SIGNATURE_HEADER_KEY: &str = "Stripe-Signature"; #[tokio::main] async fn main() -> Result<(), Error> { + FmtSubscriber::builder() + .with_max_level(tracing::Level::INFO) + .with_ansi(false) + .without_time() + .with_target(false) + .init(); lambda_http::run(service_fn(handler)).await?; Ok(()) } -async fn handler(_event: Request) -> Result, Error> { - Ok(Response::new(Body::from("Hello world!"))) +async fn handler(event: Request) -> Result, Error> { + let signature = get_header(&event, SIGNATURE_HEADER_KEY)?; + let secret_key = get_env_var(STRIPE_SECRET_KEY)?; + let event_body = match event.body() { + Body::Text(s) => s, + _ => { + tracing::error!("Error getting event body"); + return Err(Error::from("Error getting event body")); + } + }; + + let webhook_event = match Webhook::construct_event(event_body, &signature, &secret_key) { + Ok(webhook_event) => webhook_event, + Err(e) => { + tracing::error!("Error constructing webhook event: {:?}", e); + return Err(Error::from("Error constructing webhook event")); + } + }; + + let payment_intent_id = { + let payment_intent = match webhook_event.data.object { + // safe to unwrap, as this charge is the result of a PaymentIntent confirmation + EventObject::Charge(charge) => charge.payment_intent.unwrap(), + _ => { + tracing::error!("Error getting payment intent"); + return Err(Error::from("Error getting payment intent")); + } + }; + + payment_intent.id() + }; + + let payment_repository = PaymentsRepository::get().await; + payment_repository + .update_payment_status(payment_intent_id.as_str(), PaymentStatus::Completed) + .await?; + + Ok(Response::new(Body::from(()))) } diff --git a/src/initiate_payment.rs b/src/initiate_payment.rs index 34e1570..a20b5ad 100644 --- a/src/initiate_payment.rs +++ b/src/initiate_payment.rs @@ -1,7 +1,6 @@ -use aws_sdk_dynamodb::Client; use lambda_http::{service_fn, Body, Error, Request, Response}; use serverless_payments::{ - domain::PaymentRequest, payment_client::PaymentClient, payments_repository::PaymentsRepository, + database::PaymentsRepository, domain::PaymentRequest, payment_client::PaymentClient, request_utils::get_body, }; use tracing_subscriber::FmtSubscriber; @@ -28,9 +27,7 @@ async fn handler(event: Request) -> Result, Error> { let redirect_url = payment_client.initiate_payment(&payment_request).await?; // Save the data to the database - let config = aws_config::load_from_env().await; - let db_client = Client::new(&config); - let payments_repository = PaymentsRepository::new(db_client); + let payments_repository = PaymentsRepository::get().await; payments_repository .insert_payment(payment_request.into()) .await?; diff --git a/src/lib.rs b/src/lib.rs index f1ef93e..b2c2edb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ +pub mod database; pub mod domain; pub mod environment; pub mod payment_client; -pub mod payments_repository; pub mod request_utils; diff --git a/src/payments_repository.rs b/src/payments_repository.rs deleted file mode 100644 index bbca845..0000000 --- a/src/payments_repository.rs +++ /dev/null @@ -1,41 +0,0 @@ -use aws_sdk_dynamodb::{types::AttributeValue, Client}; - -use crate::{ - domain::Payment, - environment::{get_env_var, PAYMENTS_TABLE}, -}; - -pub struct PaymentsRepository { - client: Client, - table_name: String, -} - -impl PaymentsRepository { - pub fn new(client: Client) -> Self { - let table_name = get_env_var(PAYMENTS_TABLE) - .unwrap_or_else(|_| format!("{} variable not set", PAYMENTS_TABLE)); - - Self { client, table_name } - } - - pub async fn insert_payment(self, payment: Payment) -> Result<(), String> { - let id = AttributeValue::S(payment.id); - let amount = AttributeValue::N(payment.amount.to_string()); - let sender = AttributeValue::S(payment.sender); - let status = AttributeValue::N((payment.status as i8).to_string()); - - let _request = self - .client - .put_item() - .table_name(self.table_name) - .item("id", id) - .item("amount", amount) - .item("sender", sender) - .item("status", status) - .send() - .await - .map_err(|e| e.to_string())?; - - Ok(()) - } -} diff --git a/src/request_utils.rs b/src/request_utils.rs index a7876e0..c157f91 100644 --- a/src/request_utils.rs +++ b/src/request_utils.rs @@ -14,3 +14,15 @@ where let result: T = serde_json::from_str(body).map_err(|e| e.to_string())?; Ok(result) } + +#[tracing::instrument] +pub fn get_header(event: &Request, header: &str) -> Result { + let header = event + .headers() + .get(header) + .ok_or_else(|| format!("Missing header: {}", header))? + .to_str() + .map_err(|e| e.to_string())? + .to_string(); + Ok(header) +}