Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pg_dump support + schema evolution #837

Merged
merged 22 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions docs/analytics/quickstart.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ will become resolved as `deltalake` tables become production-ready.
- [ ] `UPDATE` statements
- [ ] Nested `DELETE` statements
- [ ] Partitioning tables by column
- [ ] Some Postgres types like arrays, JSON, time, and timestamp with time zone
- [ ] Some Postgres types like JSON, time, and timestamp with time zone
- [ ] User-defined functions, aggregations, or types
- [ ] Referencing `deltalake` and regular Postgres `heap` tables in the same query
- [ ] Write-ahead-log (WAL) support and `ROLLBACK`
- [ ] Foreign keys
- [ ] Index scans
- [ ] `TEMP` tables
- [ ] Collations
- [ ] Using an external data lake as a table storage provider
- [ ] Full text search over `deltalake` tables with `pg_bm25`
7 changes: 4 additions & 3 deletions pg_analytics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Parquet files belonging to dropped data.
- [x] `deltalake` tables behave like regular Postgres tables and support most Postgres queries (JOINs, CTEs, window functions, etc.)
- [x] Vacuum and Parquet storage optimization
- [x] `INSERT`, `TRUNCATE`, `DELETE`, `COPY`
- [x] Physical backups with `pg_dump`

### Known Limitations

Expand All @@ -73,13 +74,13 @@ As `pg_analytics` becomes production-ready, many of these will be resolved.
- [ ] `UPDATE` statements
- [ ] Nested `DELETE` statements
- [ ] Partitioning tables by column
- [ ] Some Postgres types like arrays, JSON, time, and timestamp with time zone
- [ ] Some Postgres types like JSON, time, and timestamp with time zone
- [ ] User-defined functions, aggregations, or types
- [ ] Referencing `deltalake` and regular Postgres `heap` tables in the same query
- [ ] Write-ahead-log (WAL) support and `ROLLBACK`
- [ ] Write-ahead-log (WAL) support/`ROLLBACK`/logical replication
- [ ] Foreign keys
- [ ] Index scans
- [ ] `TEMP` tables
- [ ] Collations
- [ ] Using an external data lake as a table storage provider
- [ ] Full text search over `deltalake` tables with `pg_bm25`

Expand Down
27 changes: 19 additions & 8 deletions pg_analytics/src/datafusion/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,25 @@

