Skip to content

Commit

Permalink
chore: Better error handling (#783)
Browse files Browse the repository at this point in the history
  • Loading branch information
rebasedming committed Jan 28, 2024
1 parent 98c7dff commit e6fad26
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 118 deletions.
1 change: 1 addition & 0 deletions docs/search/indexing/bm25.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ It ranks documents by considering how often a term appears and how unique that t
documents. BM25 is especially useful when you need to find exact keywords or phrases within documents.

The BM25 index enables full text search over a table and relevance scoring using the BM25 algorithm.
This index is strongly consistent, which means that new data is immediately searchable across all connections.

## Creating a BM25 Index

Expand Down
5 changes: 2 additions & 3 deletions docs/search/indexing/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ title: Indexing Explained

In Postgres, indexes are database objects that provide a fast way to look up rows in a table. If you
imagine a Postgres table as a book, then an index is like the table of contents. Instead of flipping
through every page to find a chapter, the table of contents stores additional information that enables
readers to locate a chapter much faster.
through every page to find a chapter, the table of contents stores additional information that can help
readers find chapters faster.

## ParadeDB Indexes

Expand All @@ -16,4 +16,3 @@ the HNSW index for dense vector search, and the sparse HNSW index for sparse vec

Once an index is created, it will automatically stay in sync with the underlying table data, and
does not need to be re-created unless the table schema changes (for instance, if a column is renamed or deleted).
The bm25 index is strongly consistent. This means that new data is immediately searchable across all connections.
4 changes: 3 additions & 1 deletion pg_analytics/src/api/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ extension_sql!(
#[pg_guard]
#[no_mangle]
pub extern "C" fn init() {
let _ = DatafusionContext::init().expect("Failed to initialize context");
let _ = DatafusionContext::init().unwrap_or_else(|err| {
panic!("{}", err);
});
}
6 changes: 3 additions & 3 deletions pg_analytics/src/datafusion/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use deltalake::datafusion::catalog::{CatalogList, CatalogProvider};
use deltalake::datafusion::common::DataFusionError;
use parking_lot::RwLock;
use pgrx::*;
use std::{any::Any, collections::HashMap, ffi::CStr, sync::Arc};
use std::{any::type_name, any::Any, collections::HashMap, ffi::CStr, ffi::OsStr, sync::Arc};

use crate::datafusion::directory::ParadeDirectory;
use crate::datafusion::schema::ParadeSchemaProvider;
Expand Down Expand Up @@ -36,9 +36,9 @@ impl ParadeCatalog {
if path.is_dir() {
let schema_oid = path
.file_name()
.ok_or_else(|| ParadeError::NotFound)?
.ok_or(ParadeError::NoneError(type_name::<OsStr>().to_string()))?
.to_str()
.ok_or_else(|| ParadeError::NotFound)?
.ok_or(ParadeError::NoneError(type_name::<str>().to_string()))?
.parse::<u32>()?;

let pg_oid = pg_sys::Oid::from(schema_oid);
Expand Down
19 changes: 12 additions & 7 deletions pg_analytics/src/datafusion/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use deltalake::datafusion::sql::planner::ContextProvider;
use deltalake::datafusion::sql::TableReference;
use lazy_static::lazy_static;
use parking_lot::{RwLock, RwLockWriteGuard};
use std::any::type_name;
use std::sync::Arc;

use crate::datafusion::catalog::{ParadeCatalog, ParadeCatalogList, PARADE_CATALOG};
Expand Down Expand Up @@ -56,14 +57,16 @@ impl<'a> DatafusionContext {

let schema_provider = context
.catalog(PARADE_CATALOG)
.ok_or_else(|| ParadeError::NotFound)?
.ok_or(ParadeError::CatalogNotFound(PARADE_CATALOG.to_string()))?
.schema(schema_name)
.ok_or_else(|| ParadeError::NotFound)?;
.ok_or(ParadeError::SchemaNotFound(schema_name.to_string()))?;

let parade_provider = schema_provider
.as_any()
.downcast_ref::<ParadeSchemaProvider>()
.ok_or_else(|| ParadeError::NotFound)?;
.ok_or(ParadeError::NoneError(
type_name::<ParadeSchemaProvider>().to_string(),
))?;

f(parade_provider)
}
Expand All @@ -83,12 +86,14 @@ impl<'a> DatafusionContext {

let catalog_provider = context
.catalog(PARADE_CATALOG)
.ok_or_else(|| ParadeError::NotFound)?;
.ok_or(ParadeError::CatalogNotFound(PARADE_CATALOG.to_string()))?;

let parade_catalog = catalog_provider
.as_any()
.downcast_ref::<ParadeCatalog>()
.ok_or_else(|| ParadeError::NotFound)?;
.ok_or(ParadeError::NoneError(
type_name::<ParadeCatalog>().to_string(),
))?;

f(parade_catalog)
}
Expand Down Expand Up @@ -151,8 +156,8 @@ impl ContextProvider for ParadeContextProvider {
let schema_name = reference.schema().unwrap_or("public");

DatafusionContext::with_schema_provider(schema_name, |provider| {
let table =
task::block_on(provider.table(table_name)).ok_or_else(|| ParadeError::NotFound)?;
let table = task::block_on(provider.table(table_name))
.ok_or(ParadeError::TableNotFound(table_name.to_string()))?;

Ok(provider_as_source(table))
})
Expand Down
135 changes: 80 additions & 55 deletions pg_analytics/src/datafusion/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use deltalake::datafusion::sql::sqlparser::ast::{
};
use pgrx::pg_sys::{Datum, VARHDRSZ};
use pgrx::*;
use std::any::type_name;
use std::sync::Arc;

use crate::errors::ParadeError;
Expand All @@ -39,19 +40,21 @@ impl DatafusionTypeTranslator for DataType {
ExactNumberInfo::PrecisionAndScale(*precision as u64, *scale as u64),
),
DataType::Time64(TimeUnit::Microsecond) => {
return Err(ParadeError::NotSupported("time not supported".into()))
return Err(ParadeError::NotSupported("time not supported".to_string()))
}
DataType::Timestamp(TimeUnit::Microsecond, timestamp) => SQLDataType::Timestamp(
None,
match timestamp {
None => TimezoneInfo::WithoutTimeZone,
Some(_) => {
return Err(ParadeError::NotSupported("timestamp with time zone".into()))
return Err(ParadeError::NotSupported(
"timestamp with time zone".to_string(),
))
}
},
),
DataType::Date32 => SQLDataType::Date,
_ => return Err(ParadeError::NotFound),
_ => return Err(ParadeError::DataTypeNotSupported(self.to_string())),
};

Ok(result)
Expand Down Expand Up @@ -90,15 +93,17 @@ impl DatafusionTypeTranslator for DataType {

DataType::Decimal128(casted_precision, casted_scale)
}
SQLDataType::Time(_, _) => return Err(ParadeError::NotSupported("time".into())),
SQLDataType::Time(_, _) => return Err(ParadeError::NotSupported("time".to_string())),
SQLDataType::Timestamp(_, TimezoneInfo::WithoutTimeZone) => {
DataType::Timestamp(TimeUnit::Microsecond, None)
}
SQLDataType::Timestamp(_, TimezoneInfo::WithTimeZone) => {
return Err(ParadeError::NotSupported("timestamp with time zone".into()))
return Err(ParadeError::NotSupported(
"timestamp with time zone".to_string(),
))
}
SQLDataType::Date => DataType::Date32,
_ => return Err(ParadeError::NotFound),
_ => return Err(ParadeError::DataTypeNotSupported(sql_data_type.to_string())),
};

Ok(result)
Expand Down Expand Up @@ -126,26 +131,38 @@ impl PostgresTypeTranslator for PgOid {
PgBuiltInOids::OIDOID | PgBuiltInOids::XIDOID => SQLDataType::UnsignedInt4(None),
PgBuiltInOids::FLOAT4OID => SQLDataType::Float4,
PgBuiltInOids::FLOAT8OID => SQLDataType::Float8,
PgBuiltInOids::TIMEOID => return Err(ParadeError::NotSupported("time".into())),
PgBuiltInOids::TIMEOID => {
return Err(ParadeError::NotSupported("time".to_string()))
}
PgBuiltInOids::TIMESTAMPOID => {
SQLDataType::Timestamp(None, TimezoneInfo::WithoutTimeZone)
}
PgBuiltInOids::DATEOID => SQLDataType::Date,
PgBuiltInOids::TIMETZOID => {
return Err(ParadeError::NotSupported("time with time zone".into()))
return Err(ParadeError::NotSupported("time with time zone".to_string()))
}
PgBuiltInOids::TIMESTAMPTZOID => {
return Err(ParadeError::NotSupported("timestamp with time zone".into()))
return Err(ParadeError::NotSupported(
"timestamp with time zone".to_string(),
))
}
PgBuiltInOids::JSONOID => {
return Err(ParadeError::NotSupported("json".to_string()))
}
PgBuiltInOids::JSONBOID => {
return Err(ParadeError::NotSupported("jsonb".to_string()))
}
PgBuiltInOids::BOXOID => return Err(ParadeError::NotSupported("box".to_string())),
PgBuiltInOids::POINTOID => {
return Err(ParadeError::NotSupported("point".to_string()))
}
PgBuiltInOids::JSONOID => return Err(ParadeError::NotSupported("json".into())),
PgBuiltInOids::JSONBOID => return Err(ParadeError::NotSupported("jsonb".into())),
PgBuiltInOids::BOXOID => return Err(ParadeError::NotSupported("box".into())),
PgBuiltInOids::POINTOID => return Err(ParadeError::NotSupported("point".into())),
PgBuiltInOids::TIDOID => return Err(ParadeError::NotSupported("tid".into())),
PgBuiltInOids::TIDOID => return Err(ParadeError::NotSupported("tid".to_string())),
PgBuiltInOids::CSTRINGOID => {
return Err(ParadeError::NotSupported("cstring".into()))
return Err(ParadeError::NotSupported("cstring".to_string()))
}
PgBuiltInOids::INETOID => {
return Err(ParadeError::NotSupported("inet".to_string()))
}
PgBuiltInOids::INETOID => return Err(ParadeError::NotSupported("inet".into())),
PgBuiltInOids::NUMERICOID => {
let scale: i32 = (((typmod - VARHDRSZ as i32) & 0x7ff) ^ 1024) - 1024;
let precision: i32 = ((typmod - VARHDRSZ as i32) >> 16) & 0xffff;
Expand All @@ -168,31 +185,36 @@ impl PostgresTypeTranslator for PgOid {
scale as u64,
))
}
PgBuiltInOids::VOIDOID => return Err(ParadeError::NotSupported("void".into())),
PgBuiltInOids::VOIDOID => {
return Err(ParadeError::NotSupported("void".to_string()))
}
PgBuiltInOids::INT4RANGEOID => {
return Err(ParadeError::NotSupported("int4 range".into()))
return Err(ParadeError::NotSupported("int4 range".to_string()))
}
PgBuiltInOids::INT8RANGEOID => {
return Err(ParadeError::NotSupported("int8 range".into()))
return Err(ParadeError::NotSupported("int8 range".to_string()))
}
PgBuiltInOids::NUMRANGEOID => {
return Err(ParadeError::NotSupported("numeric range".into()))
return Err(ParadeError::NotSupported("numeric range".to_string()))
}
PgBuiltInOids::DATERANGEOID => {
return Err(ParadeError::NotSupported("date range".into()))
return Err(ParadeError::NotSupported("date range".to_string()))
}
PgBuiltInOids::TSRANGEOID => {
return Err(ParadeError::NotSupported("timestamp range".into()))
return Err(ParadeError::NotSupported("timestamp range".to_string()))
}
PgBuiltInOids::TSTZRANGEOID => {
return Err(ParadeError::NotSupported(
"timestamp with time zone range".into(),
"timestamp with time zone range".to_string(),
))
}
PgBuiltInOids::UUIDOID => SQLDataType::Uuid,
_ => return Err(ParadeError::NotFound),
_ => return Err(ParadeError::DataTypeNotSupported("unknown".to_string())),
},
_ => return Err(ParadeError::NotFound),
PgOid::Invalid => return Err(ParadeError::DataTypeNotSupported("invalid".to_string())),
PgOid::Custom(_) => {
return Err(ParadeError::DataTypeNotSupported("custom".to_string()))
}
};

