Skip to content

Commit

Permalink
agent: hold lock while setting new policy
Browse files Browse the repository at this point in the history
Don't release the lock between is_allowed and set_policy calls,
because the policy might change in between these calls.

Also, move more policy code into policy.rs.

Fixes: #8734

Signed-off-by: Dan Mihai <dmihai@microsoft.com>
  • Loading branch information
danmihai1 committed Jan 2, 2024
1 parent 67b91c1 commit 3db3e66
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 30 deletions.
34 changes: 33 additions & 1 deletion src/agent/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
//

use anyhow::{bail, Result};
use protobuf::MessageDyn;
use serde::{Deserialize, Serialize};
use slog::Drain;
use tokio::io::AsyncWriteExt;
use tokio::time::{sleep, Duration};

use crate::rpc::ttrpc_error;
use crate::AGENT_POLICY;

static EMPTY_JSON_INPUT: &str = "{\"input\":{}}";

static OPA_DATA_PATH: &str = "/data";
Expand All @@ -23,6 +27,34 @@ macro_rules! sl {
};
}

async fn allow_request(policy: &mut AgentPolicy, ep: &str, request: &str) -> ttrpc::Result<()> {
if !policy.allow_request(ep, request).await {
warn!(sl!(), "{ep} is blocked by policy");
Err(ttrpc_error(
ttrpc::Code::PERMISSION_DENIED,
format!("{ep} is blocked by policy"),
))
} else {
Ok(())
}
}

pub async fn is_allowed(req: &(impl MessageDyn + serde::Serialize)) -> ttrpc::Result<()> {
let request = serde_json::to_string(req).unwrap();
let mut policy = AGENT_POLICY.lock().await;
allow_request(&mut policy, req.descriptor_dyn().name(), &request).await
}

pub async fn do_set_policy(req: &protocols::agent::SetPolicyRequest) -> ttrpc::Result<()> {
let request = serde_json::to_string(req).unwrap();
let mut policy = AGENT_POLICY.lock().await;
allow_request(&mut policy, "SetPolicyRequest", &request).await?;
policy
.set_policy(&req.policy)
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INVALID_ARGUMENT, e))
}

/// Example of HTTP response from OPA: {"result":true}
#[derive(Debug, Serialize, Deserialize)]
struct AllowResponse {
Expand Down Expand Up @@ -127,7 +159,7 @@ impl AgentPolicy {
}

/// Ask OPA to check if an API call should be allowed or not.
pub async fn is_allowed_endpoint(&mut self, ep: &str, request: &str) -> bool {
pub async fn allow_request(&mut self, ep: &str, request: &str) -> bool {
let post_input = format!("{{\"input\":{request}}}");
self.log_opa_input(ep, &post_input).await;
match self.post_query(ep, &post_input).await {
Expand Down
34 changes: 5 additions & 29 deletions src/agent/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use ttrpc::{
use anyhow::{anyhow, Context, Result};
use cgroups::freezer::FreezerState;
use oci::{LinuxNamespace, Root, Spec};
use protobuf::{MessageDyn, MessageField};
use protobuf::MessageField;
use protocols::agent::{
AddSwapRequest, AgentDetails, CopyFileRequest, GetIPTablesRequest, GetIPTablesResponse,
GuestDetailsResponse, Interfaces, Metrics, OOMEvent, ReadStreamResponse, Routes,
Expand Down Expand Up @@ -69,7 +69,7 @@ use crate::trace_rpc_call;
use crate::tracer::extract_carrier_from_ttrpc;

#[cfg(feature = "agent-policy")]
use crate::AGENT_POLICY;
use crate::policy::{do_set_policy, is_allowed};

use opentelemetry::global;
use tracing::span;
Expand Down Expand Up @@ -123,33 +123,15 @@ fn sl() -> slog::Logger {
}

// Convenience function to wrap an error and response to ttrpc client
fn ttrpc_error(code: ttrpc::Code, err: impl Debug) -> ttrpc::Error {
pub fn ttrpc_error(code: ttrpc::Code, err: impl Debug) -> ttrpc::Error {
get_rpc_status(code, format!("{:?}", err))
}

#[cfg(not(feature = "agent-policy"))]
async fn is_allowed(_req: &(impl MessageDyn + serde::Serialize)) -> ttrpc::Result<()> {
async fn is_allowed(_req: &impl serde::Serialize) -> ttrpc::Result<()> {
Ok(())
}

#[cfg(feature = "agent-policy")]
async fn is_allowed(req: &(impl MessageDyn + serde::Serialize)) -> ttrpc::Result<()> {
let request = serde_json::to_string(req).unwrap();
let mut policy = AGENT_POLICY.lock().await;
if !policy
.is_allowed_endpoint(req.descriptor_dyn().name(), &request)
.await
{
warn!(sl(), "{} is blocked by policy", req.descriptor_dyn().name());
Err(ttrpc_error(
ttrpc::Code::PERMISSION_DENIED,
format!("{} is blocked by policy", req.descriptor_dyn().name()),
))
} else {
Ok(())
}
}

fn same<E>(e: E) -> E {
e
}
Expand Down Expand Up @@ -1439,14 +1421,8 @@ impl agent_ttrpc::AgentService for AgentService {
req: protocols::agent::SetPolicyRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "set_policy", req);
is_allowed(&req).await?;

AGENT_POLICY
.lock()
.await
.set_policy(&req.policy)
.await
.map_ttrpc_err(same)?;
do_set_policy(&req).await?;

Ok(Empty::new())
}
Expand Down

0 comments on commit 3db3e66

Please sign in to comment.