Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/architecture_2024' into question…
Browse files Browse the repository at this point in the history
…s_http_api
  • Loading branch information
jreidinger committed Mar 21, 2024
2 parents da4a343 + 0cc18ac commit 46abecb
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 77 deletions.
7 changes: 7 additions & 0 deletions rust/agama-lib/src/proxies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,10 @@ trait QuestionWithPassword {
#[dbus_proxy(property)]
fn set_password(&self, value: &str) -> zbus::Result<()>;
}

#[dbus_proxy(interface = "org.opensuse.Agama1.Issues", assume_defaults = true)]
trait Issues {
/// All property
#[dbus_proxy(property)]
fn all(&self) -> zbus::Result<Vec<(String, String, u32, u32)>>;
}
7 changes: 6 additions & 1 deletion rust/agama-server/src/software/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use crate::{
error::Error,
web::{
common::{progress_router, service_status_router},
common::{issues_router, progress_router, service_status_router},
Event,
},
};
Expand Down Expand Up @@ -114,9 +114,12 @@ fn reason_to_selected_by(
pub async fn software_service(dbus: zbus::Connection) -> Result<Router, ServiceError> {
const DBUS_SERVICE: &'static str = "org.opensuse.Agama.Software1";
const DBUS_PATH: &'static str = "/org/opensuse/Agama/Software1";
const DBUS_PRODUCT_PATH: &'static str = "/org/opensuse/Agama/Software1/Product";

let status_router = service_status_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?;
let progress_router = progress_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?;
let software_issues = issues_router(&dbus, DBUS_SERVICE, DBUS_PATH).await?;
let product_issues = issues_router(&dbus, DBUS_SERVICE, DBUS_PRODUCT_PATH).await?;

let product = ProductClient::new(dbus.clone()).await?;
let software = SoftwareClient::new(dbus).await?;
Expand All @@ -129,6 +132,8 @@ pub async fn software_service(dbus: zbus::Connection) -> Result<Router, ServiceE
.route("/probe", post(probe))
.merge(status_router)
.merge(progress_router)
.nest("/issues/product", product_issues)
.nest("/issues/software", software_issues)
.with_state(state);
Ok(router)
}
Expand Down
20 changes: 19 additions & 1 deletion rust/agama-server/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
manager::web::{manager_service, manager_stream},
questions::web::{questions_service, questions_stream},
software::web::{software_service, software_stream},
web::common::{progress_stream, service_status_stream},
web::common::{issues_stream, progress_stream, service_status_stream},
};
use axum::Router;

Expand Down Expand Up @@ -117,6 +117,24 @@ async fn run_events_monitor(dbus: zbus::Connection, events: EventsSender) -> Res
.await?,
);
stream.insert("questions", questions_stream(dbus.clone()).await?);
stream.insert(
"software-issues",
issues_stream(
dbus.clone(),
"org.opensuse.Agama.Software1",
"/org/opensuse/Agama/Software1",
)
.await?,
);
stream.insert(
"software-product-issues",
issues_stream(
dbus.clone(),
"org.opensuse.Agama.Software1",
"/org/opensuse/Agama/Software1/Product",
)
.await?,
);

tokio::pin!(stream);
let e = events.clone();
Expand Down
127 changes: 124 additions & 3 deletions rust/agama-server/src/web/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{pin::Pin, task::Poll};
use agama_lib::{
error::ServiceError,
progress::Progress,
proxies::{ProgressProxy, ServiceStatusProxy},
proxies::{IssuesProxy, ProgressProxy, ServiceStatusProxy},
};
use axum::{extract::State, routing::get, Json, Router};
use pin_project::pin_project;
Expand All @@ -17,8 +17,8 @@ use crate::error::Error;

use super::Event;

/// Builds a router to the `org.opensuse.Agama1.ServiceStatus`
/// interface of the given D-Bus object.
/// Builds a router to the `org.opensuse.Agama1.ServiceStatus` interface of the
/// given D-Bus object.
///
/// ```no_run
/// # use axum::{extract::State, routing::get, Json, Router};
Expand Down Expand Up @@ -231,3 +231,124 @@ async fn build_progress_proxy<'a>(
.await?;
Ok(proxy)
}

