Skip to content

Commit

Permalink
net: Add illegal packet escape for pipeline
Browse files Browse the repository at this point in the history
Additional changes:
- Added changelog entry
  • Loading branch information
ohsayan committed Apr 4, 2024
1 parent b9161f1 commit 2dd6537
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 24 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All changes in this project will be noted in this file.

## Version 0.8.2

### Additions

- Skyhash/2.1: Restored support for pipelines

## Version 0.8.1

### Additions
Expand Down
40 changes: 16 additions & 24 deletions server/src/engine/net/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ use {
},
},
super::{IoResult, QueryLoopResult, Socket},
crate::engine::{
self,
core::system_db::VerifyUser,
error::{QueryError, QueryResult},
fractal::{Global, GlobalInstanceLike},
mem::{BufferedScanner, IntegerRepr},
crate::{
engine::{
core::{exec, system_db::VerifyUser},
error::{QueryError, QueryResult},
fractal::{Global, GlobalInstanceLike},
mem::{BufferedScanner, IntegerRepr},
},
util::compiler,
},
bytes::{Buf, BytesMut},
tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter},
Expand Down Expand Up @@ -348,17 +350,17 @@ async fn exec_simple<S: Socket>(
global: &Global,
query: SQuery<'_>,
) -> IoResult<()> {
write_response(
engine::core::exec::dispatch_to_executor(global, cs, query).await,
con,
)
.await
write_response(exec::dispatch_to_executor(global, cs, query).await, con).await
}

/*
pipeline
---
malformed packets
*/

const ILLEGAL_PACKET_ESCAPE: u8 = 0xFF;

async fn exec_pipe<'a, S: Socket>(
con: &mut BufWriter<S>,
cs: &mut ClientLocalState,
Expand All @@ -368,20 +370,10 @@ async fn exec_pipe<'a, S: Socket>(
let mut pipe = pipe.into_iter();
while let Some(query) = pipe.next() {
match query {
Ok(q) => {
write_response(
engine::core::exec::dispatch_to_executor(global, cs, q).await,
con,
)
.await?
}
Ok(q) => write_response(exec::dispatch_to_executor(global, cs, q).await, con).await?,
Err(_) => {
// respond with error
let [a, b] = (QueryError::SysNetworkSystemIllegalClientPacket.value_u8() as u16)
.to_le_bytes();
con.write_all(&[ResponseType::Error.value_u8(), a, b])
.await?;
return Ok(());
return compiler::cold_call(|| async { con.write_u8(ILLEGAL_PACKET_ESCAPE).await })
.await
}
}
}
Expand Down

0 comments on commit 2dd6537

Please sign in to comment.