Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added number of rows read in CSV inference (#765)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 14, 2022
1 parent a886e35 commit 79b87b6
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion examples/csv_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Arc<dyn A

// Infers the fields using the default inferer. The inferer is just a function that maps bytes
// to a `DataType`.
let fields = read::infer_schema(&mut reader, None, true, &read::infer)?;
let (fields, _) = read::infer_schema(&mut reader, None, true, &read::infer)?;

// allocate space to read from CSV to. The size of this vec denotes how many rows are read.
let mut rows = vec![read::ByteRecord::default(); 100];
Expand Down
2 changes: 1 addition & 1 deletion examples/csv_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn main() -> Result<()> {

let mut reader = AsyncReaderBuilder::new().create_reader(file);

let fields = infer_schema(&mut reader, None, true, &infer).await?;
let (fields, _) = infer_schema(&mut reader, None, true, &infer).await?;

let mut rows = vec![ByteRecord::default(); 100];
let rows_read = read_rows(&mut reader, 0, &mut rows).await?;
Expand Down
3 changes: 2 additions & 1 deletion examples/csv_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
let (tx, rx) = unbounded();

let mut reader = read::ReaderBuilder::new().from_path(path)?;
let fields = read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let (fields, _) =
read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let fields = Arc::new(fields);

let start = SystemTime::now();
Expand Down
5 changes: 3 additions & 2 deletions src/io/csv/read/infer_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ use super::super::utils::merge_schema;
use super::{ByteRecord, Reader};

/// Infers the [`Field`]s of a CSV file by reading through the first n records up to `max_rows`.
/// Also returns the number of rows used to infer.
/// Seeks back to the begining of the file _after_ the header
pub fn infer_schema<R: Read + Seek, F: Fn(&[u8]) -> DataType>(
reader: &mut Reader<R>,
max_rows: Option<usize>,
has_header: bool,
infer: &F,
) -> Result<Vec<Field>> {
) -> Result<(Vec<Field>, usize)> {
// get or create header names
// when has_header is false, creates default column names with column_ prefix
let headers: Vec<String> = if has_header {
Expand Down Expand Up @@ -57,5 +58,5 @@ pub fn infer_schema<R: Read + Seek, F: Fn(&[u8]) -> DataType>(
// return the reader seek back to the start
reader.seek(position)?;

Ok(fields)
Ok((fields, records_count))
}
4 changes: 2 additions & 2 deletions src/io/csv/read_async/infer_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub async fn infer_schema<R, F>(
max_rows: Option<usize>,
has_header: bool,
infer: &F,
) -> Result<Vec<Field>>
) -> Result<(Vec<Field>, usize)>
where
R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
F: Fn(&[u8]) -> DataType,
Expand Down Expand Up @@ -65,5 +65,5 @@ where
// return the reader seek back to the start
reader.seek(position).await?;

Ok(fields)
Ok((fields, records_count))
}
6 changes: 3 additions & 3 deletions tests/it/io/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn read() -> Result<()> {
"Aberdeen, Aberdeen City, UK",57.149651,-2.099075"#;
let mut reader = ReaderBuilder::new().from_reader(Cursor::new(data));

let fields = infer_schema(&mut reader, None, true, &infer)?;
let (fields, _) = infer_schema(&mut reader, None, true, &infer)?;

let mut rows = vec![ByteRecord::default(); 100];
let rows_read = read_rows(&mut reader, 0, &mut rows)?;
Expand Down Expand Up @@ -58,7 +58,7 @@ fn infer_basics() -> Result<()> {
let file = Cursor::new("1,2,3\na,b,c\na,,c");
let mut reader = ReaderBuilder::new().from_reader(file);

let fields = infer_schema(&mut reader, Some(10), false, &infer)?;
let (fields, _) = infer_schema(&mut reader, Some(10), false, &infer)?;

assert_eq!(
fields,
Expand All @@ -76,7 +76,7 @@ fn infer_ints() -> Result<()> {
let file = Cursor::new("1,2,3\n1,a,5\n2,,4");
let mut reader = ReaderBuilder::new().from_reader(file);

let fields = infer_schema(&mut reader, Some(10), false, &infer)?;
let (fields, _) = infer_schema(&mut reader, Some(10), false, &infer)?;

assert_eq!(
fields,
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/csv/read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn read() -> Result<()> {
"Aberdeen, Aberdeen City, UK",57.149651,-2.099075"#;
let mut reader = AsyncReaderBuilder::new().create_reader(Cursor::new(data.as_bytes()));

let fields = infer_schema(&mut reader, None, true, &infer).await?;
let (fields, _) = infer_schema(&mut reader, None, true, &infer).await?;

let mut rows = vec![ByteRecord::default(); 100];
let rows_read = read_rows(&mut reader, 0, &mut rows).await?;
Expand Down

0 comments on commit 79b87b6

Please sign in to comment.