Ok(result)
Expand All @@ -212,13 +234,15 @@ impl PostgresTypeTranslator for PgOid {
SQLDataType::Numeric(ExactNumberInfo::PrecisionAndScale(_precision, _scale)) => {
PgBuiltInOids::NUMERICOID
}
SQLDataType::Time(_, _) => return Err(ParadeError::NotSupported("time".into())),
SQLDataType::Time(_, _) => return Err(ParadeError::NotSupported("time".to_string())),
SQLDataType::Timestamp(_, TimezoneInfo::WithoutTimeZone) => PgBuiltInOids::TIMESTAMPOID,
SQLDataType::Date => PgBuiltInOids::DATEOID,
SQLDataType::Timestamp(_, TimezoneInfo::WithTimeZone) => {
return Err(ParadeError::NotSupported("timestamp with time zone".into()))
return Err(ParadeError::NotSupported(
"timestamp with time zone".to_string(),
))
}
_ => return Err(ParadeError::NotFound),
_ => return Err(ParadeError::DataTypeNotSupported(sql_data_type.to_string())),
};

let typmod: i32 = match sql_data_type {
Expand Down Expand Up @@ -248,7 +272,7 @@ fn scale_anynumeric(
pg_sys::numeric,
&[anynumeric.into_datum(), original_typmod.into_datum()],
)
.ok_or_else(|| ParadeError::Generic("numeric direct function call failed".into()))?
.ok_or_else(|| ParadeError::Generic("numeric direct function call failed".to_string()))?
};

