From 3ae53b559964eae2d7d6e1019f4f47c5665ca9d9 Mon Sep 17 00:00:00 2001 From: Pratik Mishra Date: Tue, 7 May 2024 15:59:09 +0530 Subject: [PATCH] feat: config version implementaion --- .../2024-04-22-122806_config_verions/down.sql | 1 + .../2024-04-22-122806_config_verions/up.sql | 11 + .../src/api/config/handlers.rs | 77 ++++-- .../src/api/config/mod.rs | 2 +- .../src/api/config/types.rs | 7 +- .../src/api/context/handlers.rs | 237 ++++++++++-------- .../src/api/context/types.rs | 15 +- .../src/api/default_config/handlers.rs | 93 ++++--- .../src/api/default_config/types.rs | 6 + crates/context_aware_config/src/db/models.rs | 15 +- .../context_aware_config/src/db/schema.patch | 19 ++ crates/context_aware_config/src/db/schema.rs | 11 + crates/context_aware_config/src/helpers.rs | 37 ++- .../src/validation_functions.rs | 2 +- .../src/api/experiments/handlers.rs | 32 ++- .../src/api/experiments/types.rs | 11 +- .../experimentation_platform/src/db/models.rs | 13 +- crates/service_utils/src/helpers.rs | 15 +- crates/service_utils/src/service/types.rs | 4 +- crates/superposition/src/main.rs | 5 +- postman/cac/.meta.json | 2 +- .../cac/Context/Delete Context/request.json | 9 + .../Recompute Priority Context/request.json | 9 + .../Delete default-config key/request.json | 9 + postman/cac/config/Get Config/event.test.js | 2 +- 25 files changed, 457 insertions(+), 187 deletions(-) create mode 100644 crates/context_aware_config/migrations/2024-04-22-122806_config_verions/down.sql create mode 100644 crates/context_aware_config/migrations/2024-04-22-122806_config_verions/up.sql create mode 100644 crates/context_aware_config/src/db/schema.patch diff --git a/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/down.sql b/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/down.sql new file mode 100644 index 00000000..d9a93fe9 --- /dev/null +++ b/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/down.sql @@ -0,0 +1 @@ +-- This file should undo anything in `up.sql` diff --git a/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/up.sql b/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/up.sql new file mode 100644 index 00000000..287f4b79 --- /dev/null +++ b/crates/context_aware_config/migrations/2024-04-22-122806_config_verions/up.sql @@ -0,0 +1,11 @@ +-- Your SQL goes here +-- Name: functions; Type: TABLE; Schema: public; Owner: - +-- +CREATE TABLE public.config_versions ( + id bigint PRIMARY KEY, + config json NOT NULL, + config_hash TEXT NOT NULL, + tags varchar(100)[] check (array_position(tags, null) is null), + created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL +); +-- \ No newline at end of file diff --git a/crates/context_aware_config/src/api/config/handlers.rs b/crates/context_aware_config/src/api/config/handlers.rs index 73b6bc2e..361674e4 100644 --- a/crates/context_aware_config/src/api/config/handlers.rs +++ b/crates/context_aware_config/src/api/config/handlers.rs @@ -7,7 +7,8 @@ use super::helpers::{ use super::types::Config; use crate::db::schema::{ - contexts::dsl as ctxt, default_configs::dsl as def_conf, event_log::dsl as event_log, + config_versions::dsl as config_versions, contexts::dsl as ctxt, + default_configs::dsl as def_conf, event_log::dsl as event_log, }; use actix_http::header::{HeaderName, HeaderValue}; use actix_web::{get, web::Query, HttpRequest, HttpResponse, Scope}; @@ -18,11 +19,12 @@ use diesel::{ r2d2::{ConnectionManager, PooledConnection}, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, }; -use serde_json::{json, Map, Value}; -use service_utils::service::types::DbConnection; -use service_utils::{bad_argument, db_error, unexpected_error}; +use serde_json::{from_value, json, Map, Value}; +use service_utils::{ + bad_argument, db_error, result as superposition, service::types::DbConnection, + unexpected_error, +}; -use service_utils::result as superposition; use uuid::Uuid; pub fn endpoints() -> Scope { @@ -98,13 +100,53 @@ fn is_not_modified(max_created_at: Option, req: &HttpRequest) -> max_created_at.is_some() && parsed_max <= last_modified } -async fn generate_cac( +pub fn generate_config_from_version( + version: Option, + conn: &mut PooledConnection>, +) -> superposition::Result { + let config = match version { + None => config_versions::config_versions + .select(config_versions::config) + .order_by(config_versions::created_at.desc()) + .first::(conn) + .map_err(|err| { + log::error!("failed to fetch config with error: {}", err); + db_error!(err) + }), + Some(version_val) => { + let version_id = from_value::(version_val).map_err(|e| { + log::error!("failed to decode version_id as integer: {}", e); + bad_argument!("version_id is not of type integer") + })?; + config_versions::config_versions + .select(config_versions::config) + .filter(config_versions::id.eq(version_id)) + .get_result::(conn) + .map_err(|err| { + log::error!("failed to fetch config with error: {}", err); + db_error!(err) + }) + } + }?; + + serde_json::from_value::(config).map_err(|err| { + log::error!("failed to decode config: {}", err); + unexpected_error!("failed to decode config") + }) +} +pub fn generate_cac( conn: &mut PooledConnection>, ) -> superposition::Result { let contexts_vec = ctxt::contexts - .select((ctxt::id, ctxt::value, ctxt::override_id, ctxt::override_)) + .select(( + ctxt::id, + ctxt::value, + ctxt::priority, + ctxt::override_id, + ctxt::override_, + )) .order_by((ctxt::priority.asc(), ctxt::created_at.asc())) - .load::<(String, Value, String, Value)>(conn) + .load::<(String, Value, i32, String, Value)>(conn) .map_err(|err| { log::error!("failed to fetch contexts with error: {}", err); db_error!(err) @@ -112,10 +154,12 @@ async fn generate_cac( let (contexts, overrides) = contexts_vec.into_iter().fold( (Vec::new(), Map::new()), - |(mut ctxts, mut overrides), (id, condition, override_id, override_)| { + |(mut ctxts, mut overrides), + (id, condition, priority_, override_id, override_)| { let ctxt = super::types::Context { id, condition, + priority: priority_, override_with_keys: [override_id.to_owned()], }; ctxts.push(ctxt); @@ -177,12 +221,13 @@ async fn get( query_params_map.insert( key, value - .parse::() + .parse::() .map_or_else(|_| json!(value), |int_val| json!(int_val)), ); } - let mut config = generate_cac(&mut conn).await?; + let mut config = + generate_config_from_version(query_params_map.remove("version"), &mut conn)?; if let Some(prefix) = query_params_map.get("prefix") { let prefix_list: HashSet<&str> = prefix .as_str() @@ -225,7 +270,7 @@ async fn get_resolved_config( query_params_map.insert( item.0, item.1 - .parse::() + .parse::() .map_or_else(|_| json!(item.1), |int_val| json!(int_val)), ); } @@ -240,7 +285,8 @@ async fn get_resolved_config( return Ok(HttpResponse::NotModified().finish()); } - let res = generate_cac(&mut conn).await?; + let res = + generate_config_from_version(query_params_map.remove("version"), &mut conn)?; let cac_client_contexts = res .contexts @@ -309,11 +355,12 @@ async fn get_filtered_config( query_params_map.insert( key, value - .parse::() + .parse::() .map_or_else(|_| json!(value), |int_val| json!(int_val)), ); } - let config = generate_cac(&mut conn).await?; + let config = + generate_config_from_version(query_params_map.remove("version"), &mut conn)?; let contexts = config.contexts; let filtered_context = filter_context(&contexts, &query_params_map)?; diff --git a/crates/context_aware_config/src/api/config/mod.rs b/crates/context_aware_config/src/api/config/mod.rs index d4417bb8..5ee185e0 100644 --- a/crates/context_aware_config/src/api/config/mod.rs +++ b/crates/context_aware_config/src/api/config/mod.rs @@ -1,4 +1,4 @@ -mod handlers; +pub mod handlers; mod types; pub use handlers::endpoints; mod helpers; diff --git a/crates/context_aware_config/src/api/config/types.rs b/crates/context_aware_config/src/api/config/types.rs index 332bfc03..4ae24fb8 100644 --- a/crates/context_aware_config/src/api/config/types.rs +++ b/crates/context_aware_config/src/api/config/types.rs @@ -1,16 +1,17 @@ -use serde::Serialize; +use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; -#[derive(Serialize)] +#[derive(Serialize, Deserialize)] pub struct Config { pub contexts: Vec, pub overrides: Map, pub default_configs: Map, } -#[derive(Serialize, Clone)] +#[derive(Serialize, Clone, Deserialize)] pub struct Context { pub id: String, pub condition: Value, + pub priority: i32, pub override_with_keys: [String; 1], } diff --git a/crates/context_aware_config/src/api/context/handlers.rs b/crates/context_aware_config/src/api/context/handlers.rs index 1ff58c27..6abf4268 100644 --- a/crates/context_aware_config/src/api/context/handlers.rs +++ b/crates/context_aware_config/src/api/context/handlers.rs @@ -2,13 +2,15 @@ extern crate base64; use std::str; use crate::helpers::{ - calculate_context_priority, json_to_sorted_string, validate_context_jsonschema, + add_config_version, calculate_context_priority, json_to_sorted_string, + validate_context_jsonschema, }; use crate::{ api::{ context::types::{ - ContextAction, ContextBulkResponse, DimensionCondition, MoveReq, - PaginationParams, PriorityRecomputeResponse, PutReq, PutResp, + BulkOperationReq, ContextAction, ContextBulkResponse, DeleteReq, + DimensionCondition, MoveReq, PaginationParams, PriorityRecomputeResponse, + PutReq, PutResp, }, dimension::get_all_dimension_schema_map, }, @@ -20,6 +22,9 @@ use crate::{ }, }, }; +use actix_web::web::Data; +use service_utils::service::types::AppState; + use actix_web::{ delete, get, put, web::{Json, Path, Query}, @@ -55,6 +60,8 @@ pub fn endpoints() -> Scope { .service(delete_context) .service(bulk_operations) .service(list_contexts) + .service(move_handler) + .service(delete_context) .service(get_context) .service(priority_recompute) } @@ -274,25 +281,23 @@ fn get_put_resp(ctx: Context) -> PutResp { fn put( req: Json, - conn: &mut PooledConnection>, - already_under_txn: bool, + transaction_conn: &mut PooledConnection>, user: &User, ) -> superposition::Result { use contexts::dsl::contexts; - let new_ctx = create_ctx_from_put_req(req, conn, user)?; + let new_ctx = create_ctx_from_put_req(req, transaction_conn, user)?; - if already_under_txn { - diesel::sql_query("SAVEPOINT put_ctx_savepoint").execute(conn)?; - } - let insert = diesel::insert_into(contexts).values(&new_ctx).execute(conn); + diesel::sql_query("SAVEPOINT put_ctx_savepoint").execute(transaction_conn)?; + let insert = diesel::insert_into(contexts) + .values(&new_ctx) + .execute(transaction_conn); match insert { Ok(_) => Ok(get_put_resp(new_ctx)), Err(DatabaseError(UniqueViolation, _)) => { - if already_under_txn { - diesel::sql_query("ROLLBACK TO put_ctx_savepoint").execute(conn)?; - } - update_override_of_existing_ctx(conn, new_ctx) + diesel::sql_query("ROLLBACK TO put_ctx_savepoint") + .execute(transaction_conn)?; + update_override_of_existing_ctx(transaction_conn, new_ctx) } Err(e) => { log::error!("failed to update context with db error: {:?}", e); @@ -303,64 +308,77 @@ fn put( #[put("")] async fn put_handler( + state: Data, req: Json, mut db_conn: DbConnection, user: User, ) -> superposition::Result> { - put(req, &mut db_conn, false, &user) - .map(|resp| Json(resp)) - .map_err(|err: superposition::AppError| { - log::info!("context put failed with error: {:?}", err); - err - }) + let tags = req.tags.to_owned(); + db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let put_response = put(req, transaction_conn, &user) + .map(|resp| Json(resp)) + .map_err(|err: superposition::AppError| { + log::info!("context put failed with error: {:?}", err); + err + })?; + add_config_version(&state, tags, transaction_conn)?; + Ok(put_response) + }) } fn override_helper( + state: &Data, req: Json, conn: &mut PooledConnection>, user: &User, ) -> superposition::Result> { use contexts::dsl::contexts; + let tags = req.tags.to_owned(); let new_ctx = create_ctx_from_put_req(req, conn, user)?; - - let insert = diesel::insert_into(contexts).values(&new_ctx).execute(conn); - - match insert { - Ok(_) => Ok(Json(get_put_resp(new_ctx))), - Err(DatabaseError(UniqueViolation, _)) => { - replace_override_of_existing_ctx(conn, new_ctx) // no need for .map(Json) - } - Err(e) => { - log::error!("failed to update context with db error: {:?}", e); - Err(db_error!(e)) + (*conn).transaction::<_, superposition::AppError, _>(|transaction_conn| { + let insert = diesel::insert_into(contexts) + .values(&new_ctx) + .execute(transaction_conn); + add_config_version(&state, tags, transaction_conn)?; + match insert { + Ok(_) => Ok(Json(get_put_resp(new_ctx))), + Err(DatabaseError(UniqueViolation, _)) => { + replace_override_of_existing_ctx(transaction_conn, new_ctx) // no need for .map(Json) + } + Err(e) => { + log::error!("failed to update context with db error: {:?}", e); + Err(db_error!(e)) + } } - } + }) } #[put("/overrides")] async fn update_override_handler( + state: Data, req: Json, mut db_conn: DbConnection, user: User, ) -> superposition::Result> { - override_helper(req, &mut db_conn, &user).map_err(|err: superposition::AppError| { - log::info!("context put failed with error: {:?}", err); - err - }) + override_helper(&state, req, &mut db_conn, &user).map_err( + |err: superposition::AppError| { + log::info!("context put failed with error: {:?}", err); + err + }, + ) } fn r#move( old_ctx_id: String, req: Json, - conn: &mut PooledConnection>, - already_under_txn: bool, + transaction_conn: &mut PooledConnection>, user: &User, ) -> superposition::Result { use contexts::dsl; let req = req.into_inner(); let ctx_condition = Value::Object(req.context); let new_ctx_id = hash(&ctx_condition); - let dimension_schema_map = get_all_dimension_schema_map(conn)?; + let dimension_schema_map = get_all_dimension_schema_map(transaction_conn)?; let priority = validate_dimensions_and_calculate_priority( "context", &ctx_condition, @@ -371,9 +389,7 @@ fn r#move( return Err(bad_argument!("no dimension found in context")); } - if already_under_txn { - diesel::sql_query("SAVEPOINT update_ctx_savepoint").execute(conn)?; - } + diesel::sql_query("SAVEPOINT update_ctx_savepoint").execute(transaction_conn)?; let context = diesel::update(dsl::contexts) .filter(dsl::id.eq(&old_ctx_id)) @@ -382,7 +398,7 @@ fn r#move( dsl::value.eq(&ctx_condition), dsl::priority.eq(priority), )) - .get_result(conn); + .get_result(transaction_conn); let contruct_new_ctx_with_old_overrides = |ctx: Context| Context { id: new_ctx_id, @@ -394,33 +410,21 @@ fn r#move( override_: ctx.override_, }; - let handle_unique_violation = - |db_conn: &mut DBConnection, already_under_txn: bool| { - if already_under_txn { - let deleted_ctxt = diesel::delete(dsl::contexts) - .filter(dsl::id.eq(&old_ctx_id)) - .get_result(db_conn)?; - - let ctx = contruct_new_ctx_with_old_overrides(deleted_ctxt); - update_override_of_existing_ctx(db_conn, ctx) - } else { - db_conn.build_transaction().read_write().run(|conn| { - let deleted_ctxt = diesel::delete(dsl::contexts) - .filter(dsl::id.eq(&old_ctx_id)) - .get_result(conn)?; - let ctx = contruct_new_ctx_with_old_overrides(deleted_ctxt); - update_override_of_existing_ctx(conn, ctx) - }) - } - }; + let handle_unique_violation = |db_conn: &mut DBConnection| { + let deleted_ctxt = diesel::delete(dsl::contexts) + .filter(dsl::id.eq(&old_ctx_id)) + .get_result(db_conn)?; + + let ctx = contruct_new_ctx_with_old_overrides(deleted_ctxt); + update_override_of_existing_ctx(db_conn, ctx) + }; match context { Ok(ctx) => Ok(get_put_resp(ctx)), Err(DatabaseError(UniqueViolation, _)) => { - if already_under_txn { - diesel::sql_query("ROLLBACK TO update_ctx_savepoint").execute(conn)?; - } - handle_unique_violation(conn, already_under_txn) + diesel::sql_query("ROLLBACK TO update_ctx_savepoint") + .execute(transaction_conn)?; + handle_unique_violation(transaction_conn) } Err(e) => { log::error!("failed to move context with db error: {:?}", e); @@ -431,17 +435,23 @@ fn r#move( #[put("/move/{ctx_id}")] async fn move_handler( + state: Data, path: Path, req: Json, mut db_conn: DbConnection, user: User, ) -> superposition::Result> { - r#move(path.into_inner(), req, &mut db_conn, false, &user) - .map(|resp| Json(resp)) - .map_err(|err| { - log::info!("move api failed with error: {:?}", err); - err - }) + let tags = req.tags.to_owned(); + db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let move_reponse = r#move(path.into_inner(), req, transaction_conn, &user) + .map(|resp| Json(resp)) + .map_err(|err| { + log::info!("move api failed with error: {:?}", err); + err + })?; + add_config_version(&state, tags, transaction_conn)?; + Ok(move_reponse) + }) } #[get("/{ctx_id}")] @@ -495,45 +505,54 @@ async fn list_contexts( #[delete("/{ctx_id}")] async fn delete_context( + state: Data, path: Path, + req: Json, db_conn: DbConnection, user: User, ) -> superposition::Result { use contexts::dsl; let DbConnection(mut conn) = db_conn; + let tags = req.tags.to_owned(); let ctx_id = path.into_inner(); - let deleted_row = - delete(dsl::contexts.filter(dsl::id.eq(&ctx_id))).execute(&mut conn); - match deleted_row { - Ok(0) => Err(not_found!("Context Id `{}` doesn't exists", ctx_id)), - Ok(_) => { - log::info!("{ctx_id} context deleted by {}", user.get_email()); - Ok(HttpResponse::NoContent().finish()) - } - Err(e) => { - log::error!("context delete query failed with error: {e}"); - Err(unexpected_error!("Something went wrong.")) + conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let deleted_row = + delete(dsl::contexts.filter(dsl::id.eq(&ctx_id))).execute(transaction_conn); + match deleted_row { + Ok(0) => Err(not_found!("Context Id `{}` doesn't exists", ctx_id)), + Ok(_) => { + add_config_version(&state, tags, transaction_conn)?; + log::info!("{ctx_id} context deleted by {}", user.get_email()); + Ok(HttpResponse::NoContent().finish()) + } + Err(e) => { + log::error!("context delete query failed with error: {e}"); + Err(unexpected_error!("Something went wrong.")) + } } - } + }) } #[put("/bulk-operations")] async fn bulk_operations( - reqs: Json>, + state: Data, + req: Json, db_conn: DbConnection, user: User, ) -> superposition::Result>> { use contexts::dsl::contexts; let DbConnection(mut conn) = db_conn; + let tags = req.tags.to_owned(); + let context_actions = req.into_inner().context_actions.clone(); let mut response = Vec::::new(); conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { - for action in reqs.into_inner().into_iter() { + for action in context_actions.into_iter() { match action { ContextAction::PUT(put_req) => { - let put_resp = put(Json(put_req), transaction_conn, true, &user) - .map_err(|err| { + let put_resp = + put(Json(put_req), transaction_conn, &user).map_err(|err| { log::error!( "Failed at insert into contexts due to {:?}", err @@ -568,7 +587,7 @@ async fn bulk_operations( } ContextAction::MOVE((old_ctx_id, move_req)) => { let move_context_resp = - r#move(old_ctx_id, Json(move_req), transaction_conn, true, &user) + r#move(old_ctx_id, Json(move_req), transaction_conn, &user) .map_err(|err| { log::error!( "Failed at moving context reponse due to {:?}", @@ -580,6 +599,7 @@ async fn bulk_operations( } } } + add_config_version(&state, tags, transaction_conn)?; Ok(()) // Commit the transaction })?; Ok(Json(response)) @@ -587,8 +607,11 @@ async fn bulk_operations( #[put("/priority/recompute")] async fn priority_recompute( + state: Data, + req: Json, db_conn: DbConnection, -) -> superposition::Result { + _user: User, +) -> superposition::Result>> { use crate::db::schema::contexts::dsl::*; let DbConnection(mut conn) = db_conn; @@ -599,6 +622,7 @@ async fn priority_recompute( let dimension_schema_map = get_all_dimension_schema_map(&mut conn)?; let mut response: Vec = vec![]; + let tags = req.tags.to_owned(); let update_contexts = result .clone() @@ -632,20 +656,23 @@ async fn priority_recompute( }) .collect::>>()?; - let insert = diesel::insert_into(contexts) - .values(&update_contexts) - .on_conflict(id) - .do_update() - .set(priority.eq(excluded(priority))) - .execute(&mut conn); - - match insert { - Ok(_) => Ok(HttpResponse::Ok().json(response)), - Err(err) => { - log::error!( - "Failed to execute query while recomputing priority, error: {err}" - ); - Err(db_error!(err)) + conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let insert = diesel::insert_into(contexts) + .values(&update_contexts) + .on_conflict(id) + .do_update() + .set(priority.eq(excluded(priority))) + .execute(transaction_conn); + add_config_version(&state, tags, transaction_conn)?; + + match insert { + Ok(_) => Ok(Json(response)), + Err(err) => { + log::error!( + "Failed to execute query while recomputing priority, error: {err}" + ); + Err(db_error!(err)) + } } - } + }) } diff --git a/crates/context_aware_config/src/api/context/types.rs b/crates/context_aware_config/src/api/context/types.rs index 88f616e0..09aa2acb 100644 --- a/crates/context_aware_config/src/api/context/types.rs +++ b/crates/context_aware_config/src/api/context/types.rs @@ -5,11 +5,18 @@ use serde_json::{Map, Value}; pub struct PutReq { pub context: Map, pub r#override: Map, + pub tags: Option>, } #[derive(Deserialize, Clone)] pub struct MoveReq { pub context: Map, + pub tags: Option>, +} + +#[derive(Deserialize, Clone)] +pub struct DeleteReq { + pub tags: Option>, } #[derive(Deserialize, Clone)] @@ -30,7 +37,13 @@ pub struct PaginationParams { pub size: Option, } -#[derive(serde::Deserialize)] +#[derive(Deserialize, Clone)] +pub struct BulkOperationReq { + pub context_actions: Vec, + pub tags: Option>, +} + +#[derive(serde::Deserialize, Clone)] pub enum ContextAction { PUT(PutReq), DELETE(String), diff --git a/crates/context_aware_config/src/api/default_config/handlers.rs b/crates/context_aware_config/src/api/default_config/handlers.rs index cd0998c1..13c93917 100644 --- a/crates/context_aware_config/src/api/default_config/handlers.rs +++ b/crates/context_aware_config/src/api/default_config/handlers.rs @@ -1,8 +1,11 @@ extern crate base64; -use super::types::CreateReq; -use service_utils::helpers::validation_err_to_str; +use super::types::{CreateReq, DeleteReq}; use service_utils::{ - bad_argument, db_error, not_found, unexpected_error, validation_error, + bad_argument, db_error, + helpers::validation_err_to_str, + not_found, result as superposition, + service::types::{AppState, DbConnection}, + unexpected_error, validation_error, }; use superposition_types::{SuperpositionUser, User}; @@ -15,7 +18,7 @@ use crate::{ models::{Context, DefaultConfig}, schema::{contexts::dsl::contexts, default_configs::dsl::default_configs}, }, - helpers::validate_jsonschema, + helpers::{add_config_version, validate_jsonschema}, }; use actix_web::{ delete, get, put, @@ -23,16 +26,13 @@ use actix_web::{ HttpResponse, Scope, }; use chrono::Utc; +use diesel::Connection; use diesel::{ r2d2::{ConnectionManager, PooledConnection}, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, }; use jsonschema::{Draft, JSONSchema, ValidationError}; use serde_json::{from_value, json, Map, Value}; -use service_utils::{ - result as superposition, - service::types::{AppState, DbConnection}, -}; pub fn endpoints() -> Scope { Scope::new("").service(create).service(get).service(delete) @@ -147,25 +147,27 @@ async fn create( )?; } } - - let upsert = diesel::insert_into(default_configs) - .values(&default_config) - .on_conflict(db::schema::default_configs::key) - .do_update() - .set(&default_config) - .execute(&mut conn); - - match upsert { - Ok(_) => Ok(HttpResponse::Ok().json(json!({ - "message": "DefaultConfig created/updated successfully." - }))), - Err(e) => { - log::info!("DefaultConfig creation failed with error: {e}"); - Err(unexpected_error!( - "Something went wrong, failed to create DefaultConfig" - )) - } - } + conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let upsert = diesel::insert_into(default_configs) + .values(&default_config) + .on_conflict(db::schema::default_configs::key) + .do_update() + .set(&default_config) + .execute(transaction_conn); + add_config_version(&state, req.tags, transaction_conn)?; + let ok_resp = match upsert { + Ok(_) => Ok(HttpResponse::Ok().json(json!({ + "message": "DefaultConfig created/updated successfully." + }))), + Err(e) => { + log::info!("DefaultConfig creation failed with error: {e}"); + Err(unexpected_error!( + "Something went wrong, failed to create DefaultConfig" + )) + } + }?; + Ok(ok_resp) + }) } fn fetch_default_key( @@ -215,32 +217,41 @@ pub fn get_key_usage_context_ids( #[delete("/{key}")] async fn delete( + state: Data, path: Path, + req: Json, db_conn: DbConnection, user: User, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; + let tags = req.tags.to_owned(); let key = path.into_inner(); fetch_default_key(&key, &mut conn)?; let context_ids = get_key_usage_context_ids(&key, &mut conn) .map_err(|_| unexpected_error!("Something went wrong"))?; if context_ids.is_empty() { - let deleted_row = diesel::delete( - default_configs.filter(db::schema::default_configs::key.eq(&key)), - ) - .execute(&mut conn); - match deleted_row { - Ok(0) => Err(not_found!("default config key `{}` doesn't exists", key)), - Ok(_) => { - log::info!("default config key: {key} deleted by {}", user.get_email()); - Ok(HttpResponse::NoContent().finish()) - } - Err(e) => { - log::error!("default config delete query failed with error: {e}"); - Err(unexpected_error!("Something went wrong.")) + conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { + let deleted_row = diesel::delete( + default_configs.filter(db::schema::default_configs::key.eq(&key)), + ) + .execute(transaction_conn); + match deleted_row { + Ok(0) => Err(not_found!("default config key `{}` doesn't exists", key)), + Ok(_) => { + add_config_version(&state, tags, transaction_conn)?; + log::info!( + "default config key: {key} deleted by {}", + user.get_email() + ); + Ok(HttpResponse::NoContent().finish()) + } + Err(e) => { + log::error!("default config delete query failed with error: {e}"); + Err(unexpected_error!("Something went wrong.")) + } } - } + }) } else { Err(bad_argument!( "Given key already in use in contexts: {}", diff --git a/crates/context_aware_config/src/api/default_config/types.rs b/crates/context_aware_config/src/api/default_config/types.rs index 0e4b1b61..a7939895 100644 --- a/crates/context_aware_config/src/api/default_config/types.rs +++ b/crates/context_aware_config/src/api/default_config/types.rs @@ -8,6 +8,7 @@ pub struct CreateReq { pub schema: Option>, #[serde(default, deserialize_with = "deserialize_option")] pub function_name: Option, + pub tags: Option>, } fn deserialize_option<'de, D>(deserializer: D) -> Result, D::Error> @@ -17,3 +18,8 @@ where let value: Value = Deserialize::deserialize(deserializer)?; Ok(Some(value)) } + +#[derive(Deserialize, Clone)] +pub struct DeleteReq { + pub tags: Option>, +} diff --git a/crates/context_aware_config/src/db/models.rs b/crates/context_aware_config/src/db/models.rs index 86e27552..42c3964b 100644 --- a/crates/context_aware_config/src/db/models.rs +++ b/crates/context_aware_config/src/db/models.rs @@ -1,4 +1,6 @@ -use crate::db::schema::{contexts, default_configs, dimensions, event_log, functions}; +use crate::db::schema::{ + config_versions, contexts, default_configs, dimensions, event_log, functions, +}; use chrono::{offset::Utc, DateTime, NaiveDateTime}; use diesel::{AsChangeset, Insertable, Queryable, Selectable}; use serde::Serialize; @@ -74,3 +76,14 @@ pub struct EventLog { pub new_data: Option, pub query: String, } + +#[derive(Queryable, Selectable, Insertable, Serialize, Clone, Debug)] +#[diesel(check_for_backend(diesel::pg::Pg))] +#[diesel(primary_key(id))] +pub struct ConfigVersion { + pub id: i64, + pub config: Value, + pub config_hash: String, + pub tags: Option>, + pub created_at: NaiveDateTime, +} diff --git a/crates/context_aware_config/src/db/schema.patch b/crates/context_aware_config/src/db/schema.patch new file mode 100644 index 00000000..acf43751 --- /dev/null +++ b/crates/context_aware_config/src/db/schema.patch @@ -0,0 +1,19 @@ +diff --git a/crates/context_aware_config/src/db/schema.rs b/crates/context_aware_config/src/db/schema.rs +index 15c2eee..25c8088 100644 +--- a/crates/context_aware_config/src/db/schema.rs ++++ b/crates/context_aware_config/src/db/schema.rs +@@ -2,13 +2,13 @@ + + diesel::table! { + config_versions (id) { + id -> Int8, + config -> Json, + config_hash -> Text, +- tags -> Nullable>>, ++ tags -> Nullable>, + created_at -> Timestamp, + } + } + + diesel::table! { + contexts (id) { diff --git a/crates/context_aware_config/src/db/schema.rs b/crates/context_aware_config/src/db/schema.rs index 99d135ac..25c8088c 100644 --- a/crates/context_aware_config/src/db/schema.rs +++ b/crates/context_aware_config/src/db/schema.rs @@ -1,5 +1,15 @@ // @generated automatically by Diesel CLI. +diesel::table! { + config_versions (id) { + id -> Int8, + config -> Json, + config_hash -> Text, + tags -> Nullable>, + created_at -> Timestamp, + } +} + diesel::table! { contexts (id) { id -> Varchar, @@ -602,6 +612,7 @@ diesel::joinable!(default_configs -> functions (function_name)); diesel::joinable!(dimensions -> functions (function_name)); diesel::allow_tables_to_appear_in_same_query!( + config_versions, contexts, default_configs, dimensions, diff --git a/crates/context_aware_config/src/helpers.rs b/crates/context_aware_config/src/helpers.rs index 663707d4..d727586e 100644 --- a/crates/context_aware_config/src/helpers.rs +++ b/crates/context_aware_config/src/helpers.rs @@ -1,9 +1,22 @@ +use crate::{ + api::config::handlers::generate_cac, + db::{models::ConfigVersion, schema::config_versions}, +}; use actix_web::http::header::{HeaderMap, HeaderName, HeaderValue}; +use actix_web::web::Data; +use chrono::Utc; +use diesel::{ + r2d2::{ConnectionManager, PooledConnection}, + PgConnection, RunQueryDsl, +}; use itertools::{self, Itertools}; use jsonschema::{Draft, JSONSchema, ValidationError}; use serde_json::{json, Value}; use service_utils::{ - helpers::validation_err_to_str, result as superposition, validation_error, + helpers::{generate_snowflake_id, validation_err_to_str}, + result as superposition, + service::types::AppState, + validation_error, }; use std::collections::HashMap; @@ -286,6 +299,28 @@ pub fn calculate_context_priority( _ => Ok(0), } } +pub fn add_config_version( + state: &Data, + tags: Option>, + db_conn: &mut PooledConnection>, +) -> superposition::Result { + use config_versions::dsl::config_versions; + let version_id = generate_snowflake_id(state)?; + let config = generate_cac(db_conn)?; + let json_config = json!(config); + let config_hash = blake3::hash(json_config.to_string().as_bytes()).to_string(); + let config_version = ConfigVersion { + id: version_id, + config: json_config, + config_hash, + tags: tags, + created_at: Utc::now().naive_utc(), + }; + diesel::insert_into(config_versions) + .values(&config_version) + .execute(db_conn)?; + Ok(version_id) +} // ************ Tests ************* diff --git a/crates/context_aware_config/src/validation_functions.rs b/crates/context_aware_config/src/validation_functions.rs index 98b90a44..94d47393 100644 --- a/crates/context_aware_config/src/validation_functions.rs +++ b/crates/context_aware_config/src/validation_functions.rs @@ -1,4 +1,4 @@ -use serde_json::{json, Value}; +use serde_json::Value; use service_utils::result as superposition; use service_utils::unexpected_error; use service_utils::validation_error; diff --git a/crates/experimentation_platform/src/api/experiments/handlers.rs b/crates/experimentation_platform/src/api/experiments/handlers.rs index df77dfb9..4851458f 100644 --- a/crates/experimentation_platform/src/api/experiments/handlers.rs +++ b/crates/experimentation_platform/src/api/experiments/handlers.rs @@ -12,7 +12,8 @@ use diesel::{ }; use service_utils::{ - bad_argument, response_error, result as superposition, unexpected_error, + bad_argument, helpers::generate_snowflake_id, response_error, + result as superposition, unexpected_error, }; use superposition_types::{SuperpositionUser, User}; @@ -27,10 +28,10 @@ use super::{ validate_override_keys, }, types::{ - AuditQueryFilters, ConcludeExperimentRequest, ContextAction, ContextBulkResponse, - ContextMoveReq, ContextPutReq, ExperimentCreateRequest, ExperimentCreateResponse, - ExperimentResponse, ExperimentsResponse, ListFilters, OverrideKeysUpdateRequest, - RampRequest, Variant, + AuditQueryFilters, ConcludeExperimentRequest, ContextAction, ContextBulkReq, + ContextBulkResponse, ContextMoveReq, ContextPutReq, ExperimentCreateRequest, + ExperimentCreateResponse, ExperimentResponse, ExperimentsResponse, ListFilters, + OverrideKeysUpdateRequest, RampRequest, Variant, }, }; @@ -159,8 +160,7 @@ async fn create( } // generating snowflake id for experiment - let mut snowflake_generator = state.snowflake_generator.lock().unwrap(); - let experiment_id = snowflake_generator.real_time_generate(); + let experiment_id = generate_snowflake_id(&state)?; //create overrides in CAC, if successfull then create experiment in DB let mut cac_operations: Vec = vec![]; @@ -191,6 +191,10 @@ async fn create( // creating variants' context in CAC let http_client = reqwest::Client::new(); let url = state.cac_host.clone() + "/context/bulk-operations"; + let request_body = ContextBulkReq { + context_actions: cac_operations, + tags: req.tags.to_owned(), + }; // Step 1: Perform the HTTP request and handle errors let response = http_client @@ -200,7 +204,7 @@ async fn create( "Authorization", format!("{} {}", user.get_auth_type(), user.get_auth_token()), ) - .json(&cac_operations) + .json(&request_body) .send() .await; @@ -338,6 +342,10 @@ pub async fn conclude( // calling CAC bulk api with operations as payload let http_client = reqwest::Client::new(); let url = state.cac_host.clone() + "/context/bulk-operations"; + let request_body = ContextBulkReq { + context_actions: operations, + tags: req.tags.to_owned(), + }; let response = http_client .put(&url) .header("x-tenant", tenant.as_str()) @@ -345,7 +353,7 @@ pub async fn conclude( "Authorization", format!("{} {}", user.get_auth_type(), user.get_auth_token()), ) - .json(&operations) + .json(&request_body) .send() .await; @@ -654,6 +662,10 @@ async fn update_overrides( let http_client = reqwest::Client::new(); let url = state.cac_host.clone() + "/context/bulk-operations"; + let request_body = ContextBulkReq { + context_actions: cac_operations, + tags: payload.tags, + }; let response = http_client .put(&url) @@ -662,7 +674,7 @@ async fn update_overrides( "Authorization", format!("{} {}", user.get_auth_type(), user.get_auth_token()), ) - .json(&cac_operations) + .json(&request_body) .send() .await; diff --git a/crates/experimentation_platform/src/api/experiments/types.rs b/crates/experimentation_platform/src/api/experiments/types.rs index d4a1c0b5..a12c0220 100644 --- a/crates/experimentation_platform/src/api/experiments/types.rs +++ b/crates/experimentation_platform/src/api/experiments/types.rs @@ -28,6 +28,7 @@ pub struct ExperimentCreateRequest { pub context: Value, pub variants: Vec, + pub tags: Option>, } #[derive(Serialize)] @@ -96,6 +97,7 @@ pub struct ExperimentsResponse { #[derive(Deserialize, Debug)] pub struct ConcludeExperimentRequest { pub chosen_variant: String, + pub tags: Option>, } /********** Context Bulk API Type *************/ @@ -106,7 +108,7 @@ pub struct ContextPutReq { pub r#override: Value, } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub enum ContextAction { PUT(ContextPutReq), DELETE(String), @@ -120,6 +122,12 @@ pub struct ContextPutResp { pub priority: i32, } +#[derive(Deserialize, Serialize, Clone)] +pub struct ContextBulkReq { + pub context_actions: Vec, + pub tags: Option>, +} + #[derive(Serialize, Deserialize, Debug)] pub enum ContextBulkResponse { PUT(ContextPutResp), @@ -161,6 +169,7 @@ pub struct VariantUpdateRequest { #[derive(Deserialize, Debug)] pub struct OverrideKeysUpdateRequest { pub variants: Vec, + pub tags: Option>, } #[derive(Deserialize, Serialize, Clone)] diff --git a/crates/experimentation_platform/src/db/models.rs b/crates/experimentation_platform/src/db/models.rs index 3711c011..830f9042 100644 --- a/crates/experimentation_platform/src/db/models.rs +++ b/crates/experimentation_platform/src/db/models.rs @@ -1,12 +1,21 @@ use crate::db::schema::*; use chrono::{DateTime, NaiveDateTime, Utc}; -use diesel::{Insertable, Queryable, QueryableByName, Selectable}; +use diesel::{ + query_builder::QueryId, Insertable, Queryable, QueryableByName, Selectable, +}; use serde::{Deserialize, Serialize}; use serde_json::Value; #[derive( - Debug, Clone, Copy, PartialEq, Deserialize, Serialize, diesel_derive_enum::DbEnum, + Debug, + Clone, + Copy, + PartialEq, + Deserialize, + Serialize, + diesel_derive_enum::DbEnum, + QueryId, )] #[DbValueStyle = "UPPERCASE"] #[ExistingTypePath = "crate::db::schema::sql_types::ExperimentStatusType"] diff --git a/crates/service_utils/src/helpers.rs b/crates/service_utils/src/helpers.rs index c489d823..1438275b 100644 --- a/crates/service_utils/src/helpers.rs +++ b/crates/service_utils/src/helpers.rs @@ -1,4 +1,5 @@ -use actix_web::{error::ErrorInternalServerError, Error}; +use actix_web::{error::ErrorInternalServerError, web::Data, Error}; +use anyhow::anyhow; use jsonschema::{error::ValidationErrorKind, ValidationError}; use log::info; use serde::de::{self, IntoDeserializer}; @@ -9,6 +10,7 @@ use std::{ }; use super::result; +use crate::service::types::AppState; use serde_json::{Map, Value}; //WARN Do NOT use this fxn inside api requests, instead add the required @@ -326,3 +328,14 @@ pub fn validation_err_to_str(errors: Vec) -> Vec { } }).collect() } + +pub fn generate_snowflake_id(state: &Data) -> result::Result { + let mut snowflake_generator = state.snowflake_generator.lock().map_err(|e| { + log::error!("snowflake_id generation failed {}", e); + result::AppError::UnexpectedError(anyhow!("snowflake_id generation failed {}", e)) + })?; + let id = snowflake_generator.real_time_generate(); + // explicitly dropping snowflake_generator so that lock is released and it can be acquired in bulk-operations handler + drop(snowflake_generator); + Ok(id) +} diff --git a/crates/service_utils/src/service/types.rs b/crates/service_utils/src/service/types.rs index 7774a06e..3bf06c6f 100644 --- a/crates/service_utils/src/service/types.rs +++ b/crates/service_utils/src/service/types.rs @@ -1,12 +1,14 @@ use crate::db::pgschema_manager::{PgSchemaConnection, PgSchemaManager}; use derive_more::{Deref, DerefMut}; use jsonschema::JSONSchema; +use serde::Deserialize; use serde_json::json; use std::{ collections::HashSet, future::{ready, Ready}, str::FromStr, + sync::Arc, }; use actix_web::{error, web::Data, Error, FromRequest, HttpMessage}; @@ -37,7 +39,7 @@ pub struct AppState { pub default_config_validation_schema: JSONSchema, pub meta_schema: JSONSchema, pub experimentation_flags: ExperimentationFlags, - pub snowflake_generator: Mutex, + pub snowflake_generator: Arc>, pub enable_tenant_and_scope: bool, pub tenant_middleware_exclusion_list: HashSet, pub service_prefix: String, diff --git a/crates/superposition/src/main.rs b/crates/superposition/src/main.rs index 2c508fdd..c51be386 100644 --- a/crates/superposition/src/main.rs +++ b/crates/superposition/src/main.rs @@ -7,6 +7,7 @@ use context_aware_config::helpers::{ }; use dotenv; use experimentation_platform::api::*; +use std::sync::Arc; use std::{collections::HashSet, io::Result}; use superposition_types::User; @@ -128,6 +129,8 @@ async fn main() -> Result<()> { return view! { }; }); + let snowflake_generator = Arc::new(Mutex::new(SnowflakeIdGenerator::new(1, 1))); + HttpServer::new(move || { let leptos_options = &conf.leptos_options; let site_root = &leptos_options.site_root; @@ -155,7 +158,7 @@ async fn main() -> Result<()> { allow_same_keys_non_overlapping_ctx.to_owned(), }, - snowflake_generator: Mutex::new(SnowflakeIdGenerator::new(1,1)), + snowflake_generator: snowflake_generator.clone(), meta_schema: get_meta_schema(), app_env: app_env.to_owned(), enable_tenant_and_scope: enable_tenant_and_scope.to_owned(), diff --git a/postman/cac/.meta.json b/postman/cac/.meta.json index 7d010895..1e7dc4e1 100644 --- a/postman/cac/.meta.json +++ b/postman/cac/.meta.json @@ -1,7 +1,7 @@ { "childrenOrder": [ - "config", "Default Config", + "config", "Dimension", "Context", "audit log" diff --git a/postman/cac/Context/Delete Context/request.json b/postman/cac/Context/Delete Context/request.json index ed19fe4c..ae60143e 100644 --- a/postman/cac/Context/Delete Context/request.json +++ b/postman/cac/Context/Delete Context/request.json @@ -12,6 +12,15 @@ "type": "default" } ], + "body": { + "mode": "raw", + "options": { + "raw": { + "language": "json" + } + }, + "raw_json_formatted": {} + }, "url": { "raw": "{{host}}/context/{{context_id}}", "host": [ diff --git a/postman/cac/Context/Recompute Priority Context/request.json b/postman/cac/Context/Recompute Priority Context/request.json index c94f7bf4..839b7274 100644 --- a/postman/cac/Context/Recompute Priority Context/request.json +++ b/postman/cac/Context/Recompute Priority Context/request.json @@ -17,6 +17,15 @@ "type": "default" } ], + "body": { + "mode": "raw", + "options": { + "raw": { + "language": "json" + } + }, + "raw_json_formatted": {} + }, "url": { "raw": "{{host}}/context/priority/recompute", "host": [ diff --git a/postman/cac/Default Config/Delete default-config key/request.json b/postman/cac/Default Config/Delete default-config key/request.json index 97e1d804..82f36a74 100644 --- a/postman/cac/Default Config/Delete default-config key/request.json +++ b/postman/cac/Default Config/Delete default-config key/request.json @@ -17,6 +17,15 @@ "type": "default" } ], + "body": { + "mode": "raw", + "options": { + "raw": { + "language": "json" + } + }, + "raw_json_formatted": {} + }, "url": { "raw": "{{host}}/default-config/key2", "host": [ diff --git a/postman/cac/config/Get Config/event.test.js b/postman/cac/config/Get Config/event.test.js index bb89bec7..9cb78dde 100644 --- a/postman/cac/config/Get Config/event.test.js +++ b/postman/cac/config/Get Config/event.test.js @@ -4,7 +4,7 @@ pm.test("200 check", function() { let expected_response = { "contexts": [], "overrides": {}, - "default_configs": {} + "default_configs": {"key1": "value1"} }; pm.expect(JSON.stringify(response)).to.be.eq(JSON.stringify(expected_response)); })