Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl StandardTableProvider {
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
let mut column_statistics = HashMap::<String, Option<TypedStatistics>>::new();
let mut count = 0;
let mut total_file_size = 0u64;
let mut total_compressed_size = 0u64;
let mut file_count = 0u64;
for (index, file) in manifest_files
.into_iter()
Expand All @@ -339,13 +339,14 @@ impl StandardTableProvider {
mut file_path,
num_rows,
columns,
file_size,
..
} = file;

// Track billing metrics for files scanned in query
file_count += 1;
total_file_size += file_size;
// Calculate actual compressed bytes that will be read from storage
let compressed_bytes: u64 = columns.iter().map(|col| col.compressed_size).sum();
total_compressed_size += compressed_bytes;

// object_store::path::Path doesn't automatically deal with Windows path separators
// to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem
Expand Down Expand Up @@ -406,7 +407,8 @@ impl StandardTableProvider {
// Track billing metrics for query scan
let current_date = chrono::Utc::now().date_naive().to_string();
increment_files_scanned_in_query_by_date(file_count, &current_date);
increment_bytes_scanned_in_query_by_date(total_file_size, &current_date);
// Use compressed size as it represents actual bytes read from storage (S3/object store charges)
increment_bytes_scanned_in_query_by_date(total_compressed_size, &current_date);

(partitioned_files, statistics)
}
Expand Down
Loading