fn from_sql_data_type(sql_data_type: SQLDataType) -> Result<DataType, ParadeError> {
let result = match sql_data_type {
SQLDataType::Boolean => DataType::Boolean,
SQLDataType::Text => DataType::Utf8,
SQLDataType::Int2(_) => DataType::Int16,
SQLDataType::Int4(_) => DataType::Int32,
SQLDataType::Int8(_) => DataType::Int64,
SQLDataType::UnsignedInt4(_) => DataType::UInt32,
SQLDataType::Float4 => DataType::Float32,
SQLDataType::Float8 => DataType::Float64,
SQLDataType::Boolean | SQLDataType::Bool => DataType::Boolean,

Check warning on line 71 in pg_analytics/src/datafusion/datatype.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/datatype.rs#L71

Added line #L71 was not covered by tests
SQLDataType::Text
| SQLDataType::Char(_)
| SQLDataType::Varchar(_)
| SQLDataType::String(_) => DataType::Utf8,
SQLDataType::TinyInt(_) => DataType::Int8,
SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => DataType::Int16,
SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => DataType::Int32,
SQLDataType::BigInt(_) | SQLDataType::Int8(_) => DataType::Int64,
SQLDataType::UnsignedTinyInt(_) => DataType::UInt8,
SQLDataType::UnsignedSmallInt(_) | SQLDataType::UnsignedInt2(_) => DataType::UInt16,

Check warning on line 81 in pg_analytics/src/datafusion/datatype.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/datatype.rs#L75-L81

Added lines #L75 - L81 were not covered by tests
SQLDataType::UnsignedInt(_)
| SQLDataType::UnsignedInteger(_)
| SQLDataType::UnsignedInt4(_) => DataType::UInt32,
SQLDataType::UnsignedBigInt(_) | SQLDataType::UnsignedInt8(_) => DataType::UInt64,
SQLDataType::Float4 | SQLDataType::Float(_) => DataType::Float32,

Check warning on line 86 in pg_analytics/src/datafusion/datatype.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/datatype.rs#L84-L86

Added lines #L84 - L86 were not covered by tests
SQLDataType::Double | SQLDataType::DoublePrecision | SQLDataType::Float8 => {
DataType::Float64

Check warning on line 88 in pg_analytics/src/datafusion/datatype.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/datatype.rs#L88

Added line #L88 was not covered by tests
}
SQLDataType::Bytea => DataType::Binary,
SQLDataType::Numeric(ExactNumberInfo::PrecisionAndScale(precision, scale)) => {
let casted_precision = precision as u8;
Expand Down
90 changes: 88 additions & 2 deletions pg_analytics/src/datafusion/schema.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use async_std::stream::StreamExt;
use async_std::task;
use async_trait::async_trait;
use deltalake::datafusion::arrow::datatypes::Schema as ArrowSchema;
use deltalake::datafusion::arrow::record_batch::RecordBatch;
use deltalake::datafusion::catalog::schema::SchemaProvider;
use deltalake::datafusion::datasource::TableProvider;
use deltalake::datafusion::error::Result;
use deltalake::datafusion::execution::context::SessionState;
use deltalake::datafusion::execution::TaskContext;
use deltalake::datafusion::logical_expr::Expr;
use deltalake::datafusion::physical_plan::SendableRecordBatchStream;
use deltalake::kernel::Action;
use deltalake::kernel::Schema as DeltaSchema;
use deltalake::operations::create::CreateBuilder;
Expand All @@ -18,6 +22,8 @@
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::storage::ObjectStoreRef;
use deltalake::table::state::DeltaTableState;
use deltalake::writer::record_batch::RecordBatchWriter;
use deltalake::writer::{DeltaWriter as DeltaWriterTrait, WriteMode};
use deltalake::DeltaTable;
use parking_lot::{Mutex, RwLock};
use pgrx::*;
Expand All @@ -39,6 +45,7 @@
schema_name: String,
tables: RwLock<HashMap<String, Arc<dyn TableProvider>>>,
writers: Mutex<HashMap<String, DeltaWriter>>,
streams: Mutex<HashMap<String, SendableRecordBatchStream>>,
dir: PathBuf,
}

Expand All @@ -49,6 +56,7 @@
schema_name: schema_name.to_string(),
tables: RwLock::new(HashMap::new()),
writers: Mutex::new(HashMap::new()),
streams: Mutex::new(HashMap::new()),

Check warning on line 59 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L59

Added line #L59 was not covered by tests
dir,
})
}
Expand Down Expand Up @@ -250,7 +258,7 @@
let mut writer_lock = self.writers.lock();
let writer = writer_lock
.get_mut(table_name)
.ok_or(NotFound::Table(table_name.to_string()))?;
.ok_or(NotFound::Writer(table_name.to_string()))?;

Check warning on line 261 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L261

Added line #L261 was not covered by tests

task::block_on(writer.write(&batch))?;

Expand All @@ -270,7 +278,7 @@
let mut writer_lock = self.writers.lock();
let writer = writer_lock
.remove(table_name)
.ok_or(NotFound::Table(table_name.to_string()))?;
.ok_or(NotFound::Writer(table_name.to_string()))?;

Check warning on line 281 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L281

Added line #L281 was not covered by tests

// Generate commit actions by closing the writer and commit to delta logs
let actions = task::block_on(writer.close())?;
Expand Down Expand Up @@ -356,6 +364,41 @@
Ok(metrics)
}

