1- use std:: collections:: HashSet ;
2- use std:: sync:: atomic:: Ordering ;
3- use std:: sync:: Arc ;
1+ use std:: mem:: replace;
2+ use std:: ops:: DerefMut ;
43use std:: { sync:: Mutex , time:: SystemTime } ;
54
65use crate :: metrics:: data:: HistogramDataPoint ;
3736 buckets : Mutex :: new ( Buckets :: < T > :: new ( * count) ) ,
3837 }
3938 }
39+
40+ fn clone_and_reset ( & self , count : & usize ) -> Self {
41+ let mut current = self . buckets . lock ( ) . unwrap_or_else ( |err| err. into_inner ( ) ) ;
42+ let cloned = replace ( current. deref_mut ( ) , Buckets :: new ( * count) ) ;
43+ Self {
44+ buckets : Mutex :: new ( cloned) ,
45+ }
46+ }
4047}
4148
4249#[ derive( Default ) ]
@@ -73,16 +80,6 @@ impl<T: Number> Buckets<T> {
7380 self . max = value
7481 }
7582 }
76-
77- fn reset ( & mut self ) {
78- for item in & mut self . counts {
79- * item = 0 ;
80- }
81- self . count = Default :: default ( ) ;
82- self . total = Default :: default ( ) ;
83- self . min = T :: max ( ) ;
84- self . max = T :: min ( ) ;
85- }
8683}
8784
8885/// Summarizes a set of measurements as a histogram with explicitly defined
@@ -139,11 +136,6 @@ impl<T: Number> Histogram<T> {
139136 dest : Option < & mut dyn Aggregation > ,
140137 ) -> ( usize , Option < Box < dyn Aggregation > > ) {
141138 let t = SystemTime :: now ( ) ;
142- let start = self
143- . start
144- . lock ( )
145- . map ( |s| * s)
146- . unwrap_or_else ( |_| SystemTime :: now ( ) ) ;
147139 let h = dest. and_then ( |d| d. as_mut ( ) . downcast_mut :: < data:: Histogram < T > > ( ) ) ;
148140 let mut new_agg = if h. is_none ( ) {
149141 Some ( data:: Histogram {
@@ -155,24 +147,22 @@ impl<T: Number> Histogram<T> {
155147 } ;
156148 let h = h. unwrap_or_else ( || new_agg. as_mut ( ) . expect ( "present if h is none" ) ) ;
157149 h. temporality = Temporality :: Delta ;
158- h. data_points . clear ( ) ;
159-
160- // Max number of data points need to account for the special casing
161- // of the no attribute value + overflow attribute.
162- let n = self . value_map . count . load ( Ordering :: SeqCst ) + 2 ;
163- if n > h. data_points . capacity ( ) {
164- h. data_points . reserve_exact ( n - h. data_points . capacity ( ) ) ;
165- }
166150
167- if self
168- . value_map
169- . has_no_attribute_value
170- . swap ( false , Ordering :: AcqRel )
171- {
172- if let Ok ( ref mut b) = self . value_map . no_attribute_tracker . buckets . lock ( ) {
173- h. data_points . push ( HistogramDataPoint {
174- attributes : vec ! [ ] ,
175- start_time : start,
151+ let prev_start = self
152+ . start
153+ . lock ( )
154+ . map ( |mut start| replace ( start. deref_mut ( ) , t) )
155+ . unwrap_or ( t) ;
156+
157+ self . value_map
158+ . collect_and_reset ( & mut h. data_points , |attributes, aggr| {
159+ let b = aggr
160+ . buckets
161+ . into_inner ( )
162+ . unwrap_or_else ( |err| err. into_inner ( ) ) ;
163+ HistogramDataPoint {
164+ attributes,
165+ start_time : prev_start,
176166 time : t,
177167 count : b. count ,
178168 bounds : self . bounds . clone ( ) ,
@@ -193,54 +183,8 @@ impl<T: Number> Histogram<T> {
193183 None
194184 } ,
195185 exemplars : vec ! [ ] ,
196- } ) ;
197-
198- b. reset ( ) ;
199- }
200- }
201-
202- let mut trackers = match self . value_map . trackers . write ( ) {
203- Ok ( v) => v,
204- Err ( _) => return ( 0 , None ) ,
205- } ;
206-
207- let mut seen = HashSet :: new ( ) ;
208- for ( attrs, tracker) in trackers. drain ( ) {
209- if seen. insert ( Arc :: as_ptr ( & tracker) ) {
210- if let Ok ( b) = tracker. buckets . lock ( ) {
211- h. data_points . push ( HistogramDataPoint {
212- attributes : attrs. clone ( ) ,
213- start_time : start,
214- time : t,
215- count : b. count ,
216- bounds : self . bounds . clone ( ) ,
217- bucket_counts : b. counts . clone ( ) ,
218- sum : if self . record_sum {
219- b. total
220- } else {
221- T :: default ( )
222- } ,
223- min : if self . record_min_max {
224- Some ( b. min )
225- } else {
226- None
227- } ,
228- max : if self . record_min_max {
229- Some ( b. max )
230- } else {
231- None
232- } ,
233- exemplars : vec ! [ ] ,
234- } ) ;
235186 }
236- }
237- }
238-
239- // The delta collection cycle resets.
240- if let Ok ( mut start) = self . start . lock ( ) {
241- * start = t;
242- }
243- self . value_map . count . store ( 0 , Ordering :: SeqCst ) ;
187+ } ) ;
244188
245189 ( h. data_points . len ( ) , new_agg. map ( |a| Box :: new ( a) as Box < _ > ) )
246190 }
@@ -250,11 +194,6 @@ impl<T: Number> Histogram<T> {
250194 dest : Option < & mut dyn Aggregation > ,
251195 ) -> ( usize , Option < Box < dyn Aggregation > > ) {
252196 let t = SystemTime :: now ( ) ;
253- let start = self
254- . start
255- . lock ( )
256- . map ( |s| * s)
257- . unwrap_or_else ( |_| SystemTime :: now ( ) ) ;
258197 let h = dest. and_then ( |d| d. as_mut ( ) . downcast_mut :: < data:: Histogram < T > > ( ) ) ;
259198 let mut new_agg = if h. is_none ( ) {
260199 Some ( data:: Histogram {
@@ -266,24 +205,19 @@ impl<T: Number> Histogram<T> {
266205 } ;
267206 let h = h. unwrap_or_else ( || new_agg. as_mut ( ) . expect ( "present if h is none" ) ) ;
268207 h. temporality = Temporality :: Cumulative ;
269- h. data_points . clear ( ) ;
270208
271- // Max number of data points need to account for the special casing
272- // of the no attribute value + overflow attribute.
273- let n = self . value_map . count . load ( Ordering :: SeqCst ) + 2 ;
274- if n > h. data_points . capacity ( ) {
275- h. data_points . reserve_exact ( n - h. data_points . capacity ( ) ) ;
276- }
209+ let prev_start = self
210+ . start
211+ . lock ( )
212+ . map ( |s| * s)
213+ . unwrap_or_else ( |_| SystemTime :: now ( ) ) ;
277214
278- if self
279- . value_map
280- . has_no_attribute_value
281- . load ( Ordering :: Acquire )
282- {
283- if let Ok ( b) = & self . value_map . no_attribute_tracker . buckets . lock ( ) {
284- h. data_points . push ( HistogramDataPoint {
285- attributes : vec ! [ ] ,
286- start_time : start,
215+ self . value_map
216+ . collect_readonly ( & mut h. data_points , |attributes, aggr| {
217+ let b = aggr. buckets . lock ( ) . unwrap_or_else ( |err| err. into_inner ( ) ) ;
218+ HistogramDataPoint {
219+ attributes,
220+ start_time : prev_start,
287221 time : t,
288222 count : b. count ,
289223 bounds : self . bounds . clone ( ) ,
@@ -304,50 +238,8 @@ impl<T: Number> Histogram<T> {
304238 None
305239 } ,
306240 exemplars : vec ! [ ] ,
307- } ) ;
308- }
309- }
310-
311- let trackers = match self . value_map . trackers . write ( ) {
312- Ok ( v) => v,
313- Err ( _) => return ( 0 , None ) ,
314- } ;
315-
316- // TODO: This will use an unbounded amount of memory if there
317- // are unbounded number of attribute sets being aggregated. Attribute
318- // sets that become "stale" need to be forgotten so this will not
319- // overload the system.
320- let mut seen = HashSet :: new ( ) ;
321- for ( attrs, tracker) in trackers. iter ( ) {
322- if seen. insert ( Arc :: as_ptr ( tracker) ) {
323- if let Ok ( b) = tracker. buckets . lock ( ) {
324- h. data_points . push ( HistogramDataPoint {
325- attributes : attrs. clone ( ) ,
326- start_time : start,
327- time : t,
328- count : b. count ,
329- bounds : self . bounds . clone ( ) ,
330- bucket_counts : b. counts . clone ( ) ,
331- sum : if self . record_sum {
332- b. total
333- } else {
334- T :: default ( )
335- } ,
336- min : if self . record_min_max {
337- Some ( b. min )
338- } else {
339- None
340- } ,
341- max : if self . record_min_max {
342- Some ( b. max )
343- } else {
344- None
345- } ,
346- exemplars : vec ! [ ] ,
347- } ) ;
348241 }
349- }
350- }
242+ } ) ;
351243
352244 ( h. data_points . len ( ) , new_agg. map ( |a| Box :: new ( a) as Box < _ > ) )
353245 }
0 commit comments