Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Add balancer capacity (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Sep 30, 2020
1 parent 091b5a4 commit 2b601a9
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE janus_backend DROP COLUMN balancer_capacity;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE janus_backend ADD COLUMN balancer_capacity INT;
40 changes: 39 additions & 1 deletion src/app/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,34 @@ pub(crate) enum Transaction {
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct CreateSessionTransaction {
capacity: Option<i32>,
balancer_capacity: Option<i32>,
}

impl CreateSessionTransaction {
pub(crate) fn new() -> Self {
Self { capacity: None }
Self {
capacity: None,
balancer_capacity: None,
}
}

pub(crate) fn capacity(&self) -> Option<i32> {
self.capacity
}

pub(crate) fn balancer_capacity(&self) -> Option<i32> {
self.balancer_capacity
}

pub(crate) fn set_capacity(&mut self, capacity: i32) -> &mut Self {
self.capacity = Some(capacity);
self
}

pub(crate) fn set_balancer_capacity(&mut self, balancer_capacity: i32) -> &mut Self {
self.balancer_capacity = Some(balancer_capacity);
self
}
}

pub(crate) fn create_session_request<M>(
Expand All @@ -87,6 +100,10 @@ where
tn_data.set_capacity(capacity);
}

if let Some(balancer_capacity) = payload.balancer_capacity() {
tn_data.set_balancer_capacity(balancer_capacity);
}

let transaction = Transaction::CreateSession(tn_data);
let payload = CreateSessionRequest::new(&to_base64(&transaction)?);

Expand All @@ -112,30 +129,42 @@ where
pub(crate) struct CreateHandleTransaction {
session_id: i64,
capacity: Option<i32>,
balancer_capacity: Option<i32>,
}

impl CreateHandleTransaction {
pub(crate) fn new(session_id: i64) -> Self {
Self {
session_id,
capacity: None,
balancer_capacity: None,
}
}

pub(crate) fn capacity(&self) -> Option<i32> {
self.capacity
}

pub(crate) fn balancer_capacity(&self) -> Option<i32> {
self.balancer_capacity
}

pub(crate) fn set_capacity(&mut self, capacity: i32) -> &mut Self {
self.capacity = Some(capacity);
self
}

pub(crate) fn set_balancer_capacity(&mut self, balancer_capacity: i32) -> &mut Self {
self.balancer_capacity = Some(balancer_capacity);
self
}
}

pub(crate) fn create_handle_request<M>(
respp: &IncomingResponseProperties,
session_id: i64,
capacity: Option<i32>,
balancer_capacity: Option<i32>,
me: &M,
start_timestamp: DateTime<Utc>,
) -> Result<OutgoingMessage<CreateHandleRequest>>
Expand All @@ -149,6 +178,10 @@ where
tn_data.set_capacity(capacity);
}

if let Some(balancer_capacity) = balancer_capacity {
tn_data.set_balancer_capacity(balancer_capacity);
}

let transaction = Transaction::CreateHandle(tn_data);

let payload = CreateHandleRequest::new(
Expand Down Expand Up @@ -644,6 +677,7 @@ async fn handle_response_impl<C: Context>(
respp,
inresp.data().id(),
tn.capacity(),
tn.balancer_capacity(),
context.agent_id(),
start_timestamp,
)
Expand All @@ -666,6 +700,10 @@ async fn handle_response_impl<C: Context>(
q = q.capacity(capacity);
}

if let Some(balancer_capacity) = tn.balancer_capacity() {
q = q.balancer_capacity(balancer_capacity);
}

q.execute(&conn)?;
Ok(Box::new(stream::empty()))
}
Expand Down
5 changes: 5 additions & 0 deletions src/backend/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ impl OpaqueId for DetachedEvent {
pub(crate) struct StatusEvent {
online: bool,
capacity: Option<i32>,
balancer_capacity: Option<i32>,
}

impl StatusEvent {
Expand All @@ -423,4 +424,8 @@ impl StatusEvent {
pub(crate) fn capacity(&self) -> Option<i32> {
self.capacity
}

pub(crate) fn balancer_capacity(&self) -> Option<i32> {
self.balancer_capacity
}
}
16 changes: 14 additions & 2 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub(crate) type AllColumns = (
janus_backend::session_id,
janus_backend::created_at,
janus_backend::capacity,
janus_backend::balancer_capacity,
);

pub(crate) const ALL_COLUMNS: AllColumns = (
Expand All @@ -20,6 +21,7 @@ pub(crate) const ALL_COLUMNS: AllColumns = (
janus_backend::session_id,
janus_backend::created_at,
janus_backend::capacity,
janus_backend::balancer_capacity,
);

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -32,6 +34,7 @@ pub(crate) struct Object {
session_id: i64,
created_at: DateTime<Utc>,
capacity: Option<i32>,
balancer_capacity: Option<i32>,
}

impl Object {
Expand Down Expand Up @@ -125,6 +128,7 @@ pub(crate) struct UpsertQuery<'a> {
handle_id: i64,
session_id: i64,
capacity: Option<i32>,
balancer_capacity: Option<i32>,
}

impl<'a> UpsertQuery<'a> {
Expand All @@ -134,6 +138,7 @@ impl<'a> UpsertQuery<'a> {
handle_id,
session_id,
capacity: None,
balancer_capacity: None,
}
}

Expand All @@ -144,6 +149,13 @@ impl<'a> UpsertQuery<'a> {
}
}

pub(crate) fn balancer_capacity(self, balancer_capacity: i32) -> Self {
Self {
balancer_capacity: Some(balancer_capacity),
..self
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<Object, Error> {
use crate::schema::janus_backend::dsl::janus_backend;
use diesel::RunQueryDsl;
Expand Down Expand Up @@ -226,8 +238,8 @@ const LEAST_LOADED_SQL: &str = r#"
LEFT JOIN room AS r2
ON 1 = 1
WHERE r2.id = $1
AND COALESCE(jb.capacity, 2147483647) - COALESCE(jbl.load, 0) > COALESCE(r2.reserve, 0)
ORDER BY COALESCE(jb.capacity, 2147483647) - COALESCE(jbl.load, 0) DESC
AND COALESCE(jb.balancer_capacity, jb.capacity, 2147483647) - COALESCE(jbl.load, 0) > COALESCE(r2.reserve, 0)
ORDER BY COALESCE(jb.balancer_capacity, jb.capacity, 2147483647) - COALESCE(jbl.load, 0) DESC
LIMIT 1
"#;

Expand Down
1 change: 1 addition & 0 deletions src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ table! {
session_id -> Int8,
created_at -> Timestamptz,
capacity -> Nullable<Int4>,
balancer_capacity -> Nullable<Int4>,
}
}

Expand Down

0 comments on commit 2b601a9

Please sign in to comment.