Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@
import java.util.Map;
import java.util.Set;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkus.logging.Log;
import net.laprun.sustainability.power.SensorMetadata;
import net.laprun.sustainability.power.SensorUnit;

public abstract class AbstractPowerSensor<M extends Measures> implements PowerSensor {
protected final M measures;
public abstract class AbstractPowerSensor implements PowerSensor {
protected final Measures measures;
private long lastUpdateEpoch;
private boolean started;
protected boolean cpuSharesEnabled = false;
@ConfigProperty(name = "net.laprun.sustainability.power.enable-cpu-share-sampling", defaultValue = "false")
protected boolean cpuSharesEnabled;
private SensorMetadata metadata;
private int externalCPUShareComponentIndex = -1;

public AbstractPowerSensor(M measures) {
public AbstractPowerSensor(Measures measures) {
this.measures = measures;
}

public AbstractPowerSensor() {
this(new MapMeasures());
}

@Override
public SensorMetadata metadata() {
if (metadata == null) {
Expand All @@ -28,13 +36,28 @@ public SensorMetadata metadata() {
"CPU share estimate based on currently configured strategy used in CPUShare", false,
SensorUnit.decimalPercentage)
.build();
externalCPUShareComponentIndex = metadata.metadataFor(EXTERNAL_CPU_SHARE_COMPONENT_NAME).index();
}
}
return metadata;
}

abstract protected SensorMetadata nativeMetadata();

@Override
public boolean wantsCPUShareSamplingEnabled() {
return cpuSharesEnabled;
}

@Override
public void enableCPUShareSampling(boolean enable) {
cpuSharesEnabled = enable;
}

protected int externalCPUShareComponentIndex() {
return externalCPUShareComponentIndex;
}

@Override
public RegisteredPID register(long pid) {
Log.debugf("Registered pid: %d", pid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ default boolean supportsProcessAttribution() {
return false;
}

boolean wantsCPUShareSamplingEnabled();

void enableCPUShareSampling(boolean enable);

/**
* Stops measuring power consumption
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class SamplingMeasurer {
private Multi<Measures> periodicSensorCheck;
private final Map<Long, Cancellable> manuallyTrackedProcesses = new ConcurrentHashMap<>();

public PowerSensor sensor() {
return sensor;
}

public Multi<SensorMeasure> stream(String pid) throws Exception {
final var parsedPID = validPIDOrFail(pid);
return uncheckedStream(parsedPID);
Expand All @@ -65,39 +69,45 @@ RegisteredPID track(long pid) throws Exception {
if (!sensor.isStarted()) {
sensor.start(samplingPeriod.toMillis());

final var overSamplingFactor = 3;
final var cpuSharesMulti = Multi.createFrom().ticks()
// over sample but over a shorter period to ensure we have an average that covers most of the sampling period
.every(samplingPeriod.minus(50, ChronoUnit.MILLIS).dividedBy(overSamplingFactor))
.runSubscriptionOn(Infrastructure.getDefaultExecutor())
.map(tick -> CPUShare.cpuSharesFor(sensor.getRegisteredPIDs()))
.group()
.intoLists()
.of(overSamplingFactor)
.map(cpuShares -> {
// first convert list of mappings pid -> cpu to mapping pid -> list of cpu shares
Map<String, List<Double>> pidToRecordedCPUShares = new HashMap<>();
cpuShares.forEach(cpuShare -> cpuShare.forEach(
(p, cpu) -> {
if (cpu != null && cpu > 0) { // drop null values to avoid skewing average even more
pidToRecordedCPUShares.computeIfAbsent(p, unused -> new ArrayList<>()).add(cpu);
}
}));
// then reduce each cpu shares list to their average
Map<String, Double> averages = new HashMap<>(pidToRecordedCPUShares.size());
pidToRecordedCPUShares.forEach((p, values) -> averages.put(p,
values.stream().mapToDouble(Double::doubleValue).average().orElse(0)));
return averages;
});

final var samplingTicks = Multi.createFrom().ticks()
.every(samplingPeriod);
periodicSensorCheck = Multi.createBy()
.combining()
.streams(samplingTicks, cpuSharesMulti)
.asTuple()
.log()
.map(this::updateSensor)
final var samplingTicks = Multi.createFrom().ticks().every(samplingPeriod);

if (sensor.wantsCPUShareSamplingEnabled()) {
final var overSamplingFactor = 3;
final var cpuSharesMulti = Multi.createFrom().ticks()
// over sample but over a shorter period to ensure we have an average that covers most of the sampling period
.every(samplingPeriod.minus(50, ChronoUnit.MILLIS).dividedBy(overSamplingFactor))
.runSubscriptionOn(Infrastructure.getDefaultExecutor())
.map(tick -> CPUShare.cpuSharesFor(sensor.getRegisteredPIDs()))
.group()
.intoLists()
.of(overSamplingFactor)
.map(cpuShares -> {
// first convert list of mappings pid -> cpu to mapping pid -> list of cpu shares
Map<String, List<Double>> pidToRecordedCPUShares = new HashMap<>();
cpuShares.forEach(cpuShare -> cpuShare.forEach(
(p, cpu) -> {
if (cpu != null && cpu > 0) { // drop null values to avoid skewing average even more
pidToRecordedCPUShares.computeIfAbsent(p, unused -> new ArrayList<>()).add(cpu);
}
}));
// then reduce each cpu shares list to their average
Map<String, Double> averages = new HashMap<>(pidToRecordedCPUShares.size());
pidToRecordedCPUShares.forEach((p, values) -> averages.put(p,
values.stream().mapToDouble(Double::doubleValue).average().orElse(0)));
return averages;
});
periodicSensorCheck = Multi.createBy()
.combining()
.streams(samplingTicks, cpuSharesMulti)
.asTuple()
.log()
.map(this::updateSensor);
} else {
periodicSensorCheck = samplingTicks
.map(tick -> sensor.update(tick, Map.of()));
}

periodicSensorCheck = periodicSensorCheck
.broadcast()
.withCancellationAfterLastSubscriberDeparture()
.toAtLeast(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

public class PSExtractionStrategy implements ExtractionStrategy {
public static final PSExtractionStrategy INSTANCE = new PSExtractionStrategy();
private final int fullCPU = CpuCoreSensor.availableProcessors() * 100;// each core contributes 100%

@Override
public Map<String, Double> cpuSharesFor(Set<String> pids) {
Expand All @@ -38,8 +39,8 @@ public void onStdout(ByteBuffer buffer, boolean closed) {
public void onExit(int statusCode) {
if (statusCode != 0) {
Log.warnf("Failed to extract CPU shares for pids: %s", pids);
cpuShares.clear();
}
cpuShares.clear();
}
};
new NuProcessBuilder(psHandler, psHandler.command()).run();
Expand Down Expand Up @@ -76,8 +77,8 @@ private void extractCPUShare(String line, Map<String, Double> cpuShares) {

var pid = line.substring(0, spaceIndex).trim();
var cpuPercentage = line.substring(spaceIndex + 1).trim();
Log.infof("pid: %s / cpu: %s%%", pid, cpuPercentage);
final var value = Double.parseDouble(cpuPercentage) / fullCPU();
Log.infof("pid: %s -> cpu: %s/%d%% = %3.2f", pid, cpuPercentage, fullCPU(), value);
if (value < 0) {
Log.warnf("Invalid CPU share percentage: %s", cpuPercentage);
return;
Expand All @@ -92,8 +93,6 @@ private void extractCPUShare(String line, Map<String, Double> cpuShares) {
}

int fullCPU() {
final var fullCPU = CpuCoreSensor.availableProcessors() * 100;
Log.infof("'potential' full CPU: %d%%", fullCPU);
return fullCPU; // each core contributes 100%
return fullCPU;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import net.laprun.sustainability.power.SensorMetadata;
import net.laprun.sustainability.power.sensors.AbstractPowerSensor;
import net.laprun.sustainability.power.sensors.Measures;
import net.laprun.sustainability.power.sensors.RegisteredPID;

/**
* A sensor using Intel's RAPL accessed via Linux' powercap system.
*/
public class IntelRAPLSensor extends AbstractPowerSensor<SingleMeasureMeasures> {
public class IntelRAPLSensor extends AbstractPowerSensor {
private final RAPLFile[] raplFiles;
private final SensorMetadata metadata;
private final int rawOffset;
private SensorMetadata nativeMetadata;
private final long[] lastMeasuredSensorValues;

/**
Expand Down Expand Up @@ -57,12 +59,11 @@ private static SortedMap<String, RAPLFile> defaultRAPLFiles() {
}

IntelRAPLSensor(SortedMap<String, RAPLFile> files) {
super(new SingleMeasureMeasures());
if (files.isEmpty())
throw new RuntimeException("Failed to get RAPL energy readings, probably due to lack of read access ");

raplFiles = files.values().toArray(new RAPLFile[0]);
final var rawOffset = files.size();
rawOffset = files.size();
final var metadata = new ArrayList<SensorMetadata.ComponentMetadata>(rawOffset * 2);
int fileNb = 0;
for (String name : files.keySet()) {
Expand All @@ -72,7 +73,7 @@ private static SortedMap<String, RAPLFile> defaultRAPLFiles() {
name + " (raw micro Joule data)", false, µJ));
fileNb++;
}
this.metadata = new SensorMetadata(metadata,
this.nativeMetadata = new SensorMetadata(metadata,
"Linux RAPL derived information, see https://www.kernel.org/doc/html/latest/power/powercap/powercap.html");
lastMeasuredSensorValues = new long[raplFiles.length];
}
Expand Down Expand Up @@ -128,15 +129,46 @@ static double computePowerInMilliWatts(long newMicroJoules, long prevMicroJoules

@Override
protected SensorMetadata nativeMetadata() {
return metadata;
try {
return nativeMetadata;
} finally {
// "forget" metadata once it's used in parent
nativeMetadata = null;
}
}

@Override
protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch, Map<String, Double> cpuShares) {
final var measure = new double[raplFiles.length];
readAndRecordSensor((value, index) -> measure[index] = computePowerInMilliWatts(index, value, newUpdateStartEpoch),
final var measure = new double[metadata().componentCardinality()];
readAndRecordSensor((value, index) -> {
measure[index] = computePowerInMilliWatts(index, value, newUpdateStartEpoch);
measure[index + rawOffset] = value;
},
newUpdateStartEpoch);
measures.singleMeasure(new SensorMeasure(measure, lastUpdateEpoch, newUpdateStartEpoch));

final var needMultipleMeasures = wantsCPUShareSamplingEnabled() && externalCPUShareComponentIndex() > 0;
final var single = new SensorMeasure(measure, lastUpdateEpoch, newUpdateStartEpoch);
measures.trackedPIDs().forEach(pid -> {
final SensorMeasure m;
if (needMultipleMeasures) {
double cpuShare;
if (RegisteredPID.SYSTEM_TOTAL_REGISTERED_PID.equals(pid)) {
cpuShare = 1.0;
} else {
cpuShare = cpuShares.getOrDefault(pid.pidAsString(), 0.0);
}
// todo: avoid copying array, external cpu share should be recorded as a separate value, not a component maybe?
// copy array
final var copy = new double[measure.length];
System.arraycopy(measure, 0, copy, 0, measure.length);
copy[externalCPUShareComponentIndex()] = cpuShare;
m = new SensorMeasure(copy, lastUpdateEpoch, newUpdateStartEpoch);
} else {
m = single;
}
measures.record(pid, m);
});

return measures;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
import net.laprun.sustainability.power.SensorMeasure;
import net.laprun.sustainability.power.SensorMetadata;
import net.laprun.sustainability.power.sensors.AbstractPowerSensor;
import net.laprun.sustainability.power.sensors.MapMeasures;
import net.laprun.sustainability.power.sensors.Measures;
import net.laprun.sustainability.power.sensors.PowerSensor;
import net.laprun.sustainability.power.sensors.RegisteredPID;

/**
* A macOS powermetrics based {@link PowerSensor} implementation.
*/
public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor<MapMeasures> {
public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor {
/**
* The Central Processing Unit component name
*/
Expand Down Expand Up @@ -57,10 +56,6 @@ public abstract class MacOSPowermetricsSensor extends AbstractPowerSensor<MapMea

private CPU cpu;

public MacOSPowermetricsSensor() {
super(new MapMeasures());
}

@Override
public boolean supportsProcessAttribution() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import net.laprun.sustainability.power.sensors.Measures;

@SuppressWarnings("unused")
public class TestPowerSensor extends AbstractPowerSensor<MapMeasures> {
public class TestPowerSensor extends AbstractPowerSensor {
public static final String CPU = "cpu";
public static final SensorMetadata DEFAULT = new SensorMetadata(
List.of(new SensorMetadata.ComponentMetadata(CPU, 0, "CPU", true, mW)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,35 @@ void wattComputationShouldWork() throws Exception {
final var raplFile = new TestRAPLFile(10000L, 20000L, 30000L);
final var sensor = new TestIntelRAPLSensor(new TreeMap<>(Map.of("sensor", raplFile)));
sensor.start(500);
Thread.sleep(10); // ensure we get enough time between the measure performed during start and the first update
final var pid = sensor.register(1234L);
final var measures = sensor.update(1L, Map.of());
final var components = measures.getOrDefault(pid).components();
assertEquals(1, components.length);
assertEquals(2, components.length);
assertEquals(2, raplFile.callCount());
final var interval = raplFile.measureTimeFor(1) - raplFile.measureTimeFor(0);
final var expected = (double) (raplFile.valueAt(1) - raplFile.valueAt(0)) / interval;
assertEquals(expected, components[0]);
assertEquals(20000, components[1]);
}

@Test
void shouldIncludeCPUShareIfRequested() throws Exception {
final var raplFile = new TestRAPLFile(10000L, 20000L, 30000L);
final var sensor = new TestIntelRAPLSensor(new TreeMap<>(Map.of("sensor", raplFile)));
sensor.enableCPUShareSampling(true);
sensor.start(500);
final var pid = sensor.register(1234L);
double cpuShare = 0.3;
final var measures = sensor.update(1L, Map.of("1234", cpuShare));
final var components = measures.getOrDefault(pid).components();
assertEquals(3, components.length);
assertEquals(2, raplFile.callCount());
final var interval = raplFile.measureTimeFor(1) - raplFile.measureTimeFor(0);
final var expected = (double) (raplFile.valueAt(1) - raplFile.valueAt(0)) / interval;
assertEquals(expected, components[0]);
assertEquals(20000, components[1]);
assertEquals(cpuShare, components[2]);
}

private SensorMetadata loadMetadata(String... fileNames) {
Expand Down
Loading
Loading