Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,16 +1,40 @@
package net.laprun.sustainability.power.sensors;

import java.util.Map;
import java.util.Set;

import io.quarkus.logging.Log;
import net.laprun.sustainability.power.SensorMetadata;
import net.laprun.sustainability.power.SensorUnit;

public abstract class AbstractPowerSensor<M extends Measures> implements PowerSensor {
protected final M measures;
private long lastUpdateEpoch;
private boolean started;
protected boolean cpuSharesEnabled = false;
private SensorMetadata metadata;

public AbstractPowerSensor(M measures) {
this.measures = measures;
}

@Override
public SensorMetadata metadata() {
if (metadata == null) {
metadata = nativeMetadata();
if (cpuSharesEnabled) {
metadata = SensorMetadata.from(metadata)
.withNewComponent(EXTERNAL_CPU_SHARE_COMPONENT_NAME,
"CPU share estimate based on currently configured strategy used in CPUShare", false,
SensorUnit.decimalPercentage)
.build();
}
}
return metadata;
}

abstract protected SensorMetadata nativeMetadata();

@Override
public RegisteredPID register(long pid) {
Log.debugf("Registered pid: %d", pid);
Expand Down Expand Up @@ -43,12 +67,17 @@ public void stop() {
started = false;
}

@Override
public Set<String> getRegisteredPIDs() {
return measures.trackedPIDsAsString();
}

protected abstract void doStart(long samplingFrequencyInMillis);

@Override
public Measures update(Long tick) {
public Measures update(Long tick, Map<String, Double> cpuShares) {
final long newUpdateStartEpoch = System.currentTimeMillis();
final var measures = doUpdate(lastUpdateEpoch, newUpdateStartEpoch);
final var measures = doUpdate(lastUpdateEpoch, newUpdateStartEpoch, cpuShares);
lastUpdateEpoch = measures.lastMeasuredUpdateEndEpoch() > 0 ? measures.lastMeasuredUpdateEndEpoch()
: newUpdateStartEpoch;
return measures;
Expand All @@ -58,5 +87,5 @@ protected long lastUpdateEpoch() {
return lastUpdateEpoch;
}

abstract protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch);
abstract protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch, Map<String, Double> cpuShares);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,32 @@
public class MapMeasures implements Measures {
private final ConcurrentMap<RegisteredPID, SensorMeasure> measures = new ConcurrentHashMap<>();
private long lastMeasuredUpdateStartEpoch;
private final PIDRegistry registry = new PIDRegistry();

@Override
public RegisteredPID register(long pid) {
final var key = RegisteredPID.create(pid);
measures.put(key, SensorMeasure.missing);
registry.register(key);
return key;
}

@Override
public void unregister(RegisteredPID registeredPID) {
measures.remove(registeredPID);
registry.unregister(registeredPID);
}

@Override
public Set<RegisteredPID> trackedPIDs() {
return measures.keySet();
}

@Override
public Set<String> trackedPIDsAsString() {
return registry.pids();
}

@Override
public int numberOfTrackedPIDs() {
return measures.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public interface Measures {
*/
Set<RegisteredPID> trackedPIDs();

Set<String> trackedPIDsAsString();

/**
* Retrieves the number of tracked processes
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package net.laprun.sustainability.power.sensors;

import java.util.HashSet;
import java.util.Set;

public class PIDRegistry {
private final Set<String> pids = new HashSet<>();

public void register(RegisteredPID registeredPID) {
if (!RegisteredPID.SYSTEM_TOTAL_REGISTERED_PID.equals(registeredPID)) {
pids.add(registeredPID.pidAsString());
}
}

public void unregister(RegisteredPID registeredPID) {
if (!RegisteredPID.SYSTEM_TOTAL_REGISTERED_PID.equals(registeredPID)) {
pids.remove(registeredPID.pidAsString());
}
}

public Set<String> pids() {
return pids;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package net.laprun.sustainability.power.sensors;

import java.util.Map;
import java.util.Set;

import net.laprun.sustainability.power.SensorMetadata;

/**
* A representation of a power-consumption sensor.
*/
public interface PowerSensor {
String EXTERNAL_CPU_SHARE_COMPONENT_NAME = "externalCpuShare";
Double MISSING_CPU_SHARE = -1.0;

/**
* Whether the sensor supports process attribution of power, i.e. is measured power imputed to each process or does
Expand Down Expand Up @@ -61,9 +66,10 @@ default void stop() {
*
* @param tick an ordinal value tracking the number of recorded measures being taken by the sensor since it started
* measuring power consumption
* @param cpuShares externally provided (if available) cpu attribution for each process id
* @return the {@link Measures} object recording the measures this sensor has taken since it started measuring
*/
Measures update(Long tick);
Measures update(Long tick, Map<String, Double> cpuShares);

/**
* Unregisters the specified {@link RegisteredPID} with this sensor thus signaling that clients are not interested in
Expand All @@ -73,4 +79,6 @@ default void stop() {
* registered with this sensor
*/
void unregister(RegisteredPID registeredPID);

Set<String> getRegisteredPIDs();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package net.laprun.sustainability.power.sensors;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -12,10 +16,12 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.Cancellable;
import io.smallrye.mutiny.tuples.Tuple2;
import net.laprun.sustainability.power.ProcessUtils;
import net.laprun.sustainability.power.SensorMeasure;
import net.laprun.sustainability.power.SensorMetadata;
import net.laprun.sustainability.power.persistence.Persistence;
import net.laprun.sustainability.power.sensors.cpu.CPUShare;

@ApplicationScoped
public class SamplingMeasurer {
Expand Down Expand Up @@ -58,9 +64,40 @@ RegisteredPID track(long pid) throws Exception {

if (!sensor.isStarted()) {
sensor.start(samplingPeriod.toMillis());
periodicSensorCheck = Multi.createFrom().ticks()
.every(samplingPeriod)
.map(sensor::update)

final var overSamplingFactor = 3;
final var cpuSharesMulti = Multi.createFrom().ticks()
// over sample but over a shorter period to ensure we have an average that covers most of the sampling period
.every(samplingPeriod.minus(50, ChronoUnit.MILLIS).dividedBy(overSamplingFactor))
.runSubscriptionOn(Infrastructure.getDefaultExecutor())
.map(tick -> CPUShare.cpuSharesFor(sensor.getRegisteredPIDs()))
.group()
.intoLists()
.of(overSamplingFactor)
.map(cpuShares -> {
// first convert list of mappings pid -> cpu to mapping pid -> list of cpu shares
Map<String, List<Double>> pidToRecordedCPUShares = new HashMap<>();
cpuShares.forEach(cpuShare -> cpuShare.forEach(
(p, cpu) -> {
if (cpu != null && cpu > 0) { // drop null values to avoid skewing average even more
pidToRecordedCPUShares.computeIfAbsent(p, unused -> new ArrayList<>()).add(cpu);
}
}));
// then reduce each cpu shares list to their average
Map<String, Double> averages = new HashMap<>(pidToRecordedCPUShares.size());
pidToRecordedCPUShares.forEach((p, values) -> averages.put(p,
values.stream().mapToDouble(Double::doubleValue).average().orElse(0)));
return averages;
});

final var samplingTicks = Multi.createFrom().ticks()
.every(samplingPeriod);
periodicSensorCheck = Multi.createBy()
.combining()
.streams(samplingTicks, cpuSharesMulti)
.asTuple()
.log()
.map(this::updateSensor)
.broadcast()
.withCancellationAfterLastSubscriberDeparture()
.toAtLeast(1)
Expand All @@ -71,6 +108,10 @@ RegisteredPID track(long pid) throws Exception {
return registeredPID;
}

private Measures updateSensor(Tuple2<Long, Map<String, Double>> tuple) {
return sensor.update(tuple.getItem1(), tuple.getItem2());
}

/**
* Starts measuring even in the absence of registered PID. This will record the system's total energy consumption.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
package net.laprun.sustainability.power.sensors.cpu;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.zaxxer.nuprocess.NuProcessBuilder;

import io.quarkus.logging.Log;
import io.vertx.core.impl.cpu.CpuCoreSensor;
import net.laprun.sustainability.power.nuprocess.BaseProcessHandler;

public class PSExtractionStrategy implements ExtractionStrategy {
public static final ExtractionStrategy INSTANCE = new PSExtractionStrategy();
public static final PSExtractionStrategy INSTANCE = new PSExtractionStrategy();

@Override
public Map<String, Double> cpuSharesFor(Set<String> pids) {
if (pids.isEmpty()) {
return Collections.emptyMap();
}
final var cpuShares = new HashMap<String, Double>(pids.size());
final var pidList = String.join(",", pids);
final var cmd = new String[] { "ps", "-p", pidList, "-o", "pid=,pcpu=" };
Expand All @@ -28,16 +33,24 @@ public void onStdout(ByteBuffer buffer, boolean closed) {
extractCPUSharesInto(bytes, cpuShares);
}
}

@Override
public void onExit(int statusCode) {
if (statusCode != 0) {
Log.warnf("Failed to extract CPU shares for pids: %s", pids);
}
cpuShares.clear();
}
};
new NuProcessBuilder(psHandler, psHandler.command()).run();
return cpuShares;
}

static void extractCPUSharesInto(byte[] bytes, Map<String, Double> cpuShares) {
void extractCPUSharesInto(byte[] bytes, Map<String, Double> cpuShares) {
extractCPUSharesInto(new String(bytes), cpuShares);
}

static void extractCPUSharesInto(String input, Map<String, Double> cpuShares) {
void extractCPUSharesInto(String input, Map<String, Double> cpuShares) {
final var lines = input.split("\n");
for (var line : lines) {
if (line.isBlank()) {
Expand All @@ -47,7 +60,7 @@ static void extractCPUSharesInto(String input, Map<String, Double> cpuShares) {
}
}

private static void extractCPUShare(String line, Map<String, Double> cpuShares) {
private void extractCPUShare(String line, Map<String, Double> cpuShares) {
try {
line = line.trim();
int spaceIndex = line.indexOf(' ');
Expand All @@ -63,8 +76,8 @@ private static void extractCPUShare(String line, Map<String, Double> cpuShares)

var pid = line.substring(0, spaceIndex).trim();
var cpuPercentage = line.substring(spaceIndex + 1).trim();
Log.info(pid + " / " + cpuPercentage);
final var value = Double.parseDouble(cpuPercentage);
Log.infof("pid: %s / cpu: %s%%", pid, cpuPercentage);
final var value = Double.parseDouble(cpuPercentage) / fullCPU();
if (value < 0) {
Log.warnf("Invalid CPU share percentage: %s", cpuPercentage);
return;
Expand All @@ -77,4 +90,10 @@ private static void extractCPUShare(String line, Map<String, Double> cpuShares)
Log.warnf("Failed to parse CPU percentage for line: '%s', cause: %s", line, e);
}
}

int fullCPU() {
final var fullCPU = CpuCoreSensor.availableProcessors() * 100;
Log.infof("'potential' full CPU: %d%%", fullCPU);
return fullCPU; // each core contributes 100%
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public IntelRAPLSensor() {

protected IntelRAPLSensor(String... raplFilePaths) {
this(fromPaths(raplFilePaths));
cpuSharesEnabled = false;
}

private static SortedMap<String, RAPLFile> fromPaths(String... raplFilePaths) {
Expand Down Expand Up @@ -126,12 +127,12 @@ static double computePowerInMilliWatts(long newMicroJoules, long prevMicroJoules
}

@Override
public SensorMetadata metadata() {
protected SensorMetadata nativeMetadata() {
return metadata;
}

@Override
protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch) {
protected Measures doUpdate(long lastUpdateEpoch, long newUpdateStartEpoch, Map<String, Double> cpuShares) {
final var measure = new double[raplFiles.length];
readAndRecordSensor((value, index) -> measure[index] = computePowerInMilliWatts(index, value, newUpdateStartEpoch),
newUpdateStartEpoch);
Expand Down
Loading
Loading