Skip to content

Commit

Permalink
specify workload as argument
Browse files Browse the repository at this point in the history
Signed-off-by: Plamen Petrov <plamb0brt@gmail.com>
  • Loading branch information
plamenmpetrov committed Nov 26, 2020
1 parent 3758056 commit bad6f38
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 133 deletions.
260 changes: 129 additions & 131 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
var (
parallelNum = flag.Int("parallel", 1, "Number of parallel instances to start")
iterNum = flag.Int("iter", 1, "Number of iterations to run")
funcName = flag.String("funcName", "helloworld", "Name of the function to benchmark")
)

func TestBenchParallelServe(t *testing.T) {
Expand All @@ -59,73 +60,67 @@ func TestBenchParallelServe(t *testing.T) {
concurrency = 2
)

imageName, isPresent := images[*funcName]
require.True(t, isPresent, "Function is not supported")

funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst)

createResultsDir()

for funcName, imageName := range images {
// Pull image
resp, _, err := funcPool.Serve(context.Background(), "plr_fnc", imageName, "record")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, record_response!")
// Pull image
resp, _, err := funcPool.Serve(context.Background(), "plr_fnc", imageName, "record")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, record_response!")

createSnapshots(t, concurrency, vmID, imageName, isSyncOffload)
log.Info("All snapshots created")
createSnapshots(t, concurrency, vmID, imageName, isSyncOffload)
log.Info("All snapshots created")

createRecords(t, concurrency, vmID, imageName, isSyncOffload)
log.Info("All records done")
createRecords(t, concurrency, vmID, imageName, isSyncOffload)
log.Info("All records done")

// Measure
var vmGroup sync.WaitGroup
semLoad := make(chan bool, 1000)
// Measure
var vmGroup sync.WaitGroup

if !*isWithCache {
dropPageCache()
}
if !*isWithCache {
dropPageCache()
}

tStart := time.Now()
for i := 0; i < parallel; i++ {
vmIDString := strconv.Itoa(vmID + i)
tStart := time.Now()
for i := 0; i < parallel; i++ {
vmIDString := strconv.Itoa(vmID + i)

semLoad <- true
vmGroup.Add(1)
vmGroup.Add(1)

go func(i int) {
defer vmGroup.Done()
defer func() { <-semLoad }()
go func(i int) {
defer vmGroup.Done()

resp, metr, err := funcPool.Serve(context.Background(), vmIDString, imageName, "replay")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, replay_response!")
resp, metr, err := funcPool.Serve(context.Background(), vmIDString, imageName, "replay")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, replay_response!")

serveMetrics[i] = metr
}(i)
}
serveMetrics[i] = metr
}(i)
}

for i := 0; i < cap(semLoad); i++ {
semLoad <- true
}
vmGroup.Wait()
vmGroup.Wait()

log.Printf("Started %d instances in %d milliseconds", parallel, time.Since(tStart).Milliseconds())
log.Printf("Started %d instances in %d milliseconds", parallel, time.Since(tStart).Milliseconds())

for i := 0; i < parallel; i++ {
vmIDString := strconv.Itoa(vmID + i)
message, err := funcPool.RemoveInstance(vmIDString, imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)
for i := 0; i < parallel; i++ {
vmIDString := strconv.Itoa(vmID + i)
message, err := funcPool.RemoveInstance(vmIDString, imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)

if *isUPFEnabledTest {
memManagerMetrics, err := orch.GetUPFLatencyStats(vmIDString + "_0")
require.NoError(t, err, "Failed to ge tupf metrics")
require.Equal(t, len(memManagerMetrics), 1, "wrong length")
upfMetrics[i] = memManagerMetrics[0]
}
if *isUPFEnabledTest {
memManagerMetrics, err := orch.GetUPFLatencyStats(vmIDString + "_0")
require.NoError(t, err, "Failed to ge tupf metrics")
require.Equal(t, len(memManagerMetrics), 1, "wrong length")
upfMetrics[i] = memManagerMetrics[0]
}
}

fusePrintMetrics(t, serveMetrics, upfMetrics, isUPFEnabledTest, true, funcName, "parallelServe.csv")
fusePrintMetrics(t, serveMetrics, upfMetrics, isUPFEnabledTest, true, *funcName, "parallelServe.csv")

vmID += parallel
}
}

func TestBenchWarmServe(t *testing.T) {
Expand All @@ -138,59 +133,59 @@ func TestBenchWarmServe(t *testing.T) {
memManagerMetrics []*metrics.Metric
)

imageName, isPresent := images[*funcName]
require.True(t, isPresent, "Function is not supported")

funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst)

