Skip to content

Commit

Permalink
update arrow
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 9, 2022
1 parent 30f4608 commit 1a74cbe
Show file tree
Hide file tree
Showing 17 changed files with 102 additions and 60 deletions.
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "7add9d31bff7a65076efbf1c4f7732be702f0e2b", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "a4383b18955b35bef1237be05e0a747d9dca1171", default-features = false }
hashbrown = "0.11"
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", default-features = false, features = ["compute"], branch = "offset_pub" }
# arrow = { package = "arrow2", version = "0.8", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-arrow/src/io/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn read_parquet<R: Read + Seek>(

let projection = projection
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned((0usize..schema.fields().len()).collect::<Vec<_>>()));
.unwrap_or_else(|| Cow::Owned((0usize..schema.fields.len()).collect::<Vec<_>>()));

let mut rb = Vec::with_capacity(row_group_len);

Expand All @@ -42,7 +42,7 @@ pub fn read_parquet<R: Read + Seek>(
// inner `Vec` is whatever number of pages the chunk contains.
let column_iter =
read::get_column_iterator(&mut reader, &metadata, rg, *column_i, None, b1);
let fld = schema.field(*column_i);
let fld = &schema.fields[*column_i];
let (mut array, b1, b2) = read::column_iter_to_array(column_iter, fld, b2)?;

if array.len() > remaining_rows {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ unsafe_unwrap = "^0.1.0"
package = "arrow2"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "7add9d31bff7a65076efbf1c4f7732be702f0e2b"
rev = "a4383b18955b35bef1237be05e0a747d9dca1171"
# branch = "offset_pub"
# version = "0.8"
default-features = false
Expand Down
51 changes: 37 additions & 14 deletions polars/polars-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use arrow::compute::arithmetics::basic::NativeArithmetics;
use arrow::compute::comparison::Simd8;
use arrow::datatypes::IntegerType;
pub use arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use arrow::error::ArrowError;
use arrow::types::simd::Simd;
use arrow::types::NativeType;
use num::{Bounded, FromPrimitive, Num, NumCast, Zero};
Expand Down Expand Up @@ -771,6 +772,27 @@ impl Field {
}
}

#[cfg(feature = "private")]
pub trait IndexOfSchema {
fn index_of(&self, name: &str) -> arrow::error::Result<usize>;
}

impl IndexOfSchema for ArrowSchema {
fn index_of(&self, name: &str) -> arrow::error::Result<usize> {
self.fields
.iter()
.position(|f| f.name == name)
.ok_or_else(|| {
let valid_fields: Vec<String> =
self.fields.iter().map(|f| f.name.clone()).collect();
ArrowError::InvalidArgumentError(format!(
"Unable to get field named \"{}\". Valid fields: {:?}",
name, valid_fields
))
})
}
}

#[derive(Clone, Debug, PartialEq, Hash, Default)]
pub struct Schema {
fields: Vec<Field>,
Expand Down Expand Up @@ -832,20 +854,21 @@ impl Schema {

/// Find the index of the column with the given name
pub fn index_of(&self, name: &str) -> Result<usize> {
for i in 0..self.fields.len() {
if self.fields[i].name == name {
return Ok(i);
}
}
let valid_fields: Vec<String> = self.fields.iter().map(|f| f.name().clone()).collect();
Err(PolarsError::NotFound(format!(
"Unable to get field named \"{}\". Valid fields: {:?}",
name, valid_fields
)))
self.fields
.iter()
.position(|f| f.name == name)
.ok_or_else(|| {
let valid_fields: Vec<String> =
self.fields.iter().map(|f| f.name.clone()).collect();
PolarsError::NotFound(format!(
"Unable to get field named \"{}\". Valid fields: {:?}",
name, valid_fields
))
})
}

pub fn to_arrow(&self) -> ArrowSchema {
let fields = self
let fields: Vec<ArrowField> = self
.fields
.iter()
.map(|f| {
Expand Down Expand Up @@ -874,7 +897,7 @@ impl Schema {
}
})
.collect();
ArrowSchema::new(fields)
ArrowSchema::from(fields)
}

pub fn try_merge(schemas: &[Self]) -> Result<Self> {
Expand Down Expand Up @@ -952,14 +975,14 @@ impl From<&ArrowDataType> for DataType {

impl From<&ArrowField> for Field {
fn from(f: &ArrowField) -> Self {
Field::new(f.name(), f.data_type().into())
Field::new(&f.name, f.data_type().into())
}
}
impl From<&ArrowSchema> for Schema {
fn from(a_schema: &ArrowSchema) -> Self {
Schema::new(
a_schema
.fields()
.fields
.iter()
.map(|arrow_f| arrow_f.into())
.collect(),
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/frame/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl std::convert::TryFrom<(ArrowChunk, &[ArrowField])> for DataFrame {
.columns()
.iter()
.zip(arg.1)
.map(|(arr, field)| Series::try_from((field.name().as_ref(), arr.clone())))
.map(|(arr, field)| Series::try_from((field.name.as_ref(), arr.clone())))
.collect();

DataFrame::new(columns?)
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/series/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ fn convert_list_inner(arr: &ArrayRef, fld: &ArrowField) -> ArrayRef {

Arc::new(LargeListArray::from_data(
ArrowDataType::LargeList(
ArrowField::new(fld.name(), ArrowDataType::LargeUtf8, true).into(),
ArrowField::new(&fld.name, ArrowDataType::LargeUtf8, true).into(),
),
offsets,
Arc::new(values),
Expand Down
43 changes: 28 additions & 15 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ where
.map(|i| unsafe { get_bit_unchecked(slice, i) })
.zip(&mut hashes[offset..])
.for_each(|(valid, h)| {
if !valid {
*h = null_h;
}
*h = [null_h, *h][valid as usize];
})
}
offset += arr.len();
Expand All @@ -98,18 +96,33 @@ where
let l = T::Native::get_hash(v, &random_state);
*h = boost_hash_combine(l, *h)
}),
_ => arr
.iter()
.zip(&mut hashes[offset..])
.for_each(|(opt_v, h)| match opt_v {
Some(v) => {
let l = T::Native::get_hash(v, &random_state);
*h = boost_hash_combine(l, *h)
}
None => {
*h = boost_hash_combine(null_h, *h);
}
}),
_ => {
let validity = arr.validity().unwrap();
let slice = validity.as_slice().0;
(0..validity.len())
.map(|i| unsafe { get_bit_unchecked(slice, i) })
.zip(&mut hashes[offset..])
.zip(arr.values().as_slice())
.for_each(|((valid, h), l)| {
*h = boost_hash_combine(
[null_h, T::Native::get_hash(l, &random_state)][valid as usize],
*h,
)
});

// arr
// .iter()
// .zip(&mut hashes[offset..])
// .for_each(|(opt_v, h)| match opt_v {
// Some(v) => {
// let l = T::Native::get_hash(v, &random_state);
// *h = boost_hash_combine(l, *h)
// }
// None => {
// *h = boost_hash_combine(null_h, *h);
// }
// })
}
}
offset += arr.len();
});
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private = []
[dependencies]
ahash = "0.7"
anyhow = "1.0"
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "7add9d31bff7a65076efbf1c4f7732be702f0e2b", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "a4383b18955b35bef1237be05e0a747d9dca1171", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", default-features = false, features = ["compute"], branch = "offset_pub" }
# arrow = { package = "arrow2", version = "0.8", default-features = false }
csv-core = { version = "0.1.10", optional = true }
Expand Down
23 changes: 13 additions & 10 deletions polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,24 +161,27 @@ where
let metadata = read::read_file_metadata(&mut self.reader)?;
let schema = &metadata.schema;

let err = |column: &str| {
let valid_fields: Vec<String> = schema.fields.iter().map(|f| f.name.clone()).collect();
PolarsError::NotFound(format!(
"Unable to get field named \"{}\". Valid fields: {:?}",
column, valid_fields
))
};

if let Some(cols) = self.columns {
let mut prj = Vec::with_capacity(cols.len());
if cols.len() > 100 {
let mut column_names = AHashMap::with_capacity(schema.fields().len());
schema.fields().iter().enumerate().for_each(|(i, c)| {
column_names.insert(c.name(), i);
let mut column_names = AHashMap::with_capacity(schema.fields.len());
schema.fields.iter().enumerate().for_each(|(i, c)| {
column_names.insert(c.name.as_str(), i);
});

for column in cols.iter() {
if let Some(i) = column_names.get(&column) {
if let Some(i) = column_names.get(column.as_str()) {
prj.push(*i);
} else {
let valid_fields: Vec<String> =
schema.fields().iter().map(|f| f.name().clone()).collect();
return Err(PolarsError::NotFound(format!(
"Unable to get field named \"{}\". Valid fields: {:?}",
column, valid_fields
)));
return Err(err(column));
}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ where
.map(|name| {
fields
.iter()
.position(|fld| fld.name() == name)
.position(|fld| &fld.name == name)
.ok_or_else(|| PolarsError::NotFound(name.into()))
})
.collect::<Result<Vec<_>>>()
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub(crate) fn finish_reader<R: ArrowReader>(
while let Some(batch) = reader.next_record_batch()? {
num_rows += batch.len();

let mut df = DataFrame::try_from((batch, arrow_schema.fields().as_slice()))?;
let mut df = DataFrame::try_from((batch, arrow_schema.fields.as_slice()))?;

if let Some(predicate) = &predicate {
let s = predicate.evaluate(&df)?;
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
if let Some(projection) = &projection {
i = projection[i]
}
Series::try_from((schema.field(i).name().as_str(), arr)).unwrap()
Series::try_from((schema.fields[i].name.as_str(), arr)).unwrap()
})
.collect(),
)
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/src/parquet/read_par.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ pub(crate) fn parallel_read<R: MmapBytesReader>(

// <CPU bounded>
let columns = read::ReadColumnIterator::new(field.clone(), column_chunks);
let field = &arrow_schema.fields()[field_i];
let field = &arrow_schema.fields[field_i];

let b2 = cont_pool.get();
let (arr, b1, b2) = read::column_iter_to_array(columns, field, b2)?;
cont_pool.set(b1);
cont_pool.set(b2);
Series::try_from((field.name().as_str(), Arc::from(arr) as ArrayRef))
Series::try_from((field.name.as_str(), Arc::from(arr) as ArrayRef))
})
.collect::<Result<Vec<_>>>()
})?;
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,18 @@ where

/// Write the given DataFrame in the the writer `W`.
pub fn finish(mut self, df: &DataFrame) -> Result<()> {
let fields = df.schema().to_arrow().fields().clone();
let fields = df.schema().to_arrow().fields;
let rb_iter = df.iter_chunks();

let options = write::WriteOptions {
write_statistics: self.statistics,
compression: self.compression,
version: write::Version::V2,
};
let schema = ArrowSchema::new(fields);
let schema = ArrowSchema::from(fields);
let parquet_schema = write::to_parquet_schema(&schema)?;
let encodings = schema
.fields()
.fields
.iter()
.map(|field| match field.data_type().to_physical_type() {
// delta encoding
Expand Down
9 changes: 6 additions & 3 deletions polars/polars-io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ pub fn resolve_homedir(path: &Path) -> PathBuf {

#[cfg(any(feature = "ipc", feature = "parquet"))]
pub(crate) fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> ArrowSchema {
let fields = schema.fields();
let fields = projection.iter().map(|idx| fields[*idx].clone()).collect();
ArrowSchema::new(fields)
let fields = &schema.fields;
let fields = projection
.iter()
.map(|idx| fields[*idx].clone())
.collect::<Vec<_>>();
ArrowSchema::from(fields)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ fn ipc_schema(py: Python, py_f: PyObject) -> PyResult<PyObject> {
};

let dict = PyDict::new(py);
for field in metadata.schema.fields() {
for field in metadata.schema.fields {
let dt: Wrap<DataType> = Wrap((&field.data_type).into());
dict.set_item(field.name(), dt.to_object(py))?;
dict.set_item(field.name, dt.to_object(py))?;
}
Ok(dict.to_object(py))
}
Expand Down

0 comments on commit 1a74cbe

Please sign in to comment.