Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Optimize DB connection usage (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed May 18, 2020
1 parent 54d5a9f commit 2bf9225
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 157 deletions.
40 changes: 23 additions & 17 deletions src/app/endpoint/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ impl RequestHandler for ListHandler {
reqp: &IncomingRequestProperties,
start_timestamp: DateTime<Utc>,
) -> Result {
let conn = context.db().get()?;

// Check whether the room exists and open.
let room = db::room::FindQuery::new()
.id(payload.room_id)
.time(db::room::now())
.execute(&conn)?
.ok_or_else(|| format!("the room = '{}' is not found or closed", payload.room_id))
.status(ResponseStatus::NOT_FOUND)?;
let room = {
let conn = context.db().get()?;

db::room::FindQuery::new()
.id(payload.room_id)
.time(db::room::now())
.execute(&conn)?
.ok_or_else(|| format!("the room = '{}' is not found or closed", payload.room_id))
.status(ResponseStatus::NOT_FOUND)?
};

// Authorize agents listing in the room.
let room_id = room.id().to_string();
Expand All @@ -53,15 +55,19 @@ impl RequestHandler for ListHandler {
.await?;

// Get agents list in the room.
let agents = db::agent::ListQuery::new()
.room_id(payload.room_id)
.status(db::agent::Status::Ready)
.offset(payload.offset.unwrap_or_else(|| 0))
.limit(std::cmp::min(
payload.limit.unwrap_or_else(|| MAX_LIMIT),
MAX_LIMIT,
))
.execute(&conn)?;
let agents = {
let conn = context.db().get()?;

db::agent::ListQuery::new()
.room_id(payload.room_id)
.status(db::agent::Status::Ready)
.offset(payload.offset.unwrap_or_else(|| 0))
.limit(std::cmp::min(
payload.limit.unwrap_or_else(|| MAX_LIMIT),
MAX_LIMIT,
))
.execute(&conn)?
};

// Respond with agents list.
Ok(Box::new(stream::once(shared::build_response(
Expand Down
5 changes: 2 additions & 3 deletions src/app/endpoint/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ impl RequestHandler for UnicastHandler {
API_VERSION,
);

Ok(Box::new(stream::once(
Box::new(req) as Box<dyn IntoPublishableDump + Send>
)))
let boxed_req = Box::new(req) as Box<dyn IntoPublishableDump + Send>;
Ok(Box::new(stream::once(boxed_req)))
}
}

Expand Down
113 changes: 67 additions & 46 deletions src/app/endpoint/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,15 @@ impl RequestHandler for ReadHandler {
reqp: &IncomingRequestProperties,
start_timestamp: DateTime<Utc>,
) -> Result {
let conn = context.db().get()?;
let room = {
let conn = context.db().get()?;

let room = db::room::FindQuery::new()
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}'", payload.id))
.status(ResponseStatus::NOT_FOUND)?;
db::room::FindQuery::new()
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}'", payload.id))
.status(ResponseStatus::NOT_FOUND)?
};

// Authorize room reading on the tenant.
let room_id = room.id().to_string();
Expand Down Expand Up @@ -162,14 +164,16 @@ impl RequestHandler for UpdateHandler {
reqp: &IncomingRequestProperties,
start_timestamp: DateTime<Utc>,
) -> Result {
let conn = context.db().get()?;
let room = {
let conn = context.db().get()?;

let room = db::room::FindQuery::new()
.time(db::room::since_now())
.id(payload.id())
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id()))
.status(ResponseStatus::NOT_FOUND)?;
db::room::FindQuery::new()
.time(db::room::since_now())
.id(payload.id())
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id()))
.status(ResponseStatus::NOT_FOUND)?
};

// Authorize room updating on the tenant.
let room_id = room.id().to_string();
Expand All @@ -181,7 +185,10 @@ impl RequestHandler for UpdateHandler {
.await?;

// Update room.
let room = payload.execute(&conn)?;
let room = {
let conn = context.db().get()?;
payload.execute(&conn)?
};

// Respond and broadcast to the audience topic.
let response = shared::build_response(
Expand Down Expand Up @@ -220,14 +227,16 @@ impl RequestHandler for DeleteHandler {
reqp: &IncomingRequestProperties,
start_timestamp: DateTime<Utc>,
) -> Result {
let conn = context.db().get()?;
let room = {
let conn = context.db().get()?;

let room = db::room::FindQuery::new()
.time(db::room::since_now())
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id))
.status(ResponseStatus::NOT_FOUND)?;
db::room::FindQuery::new()
.time(db::room::since_now())
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id))
.status(ResponseStatus::NOT_FOUND)?
};

