Skip to content

Commit

Permalink
feat: improve vcf table options (#128)
Browse files Browse the repository at this point in the history
* feat: improve vcf table options
* feat: improvements for partiton
  • Loading branch information
tshauck committed Apr 24, 2024
1 parent 42b8fe8 commit d2bfe38
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ name = "biobear"
[dependencies]
arrow = { version = "51.0.0", features = ["pyarrow"] }
datafusion = "37"
exon = { version = "0.19.1", features = ["default"] }
exon = { version = "0.20", features = ["default"] }
pyo3 = "0.20"
tokio = { version = "1", features = ["rt"] }
noodles = { version = "0.70", features = ["core"] }
Expand Down
4 changes: 3 additions & 1 deletion python/biobear/biobear.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ class VCFReadOptions:
self,
/,
region: Optional[str] = None,
file_extension: Optional[str] = None,
file_compression_type: Optional[FileCompressionType] = None,
parse_info: bool = False,
parse_formats: bool = False,
partition_cols: list[str | None] = None,
) -> None: ...

class BCFReadOptions:
Expand Down
Binary file not shown.
Binary file not shown.
50 changes: 50 additions & 0 deletions python/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,56 @@ def test_vcf_reader():
assert len(df) == 15


@pytest.mark.skipif(
not importlib.util.find_spec("polars"), reason="polars not installed"
)
def test_vcf_reader_with_parsing():
session = new_session()
options = VCFReadOptions(parse_formats=True, parse_info=True)

df = session.read_vcf_file(
(DATA / "vcf_file.vcf").as_posix(), options=options
).to_polars()

# Check that this is a struct, with three fields
assert len(df.get_column("info").dtype.fields) == 6
assert len(df) == 15


def test_vcf_query_with_region_and_partition():
session = connect()
options = VCFReadOptions(
region="1",
file_compression_type=FileCompressionType.GZIP,
parse_formats=True,
parse_info=True,
partition_cols=["sample"],
)

rbr = session.read_vcf_file(
(DATA / "vcf-partition").as_posix(), options=options
).to_arrow_record_batch_reader()

batch = next(rbr)

assert batch.column_names == [
"chrom",
"pos",
"id",
"ref",
"alt",
"qual",
"filter",
"info",
"formats",
"sample",
]

batch.schema.field("sample")

assert 11 == batch.num_rows


def test_vcf_query():
session = connect()
options = VCFReadOptions(region="1", file_compression_type=FileCompressionType.GZIP)
Expand Down
46 changes: 31 additions & 15 deletions src/datasources/vcf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::datatypes::{DataType, Field};
use exon::datasources::vcf::ListingVCFTableOptions;
use noodles::core::Region;
use pyo3::{pyclass, pymethods, PyResult};
Expand All @@ -21,29 +22,31 @@ use crate::FileCompressionType;
use super::parse_region;

#[pyclass]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
/// Options for reading VCF files.
pub struct VCFReadOptions {
/// The region to read.
region: Option<Region>,
/// The file compression type.
file_compression_type: FileCompressionType,
}

impl Default for VCFReadOptions {
fn default() -> Self {
Self {
region: None,
file_compression_type: FileCompressionType::UNCOMPRESSED,
}
}
/// True if the INFO column should be parsed.
parse_info: bool,
/// True if the FORMAT column should be parsed.
parse_formats: bool,
/// The partition fields.
partition_cols: Option<Vec<String>>,
}

#[pymethods]
impl VCFReadOptions {
#[new]
#[pyo3(signature = (/, region = None, file_compression_type = None))]
#[pyo3(signature = (*, region = None, file_compression_type = None, parse_info = false, parse_formats = false, partition_cols = None))]
fn try_new(
region: Option<String>,
file_compression_type: Option<FileCompressionType>,
parse_info: bool,
parse_formats: bool,
partition_cols: Option<Vec<String>>,
) -> PyResult<Self> {
let region = parse_region(region)?;

Expand All @@ -53,20 +56,33 @@ impl VCFReadOptions {
Ok(Self {
region,
file_compression_type,
parse_info,
parse_formats,
partition_cols,
})
}
}

impl From<VCFReadOptions> for ListingVCFTableOptions {
fn from(options: VCFReadOptions) -> Self {
let mut o = ListingVCFTableOptions::new(options.file_compression_type.into(), false)
.with_parse_info(options.parse_info)
.with_parse_formats(options.parse_formats);

let regions = options.region.map(|r| vec![r]).unwrap_or_default();
if !regions.is_empty() {
o = o.with_regions(regions);
}

let mut t = ListingVCFTableOptions::new(options.file_compression_type.into(), false);
if let Some(partition_cols) = options.partition_cols {
let partition_fields = partition_cols
.iter()
.map(|s| Field::new(s, DataType::Utf8, false))
.collect::<Vec<_>>();

if !regions.is_empty() {
t = t.with_regions(regions);
o = o.with_table_partition_cols(partition_fields);
}

t
o
}
}
2 changes: 1 addition & 1 deletion src/execution_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
#[pyclass(name = "ExecutionResult", subclass)]
#[derive(Clone)]
pub(crate) struct ExecutionResult {
df: Arc<DataFrame>,
pub(super) df: Arc<DataFrame>,
}

impl ExecutionResult {
Expand Down

0 comments on commit d2bfe38

Please sign in to comment.