pub async fn merge_schema(
&self,
table_name: &str,
batch: RecordBatch,
) -> Result<(), ParadeError> {

Check warning on line 371 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L367-L371

Added lines #L367 - L371 were not covered by tests
// Open the DeltaTable
let mut delta_table = Self::get_delta_table(self, table_name).await?;

Check warning on line 373 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L373

Added line #L373 was not covered by tests

// Write the RecordBatch to the DeltaTable
let mut writer = RecordBatchWriter::for_table(&delta_table)?;
writer
.write_with_mode(batch, WriteMode::MergeSchema)
.await?;
writer.flush_and_commit(&mut delta_table).await?;

Check warning on line 380 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L376-L380

Added lines #L376 - L380 were not covered by tests

// Update the DeltaTable
delta_table.load().await?;

Check warning on line 383 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L383

Added line #L383 was not covered by tests

// Create and register a new writer
Self::register_writer(
self,
table_name,
Self::create_writer(self, delta_table.object_store(), writer.arrow_schema())?,
)?;

Check warning on line 390 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L387-L390

Added lines #L387 - L390 were not covered by tests

// Commit the table
Self::register_table(
self,
table_name.to_string(),
Arc::new(delta_table) as Arc<dyn TableProvider>,
)?;

Check warning on line 397 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L393-L397

Added lines #L393 - L397 were not covered by tests

Ok(())
}

Check warning on line 400 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L399-L400

Added lines #L399 - L400 were not covered by tests

// SchemaProvider stores immutable TableProviders, whereas many DeltaOps methods
// require a mutable DeltaTable. This function gets a mutable DeltaTable from
// a TableProvider using the DeltaOps UpdateBuilder.
Expand Down Expand Up @@ -410,6 +453,49 @@
Ok(delta_table)
}

pub fn register_stream(
&self,
name: &str,
stream: SendableRecordBatchStream,
) -> Result<(), ParadeError> {
let mut streams = self.streams.lock();
streams.insert(name.to_string(), stream);

Ok(())
}

Check warning on line 465 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L456-L465

Added lines #L456 - L465 were not covered by tests

