Skip to content

Commit

Permalink
[GR-38004] Fix microservice measurement issues.
Browse files Browse the repository at this point in the history
PullRequest: graal/11558
  • Loading branch information
christianhaeubl committed Apr 7, 2022
2 parents 8db16b1 + 06399d5 commit 8b10078
Showing 1 changed file with 41 additions and 29 deletions.
70 changes: 41 additions & 29 deletions sdk/mx.sdk/mx_sdk_benchmark.py
Expand Up @@ -265,19 +265,22 @@ def measureTimeToFirstResponse(bmSuite):
url = "{}:{}{}".format(protocolHost, bmSuite.servicePort(), servicePath)
lib = urllib()

measurementStartTime = time.time()
sentRequests = 0
receivedNon200Responses = 0
for i in range(60*10000):
while time.time() - measurementStartTime < 60:
time.sleep(.0001)
if i > 0 and i % 10000 == 0:
mx.log("Sent {:d} requests so far but did not receive a response with code 200 yet.".format(i))
if sentRequests > 0 and sentRequests % 10000 == 0:
mx.log("Sent {:d} requests so far but did not receive a response with code 200 yet.".format(sentRequests))

try:
res = lib.urlopen(url)
sentRequests += 1
res = lib.urlopen(url, timeout=10)
responseCode = res.getcode()
if responseCode == 200:
startTime = mx.get_last_subprocess_start_time()
processStartTime = mx.get_last_subprocess_start_time()
finishTime = datetime.datetime.now()
msToFirstResponse = (finishTime - startTime).total_seconds() * 1000
msToFirstResponse = (finishTime - processStartTime).total_seconds() * 1000
currentOutput = "First response received in {} ms".format(msToFirstResponse)
bmSuite.timeToFirstResponseOutputs.append(currentOutput)
mx.log(currentOutput)
Expand All @@ -291,7 +294,7 @@ def measureTimeToFirstResponse(bmSuite):
except IOError:
pass

mx.abort("Failed measure time to first response. Service not reachable at " + url)
mx.abort("Failed to measure time to first response. Service not reachable at " + url)


class BaseMicroserviceBenchmarkSuite(mx_benchmark.JavaBenchmarkSuite, NativeImageBenchmarkMixin):
Expand Down Expand Up @@ -428,9 +431,6 @@ def terminateApplication(port):
proc = BaseMicroserviceBenchmarkSuite.waitForPort(port, 0)
if proc:
proc.send_signal(signal.SIGTERM)
# We can't use proc.wait() as this would destroy the return code for the code that started the subprocess.
while proc.is_running():
time.sleep(0.01)
return True
else:
return False
Expand All @@ -440,12 +440,11 @@ def testTimeToFirstResponseInBackground(benchmarkSuite):
mx.log("--------------------------------------------")
mx.log("Started time-to-first-response measurements.")
mx.log("--------------------------------------------")
for _ in range(benchmarkSuite.NumMeasureTimeToFirstResponse):
measureTimeToFirstResponse(benchmarkSuite)
if not BaseMicroserviceBenchmarkSuite.waitForPort(benchmarkSuite.servicePort()):
mx.abort("Failed to find server application in {0}".format(BaseMicroserviceBenchmarkSuite.__name__))
if not BaseMicroserviceBenchmarkSuite.terminateApplication(benchmarkSuite.servicePort()):
mx.abort("Failed to terminate server application in {0}".format(BaseMicroserviceBenchmarkSuite.__name__))
measureTimeToFirstResponse(benchmarkSuite)
if not BaseMicroserviceBenchmarkSuite.waitForPort(benchmarkSuite.servicePort()):
mx.abort("Failed to find server application in {0}".format(BaseMicroserviceBenchmarkSuite.__name__))
if not BaseMicroserviceBenchmarkSuite.terminateApplication(benchmarkSuite.servicePort()):
mx.abort("Failed to terminate server application in {0}".format(BaseMicroserviceBenchmarkSuite.__name__))

@staticmethod
def testStartupPerformanceInBackground(benchmarkSuite):
Expand Down Expand Up @@ -504,59 +503,67 @@ def run_stage(self, vm, stage, server_command, out, err, cwd, nonZeroIsFatal):
mx_benchmark.enable_tracker()

# Measure time-to-first-response multiple times (without any command mapper hooks as those affect the measurement significantly)
self.startDaemonThread(target=BaseMicroserviceBenchmarkSuite.testTimeToFirstResponseInBackground, args=[self])
for _ in range(self.NumMeasureTimeToFirstResponse):
with EmptyEnv():
measurementThread = self.startDaemonThread(target=BaseMicroserviceBenchmarkSuite.testTimeToFirstResponseInBackground, args=[self])
returnCode = mx.run(server_command, out=out, err=err, cwd=cwd, nonZeroIsFatal=nonZeroIsFatal)
measurementThread.join()
if not self.validateReturnCode(returnCode):
mx.abort("The server application unexpectedly ended with return code " + str(returnCode))

# Measure startup performance (without RSS tracker)
self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testStartupPerformanceInBackground, [self])
with EmptyEnv():
measurementThread = self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testStartupPerformanceInBackground, [self])
returnCode = mx.run(serverCommandWithoutTracker, out=out, err=err, cwd=cwd, nonZeroIsFatal=nonZeroIsFatal)
measurementThread.join()
if not self.validateReturnCode(returnCode):
mx.abort("The server application unexpectedly ended with return code " + str(returnCode))