createResultsDir()

for funcName, imageName := range images {
vmIDString := strconv.Itoa(vmID)

// First time invoke (cold start)
resp, _, err := funcPool.Serve(context.Background(), vmIDString, imageName, "replay")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, replay_response!")

// just use the last measurement for memory footprint
memFootprint, err := getMemFootprint()
if err != nil {
log.Warnf("Failed to get memory footprint of VM=%s, image=%s\n", vmIDString, imageName)
}
vmIDString := strconv.Itoa(vmID)

// Measure warm
serveMetrics := make([]*metrics.Metric, *iterNum)
// First time invoke (cold start)
resp, _, err := funcPool.Serve(context.Background(), vmIDString, imageName, "replay")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, replay_response!")

for k := 0; k < *iterNum; k++ {
if !*isWithCache {
dropPageCache()
}
// memory footprint
memFootprint, err := getMemFootprint()
if err != nil {
log.Warnf("Failed to get memory footprint of VM=%s, image=%s\n", vmIDString, imageName)
}

resp, met, err := funcPool.Serve(context.Background(), vmIDString, imageName, "replay")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, replay_response!")
// Measure warm
serveMetrics := make([]*metrics.Metric, *iterNum)

serveMetrics[k] = met
time.Sleep(2 * time.Second)
for k := 0; k < *iterNum; k++ {
if !*isWithCache {
dropPageCache()
}

message, err := funcPool.RemoveInstance(vmIDString, imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)
resp, met, err := funcPool.Serve(context.Background(), vmIDString, imageName, "replay")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, replay_response!")

// FUSE
if orch.GetUPFEnabled() {
// Page stats
err = funcPool.DumpUPFPageStats(vmIDString, imageName, funcName, getOutFile("pageStats.csv"))
require.NoError(t, err, "Failed to dump page stats for"+funcName)
serveMetrics[k] = met
}

memManagerMetrics, err = orch.GetUPFLatencyStats(vmIDString + "_0")
require.NoError(t, err, "Failed to dump get stats for "+funcName)
require.Equal(t, len(serveMetrics), len(memManagerMetrics), "different metrics lengths")
}
message, err := funcPool.RemoveInstance(vmIDString, imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)

fusePrintMetrics(t, serveMetrics, memManagerMetrics, isUPFEnabledTest, true, funcName, "serve.csv")
// FUSE
if orch.GetUPFEnabled() {
// Page stats
err = funcPool.DumpUPFPageStats(vmIDString, imageName, *funcName, getOutFile("pageStats.csv"))
require.NoError(t, err, "Failed to dump page stats for"+*funcName)

appendMemFootprint(getOutFile("serve.csv"), memFootprint)
vmID++
memManagerMetrics, err = orch.GetUPFLatencyStats(vmIDString + "_0")
require.NoError(t, err, "Failed to dump get stats for "+*funcName)
require.Equal(t, len(serveMetrics), len(memManagerMetrics), "different metrics lengths")
}

fusePrintMetrics(t, serveMetrics, memManagerMetrics, isUPFEnabledTest, true, *funcName, "serve.csv")

appendMemFootprint(getOutFile("serve.csv"), memFootprint)

}

func TestBenchServe(t *testing.T) {
Expand All @@ -203,56 +198,59 @@ func TestBenchServe(t *testing.T) {
memManagerMetrics []*metrics.Metric
)

imageName, isPresent := images[*funcName]
require.True(t, isPresent, "Function is not supported")

funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst)

createResultsDir()

for funcName, imageName := range images {
// Pull image
resp, _, err := funcPool.Serve(context.Background(), "plr_fnc", imageName, "record")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, record_response!")
// Pull image
resp, _, err := funcPool.Serve(context.Background(), "plr_fnc", imageName, "record")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, record_response!")

vmIDString := strconv.Itoa(vmID)
vmIDString := strconv.Itoa(vmID)

createSnapshots(t, 1, vmID, imageName, isSyncOffload)
createSnapshots(t, 1, vmID, imageName, isSyncOffload)

createRecords(t, 1, vmID, imageName, isSyncOffload)
createRecords(t, 1, vmID, imageName, isSyncOffload)

// Measure
serveMetrics := make([]*metrics.Metric, *iterNum)
// Measure
serveMetrics := make([]*metrics.Metric, *iterNum)

for k := 0; k < *iterNum; k++ {
if !*isWithCache {
dropPageCache()
}
for k := 0; k < *iterNum; k++ {
if !*isWithCache {
dropPageCache()
}

resp, met, err := funcPool.Serve(context.Background(), vmIDString, imageName, "replay")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, replay_response!")
resp, met, err := funcPool.Serve(context.Background(), vmIDString, imageName, "replay")
require.NoError(t, err, "Function returned error")
require.Equal(t, resp.Payload, "Hello, replay_response!")

serveMetrics[k] = met
time.Sleep(1 * time.Second)
message, err := funcPool.RemoveInstance(vmIDString, imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)
time.Sleep(5 * time.Second)
}
serveMetrics[k] = met

// FUSE
if orch.GetUPFEnabled() {
// Page stats
err = funcPool.DumpUPFPageStats(vmIDString, imageName, funcName, getOutFile("pageStats.csv"))
require.NoError(t, err, "Failed to dump page stats for"+funcName)
time.Sleep(1 * time.Second) // this helps kworker hanging

memManagerMetrics, err = orch.GetUPFLatencyStats(vmIDString + "_0")
require.NoError(t, err, "Failed to dump get stats for "+funcName)
require.Equal(t, len(serveMetrics), len(memManagerMetrics), "different metrics lengths")
}
message, err := funcPool.RemoveInstance(vmIDString, imageName, isSyncOffload)
require.NoError(t, err, "Function returned error, "+message)

fusePrintMetrics(t, serveMetrics, memManagerMetrics, isUPFEnabledTest, true, funcName, "serve.csv")
time.Sleep(3 * time.Second) // this helps kworker hanging
}

// FUSE
if orch.GetUPFEnabled() {
// Page stats
err = funcPool.DumpUPFPageStats(vmIDString, imageName, *funcName, getOutFile("pageStats.csv"))
require.NoError(t, err, "Failed to dump page stats for"+*funcName)

vmID++
memManagerMetrics, err = orch.GetUPFLatencyStats(vmIDString + "_0")
require.NoError(t, err, "Failed to dump get stats for "+*funcName)
require.Equal(t, len(serveMetrics), len(memManagerMetrics), "different metrics lengths")
}

fusePrintMetrics(t, serveMetrics, memManagerMetrics, isUPFEnabledTest, true, *funcName, "serve.csv")

}

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -356,18 +354,18 @@ func getOutFile(name string) string {

func getAllImages() map[string]string {
return map[string]string{
//"helloworld": "ustiugov/helloworld:var_workload",
//"chameleon": "ustiugov/chameleon:var_workload",
//"pyaes": "ustiugov/pyaes:var_workload",
//"image_rotate": "ustiugov/image_rotate:var_workload",
//"image_rotate_s3": "ustiugov/image_rotate_s3:var_workload",
//"json_serdes": "ustiugov/json_serdes:var_workload",
//"json_serdes_s3": "ustiugov/json_serdes_s3:var_workload",
//"lr_serving": "ustiugov/lr_serving:var_workload",
//"cnn_serving": "ustiugov/cnn_serving:var_workload",
//"rnn_serving": "ustiugov/rnn_serving:var_workload",
//"lr_training_s3": "ustiugov/lr_training_s3:var_workload",
//"lr_training": "ustiugov/lr_training:var_workload",
"helloworld": "ustiugov/helloworld:var_workload",
"chameleon": "ustiugov/chameleon:var_workload",
"pyaes": "ustiugov/pyaes:var_workload",
"image_rotate": "ustiugov/image_rotate:var_workload",
"image_rotate_s3": "ustiugov/image_rotate_s3:var_workload",
"json_serdes": "ustiugov/json_serdes:var_workload",
"json_serdes_s3": "ustiugov/json_serdes_s3:var_workload",
"lr_serving": "ustiugov/lr_serving:var_workload",
"cnn_serving": "ustiugov/cnn_serving:var_workload",
"rnn_serving": "ustiugov/rnn_serving:var_workload",
"lr_training_s3": "ustiugov/lr_training_s3:var_workload",
"lr_training": "ustiugov/lr_training:var_workload",
"video_processing_s3": "ustiugov/video_processing_s3:var_workload",
}
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ require (
github.com/sirupsen/logrus v1.7.0
github.com/stretchr/testify v1.6.1
github.com/ustiugov/fccd-orchestrator/cri v0.0.0-20201120135927-438ab32ec4cf
github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20201125102721-6fa5429c84a2
github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20201125164330-518d167f276c
github.com/ustiugov/fccd-orchestrator/helloworld v0.0.0-20201120135927-438ab32ec4cf
github.com/ustiugov/fccd-orchestrator/metrics v0.0.0-20201125102721-6fa5429c84a2
github.com/ustiugov/fccd-orchestrator/metrics v0.0.0-20201125164330-518d167f276c
github.com/ustiugov/fccd-orchestrator/proto v0.0.0-20201120135927-438ab32ec4cf
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
google.golang.org/grpc v1.33.1
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,8 @@ github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20201120144936-437e526256c
github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20201120144936-437e526256c4/go.mod h1:DS6gkWRTK1toUmPiL2cYfsslTNyje5On+cqYteAL4+k=
github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20201125102721-6fa5429c84a2 h1:qYeXoVTt8yfecExiYlmERn36FfAV01cn218n7hn1K38=
github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20201125102721-6fa5429c84a2/go.mod h1:LJCI4BH2gNParx0LYfWc6mtgjV1ky5QQfTs64MpzulI=
github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20201125164330-518d167f276c h1:AstorD8uiAXq0DEXcEbHGi69XYm3gXsup7o43VqDMpc=
github.com/ustiugov/fccd-orchestrator/ctriface v0.0.0-20201125164330-518d167f276c/go.mod h1:rVbfSxjHx0lcVJziyP9XA/QZLq3oDCnnuzI47vCQKI0=
github.com/ustiugov/fccd-orchestrator/helloworld v0.0.0-20200714124456-347054937ceb/go.mod h1:5dhCs/XynpQoQcrhd/YgUBjGahhNpTknQUcC1kHRCaA=
github.com/ustiugov/fccd-orchestrator/helloworld v0.0.0-20200717125634-528c6e9f9cc9/go.mod h1:5dhCs/XynpQoQcrhd/YgUBjGahhNpTknQUcC1kHRCaA=
github.com/ustiugov/fccd-orchestrator/helloworld v0.0.0-20200803195925-0629e1cf4599 h1:I2KBmkOfR2BmRp8h2VYomP+YMNGZC4fUmKkgfMS/xq0=
Expand All @@ -1186,6 +1188,9 @@ github.com/ustiugov/fccd-orchestrator/metrics v0.0.0-20201120135927-438ab32ec4cf
github.com/ustiugov/fccd-orchestrator/metrics v0.0.0-20201125102612-103ba78db6a1/go.mod h1:4eWjym+7yP5mIXYPxDWQMTxnPvAqTd1XrqGBQogi0hg=
github.com/ustiugov/fccd-orchestrator/metrics v0.0.0-20201125102721-6fa5429c84a2 h1:6T64uFcxH58NBwRaiDOw4gZ1gXSMPzK+uSSFoL+/Otk=
github.com/ustiugov/fccd-orchestrator/metrics v0.0.0-20201125102721-6fa5429c84a2/go.mod h1:4eWjym+7yP5mIXYPxDWQMTxnPvAqTd1XrqGBQogi0hg=
github.com/ustiugov/fccd-orchestrator/metrics v0.0.0-20201125163225-42b4c3d8464b/go.mod h1:4eWjym+7yP5mIXYPxDWQMTxnPvAqTd1XrqGBQogi0hg=
github.com/ustiugov/fccd-orchestrator/metrics v0.0.0-20201125164330-518d167f276c h1:r2M/i/VJxEQ5bY6DkRLsL50P3tHfWZLNwt7sZI2hjzI=
github.com/ustiugov/fccd-orchestrator/metrics v0.0.0-20201125164330-518d167f276c/go.mod h1:4eWjym+7yP5mIXYPxDWQMTxnPvAqTd1XrqGBQogi0hg=
github.com/ustiugov/fccd-orchestrator/misc v0.0.0-20201029104011-eea62f85005b h1:naW2WC1OvMeSF2hjEeE8OwkHI23LDpx8meAuT8TBC2w=
github.com/ustiugov/fccd-orchestrator/misc v0.0.0-20201029104011-eea62f85005b/go.mod h1:5kdW5csHzZtOU2R+5aJLdVdR+S73rbIls+b4TEIeN50=
github.com/ustiugov/fccd-orchestrator/misc v0.0.0-20201030144900-0da0ca04b744 h1:l4R8w6j2Uc1kTwEML34s5d66Zi8SW3VtICwO/XKYcsA=
Expand Down

0 comments on commit bad6f38

Please sign in to comment.