pub async fn create_stream(
&self,
name: &str,
state: &SessionState,
task_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream, ParadeError> {
let delta_table = Self::get_delta_table(self, name).await?;

Check warning on line 473 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L467-L473

Added lines #L467 - L473 were not covered by tests

Ok(delta_table
.scan(state, None, &[], None)
.await
.map(|plan| plan.execute(0, task_context))??)
}

Check warning on line 479 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L475-L479

Added lines #L475 - L479 were not covered by tests

pub fn get_next_streamed_batch(&self, name: &str) -> Result<Option<RecordBatch>, ParadeError> {
let mut streams = self.streams.lock();
let stream = streams
.get_mut(name)
.ok_or(NotFound::Stream(name.to_string()))?;

Check warning on line 485 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L481-L485

Added lines #L481 - L485 were not covered by tests

let batch = task::block_on(stream.next());

Check warning on line 487 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L487

Added line #L487 was not covered by tests

match batch {
Some(Ok(b)) => Ok(Some(b)),

Check warning on line 490 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L489-L490

Added lines #L489 - L490 were not covered by tests
None => {
streams.remove(name);
Ok(None)

Check warning on line 493 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L492-L493

Added lines #L492 - L493 were not covered by tests
}
Some(Err(err)) => Err(ParadeError::DataFusion(err)),

Check warning on line 495 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L495

Added line #L495 was not covered by tests
}
}

Check warning on line 497 in pg_analytics/src/datafusion/schema.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/datafusion/schema.rs#L497

Added line #L497 was not covered by tests

// Helper function to register a table writer
fn register_writer(&self, name: &str, writer: DeltaWriter) -> Result<(), ParadeError> {
let mut writers = self.writers.lock();
Expand Down
16 changes: 14 additions & 2 deletions pg_analytics/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
#[error("No table registered with name {0}")]
Table(String),

#[error("No writer found for table {0}")]

Check warning on line 54 in pg_analytics/src/errors.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/errors.rs#L54

Added line #L54 was not covered by tests
Writer(String),

#[error("No stream found for table {0}")]

Check warning on line 57 in pg_analytics/src/errors.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/errors.rs#L56-L57

Added lines #L56 - L57 were not covered by tests
Stream(String),

Check warning on line 59 in pg_analytics/src/errors.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/errors.rs#L59

Added line #L59 was not covered by tests
#[error("Failed to convert to datum {0}")]
Datum(String),

Expand Down Expand Up @@ -81,8 +87,14 @@
#[error("Custom Postgres types are not supported")]
CustomPostgresType,

#[error("ALTER TABLE is not yet supported. Please DROP and CREATE the table instead.")]
AlterTable,
#[error("DROP COLUMN is not yet supported. Please recreate the table instead.")]
DropColumn,

#[error("ALTER COLUMN is not yet supported. Please recreate the table instead.")]

Check warning on line 93 in pg_analytics/src/errors.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/errors.rs#L91-L93

Added lines #L91 - L93 were not covered by tests
AlterColumn,

#[error("RENAME COLUMN is not yet supported. Please recreate the table instead.")]
RenameColumn,

Check warning on line 97 in pg_analytics/src/errors.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/errors.rs#L97

Added line #L97 was not covered by tests

#[error("UPDATE is not yet supported for deltalake tables")]
Update,
Expand Down
56 changes: 54 additions & 2 deletions pg_analytics/src/hooks/alter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
use async_std::task;
use deltalake::datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use deltalake::datafusion::arrow::record_batch::RecordBatch;
use deltalake::datafusion::sql::parser;
use deltalake::datafusion::sql::sqlparser::ast::{AlterTableOperation::*, ColumnOption, Statement};
use pgrx::*;
use std::sync::Arc;

use crate::datafusion::context::DatafusionContext;
use crate::datafusion::datatype::DatafusionTypeTranslator;
use crate::errors::{NotSupported, ParadeError};
use crate::hooks::handler::DeltaHandler;

pub unsafe fn alter(alter_stmt: *mut pg_sys::AlterTableStmt) -> Result<(), ParadeError> {
pub unsafe fn alter(
alter_stmt: *mut pg_sys::AlterTableStmt,
statement: &parser::Statement,
) -> Result<(), ParadeError> {

Check warning on line 17 in pg_analytics/src/hooks/alter.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/alter.rs#L14-L17

Added lines #L14 - L17 were not covered by tests
let rangevar = (*alter_stmt).relation;
let rangevar_oid = pg_sys::RangeVarGetRelidExtended(
rangevar,
Expand All @@ -23,5 +34,46 @@
return Ok(());
}

Err(NotSupported::AlterTable.into())
let pg_relation = unsafe { PgRelation::from_pg_owned(relation) };
let table_name = pg_relation.name();
let schema_name = pg_relation.namespace();
let mut fields_to_add = vec![];

Check warning on line 40 in pg_analytics/src/hooks/alter.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/alter.rs#L37-L40

Added lines #L37 - L40 were not covered by tests

if let parser::Statement::Statement(inner_statement) = statement {
if let Statement::AlterTable { operations, .. } = inner_statement.as_ref() {
for operation in operations {
match operation {
AddColumn { column_def, .. } => {
let options = &column_def.options;
let nullability = options
.iter()
.any(|opt| matches!(opt.option, ColumnOption::Null));
fields_to_add.push(Field::new(
column_def.name.value.clone(),
DataType::from_sql_data_type(column_def.data_type.clone())?,
!nullability,

Check warning on line 54 in pg_analytics/src/hooks/alter.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/alter.rs#L42-L54

Added lines #L42 - L54 were not covered by tests
));
}
DropColumn { .. } => {
return Err(NotSupported::DropColumn.into());

Check warning on line 58 in pg_analytics/src/hooks/alter.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/alter.rs#L58

Added line #L58 was not covered by tests
}
AlterColumn { .. } | ChangeColumn { .. } => {
return Err(NotSupported::AlterColumn.into());

Check warning on line 61 in pg_analytics/src/hooks/alter.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/alter.rs#L61

Added line #L61 was not covered by tests
}
_ => {}

Check warning on line 63 in pg_analytics/src/hooks/alter.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/alter.rs#L63

Added line #L63 was not covered by tests
}
}
}
}

Check warning on line 67 in pg_analytics/src/hooks/alter.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/alter.rs#L66-L67

Added lines #L66 - L67 were not covered by tests

if !fields_to_add.is_empty() {
let schema = Arc::new(ArrowSchema::new(fields_to_add));
let batch = RecordBatch::new_empty(schema);

DatafusionContext::with_schema_provider(schema_name, |provider| {
task::block_on(provider.merge_schema(table_name, batch))
})?;
}

Check warning on line 76 in pg_analytics/src/hooks/alter.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/alter.rs#L69-L76

Added lines #L69 - L76 were not covered by tests

Ok(())

Check warning on line 78 in pg_analytics/src/hooks/alter.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/alter.rs#L78

Added line #L78 was not covered by tests
}
12 changes: 6 additions & 6 deletions pg_analytics/src/hooks/executor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use deltalake::datafusion::error::DataFusionError;
use deltalake::datafusion::logical_expr::LogicalPlan;

use deltalake::datafusion::sql::parser::DFParser;
use deltalake::datafusion::sql::planner::SqlToRel;

use deltalake::datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
use pgrx::*;
use std::ffi::CStr;
Expand All @@ -30,12 +28,15 @@
unsafe {
let ps = query_desc.plannedstmt;
let rtable = (*ps).rtable;
let query = CStr::from_ptr(query_desc.sourceText).to_str()?;

Check warning on line 31 in pg_analytics/src/hooks/executor.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/executor.rs#L31

Added line #L31 was not covered by tests

// Only use this hook for deltalake tables
// Allow INSERTs to go through
if rtable.is_null()
|| query_desc.operation == pg_sys::CmdType_CMD_INSERT
|| !DeltaHandler::rtable_is_delta(rtable)?
// Tech Debt: Find a less hacky way to let COPY go through
|| query.to_lowercase().starts_with("copy")

Check warning on line 39 in pg_analytics/src/hooks/executor.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/executor.rs#L39

Added line #L39 was not covered by tests
{
prev_hook(query_desc, direction, count, execute_once);
return Ok(());
Expand All @@ -44,11 +45,11 @@
// Execute SELECT, DELETE, UPDATE
match query_desc.operation {
pg_sys::CmdType_CMD_DELETE => {
let logical_plan = create_logical_plan(query_desc.clone())?;
let logical_plan = create_logical_plan(query)?;

Check warning on line 48 in pg_analytics/src/hooks/executor.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/executor.rs#L48

Added line #L48 was not covered by tests
delete(rtable, query_desc, logical_plan)
}
pg_sys::CmdType_CMD_SELECT => {
let logical_plan = create_logical_plan(query_desc.clone())?;
let logical_plan = create_logical_plan(query)?;

Check warning on line 52 in pg_analytics/src/hooks/executor.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/executor.rs#L52

Added line #L52 was not covered by tests
select(query_desc, logical_plan)
}
pg_sys::CmdType_CMD_UPDATE => Err(NotSupported::Update.into()),
Expand All @@ -61,9 +62,8 @@
}

#[inline]
fn create_logical_plan(query_desc: PgBox<pg_sys::QueryDesc>) -> Result<LogicalPlan, ParadeError> {
fn create_logical_plan(query: &str) -> Result<LogicalPlan, ParadeError> {

Check warning on line 65 in pg_analytics/src/hooks/executor.rs

View check run for this annotation

Codecov / codecov/patch

pg_analytics/src/hooks/executor.rs#L65

Added line #L65 was not covered by tests
let dialect = PostgreSqlDialect {};
let query = unsafe { CStr::from_ptr(query_desc.sourceText).to_str()? };
let ast = DFParser::parse_sql_with_dialect(query, &dialect)
.map_err(|err| ParadeError::DataFusion(DataFusionError::SQL(err, None)))?;
let statement = &ast[0];
Expand Down
Loading
Loading