Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public void unregister(RegisteredPID registeredPID) {
}

@Override
public void start(long samplingPeriodInMillis) throws Exception {
public void start() throws Exception {
if (!started) {
lastUpdateEpoch = System.currentTimeMillis();
started = true;
doStart(samplingPeriodInMillis);
doStart();
}
}

Expand All @@ -95,14 +95,14 @@ public Set<String> getRegisteredPIDs() {
return measures.trackedPIDsAsString();
}

protected abstract void doStart(long samplingFrequencyInMillis);
protected abstract void doStart();

@Override
public Measures update(Long tick, Map<String, Double> cpuShares) {
final long newUpdateStartEpoch = System.currentTimeMillis();
Log.debugf("Sensor update last called: %dms ago", newUpdateStartEpoch - lastUpdateEpoch);
final var measures = doUpdate(lastUpdateEpoch, newUpdateStartEpoch, cpuShares);
lastUpdateEpoch = measures.lastMeasuredUpdateEndEpoch() > 0 ? measures.lastMeasuredUpdateEndEpoch()
: newUpdateStartEpoch;
lastUpdateEpoch = newUpdateStartEpoch;
return measures;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;

import net.laprun.sustainability.power.SensorMeasure;

public class MapMeasures implements Measures {
private final ConcurrentMap<RegisteredPID, SensorMeasure> measures = new ConcurrentHashMap<>();
private long lastMeasuredUpdateStartEpoch;
private final PIDRegistry registry = new PIDRegistry();

@Override
Expand Down Expand Up @@ -43,22 +41,11 @@ public int numberOfTrackedPIDs() {

@Override
public void record(RegisteredPID pid, SensorMeasure sensorMeasure) {
lastMeasuredUpdateStartEpoch = sensorMeasure.endMs();
measures.put(pid, sensorMeasure);
}

@Override
public SensorMeasure getOrDefault(RegisteredPID pid) {
return measures.getOrDefault(pid, SensorMeasure.missing);
}

@Override
public long lastMeasuredUpdateEndEpoch() {
return lastMeasuredUpdateStartEpoch;
}

@Override
public void forEach(Consumer<? super SensorMeasure> consumer) {
measures.keySet().forEach(pid -> consumer.accept(getOrDefault(pid)));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package net.laprun.sustainability.power.sensors;

import java.util.Set;
import java.util.function.Consumer;

import net.laprun.sustainability.power.SensorMeasure;

Expand Down Expand Up @@ -63,16 +62,8 @@ public interface Measures {
*/
SensorMeasure getOrDefault(RegisteredPID pid);

/**
* Returns the last measured end epoch of an update, if it exists.
*
* @return the last measured end epoch of an update, or {@code -1} if the measure didn't provide that information
*/
long lastMeasuredUpdateEndEpoch();

@SuppressWarnings("unused")
default SensorMeasure getSystemTotal() {
return getOrDefault(RegisteredPID.SYSTEM_TOTAL_REGISTERED_PID);
}

void forEach(Consumer<? super SensorMeasure> consumer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ default boolean supportsProcessAttribution() {

void enableCPUShareSampling(boolean enable);

default long adjustSamplingPeriodIfNeeded(long requestedSamplingPeriodInMillis) {
return requestedSamplingPeriodInMillis;
}

/**
* Stops measuring power consumption
*/
Expand All @@ -50,10 +54,9 @@ default void stop() {
/**
* Starts emitting power consumption measures at the given frequency
*
* @param samplingFrequencyInMillis the number of milliseconds between emitted measures
* @throws Exception if the sensor couldn't be started for some reason
*/
void start(long samplingFrequencyInMillis) throws Exception;
void start() throws Exception;

/**
* Registers the provided process identifier (pid) with the sensor in case it can provide per-process measures. For sensors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkus.logging.Log;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.Cancellable;
Expand All @@ -26,7 +27,7 @@
@ApplicationScoped
public class SamplingMeasurer {

public static final String DEFAULT_SAMPLING_PERIOD = "PT0.5S";
public static final String DEFAULT_SAMPLING_PERIOD = "PT1S";
@Inject
PowerSensor sensor;

Expand Down Expand Up @@ -67,7 +68,9 @@ RegisteredPID track(long pid) throws Exception {
final var registeredPID = sensor.register(pid);

if (!sensor.isStarted()) {
sensor.start(samplingPeriod.toMillis());
final var adjusted = sensor.adjustSamplingPeriodIfNeeded(samplingPeriod.toMillis());
Log.infof("%s sensor adjusted its sampling period to %dms", sensor.getClass().getSimpleName(), adjusted);
sensor.start();

final var samplingTicks = Multi.createFrom().ticks().every(samplingPeriod);

Expand All @@ -76,7 +79,7 @@ RegisteredPID track(long pid) throws Exception {
final var cpuSharesMulti = Multi.createFrom().ticks()
// over sample but over a shorter period to ensure we have an average that covers most of the sampling period
.every(samplingPeriod.minus(50, ChronoUnit.MILLIS).dividedBy(overSamplingFactor))
.runSubscriptionOn(Infrastructure.getDefaultExecutor())
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.map(tick -> CPUShare.cpuSharesFor(sensor.getRegisteredPIDs()))
.group()
.intoLists()
Expand All @@ -100,7 +103,6 @@ RegisteredPID track(long pid) throws Exception {
.combining()
.streams(samplingTicks, cpuSharesMulti)
.asTuple()
.log()
.map(this::updateSensor);
} else {
periodicSensorCheck = samplingTicks
Expand Down Expand Up @@ -152,6 +154,7 @@ public void stop() {
manuallyTrackedProcesses.values().forEach(Cancellable::cancel);
}

@SuppressWarnings("unused")
public void stopTrackingProcess(long processId) {
sensor.unregister(RegisteredPID.create(processId));
// cancel associated process tracking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ private static SortedMap<String, RAPLFile> defaultRAPLFiles() {
return files;
}

@SuppressWarnings("NonAsciiCharacters")
IntelRAPLSensor(SortedMap<String, RAPLFile> files) {
if (files.isEmpty())
throw new RuntimeException("Failed to get RAPL energy readings, probably due to lack of read access ");
Expand Down Expand Up @@ -101,7 +102,7 @@ private static boolean addFileIfReadable(String raplFileAsString, SortedMap<Stri
}

@Override
public void doStart(long frequency) {
public void doStart() {
// perform an initial measure to prime the data
readAndRecordSensor(null, lastUpdateEpoch());
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected SensorMetadata nativeMetadata() {
}

@Override
protected void doStart(long samplingFrequencyInMillis) {
protected void doStart() {
// nothing to do here by default
lastCalled = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class NuProcessWrapper implements ProcessWrapper {
private PowermetricsProcessHandler measureHandler;
private String periodInMilliSecondsAsString;

@SuppressWarnings("UnusedReturnValue")
private NuProcess exec(PowermetricsProcessHandler handler) {
if (handler == null)
throw new IllegalArgumentException("Handler cannot be null");
Expand All @@ -29,8 +30,6 @@ public InputStream streamForMetadata() {

@Override
public void start(long periodInMilliSeconds) {
// todo? check if asked period is the same as the current used one
periodInMilliSeconds = periodInMilliSeconds > 100 ? periodInMilliSeconds - 50 : periodInMilliSeconds;
this.periodInMilliSecondsAsString = Long.toString(periodInMilliSeconds);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import java.io.InputStream;

public class ProcessMacOSPowermetricsSensor extends MacOSPowermetricsSensor {
private static final int MINIMAL_PERIOD_MS = 700;
private final ProcessWrapper processWrapper = new NuProcessWrapper();
private long samplingPeriodInMillis;

public ProcessMacOSPowermetricsSensor() {
// extract metadata
Expand All @@ -15,8 +17,21 @@ public ProcessMacOSPowermetricsSensor() {
}

@Override
public void doStart(long frequency) {
processWrapper.start(frequency);
public long adjustSamplingPeriodIfNeeded(long requestedSamplingPeriodInMillis) {
if (requestedSamplingPeriodInMillis < MINIMAL_PERIOD_MS) {
throw new IllegalArgumentException(
"powermetrics takes around 500ms of incompressible time for each sample, set the sampling period to at least "
+
MINIMAL_PERIOD_MS + " milliseconds to get useful results to also account for processing time.");
}
samplingPeriodInMillis = requestedSamplingPeriodInMillis - 600;
return samplingPeriodInMillis;
}

@Override
public void doStart() {
super.doStart();
processWrapper.start(samplingPeriodInMillis);
}

@Override
Expand All @@ -26,7 +41,9 @@ protected InputStream getInputStream() {

@Override
public void stop() {
processWrapper.stop();
super.stop();
if (isStarted()) {
processWrapper.stop();
super.stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected SensorMetadata nativeMetadata() {
}

@Override
public void doStart(long samplingFrequencyInMillis) {
public void doStart() {
// nothing to do
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected void readAndRecordSensor(BiConsumer<Long, Integer> onReadingSensorValu
void wattComputationShouldWork() throws Exception {
final var raplFile = new TestRAPLFile(10000L, 20000L, 30000L);
final var sensor = new TestIntelRAPLSensor(new TreeMap<>(Map.of("sensor", raplFile)));
sensor.start(500);
sensor.start();
Thread.sleep(10); // ensure we get enough time between the measure performed during start and the first update
final var pid = sensor.register(1234L);
final var measures = sensor.update(1L, Map.of());
Expand All @@ -118,7 +118,7 @@ void shouldIncludeCPUShareIfRequested() throws Exception {
final var raplFile = new TestRAPLFile(10000L, 20000L, 30000L);
final var sensor = new TestIntelRAPLSensor(new TreeMap<>(Map.of("sensor", raplFile)));
sensor.enableCPUShareSampling(true);
sensor.start(500);
sensor.start();
final var pid = sensor.register(1234L);
double cpuShare = 0.3;
final var measures = sensor.update(1L, Map.of("1234", cpuShare));
Expand All @@ -132,6 +132,7 @@ void shouldIncludeCPUShareIfRequested() throws Exception {
assertEquals(cpuShare, components[2]);
}

@SuppressWarnings("SameParameterValue")
private SensorMetadata loadMetadata(String... fileNames) {
Class<? extends IntelRAPLSensorTest> clazz = getClass();
final var files = Arrays.stream(fileNames)
Expand Down
Loading