/// Builds a router to the `org.opensuse.Agama1.Issues` interface of a given
/// D-Bus object.
///
/// ```no_run
/// # use axum::{extract::State, routing::get, Json, Router};
/// # use agama_lib::connection;
/// # use agama_server::web::common::service_status_router;
/// # use tokio_test;
///
/// # tokio_test::block_on(async {
/// async fn hello(state: State<HelloWorldState>) {};
///
/// #[derive(Clone)]
/// struct HelloWorldState {};
///
/// let dbus = connection().await.unwrap();
/// let issues_router = issues_router(
/// &dbus, "org.opensuse.HelloWorld", "/org/opensuse/hello"
/// ).await.unwrap();
/// let router: Router<HelloWorldState> = Router::new()
/// .route("/hello", get(hello))
/// .merge(issues_router)
/// .with_state(HelloWorldState {});
/// });
/// ```
///
/// * `dbus`: D-Bus connection.
/// * `destination`: D-Bus service name.
/// * `path`: D-Bus object path.
pub async fn issues_router<T>(
dbus: &zbus::Connection,
destination: &str,
path: &str,
) -> Result<Router<T>, ServiceError> {
let proxy = build_issues_proxy(dbus, destination, path).await?;
let state = IssuesState { proxy };
Ok(Router::new().route("/", get(issues)).with_state(state))
}

async fn issues(State(state): State<IssuesState<'_>>) -> Result<Json<Vec<Issue>>, Error> {
let issues = state.proxy.all().await?;
let issues: Vec<Issue> = issues.into_iter().map(Issue::from_tuple).collect();
Ok(Json(issues))
}

#[derive(Clone)]
struct IssuesState<'a> {
proxy: IssuesProxy<'a>,
}

#[derive(Clone, Debug, Serialize)]
pub struct Issue {
description: String,
details: Option<String>,
source: u32,
severity: u32,
}

impl Issue {
pub fn from_tuple(
(description, details, source, severity): (String, String, u32, u32),
) -> Self {
let details = if details.is_empty() {
None
} else {
Some(details)
};

Self {
description,
details,
source,
severity,
}
}
}

/// Builds a stream of the changes in the the `org.opensuse.Agama1.Issues`
/// interface of the given D-Bus object.
///
/// * `dbus`: D-Bus connection.
/// * `destination`: D-Bus service name.
/// * `path`: D-Bus object path.
pub async fn issues_stream(
dbus: zbus::Connection,
destination: &'static str,
path: &'static str,
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Error> {
let proxy = build_issues_proxy(&dbus, destination, path).await?;
let stream = proxy
.receive_all_changed()
.await
.then(move |change| async move {
if let Ok(issues) = change.get().await {
let issues = issues.into_iter().map(Issue::from_tuple).collect();
Some(Event::IssuesChanged {
service: destination.to_string(),
path: path.to_string(),
issues,
})
} else {
None
}
})
.filter_map(|e| e);
Ok(Box::pin(stream))
}

async fn build_issues_proxy<'a>(
dbus: &zbus::Connection,
destination: &str,
path: &str,
) -> Result<IssuesProxy<'a>, zbus::Error> {
let proxy = IssuesProxy::builder(&dbus)
.destination(destination.to_string())?
.path(path.to_string())?
.build()
.await?;
Ok(proxy)
}
7 changes: 7 additions & 0 deletions rust/agama-server/src/web/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use serde::Serialize;
use std::collections::HashMap;
use tokio::sync::broadcast::{Receiver, Sender};

use super::common::Issue;

#[derive(Clone, Debug, Serialize)]
#[serde(tag = "type")]
pub enum Event {
Expand Down Expand Up @@ -31,6 +33,11 @@ pub enum Event {
service: String,
status: u32,
},
IssuesChanged {
service: String,
path: String,
issues: Vec<Issue>,
},
}

pub type EventsSender = Sender<Event>;
Expand Down
Loading

0 comments on commit 46abecb

Please sign in to comment.