Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use S3 read and write paths and namespace from control plane #773

Merged
merged 4 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion conductor/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 39 additions & 75 deletions conductor/src/aws/cloudformation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,66 +10,50 @@ use std::sync::Arc;

use crate::errors::ConductorError;

#[derive(Clone)]
pub struct CloudFormationParams {
pub backup_archive_bucket: String,
pub org_name: String,
pub db_name: String,
pub iam_role_name: String,
pub cf_template_bucket: String,
pub bucket_name: String,
pub read_path_prefix: String,
pub write_path_prefix: String,
pub role_name: String,
pub namespace: String,
pub service_account_name: String,
}

pub struct AWSConfigState {
pub cf_client: Arc<Client>,
pub cf_config: Arc<SdkConfig>,
}

impl CloudFormationParams {
pub fn new(
backup_archive_bucket: String,
org_name: String,
db_name: String,
iam_role_name: String,
cf_template_bucket: String,
namespace: String,
service_account_name: String,
) -> Self {
Self {
backup_archive_bucket,
org_name,
db_name,
iam_role_name,
cf_template_bucket,
namespace,
service_account_name,
}
fn parameters(self) -> Vec<Parameter> {
vec![
Parameter::builder()
.parameter_key("BucketName")
.parameter_value(self.bucket_name)
.build(),
Parameter::builder()
.parameter_key("ReadPathPrefix")
.parameter_value(self.read_path_prefix)
.build(),
Parameter::builder()
.parameter_key("WritePathPrefix")
.parameter_value(self.write_path_prefix)
.build(),
Parameter::builder()
.parameter_key("RoleName")
.parameter_value(self.role_name)
.build(),
Parameter::builder()
.parameter_key("Namespace")
.parameter_value(self.namespace)
.build(),
Parameter::builder()
.parameter_key("ServiceAccountName")
.parameter_value(self.service_account_name)
.build(),
]
}
}

pub fn validate(&self) -> Result<(), String> {
if self.iam_role_name.is_empty() {
return Err("IAM role name cannot be empty".to_string());
}
if self.backup_archive_bucket.is_empty() {
return Err("Cloudformation Bucket Name cannot be empty".to_string());
}
if self.org_name.is_empty() {
return Err("Cloudformation Bucket Name cannot be empty".to_string());
}
if self.db_name.is_empty() {
return Err("Cloudformation Bucket Name cannot be empty".to_string());
}
if self.cf_template_bucket.is_empty() {
return Err("Cloudformation Bucket Name cannot be empty".to_string());
}
if self.namespace.is_empty() {
return Err("Namespace cannot be empty".to_string());
}
if self.service_account_name.is_empty() {
return Err("Kubernetes Service Account Name cannot be empty".to_string());
}
Ok(())
}
pub struct AWSConfigState {
pub cf_client: Arc<Client>,
pub cf_config: Arc<SdkConfig>,
}

impl AWSConfigState {
Expand Down Expand Up @@ -103,33 +87,13 @@ impl AWSConfigState {
&self,
stack_name: &str,
params: &CloudFormationParams,
cloudformation_template_bucket: String,
) -> Result<(), ConductorError> {
let template_url = format!(
"https://{}.s3.amazonaws.com/{}",
params.cf_template_bucket, "conductor-cf-template.yaml"
cloudformation_template_bucket, "conductor-cf-template-v2.yaml"
);
let parameters = vec![
Parameter::builder()
.parameter_key("BucketName")
.parameter_value(params.backup_archive_bucket.clone())
.build(),
Parameter::builder()
.parameter_key("BucketOrg")
.parameter_value(params.org_name.clone())
.build(),
Parameter::builder()
.parameter_key("RoleName")
.parameter_value(params.iam_role_name.clone())
.build(),
Parameter::builder()
.parameter_key("Namespace")
.parameter_value(params.namespace.clone())
.build(),
Parameter::builder()
.parameter_key("ServiceAccountName")
.parameter_value(params.service_account_name.clone())
.build(),
];
let parameters = params.clone().parameters();
if !self.does_stack_exist(stack_name).await {
// todo(nhudson): We need to add tags to the stack
// get with @sjmiller609 to figure out how we want
Expand Down
55 changes: 23 additions & 32 deletions conductor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,10 @@ pub async fn restart_coredb(
pub async fn create_cloudformation(
aws_region: String,
backup_archive_bucket: String,
org_name: &str,
db_name: &str,
cf_template_bucket: &str,
namespace: String,
read_path: Option<String>,
write_path: Option<String>,
cf_template_bucket: String,
) -> Result<(), ConductorError> {
// (todo: nhudson) - Create Cloudformation Stack only for Create event
// Create new function that returns 3 enums of SUCCESS, ERROR, WAITING
Expand All @@ -390,28 +391,21 @@ pub async fn create_cloudformation(
// If we are still waiting for the stack to be created we will need to requeue the message
let region = Region::new(aws_region);
let aws_config_state = AWSConfigState::new(region).await;
let namespace = format!("org-{}-inst-{}", org_name, db_name);
let stack_name = format!("org-{}-inst-{}-cf", org_name, db_name);
let iam_role_name = format!("org-{}-inst-{}-iam", org_name, db_name);
let service_account_name = format!("org-{}-inst-{}-sa", org_name, db_name);
let cf_template_params = CloudFormationParams::new(
// Database Backup Bucket Name
String::from(&backup_archive_bucket),
// Customer Org Name
String::from(org_name),
// Customer Database Name
String::from(db_name),
// AWS IAM Role Name to create
String::from(&iam_role_name),
// The AWS S3 Bucket where the CF Template is placed
String::from(cf_template_bucket),
// The Kubernetes Namespace where the database is deployed
let stack_name = format!("{}-cf", namespace);
let iam_role_name = format!("{}-iam", namespace);
let service_account_name = format!("{}-sa", namespace);
let read_path = read_path.unwrap_or_else(|| format!("v2/{}", namespace));
let write_path = write_path.unwrap_or_else(|| format!("v2/{}", namespace));
let cf_template_params = CloudFormationParams {
bucket_name: backup_archive_bucket,
read_path_prefix: read_path,
write_path_prefix: write_path,
role_name: iam_role_name,
namespace,
// The Kubernetes Service Account to use for the database
String::from(&service_account_name),
);
service_account_name,
};
aws_config_state
.create_cloudformation_stack(&stack_name, &cf_template_params)
.create_cloudformation_stack(&stack_name, &cf_template_params, cf_template_bucket)
.await
.map_err(ConductorError::from)?;
Ok(())
Expand All @@ -425,12 +419,11 @@ pub async fn create_cloudformation(
// Delete a cloudformation stack.
pub async fn delete_cloudformation(
aws_region: String,
org_name: &str,
db_name: &str,
namespace: &str,
) -> Result<(), ConductorError> {
let region = Region::new(aws_region);
let aws_config_state = AWSConfigState::new(region).await;
let stack_name = format!("org-{}-inst-{}-cf", org_name, db_name);
let stack_name = format!("{}-cf", namespace);
aws_config_state
.delete_cloudformation_stack(&stack_name)
.await
Expand All @@ -445,10 +438,9 @@ pub struct StackOutputs {

pub async fn lookup_role_arn(
aws_region: String,
organization_name: &str,
dbname: &str,
namespace: &str,
) -> Result<String, ConductorError> {
let stack_outputs = get_stack_outputs(aws_region, organization_name, dbname).await?;
let stack_outputs = get_stack_outputs(aws_region, namespace).await?;
let role_arn = stack_outputs
.role_arn
.ok_or_else(|| ConductorError::NoOutputsFound)?;
Expand All @@ -458,12 +450,11 @@ pub async fn lookup_role_arn(
// Get Cloudformation Stack Outputs RoleName and RoleArn
async fn get_stack_outputs(
aws_region: String,
org_name: &str,
db_name: &str,
namespace: &str,
) -> Result<StackOutputs, ConductorError> {
let region = Region::new(aws_region);
let aws_config_state = AWSConfigState::new(region).await;
let stack_name = format!("org-{}-inst-{}-cf", org_name, db_name);
let stack_name = format!("{}-cf", namespace);
// When moving this into operator, handle the specific errors that mean
// "cloudformation is not done yet" and return a more specific error
let (role_name, role_arn) = aws_config_state
Expand Down
48 changes: 17 additions & 31 deletions conductor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,7 @@ async fn run(metrics: CustomMetrics) -> Result<(), ConductorError> {

let org_id = &read_msg.message.org_id;
let instance_id = &read_msg.message.inst_id;
let namespace = format!(
"org-{}-inst-{}",
read_msg.message.organization_name, read_msg.message.dbname
);
let namespace = read_msg.message.namespace.clone();
info!("{}: Using namespace {}", read_msg.msg_id, &namespace);

if read_msg.message.event_type != Event::Delete {
Expand Down Expand Up @@ -282,7 +279,7 @@ async fn run(metrics: CustomMetrics) -> Result<(), ConductorError> {

info!("{}: Creating namespace", read_msg.msg_id);
// create Namespace
create_namespace(client.clone(), &namespace, &org_id, &instance_id).await?;
create_namespace(client.clone(), &namespace, org_id, instance_id).await?;

info!("{}: Generating spec", read_msg.msg_id);
let stack_type = match coredb_spec.stack.as_ref() {
Expand All @@ -291,9 +288,9 @@ async fn run(metrics: CustomMetrics) -> Result<(), ConductorError> {
};

let spec = generate_spec(
&org_id,
org_id,
&stack_type,
&instance_id,
instance_id,
&read_msg.message.data_plane_id,
&namespace,
&coredb_spec,
Expand Down Expand Up @@ -420,12 +417,7 @@ async fn run(metrics: CustomMetrics) -> Result<(), ConductorError> {
delete_namespace(client.clone(), &namespace).await?;

info!("{}: Deleting cloudformation stack", read_msg.msg_id);
delete_cloudformation(
String::from("us-east-1"),
&read_msg.message.organization_name,
&read_msg.message.dbname,
)
.await?;
delete_cloudformation(String::from("us-east-1"), &namespace).await?;

let insert_query = sqlx::query!(
"INSERT INTO deleted_instances (namespace) VALUES ($1) ON CONFLICT (namespace) DO NOTHING",
Expand Down Expand Up @@ -703,19 +695,15 @@ async fn init_cloud_perms(
create_cloudformation(
String::from("us-east-1"),
backup_archive_bucket.clone(),
&read_msg.message.organization_name,
&read_msg.message.dbname,
&cf_template_bucket,
read_msg.message.namespace.clone(),
read_msg.message.backups_read_path.clone(),
read_msg.message.backups_write_path.clone(),
cf_template_bucket,
)
.await?;

// Lookup the CloudFormation stack's role ARN
let role_arn = lookup_role_arn(
String::from("us-east-1"),
&read_msg.message.organization_name,
&read_msg.message.dbname,
)
.await?;
let role_arn = lookup_role_arn(String::from("us-east-1"), &read_msg.message.namespace).await?;

info!("{}: Adding backup configuration to spec", read_msg.msg_id);
// Format ServiceAccountTemplate spec in CoreDBSpec
Expand All @@ -736,18 +724,16 @@ async fn init_cloud_perms(
snapshot_class: None,
});

let instance_name_slug = format!(
"org-{}-inst-{}",
&read_msg.message.organization_name, &read_msg.message.dbname
);
let write_path = read_msg
.message
.backups_write_path
.clone()
.unwrap_or(format!("v2/{}", read_msg.message.namespace));
let backup = Backup {
destinationPath: Some(format!(
"s3://{}/coredb/{}/{}",
backup_archive_bucket, &read_msg.message.organization_name, &instance_name_slug
)),
destinationPath: Some(format!("s3://{}/{}", backup_archive_bucket, write_path)),
encryption: Some(String::from("AES256")),
retentionPolicy: Some(String::from("30")),
schedule: Some(generate_cron_expression(&instance_name_slug)),
schedule: Some(generate_cron_expression(&read_msg.message.namespace)),
s3_credentials: Some(S3Credentials {
inherit_from_iam_role: Some(true),
..Default::default()
Expand Down
Loading
Loading