// Scale the anynumeric up or down
Expand All @@ -271,7 +295,7 @@ fn scale_anynumeric(
pg_sys::numeric,
&[scaled_anynumeric.into_datum(), target_typmod.into_datum()],
)
.ok_or_else(|| ParadeError::Generic("numeric direct function call failed".into()))
.ok_or_else(|| ParadeError::Generic("numeric direct function call failed".to_string()))
}
}

Expand Down Expand Up @@ -410,24 +434,13 @@ impl DatafusionMapProducer {
if is_null {
vec.push(None);
} else {
vec.push(unsafe {
match AnyNumeric::from_datum(*datum, false) {
Some(numeric) => {
let ret = scale_anynumeric(
numeric,
precision as i32,
scale as i32,
true,
)?;
let val = i128::try_from(ret);
match val {
Ok(val) => Some(val),
Err(e) => return Err(ParadeError::Generic(e.to_string())),
}
}
None => return Err(ParadeError::NotFound),
}
});
let numeric = unsafe {
AnyNumeric::from_datum(*datum, false)
.ok_or(ParadeError::DatumNotFound("numeric".to_string()))?
};
let numeric_with_scale =
scale_anynumeric(numeric, precision as i32, scale as i32, true)?;
vec.push(Some(i128::try_from(numeric_with_scale)?));
}
}
Ok(Arc::new(
Expand Down Expand Up @@ -480,7 +493,9 @@ impl DatafusionMapProducer {
}
Ok(Arc::new(Date32Array::from(vec)))
}
_ => Err(ParadeError::NotFound),
_ => Err(ParadeError::DataTypeNotSupported(
datafusion_type.to_string(),
)),
}
}

