Skip to content

refactor: get_stats merge and simplify #1150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a016af7
refactor: simplify query param extraction
Jan 30, 2025
28764cd
refactor: stats params in query API
Jan 31, 2025
3703343
DRY `get_stats`
Jan 31, 2025
dad51d2
semantic locality
Jan 31, 2025
3cb375e
Merge branch 'main' into query-param
Jan 31, 2025
150eb71
style imports
Jan 31, 2025
46fac83
Merge remote-tracking branch 'origin' into query-param
Feb 1, 2025
4eee17a
Merge remote-tracking branch 'origin/main' into query-param
Feb 4, 2025
ede0f45
fix: failure to deserialize `StatsParams`
Feb 4, 2025
14c79c5
Merge branch 'main' into query-param
Feb 4, 2025
5e62452
Merge branch 'main' into query-param
Feb 5, 2025
1138006
Merge branch 'main' into query-param
Feb 5, 2025
8b35a9f
Merge branch 'main' into query-param
Feb 6, 2025
82f04d6
Merge remote-tracking branch 'origin/main' into query-param
Feb 13, 2025
cc689b4
test makes no sense
Feb 13, 2025
3902bd2
style: less code == better
Feb 13, 2025
6340a8e
don't handle test path
Feb 13, 2025
92fba76
refactor: with less code
Feb 13, 2025
9f54e9e
coderabbit suggestions
Feb 13, 2025
5e7d8d1
refactor: date validation
Feb 13, 2025
2a3825a
Merge remote-tracking branch 'origin/main' into query-param
Feb 16, 2025
4b0c0dc
style: fmt
Feb 16, 2025
fba51f1
Merge remote-tracking branch 'origin/main' into query-param
Mar 6, 2025
43db1b6
fix: `DatedStats` merge
Mar 6, 2025
fe1bb95
Merge branch 'main' into query-param
Mar 11, 2025
0a07fbc
Merge remote-tracking branch 'origin/main' into query-param
Mar 24, 2025
f820090
refactor: separate constructors
Mar 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 88 additions & 88 deletions src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,94 @@ impl QueriedStats {
storage,
}
}

pub fn merge(stats: Vec<Self>) -> Self {
// get the stream name
let stream_name = stats[1].stream.clone();

let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);

let cumulative_ingestion =
stats
.iter()
.map(|x| &x.ingestion)
.fold(IngestionStats::default(), |acc, x| IngestionStats {
count: acc.count + x.count,

size: format!(
"{} Bytes",
acc.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
format: x.format.clone(),
lifetime_count: acc.lifetime_count + x.lifetime_count,
lifetime_size: format!(
"{} Bytes",
acc.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
deleted_count: acc.deleted_count + x.deleted_count,
deleted_size: format!(
"{} Bytes",
acc.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
});

let cumulative_storage =
stats
.iter()
.map(|x| &x.storage)
.fold(StorageStats::default(), |acc, x| StorageStats {
size: format!(
"{} Bytes",
acc.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
format: x.format.clone(),
lifetime_size: format!(
"{} Bytes",
acc.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
deleted_size: format!(
"{} Bytes",
acc.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
});

QueriedStats::new(
&stream_name,
min_time,
cumulative_ingestion,
cumulative_storage,
)
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
Expand Down Expand Up @@ -130,94 +218,6 @@ impl StorageStats {
}
}

pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
// get the stream name
let stream_name = stats[1].stream.clone();

let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);

let cumulative_ingestion =
stats
.iter()
.map(|x| &x.ingestion)
.fold(IngestionStats::default(), |acc, x| IngestionStats {
count: acc.count + x.count,

size: format!(
"{} Bytes",
acc.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
format: x.format.clone(),
lifetime_count: acc.lifetime_count + x.lifetime_count,
lifetime_size: format!(
"{} Bytes",
acc.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
deleted_count: acc.deleted_count + x.deleted_count,
deleted_size: format!(
"{} Bytes",
acc.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
});

let cumulative_storage =
stats
.iter()
.map(|x| &x.storage)
.fold(StorageStats::default(), |acc, x| StorageStats {
size: format!(
"{} Bytes",
acc.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
format: x.format.clone(),
lifetime_size: format!(
"{} Bytes",
acc.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.lifetime_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
deleted_size: format!(
"{} Bytes",
acc.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
+ x.deleted_size.split(' ').collect_vec()[0]
.parse::<u64>()
.unwrap_or_default()
),
});

QueriedStats::new(
&stream_name,
min_time,
cumulative_ingestion,
cumulative_storage,
)
}

pub async fn check_liveness(domain_name: &str) -> bool {
let uri = match Url::parse(&format!(
"{}{}/liveness",
Expand Down
Loading
Loading