Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Replaced own allocator by std::Vec. #385

Merged
merged 4 commits into from Sep 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 22 additions & 1 deletion .github/workflows/test.yml
Expand Up @@ -73,6 +73,25 @@ jobs:
# --skip io: miri can't handle opening of files, so we skip those
run: cargo miri test --features full -- --skip io::parquet --skip io::ipc

miri-checks-custom-allocator:
name: MIRI with custom allocator
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-07-09
override: true
- uses: Swatinem/rust-cache@v1
- name: Install Miri
run: |
rustup component add miri
cargo miri setup

- name: Run
# --skip io: miri can't handle opening of files, so we skip those
run: cargo miri test --features full,cache_aligned -- --skip io::parquet --skip io::ipc

coverage:
name: Coverage
runs-on: ubuntu-latest
Expand All @@ -91,7 +110,9 @@ jobs:
- name: Install tarpaulin
run: cargo install cargo-tarpaulin
- name: Run coverage
run: cargo tarpaulin --features full --out Xml
run: |
cargo tarpaulin --features cache_aligned --out Xml
cargo tarpaulin --features full --out Xml
- name: Report coverage
continue-on-error: true
run: bash <(curl -s https://codecov.io/bash)
Expand Down
5 changes: 4 additions & 1 deletion Cargo.toml
Expand Up @@ -98,7 +98,7 @@ full = [
"merge_sort",
"compute",
# parses timezones used in timestamp conversions
"chrono-tz",
"chrono-tz"
]
merge_sort = ["itertools"]
io_csv = ["io_csv_read", "io_csv_write"]
Expand All @@ -125,6 +125,9 @@ compute = ["strength_reduce", "multiversion", "lexical-core", "ahash"]
io_parquet = ["parquet2", "io_ipc", "base64", "futures"]
benchmarks = ["rand"]
simd = ["packed_simd"]
# uses a custom allocator whose pointers are aligned along cache lines.
# Using this features makes `Buffer` and `MutableBuffer` incompatible with `Vec`.
cache_aligned = []

[package.metadata.cargo-all-features]
skip_feature_sets = [
Expand Down
6 changes: 0 additions & 6 deletions arrow-pyarrow-integration-testing/src/lib.rs
Expand Up @@ -152,14 +152,8 @@ fn round_trip_field(array: PyObject, py: Python) -> PyResult<PyObject> {
to_py_field(&field, py)
}

#[pyfunction]
fn total_allocated_bytes() -> PyResult<isize> {
Ok(arrow2::total_allocated_bytes())
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(total_allocated_bytes, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_array, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_field, m)?)?;
Ok(())
Expand Down
9 changes: 0 additions & 9 deletions arrow-pyarrow-integration-testing/tests/test_sql.py
Expand Up @@ -33,18 +33,9 @@ def __reduce__(self):

class TestCase(unittest.TestCase):
def setUp(self):
self.old_allocated_rust = (
arrow_pyarrow_integration_testing.total_allocated_bytes()
)
self.old_allocated_cpp = pyarrow.total_allocated_bytes()

def tearDown(self):
# No leak of Rust
self.assertEqual(
self.old_allocated_rust,
arrow_pyarrow_integration_testing.total_allocated_bytes(),
)

# No leak of C++ memory
self.assertEqual(self.old_allocated_cpp, pyarrow.total_allocated_bytes())

Expand Down
54 changes: 26 additions & 28 deletions benches/arithmetic_kernels.rs
@@ -1,31 +1,15 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
use criterion::Criterion;

use arrow2::array::*;
use arrow2::util::bench_util::*;
use arrow2::{
compute::arithmetics::basic::div::div_scalar, datatypes::DataType, types::NativeType,
compute::arithmetics::basic::add::add, compute::arithmetics::basic::div::div_scalar,
datatypes::DataType, types::NativeType,
};
use num_traits::NumCast;
use std::ops::Div;
use std::ops::{Add, Div};

fn bench_div_scalar<T>(lhs: &PrimitiveArray<T>, rhs: &T)
where
Expand All @@ -34,17 +18,31 @@ where
criterion::black_box(div_scalar(lhs, rhs));
}

fn bench_add<T>(lhs: &PrimitiveArray<T>, rhs: &PrimitiveArray<T>)
where
T: NativeType + Add<Output = T> + NumCast,
{
criterion::black_box(add(lhs, rhs)).unwrap();
}

fn add_benchmark(c: &mut Criterion) {
let size = 65536;
let arr = create_primitive_array_with_seed::<u64>(size, DataType::UInt64, 0.0, 43);
(10..=20).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);
let arr_a = create_primitive_array_with_seed::<u64>(size, DataType::UInt64, 0.0, 43);
let arr_b = create_primitive_array_with_seed::<u64>(size, DataType::UInt64, 0.0, 42);

c.bench_function("divide_scalar 4", |b| {
// 4 is a very fast optimizable divisor
b.iter(|| bench_div_scalar(&arr, &4))
});
c.bench_function("divide_scalar prime", |b| {
// large prime number that is probably harder to simplify
b.iter(|| bench_div_scalar(&arr, &524287))
c.bench_function(&format!("divide_scalar 2^{}", log2_size), |b| {
// 4 is a very fast optimizable divisor
b.iter(|| bench_div_scalar(&arr_a, &4))
});
c.bench_function(&format!("divide_scalar prime 2^{}", log2_size), |b| {
// large prime number that is probably harder to simplify
b.iter(|| bench_div_scalar(&arr_a, &524287))
});

c.bench_function(&format!("add 2^{}", log2_size), |b| {
b.iter(|| bench_add(&arr_a, &arr_b))
});
});
}

