Skip to content

Commit

Permalink
Support DELETE FROM table statement
Browse files Browse the repository at this point in the history
This clears the whole table and related indexes.
  • Loading branch information
kawasin73 committed Nov 8, 2023
1 parent 6087cb5 commit 75a1764
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 6 deletions.
98 changes: 92 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use parser::expect_semicolon;
use parser::parse_sql;
use parser::BinaryOp;
use parser::CompareOp;
use parser::Delete;
use parser::Insert;
use parser::Parser;
use parser::ResultColumn;
Expand Down Expand Up @@ -176,7 +177,7 @@ impl Connection {
})
}

pub fn prepare<'a>(&self, sql: &'a str) -> Result<'a, Statement> {
pub fn prepare<'a, 'conn>(&'conn self, sql: &'a str) -> Result<'a, Statement<'conn>> {
let input = sql.as_bytes();
let mut parser = Parser::new(input);
let statement = parse_sql(&mut parser)?;
Expand All @@ -185,8 +186,12 @@ impl Connection {

match statement {
Stmt::Select(select) => Ok(Statement::Query(self.prepare_select(select)?)),
Stmt::Insert(insert) => Ok(Statement::Execution(self.prepare_insert(insert)?)),
Stmt::Delete(_) => todo!("implement delete statement"),
Stmt::Insert(insert) => Ok(Statement::Execution(ExecutionStatement::Insert(
self.prepare_insert(insert)?,
))),
Stmt::Delete(delete) => Ok(Statement::Execution(ExecutionStatement::Delete(
self.prepare_delete(delete)?,
))),
}
}

Expand Down Expand Up @@ -381,6 +386,41 @@ impl Connection {
})
}

fn prepare_delete<'a>(&self, delete: Delete<'a>) -> Result<'a, DeleteStatement> {
if self.schema.borrow().is_none() {
self.load_schema()?;
}
let schema_cell = self.schema.borrow();
let schema = schema_cell.as_ref().unwrap();
let table_name = delete.table_name.dequote();
let table = schema.get_table(&table_name).ok_or(anyhow::anyhow!(
"table not found: {:?}",
std::str::from_utf8(&table_name).unwrap_or_default()
))?;

let filter = delete
.filter
.map(|expr| Expression::from(expr, Some(table)))
.transpose()?;

if filter.is_some() {
todo!("filter");
}

let table_page_id = table.root_page_id;
let mut index_page_ids = Vec::new();
let mut index_schema = table.indexes.clone();
while let Some(index) = index_schema {
index_page_ids.push(index.root_page_id);
index_schema = index.next.clone();
}
Ok(DeleteStatement {
conn: self,
table_page_id,
index_page_ids,
})
}