Expand All @@ -495,13 +510,17 @@ impl DatafusionMapProducer {
DataType::Boolean => array
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| ParadeError::NotFound)?
.ok_or(ParadeError::NoneError(
type_name::<BooleanArray>().to_string(),
))?
.value(index)
.into_datum(),
DataType::Utf8 => array
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| ParadeError::NotFound)?
.ok_or(ParadeError::NoneError(
type_name::<StringArray>().to_string(),
))?
.value(index)
.into_datum(),
DataType::Int16 => array.as_primitive::<Int16Type>().value(index).into_datum(),
Expand All @@ -523,18 +542,24 @@ impl DatafusionMapProducer {
ret.into_datum()
}
DataType::Time64(TimeUnit::Microsecond) => {
return Err(ParadeError::NotSupported("time".into()));
return Err(ParadeError::NotSupported("time".to_string()));
}
DataType::Timestamp(TimeUnit::Microsecond, None) => array
.as_primitive::<TimestampMicrosecondType>()
.value(index)
.into_datum(),
DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => {
return Err(ParadeError::NotSupported("timestamp with time zone".into()));
return Err(ParadeError::NotSupported(
"timestamp with time zone".to_string(),
));
}
DataType::Date32 => array.as_primitive::<Date32Type>().value(index).into_datum(),
_ => return Err(ParadeError::NotFound),
_ => {
return Err(ParadeError::DataTypeNotSupported(
datafusion_type.to_string(),
))
}
}
.ok_or_else(|| ParadeError::NotFound)
.ok_or(ParadeError::DatumNotFound(datafusion_type.to_string()))
}
}

0 comments on commit e6fad26

Please sign in to comment.