Skip to content

Commit

Permalink
Fixed error
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Mar 20, 2022
1 parent be3a2be commit 238fae6
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 35 deletions.
4 changes: 1 addition & 3 deletions src/statistics/fixed_len_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pub struct FixedLenStatistics {
pub distinct_count: Option<i64>,
pub max_value: Option<Vec<u8>>,
pub min_value: Option<Vec<u8>>,
pub(self) physical_type: PhysicalType,
}

impl Statistics for FixedLenStatistics {
Expand All @@ -24,7 +23,7 @@ impl Statistics for FixedLenStatistics {
}

fn physical_type(&self) -> &PhysicalType {
&self.physical_type
&self.primitive_type.physical_type
}

fn null_count(&self) -> Option<i64> {
Expand Down Expand Up @@ -64,7 +63,6 @@ pub fn read(
x.truncate(size as usize);
x
}),
physical_type: PhysicalType::FixedLenByteArray(size),
}))
}

Expand Down
8 changes: 4 additions & 4 deletions src/write/column_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub async fn write_column_chunk_async<W, E>(
descriptor: &ColumnDescriptor,
compression: Compression,
mut compressed_pages: DynStreamingIterator<'_, CompressedPage, E>,
) -> Result<(ColumnChunk, usize)>
) -> Result<(ColumnChunk, Vec<PageWriteSpec>, u64)>
where
W: AsyncWrite + Unpin + Send,
ParquetError: From<E>,
Expand All @@ -81,7 +81,7 @@ where
offset += spec.bytes_written;
specs.push(spec);
}
let mut bytes_written = (offset - initial) as usize;
let mut bytes_written = offset - initial;

let column_chunk = build_column_chunk(&specs, descriptor, compression)?;

Expand All @@ -92,10 +92,10 @@ where
.as_ref()
.unwrap()
.write_to_out_stream_protocol(&mut protocol)
.await?;
.await? as u64;
protocol.flush().await?;

Ok((column_chunk, bytes_written))
Ok((column_chunk, specs, bytes_written))
}

fn build_column_chunk(
Expand Down
63 changes: 38 additions & 25 deletions src/write/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,27 @@ impl ColumnOffsetsMetadata {
}
}

fn compute_num_rows(columns: &[(ColumnChunk, Vec<PageWriteSpec>)]) -> Result<i64> {
columns
.get(0)
.map(|(_, specs)| {
let mut num_rows = 0;
specs
.iter()
.filter(|x| is_data_page(x))
.try_for_each(|spec| {
num_rows += spec.num_rows.ok_or_else(|| {
ParquetError::OutOfSpec(
"All data pages must declare the number of rows on it".to_string(),
)
})? as i64;
Result::Ok(())
})?;
Result::Ok(num_rows)
})
.unwrap_or(Ok(0))
}

pub fn write_row_group<
'a,
W,
Expand Down Expand Up @@ -82,24 +103,7 @@ where
.collect::<Result<Vec<_>>>()?;
let bytes_written = offset - initial;

let num_rows = columns
.get(0)
.map(|(_, specs)| {
let mut num_rows = 0;
specs
.iter()
.filter(|x| is_data_page(x))
.try_for_each(|spec| {
num_rows += spec.num_rows.ok_or_else(|| {
ParquetError::OutOfSpec(
"All data pages must declare a number of rows on it".to_string(),
)
})? as i64;
Result::Ok(())
})?;
Result::Ok(num_rows)
})
.unwrap_or(Ok(0))?;
let num_rows = compute_num_rows(&columns)?;

// compute row group stats
let file_offset = columns
Expand Down Expand Up @@ -145,8 +149,7 @@ pub async fn write_row_group_async<
descriptors: &[ColumnDescriptor],
compression: Compression,
columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,
num_rows: usize,
) -> Result<(RowGroup, u64)>
) -> Result<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>
where
W: AsyncWrite + Unpin + Send,
ParquetError: From<E>,
Expand All @@ -157,25 +160,34 @@ where
let initial = offset;
let mut columns = vec![];
for (descriptor, page_iter) in column_iter {
let (spec, size) =
let (column, page_specs, size) =
write_column_chunk_async(writer, offset, descriptor, compression, page_iter?).await?;
offset += size;
offset += size as u64;
columns.push(spec);
columns.push((column, page_specs));
}
let bytes_written = offset - initial;

let num_rows = compute_num_rows(&columns)?;

// compute row group stats
let file_offest = columns
.get(0)
.map(|column_chunk| {
.map(|(column_chunk, _)| {
ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset()
})
.unwrap_or(None);

let total_byte_size = columns
.iter()
.map(|c| c.meta_data.as_ref().unwrap().total_compressed_size)
.map(|(c, _)| c.meta_data.as_ref().unwrap().total_uncompressed_size)
.sum();
let total_compressed_size = columns
.iter()
.map(|(c, _)| c.meta_data.as_ref().unwrap().total_compressed_size)
.sum();

let (columns, specs) = columns.into_iter().unzip();

Ok((
RowGroup {
Expand All @@ -184,9 +196,10 @@ where
num_rows: num_rows as i64,
sorting_columns: None,
file_offset: file_offest,
total_compressed_size: None,
total_compressed_size: Some(total_compressed_size),
ordinal: None,
},
specs,
bytes_written,
))
}
5 changes: 2 additions & 3 deletions src/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
}

/// Writes a row group to the file.
pub async fn write<E>(&mut self, row_group: RowGroupIter<'_, E>, num_rows: usize) -> Result<()>
pub async fn write<E>(&mut self, row_group: RowGroupIter<'_, E>) -> Result<()>
where
ParquetError: From<E>,
E: std::error::Error,
Expand All @@ -102,13 +102,12 @@ impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
"You must call `start` before writing the first row group".to_string(),
));
}
let (group, size) = write_row_group_async(
let (group, _specs, size) = write_row_group_async(
&mut self.writer,
self.offset,
self.schema.columns(),
self.options.compression,
row_group,
num_rows,
)
.await?;
self.offset += size;
Expand Down

0 comments on commit 238fae6

Please sign in to comment.