Expand Down
5 changes: 0 additions & 5 deletions src/alloc/mod.rs
Expand Up @@ -34,11 +34,6 @@ pub use alignment::ALIGNMENT;
// If this number is not zero after all objects have been `drop`, there is a memory leak
static mut ALLOCATIONS: AtomicIsize = AtomicIsize::new(0);

/// Returns the total number of bytes allocated to buffers by the allocator.
pub fn total_allocated_bytes() -> isize {
unsafe { ALLOCATIONS.load(std::sync::atomic::Ordering::SeqCst) }
}

/// # Safety
/// This pointer may only be used to check if memory is allocated.
#[inline]
Expand Down
4 changes: 2 additions & 2 deletions src/bitmap/bitmap_ops.rs
Expand Up @@ -112,9 +112,9 @@ where

let iterator = iter.map(|left| op(left)).chain(std::iter::once(rem));

let buffer = MutableBuffer::from_trusted_len_iter(iterator);
let buffer = MutableBuffer::from_chunk_iter(iterator);

Bitmap::from_u8_buffer(buffer.into(), length)
Bitmap::from_u8_buffer(buffer, length)
}

/// Apply a bitwise operation `op` to one input and return the result as a [`Bitmap`].
Expand Down
19 changes: 9 additions & 10 deletions src/buffer/bytes.rs
Expand Up @@ -5,9 +5,10 @@ use std::slice;
use std::{fmt::Debug, fmt::Formatter};
use std::{ptr::NonNull, sync::Arc};

use crate::alloc;
use crate::ffi;
use crate::types::NativeType;
#[cfg(feature = "cache_aligned")]
use crate::vec::AlignedVec as Vec;

/// Mode of deallocating memory regions
pub enum Deallocation {
Expand Down Expand Up @@ -79,11 +80,6 @@ impl<T: NativeType> Bytes<T> {
self.len
}

#[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}

#[inline]
pub fn ptr(&self) -> NonNull<T> {
self.ptr
Expand All @@ -94,9 +90,12 @@ impl<T: NativeType> Drop for Bytes<T> {
#[inline]
fn drop(&mut self) {
match &self.deallocation {
Deallocation::Native(capacity) => {
unsafe { alloc::free_aligned(self.ptr, *capacity) };
}
Deallocation::Native(capacity) => unsafe {
#[cfg(feature = "cache_aligned")]
let _ = Vec::from_raw_parts(self.ptr, self.len, *capacity);
#[cfg(not(feature = "cache_aligned"))]
let _ = Vec::from_raw_parts(self.ptr.as_ptr(), self.len, *capacity);
},
// foreign interface knows how to deallocate itself.
Deallocation::Foreign(_) => (),
}
Expand Down Expand Up @@ -127,6 +126,6 @@ impl<T: NativeType> Debug for Bytes<T> {
}
}

// This is sound because `Bytes` is an imutable container
// This is sound because `Bytes` is an immutable container
unsafe impl<T: NativeType> Send for Bytes<T> {}
unsafe impl<T: NativeType> Sync for Bytes<T> {}
13 changes: 11 additions & 2 deletions src/buffer/immutable.rs
Expand Up @@ -52,9 +52,18 @@ impl<T: NativeType> Buffer<T> {
MutableBuffer::from_len_zeroed(length).into()
}

/// Auxiliary method to create a new Buffer
/// Takes ownership of [`Vec`].
/// # Implementation
/// This function is `O(1)`
#[cfg(not(feature = "cache_aligned"))]
#[cfg_attr(docsrs, doc(cfg(not(feature = "cache_aligned"))))]
#[inline]
pub fn from_bytes(bytes: Bytes<T>) -> Self {
pub fn from_vec(data: Vec<T>) -> Self {
MutableBuffer::from_vec(data).into()
}

/// Auxiliary method to create a new Buffer
pub(crate) fn from_bytes(bytes: Bytes<T>) -> Self {
let length = bytes.len();
Buffer {
data: Arc::new(bytes),
Expand Down
1 change: 0 additions & 1 deletion src/buffer/mod.rs
Expand Up @@ -6,7 +6,6 @@ mod immutable;
mod mutable;

pub(crate) mod bytes;
pub(crate) mod util;

pub use immutable::Buffer;
pub use mutable::MutableBuffer;