fn start_read(&self) -> anyhow::Result<ReadTransaction> {
// TODO: Lock across processes
let ref_count = self.ref_count.get();
Expand Down Expand Up @@ -452,9 +492,23 @@ struct IndexInfo {
n_extra: usize,
}

pub enum ExecutionStatement<'conn> {
Insert(InsertStatement<'conn>),
Delete(DeleteStatement<'conn>),
}

impl ExecutionStatement<'_> {
pub fn execute(&self) -> Result<u64> {
match self {
Self::Insert(stmt) => stmt.execute(),
Self::Delete(stmt) => stmt.execute(),
}
}
}

pub enum Statement<'conn> {
Query(SelectStatement<'conn>),
Execution(InsertStatement<'conn>),
Execution(ExecutionStatement<'conn>),
}

impl<'conn> Statement<'conn> {
Expand All @@ -465,7 +519,7 @@ impl<'conn> Statement<'conn> {
}
}

pub fn execute(&'conn self) -> Result<usize> {
pub fn execute(&'conn self) -> Result<u64> {
match self {
Self::Query(_) => Err(Error::Unsupported("select statement not support execute")),
Self::Execution(stmt) => stmt.execute(),
Expand Down Expand Up @@ -805,7 +859,7 @@ pub struct InsertStatement<'conn> {
}

impl<'conn> InsertStatement<'conn> {
pub fn execute(&self) -> Result<usize> {
pub fn execute(&self) -> Result<u64> {
let write_txn = self.conn.start_write()?;

let mut cursor =
Expand Down Expand Up @@ -885,3 +939,35 @@ impl<'conn> InsertStatement<'conn> {
Ok(n)
}
}

pub struct DeleteStatement<'conn> {
conn: &'conn Connection,
table_page_id: PageId,
index_page_ids: Vec<PageId>,
}

impl<'conn> DeleteStatement<'conn> {
pub fn execute(&self) -> Result<u64> {
let write_txn = self.conn.start_write()?;

let mut cursor =
BtreeCursor::new(self.table_page_id, &self.conn.pager, &self.conn.btree_ctx)?;

let n_deleted = cursor.clear()?;

for index_page_id in self.index_page_ids.iter() {
let mut cursor =
BtreeCursor::new(*index_page_id, &self.conn.pager, &self.conn.btree_ctx)?;
let n = cursor.clear()?;
if n != n_deleted {
return Err(Error::Other(anyhow::anyhow!(
"number of deleted rows in table and index does not match"
)));
}
}

write_txn.commit()?;

Ok(n_deleted)
}
}
141 changes: 141 additions & 0 deletions tests/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod common;

use common::*;
use prsqlite::Connection;
use prsqlite::Value;

#[test]
fn test_delete() {
let file = create_sqlite_database(&[
"CREATE TABLE example(col);",
"CREATE INDEX index1 ON example(col);",
"INSERT INTO example (col) VALUES (10);",
"INSERT INTO example (col) VALUES (20);",
"INSERT INTO example (col) VALUES (30);",
]);
let conn = Connection::open(file.path()).unwrap();

let stmt = conn.prepare("DELETE FROM example;").unwrap();
assert_eq!(stmt.execute().unwrap(), 3);

let test_conn = rusqlite::Connection::open(file.path()).unwrap();
assert_same_results(&[], "SELECT * FROM example;", &test_conn, &conn);

assert_eq!(stmt.execute().unwrap(), 0);

let insert_stmt = conn
.prepare("INSERT INTO example (col) VALUES (0), (1);")
.unwrap();
assert_eq!(insert_stmt.execute().unwrap(), 2);

let test_conn = rusqlite::Connection::open(file.path()).unwrap();
assert_same_results(
&[&[Some(&Value::Integer(0))], &[Some(&Value::Integer(1))]],
"SELECT * FROM example;",
&test_conn,
&conn,
);
}

#[test]
fn test_delete_all_multiple_level() {
let mut stmts = vec![
"PRAGMA page_size = 512;",
"CREATE TABLE example(col);",
"CREATE INDEX index1 ON example(col);",
];
let insert_stmt_string = format!("INSERT INTO example(col) VALUES (x'{}');", "11".repeat(100));
for _ in 0..1000 {
stmts.push(&insert_stmt_string);
}
let file = create_sqlite_database(&stmts);
let conn = Connection::open(file.path()).unwrap();

let stmt = conn.prepare("DELETE FROM example;").unwrap();
assert_eq!(stmt.execute().unwrap(), 1000);

let test_conn = rusqlite::Connection::open(file.path()).unwrap();
assert_same_results(&[], "SELECT * FROM example;", &test_conn, &conn);

assert_eq!(stmt.execute().unwrap(), 0);

let insert_stmt = conn.prepare(&insert_stmt_string).unwrap();
for _ in 0..1000 {
assert_eq!(insert_stmt.execute().unwrap(), 1);
}

let test_conn = rusqlite::Connection::open(file.path()).unwrap();
let v = Value::Blob(vec![0x11; 100].into());
let row = [Some(&v)];
let mut expected = Vec::with_capacity(1000);
for _ in 0..1000 {
expected.push(row.as_slice());
}
assert_same_results(
expected.as_slice(),
"SELECT * FROM example;",
&test_conn,
&conn,
);
}

#[test]
fn test_delete_after_insert() {
let file = create_sqlite_database(&[
"PRAGMA page_size = 512;",
"CREATE TABLE example(col);",
"CREATE INDEX index1 ON example(col);",
]);
let conn = Connection::open(file.path()).unwrap();

let insert_stmt_string = format!("INSERT INTO example(col) VALUES (x'{}');", "11".repeat(100));
let insert_stmt = conn.prepare(&insert_stmt_string).unwrap();
for _ in 0..1000 {
assert_eq!(insert_stmt.execute().unwrap(), 1);
}
let v = Value::Blob(vec![0x11; 100].into());
let row = [Some(&v)];
let mut expected = Vec::with_capacity(1000);
for _ in 0..1000 {
expected.push(row.as_slice());
}
let test_conn = rusqlite::Connection::open(file.path()).unwrap();
assert_same_results(
expected.as_slice(),
"SELECT * FROM example;",
&test_conn,
&conn,
);
let original_file_size = file.as_file().metadata().unwrap().len();

let stmt = conn.prepare("DELETE FROM example;").unwrap();
assert_eq!(stmt.execute().unwrap(), 1000);
let test_conn = rusqlite::Connection::open(file.path()).unwrap();
assert_same_results(&[], "SELECT * FROM example;", &test_conn, &conn);

for _ in 0..1000 {
assert_eq!(insert_stmt.execute().unwrap(), 1);
}
let test_conn = rusqlite::Connection::open(file.path()).unwrap();
assert_same_results(
expected.as_slice(),
"SELECT * FROM example;",
&test_conn,
&conn,
);
assert_eq!(file.as_file().metadata().unwrap().len(), original_file_size);
}

0 comments on commit 75a1764

Please sign in to comment.