Skip to content

Commit

Permalink
improve conversion from pyarrow to polars; only coerce on rust side, …
Browse files Browse the repository at this point in the history
…one level of entry (#2124)
  • Loading branch information
ritchie46 committed Dec 22, 2021
1 parent 03c07c9 commit d4ba72e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 50 deletions.
17 changes: 7 additions & 10 deletions polars/polars-core/src/series/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,13 @@ impl TryFrom<(&str, Vec<ArrayRef>)> for Series {
}
#[cfg(feature = "dtype-datetime")]
ArrowDataType::Timestamp(tu, tz) => {
let s = if tz.is_none() || tz == &Some("".to_string()) {
let chunks = cast_chunks(&chunks, &DataType::Int64).unwrap();
Int64Chunked::new_from_chunks(name, chunks)
.into_date()
.into_series()
} else {
return Err(PolarsError::InvalidOperation(
"Cannot create polars series timestamp with timezone".into(),
));
};
let chunks = cast_chunks(&chunks, &DataType::Int64).unwrap();
let s = Int64Chunked::new_from_chunks(name, chunks)
.into_date()
.into_series();
if !(tz.is_none() || tz == &Some("".to_string())) {
println!("Conversion of timezone aware to naive datetimes. TZ information may be lost.")
}
Ok(match tu {
TimeUnit::Second => &s * NANOSECONDS,
TimeUnit::Millisecond => &s * 1_000_000,
Expand Down
83 changes: 43 additions & 40 deletions py-polars/polars/internals/construction.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,20 @@ def arrow_to_pyseries(
"""
Construct a PySeries from an Arrow array.
"""
array = coerce_arrow(values, rechunk)
if hasattr(array, "num_chunks") and array.num_chunks > 1:
it = array.iterchunks()
pys = PySeries.from_arrow(name, next(it))
for a in it:
pys.append(PySeries.from_arrow(name, a))
array = coerce_arrow(values)
if hasattr(array, "num_chunks"):
if array.num_chunks > 1:
it = array.iterchunks()
pys = PySeries.from_arrow(name, next(it))
for a in it:
pys.append(PySeries.from_arrow(name, a))
else:
pys = PySeries.from_arrow(name, array.combine_chunks())

return pys
if rechunk:
pys.rechunk(in_place=True)

return pys
return PySeries.from_arrow(name, array)


Expand Down Expand Up @@ -395,20 +400,39 @@ def arrow_to_pydf(
) from e

data_dict = {}
# dictionaries cannot be build in different batches (categorical does not allow that)
# so we rechunk them and create them separate.
dictionary_cols = {}
names = []
for i, column in enumerate(data):
# extract the name before casting
if column._name is None:
name = f"column_{i}"
else:
name = column._name
names.append(name)

column = coerce_arrow(column, rechunk=rechunk)
data_dict[name] = column
column = coerce_arrow(column)
if pa.types.is_dictionary(column.type):
ps = arrow_to_pyseries(name, column, rechunk)
dictionary_cols[i] = pli.wrap_s(ps)
else:
data_dict[name] = column

batches = pa.table(data_dict).to_batches()
pydf = PyDataFrame.from_arrow_record_batches(batches)
if len(data_dict) > 0:
batches = pa.table(data_dict).to_batches()
pydf = PyDataFrame.from_arrow_record_batches(batches)
else:
pydf = pli.DataFrame([])._df
if rechunk:
pydf = pydf.rechunk()

if len(dictionary_cols) > 0:
df = pli.wrap_df(pydf)
for i, s in dictionary_cols.items():
df[s.name] = s
df = df[names]
pydf = df._df
return pydf


Expand Down Expand Up @@ -449,38 +473,19 @@ def pandas_to_pydf(


def coerce_arrow(array: "pa.Array", rechunk: bool = True) -> "pa.Array":
# also coerces timezone to naive representation
# units are accounted for by pyarrow
if isinstance(array, pa.TimestampArray):
if array.type.tz is not None:
warnings.warn(
"Conversion of timezone aware to naive datetimes. TZ information may be lost",
)
ts_ms = pa.compute.cast(array, pa.timestamp("ms"), safe=False)
ms = pa.compute.cast(ts_ms, pa.int64())
del ts_ms
array = pa.compute.cast(ms, pa.timestamp("ms"))
del ms
if isinstance(array, pa.TimestampArray) and array.type.tz is not None:
warnings.warn(
"Conversion of timezone aware to naive datetimes. TZ information may be lost",
)

# note: Decimal256 could not be cast to float
elif isinstance(array.type, pa.Decimal128Type):
if isinstance(array.type, pa.Decimal128Type):
array = pa.compute.cast(array, pa.float64())

if hasattr(array, "num_chunks") and array.num_chunks > 1 and rechunk:
# we have to coerce before combining chunks, because pyarrow panics if
# offsets overflow
if pa.types.is_string(array.type):
array = pa.compute.cast(array, pa.large_utf8())
elif pa.types.is_list(array.type):
# pyarrow does not seem to support casting from list to largelist
# so we use convert to large list ourselves and do the re-alloc on polars/arrow side
chunks = []
for arr in array.iterchunks():
chunks.append(pli.Series._from_arrow("", arr).to_arrow())
array = pa.chunked_array(chunks)

# small integer keys can often not be combined, so let's already cast
# to the uint32 used by polars
elif pa.types.is_dictionary(array.type) and (
if pa.types.is_dictionary(array.type) and (
pa.types.is_int8(array.type.index_type)
or pa.types.is_uint8(array.type.index_type)
or pa.types.is_int16(array.type.index_type)
Expand All @@ -489,7 +494,5 @@ def coerce_arrow(array: "pa.Array", rechunk: bool = True) -> "pa.Array":
):
array = pa.compute.cast(
array, pa.dictionary(pa.uint32(), pa.large_string())
)

array = array.combine_chunks()
).combine_chunks()
return array

0 comments on commit d4ba72e

Please sign in to comment.