Skip to content

Commit

Permalink
mysql-srv: Add support for COM_RESET_CONNECTION
Browse files Browse the repository at this point in the history
Add support for the COM_RESET_CONNECTION command. This command is
used to reset the connection state of the client.

Fixes: REA-3829

Release-Note-Core: Add support for MySQL COM_RESET_CONNECTION
  command.

Change-Id: Ieeba77fd98a1a0667118728a2869e9f7da981073
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/7123
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
altmannmarcelo committed Mar 21, 2024
1 parent 18501af commit ca9856b
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 0 deletions.
4 changes: 4 additions & 0 deletions mysql-srv/src/commands.rs
Expand Up @@ -157,6 +157,7 @@ pub enum Command<'a> {
Query(&'a [u8]),
ListFields(&'a [u8]),
Close(u32),
Reset,
ResetStmtData(u32),
Prepare(&'a [u8]),
Init(&'a [u8]),
Expand Down Expand Up @@ -230,6 +231,9 @@ pub fn parse(i: &[u8]) -> IResult<&[u8], Command<'_>> {
preceded(tag(&[CommandByte::COM_STMT_CLOSE as u8]), le_u32),
Command::Close,
),
map(tag(&[CommandByte::COM_RESET_CONNECTION as u8]), |_| {
Command::Reset
}),
map(tag(&[CommandByte::COM_QUIT as u8]), |_| Command::Quit),
map(tag(&[CommandByte::COM_PING as u8]), |_| Command::Ping),
map(
Expand Down
12 changes: 12 additions & 0 deletions mysql-srv/src/lib.rs
Expand Up @@ -50,6 +50,10 @@
//! }
//! async fn on_close(&mut self, _: DeallocateId) {}
//!
//! async fn on_reset(&mut self) -> io::Result<()> {
//! Ok(())
//! }
//!
//! async fn on_init(&mut self, _: &str, w: Option<InitWriter<'_, W>>) -> io::Result<()> {
//! w.unwrap().ok().await
//! }
Expand Down Expand Up @@ -290,6 +294,9 @@ pub trait MySqlShim<W: AsyncWrite + Unpin + Send> {
results: QueryResultWriter<'_, W>,
) -> QueryResultsResponse;

/// Called when the client issues a reset command
async fn on_reset(&mut self) -> io::Result<()>;

/// Called when client switches database.
async fn on_init(&mut self, _: &str, _: Option<InitWriter<'_, W>>) -> io::Result<()>;

Expand Down Expand Up @@ -815,6 +822,11 @@ impl<B: MySqlShim<W> + Send, R: AsyncRead + Unpin, W: AsyncWrite + Unpin + Send>
writers::write_ok_packet(&mut self.writer, 0, 0, StatusFlags::empty()).await?;
self.writer.flush().await?;
}
Command::Reset => {
self.shim.on_reset().await?;
writers::write_ok_packet(&mut self.writer, 0, 0, StatusFlags::empty()).await?;
self.writer.flush().await?;
}
Command::Quit => {
break;
}
Expand Down
4 changes: 4 additions & 0 deletions mysql-srv/tests/main.rs
Expand Up @@ -95,6 +95,10 @@ where

async fn on_close(&mut self, _: DeallocateId) {}

async fn on_reset(&mut self) -> io::Result<()> {
Ok(())
}

async fn on_init(&mut self, schema: &str, writer: Option<InitWriter<'_, W>>) -> io::Result<()> {
(self.on_i)(schema, writer.unwrap()).await
}
Expand Down
9 changes: 9 additions & 0 deletions readyset-adapter/src/backend.rs
Expand Up @@ -849,6 +849,15 @@ where
.unwrap_or_else(|| DB::DEFAULT_DB_VERSION.to_string())
}

/// Reset the current upstream connection
pub async fn reset(&mut self) -> Result<(), DB::Error> {
if let Some(upstream) = &mut self.upstream {
upstream.reset().await
} else {
Ok(())
}
}

/// Switch the active database for this backend to the given named database.
///
/// Internally, this will set the schema search path to a single-element vector with the
Expand Down
7 changes: 7 additions & 0 deletions readyset-adapter/src/upstream_database.rs
Expand Up @@ -110,6 +110,9 @@ pub trait UpstreamDatabase: Sized + Send {
database: &str,
) -> Result<(), Self::Error>;

/// Reset the connection to the upstream database
async fn reset(&mut self) -> Result<(), Self::Error>;

/// Returns a database name if it was included in the original connection string, or None if no
/// database name was included in the original connection string.
fn database(&self) -> Option<&str> {
Expand Down Expand Up @@ -277,6 +280,10 @@ where
.await
}

async fn reset(&mut self) -> Result<(), Self::Error> {
self.upstream().await?.reset().await
}

fn database(&self) -> Option<&str> {
if let Some(u) = &self.upstream {
u.database()
Expand Down
8 changes: 8 additions & 0 deletions readyset-mysql/src/backend.rs
Expand Up @@ -723,6 +723,14 @@ where
let _ = self.noria.remove_statement(statement_id).await;
}

async fn on_reset(&mut self) -> io::Result<()> {
let _ = match self.reset().await {
Ok(()) => Ok(()),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
};
Ok(())
}

async fn on_query(
&mut self,
query: &str,
Expand Down
5 changes: 5 additions & 0 deletions readyset-mysql/src/upstream.rs
Expand Up @@ -241,6 +241,11 @@ impl UpstreamDatabase for MySqlUpstream {
format!("{major}.{minor}.{patch}-readyset\0")
}

async fn reset(&mut self) -> Result<(), Self::Error> {
self.conn.reset().await?;
Ok(())
}

async fn is_connected(&mut self) -> Result<bool, Self::Error> {
Ok(self.conn.ping().await.is_ok())
}
Expand Down
25 changes: 25 additions & 0 deletions readyset-mysql/tests/fallback.rs
Expand Up @@ -926,6 +926,31 @@ async fn replication_failure_ignores_table() {
shutdown_tx.shutdown().await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn reset_user() {
let (opts, _handle, shutdown_tx) = setup().await;
let mut conn = mysql_async::Conn::new(opts).await.unwrap();
conn.query_drop("CREATE TEMPORARY TABLE t (id INT)")
.await
.unwrap();
conn.query_drop("INSERT INTO t (id) VALUES (1)")
.await
.unwrap();
let row_temp_table: Vec<i64> = conn.query("SELECT COUNT(*) FROM t").await.unwrap();
assert_eq!(row_temp_table.len(), 1);
assert_eq!(row_temp_table[0], 1);
conn.reset().await.unwrap();
let row = conn.query_drop("SELECT COUNT(*) FROM t").await;

assert_eq!(
row.map_err(|e| e.to_string()),
Err("Server error: `ERROR 42S02 (1146): Table 'noria.t' doesn't exist'".to_string())
);

shutdown_tx.shutdown().await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
#[slow]
Expand Down
5 changes: 5 additions & 0 deletions readyset-psql/src/upstream.rs
Expand Up @@ -230,6 +230,11 @@ impl UpstreamDatabase for PostgreSqlUpstream {
})
}

async fn reset(&mut self) -> Result<(), Self::Error> {
self.client.simple_query("DISCARD ALL").await?;
Ok(())
}

async fn is_connected(&mut self) -> Result<bool, Self::Error> {
Ok(!self.client.simple_query("select 1").await?.is_empty())
}
Expand Down

0 comments on commit ca9856b

Please sign in to comment.