// Authorize room deletion on the tenant.
let room_id = room.id().to_string();
Expand All @@ -239,7 +248,10 @@ impl RequestHandler for DeleteHandler {
.await?;

// Delete room.
db::room::DeleteQuery::new(room.id()).execute(&conn)?;
{
let conn = context.db().get()?;
db::room::DeleteQuery::new(room.id()).execute(&conn)?;
}

// Respond and broadcast to the audience topic.
let response = shared::build_response(
Expand Down Expand Up @@ -278,14 +290,16 @@ impl RequestHandler for EnterHandler {
reqp: &IncomingRequestProperties,
start_timestamp: DateTime<Utc>,
) -> Result {
let conn = context.db().get()?;
let room = {
let conn = context.db().get()?;

let room = db::room::FindQuery::new()
.id(payload.id)
.time(db::room::now())
.execute(&conn)?
.ok_or_else(|| format!("Room not found or closed, id = '{}'", payload.id))
.status(ResponseStatus::NOT_FOUND)?;
db::room::FindQuery::new()
.id(payload.id)
.time(db::room::now())
.execute(&conn)?
.ok_or_else(|| format!("Room not found or closed, id = '{}'", payload.id))
.status(ResponseStatus::NOT_FOUND)?
};

// Authorize subscribing to the room's events.
let room_id = room.id().to_string();
Expand All @@ -297,7 +311,10 @@ impl RequestHandler for EnterHandler {
.await?;

// Register agent in `in_progress` state.
db::agent::InsertQuery::new(reqp.as_agent_id(), room.id()).execute(&conn)?;
{
let conn = context.db().get()?;
db::agent::InsertQuery::new(reqp.as_agent_id(), room.id()).execute(&conn)?;
}

// Send dynamic subscription creation request to the broker.
let payload = SubscriptionRequest::new(reqp.as_agent_id().to_owned(), object);
Expand Down Expand Up @@ -343,22 +360,26 @@ impl RequestHandler for LeaveHandler {
reqp: &IncomingRequestProperties,
start_timestamp: DateTime<Utc>,
) -> Result {
let conn = context.db().get()?;

let room = db::room::FindQuery::new()
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}'", payload.id))
.status(ResponseStatus::NOT_FOUND)?;

// Check room presence.
let results = db::agent::ListQuery::new()
.room_id(room.id())
.agent_id(reqp.as_agent_id())
.status(db::agent::Status::Ready)
.execute(&conn)?;

if results.is_empty() {
let (room, presence) = {
let conn = context.db().get()?;

let room = db::room::FindQuery::new()
.id(payload.id)
.execute(&conn)?
.ok_or_else(|| format!("Room not found, id = '{}'", payload.id))
.status(ResponseStatus::NOT_FOUND)?;

// Check room presence.
let presence = db::agent::ListQuery::new()
.room_id(room.id())
.agent_id(reqp.as_agent_id())
.status(db::agent::Status::Ready)
.execute(&conn)?;

(room, presence)
};

if presence.is_empty() {
return Err(format!(
"agent = '{}' is not online in the room = '{}'",
reqp.as_agent_id(),
Expand Down
Loading

0 comments on commit 2bf9225

Please sign in to comment.