# Measure peak performance (with all command mapper hooks)
self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testPeakPerformanceInBackground, [self])
with EmptyEnv():
measurementThread = self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testPeakPerformanceInBackground, [self])
returnCode = mx.run(serverCommandWithTracker, out=out, err=err, cwd=cwd, nonZeroIsFatal=nonZeroIsFatal)
measurementThread.join()
if not self.validateReturnCode(returnCode):
mx.abort("The server application unexpectedly ended with return code " + str(returnCode))

if self.measureLatency:
# Calibrate for latency measurements (without RSS tracker)
self.startDaemonThread(BaseMicroserviceBenchmarkSuite.calibrateLatencyTestInBackground, [self])
with EmptyEnv():
measurementThread = self.startDaemonThread(BaseMicroserviceBenchmarkSuite.calibrateLatencyTestInBackground, [self])
returnCode = mx.run(serverCommandWithoutTracker, out=out, err=err, cwd=cwd, nonZeroIsFatal=nonZeroIsFatal)
measurementThread.join()
if not self.validateReturnCode(returnCode):
mx.abort("The server application unexpectedly ended with return code " + str(returnCode))

# Measure latency (without RSS tracker)
self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testLatencyInBackground, [self])
with EmptyEnv():
measurementThread = self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testLatencyInBackground, [self])
returnCode = mx.run(serverCommandWithoutTracker, out=out, err=err, cwd=cwd, nonZeroIsFatal=nonZeroIsFatal)
measurementThread.join()
if not self.validateReturnCode(returnCode):
mx.abort("The server application unexpectedly ended with return code " + str(returnCode))

return returnCode
elif stage == 'agent' or 'instrument-run' in stage:
# For the agent and the instrumented run, it is sufficient to run the peak performance workload.
self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testPeakPerformanceInBackground, [self, False])
return mx.run(server_command, out=out, err=err, cwd=cwd, nonZeroIsFatal=nonZeroIsFatal)
measurementThread = self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testPeakPerformanceInBackground, [self, False])
returnCode = mx.run(server_command, out=out, err=err, cwd=cwd, nonZeroIsFatal=nonZeroIsFatal)
measurementThread.join()
return returnCode
else:
mx.abort("Unexpected stage: " + stage)

def startDaemonThread(self, target, args):
thread = threading.Thread(target=target, args=args)
thread.setDaemon(True)
thread.start()
return thread

def rules(self, output, benchmarks, bmSuiteArgs):
return [
mx_benchmark.StdOutRule(
r"^First response received in (?P<firstResponse>\d*[.,]?\d*) ms",
r"First response received in (?P<firstResponse>\d*[.,]?\d*) ms",
{
"benchmark": benchmarks[0],
"bench-suite": self.benchSuiteName(),
Expand Down Expand Up @@ -612,35 +619,40 @@ def run(self, benchmarks, bmSuiteArgs):
datapoints = []
# Measure time-to-first-response (without any command mapper hooks as those affect the measurement significantly)
mx.disable_command_mapper_hooks()
self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testTimeToFirstResponseInBackground, [self])
for _ in range(self.NumMeasureTimeToFirstResponse):
with EmptyEnv():
measurementThread = self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testTimeToFirstResponseInBackground, [self])
datapoints += super(BaseMicroserviceBenchmarkSuite, self).run(benchmarks, remainder)
measurementThread.join()
mx.enable_command_mapper_hooks()

# Measure startup performance (without RSS tracker)
mx_benchmark.disable_tracker()
self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testStartupPerformanceInBackground, [self])
with EmptyEnv():
measurementThread = self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testStartupPerformanceInBackground, [self])
datapoints += super(BaseMicroserviceBenchmarkSuite, self).run(benchmarks, remainder)
measurementThread.join()
mx_benchmark.enable_tracker()

# Measure peak performance (with all command mapper hooks)
self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testPeakPerformanceInBackground, [self])
with EmptyEnv():
measurementThread = self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testPeakPerformanceInBackground, [self])
datapoints += super(BaseMicroserviceBenchmarkSuite, self).run(benchmarks, remainder)
measurementThread.join()

if self.measureLatency:
# Calibrate for latency measurements (without RSS tracker)
mx_benchmark.disable_tracker()
self.startDaemonThread(BaseMicroserviceBenchmarkSuite.calibrateLatencyTestInBackground, [self])
with EmptyEnv():
measurementThread = self.startDaemonThread(BaseMicroserviceBenchmarkSuite.calibrateLatencyTestInBackground, [self])
datapoints += super(BaseMicroserviceBenchmarkSuite, self).run(benchmarks, remainder)
measurementThread.join()

# Measure latency (without RSS tracker)
self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testLatencyInBackground, [self])
with EmptyEnv():
measurementThread = self.startDaemonThread(BaseMicroserviceBenchmarkSuite.testLatencyInBackground, [self])
datapoints += super(BaseMicroserviceBenchmarkSuite, self).run(benchmarks, remainder)
measurementThread.join()
mx_benchmark.enable_tracker()

return datapoints
Expand Down

0 comments on commit 8b10078

Please sign in to comment.