From 10e56c289c327b3895bcbeb84235725c66c19b63 Mon Sep 17 00:00:00 2001 From: Philip Gribov Date: Wed, 30 Apr 2025 16:21:28 +0300 Subject: [PATCH 1/5] Add metrics --- .gitignore | 1 + common/connectors/masterconn/structs.go | 17 ++- common/metrics/metrics.go | 99 ++++++++++++++++++ common/testing_system.go | 12 ++- data/compile/config.yaml | 8 -- data/compile/scripts/g++.sh.tmpl | 3 - go.mod | 7 ++ go.sum | 18 +++- invoker/check_pipeline.go | 6 +- invoker/compile_pipeline.go | 46 +++----- invoker/handler.go | 67 +----------- invoker/invoker_test.go | 52 ++++------ invoker/job.go | 104 ++++++++++++------- invoker/pipeline.go | 98 +++++++++++++++-- invoker/pipeline_files.go | 52 ++++++++-- invoker/storage/cache.go | 40 +++---- invoker/storage/storage.go | 12 +-- invoker/test_run_pipeline.go | 36 +++---- lib/customfields/memory.go | 12 ++- lib/customfields/time.go | 12 ++- master/invoker_handlers.go | 6 +- master/queue/jobgenerators/icpc_generator.go | 4 +- master/queue/queue.go | 22 ++-- master/queue/queue_test.go | 20 ++-- master/registry/registry.go | 8 +- 25 files changed, 478 insertions(+), 284 deletions(-) create mode 100644 common/metrics/metrics.go delete mode 100644 data/compile/config.yaml delete mode 100644 data/compile/scripts/g++.sh.tmpl diff --git a/.gitignore b/.gitignore index a51c590..b71da72 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ testing_system cp.sh swag +custom diff --git a/common/connectors/masterconn/structs.go b/common/connectors/masterconn/structs.go index bf58ee8..1ea9caa 100644 --- a/common/connectors/masterconn/structs.go +++ b/common/connectors/masterconn/structs.go @@ -4,19 +4,22 @@ import ( "testing_system/common/connectors/invokerconn" "testing_system/common/constants/verdict" "testing_system/lib/customfields" + "time" ) type InvokerJobResult struct { - JobID string `json:"JobID"` + Job *invokerconn.Job `json:"Job"` Verdict verdict.Verdict `json:"Verdict"` Points *float64 `json:"Points,omitempty"` Error string `json:"Error,omitempty"` // Is set only in case of check failed caused by invoker problems - Statistics *JobResultStatistics `json:"Statistics"` + Statistics *JobResultStatistics `json:"Statistics,omitempty"` InvokerStatus *invokerconn.Status `json:"InvokerStatus"` + + Metrics *InvokerJobMetrics `json:"Metrics"` } type JobResultStatistics struct { @@ -27,3 +30,13 @@ type JobResultStatistics struct { ExitCode int `json:"ExitCode"` // TODO: Add more statistics } + +type InvokerJobMetrics struct { + TestingWaitDuration time.Duration `json:"InvokerWaitDuration"` + TotalSandboxOccupation time.Duration `json:"TotalSandboxOccupation"` + ResourceWaitDuration time.Duration `json:"ResourceWaitDuration"` + FileActionsDuration time.Duration `json:"FileActionsDuration"` + ExecutionWaitDuration time.Duration `json:"ExecutionWaitDuration"` + ExecutionDuration time.Duration `json:"ExecutionDuration"` + SendResultDuration time.Duration `json:"SendResultDuration"` +} diff --git a/common/metrics/metrics.go b/common/metrics/metrics.go new file mode 100644 index 0000000..b21acea --- /dev/null +++ b/common/metrics/metrics.go @@ -0,0 +1,99 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "testing_system/common/connectors/masterconn" +) + +const ( + invokerLabel = "invoker" + jobTypeLabel = "job_type" +) + +type Collector struct { + InvokerJobResults *prometheus.CounterVec + InvokerTestingWaitDuration *prometheus.CounterVec + InvokerSandboxOccupationDuration *prometheus.CounterVec + InvokerResourceWaitDuration *prometheus.CounterVec + InvokerFileActionsDuration *prometheus.CounterVec + InvokerExecutionWaitDuration *prometheus.CounterVec + InvokerExecutionDuration *prometheus.CounterVec + InvokerSendResultDuration *prometheus.CounterVec +} + +func NewCollector() *Collector { + c := &Collector{} + c.InvokerJobResults = createInvokerCounter( + "job_results_count", + "Number of job results received from invoker", + ) + + c.InvokerTestingWaitDuration = createInvokerCounter( + "testing_wait_duration_sum", + "Time submission waits for testing in invoker", + ) + + c.InvokerSandboxOccupationDuration = createInvokerCounter( + "sandbox_occupation_duration_sun", + "Total sandbox time for submission testing in invoker", + ) + + c.InvokerResourceWaitDuration = createInvokerCounter( + "resource_wait_duration_sum", + "Total time spent waiting for resources for submissions to load in invokers", + ) + + c.InvokerFileActionsDuration = createInvokerCounter( + "file_actions_duration_sum", + "Total time spent waiting for file copy to sandbox in invoker", + ) + + c.InvokerExecutionWaitDuration = createInvokerCounter( + "execution_wait_duration_sum", + "Total time spent waiting for execution of process on invoker when sandbox is set up", + ) + + c.InvokerExecutionDuration = createInvokerCounter( + "execution_duration_sum", + "Total time spent on executing processes in sandboxes", + ) + + c.InvokerSendResultDuration = createInvokerCounter( + "send_result_duration_sum", + "Total time spent on sending results from invoker to storage", + ) + return c +} + +func createInvokerCounter( + name string, + help string, +) *prometheus.CounterVec { + counter := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ts", + Subsystem: "invoker", + Name: name, + Help: help, + }, + []string{invokerLabel, jobTypeLabel}, + ) + prometheus.MustRegister(counter) + return counter +} + +func (c *Collector) NewJobResult(result *masterconn.InvokerJobResult) { + labels := prometheus.Labels{ + invokerLabel: result.InvokerStatus.Address, + jobTypeLabel: result.Job.Type.String(), + } + + c.InvokerJobResults.With(labels).Inc() + c.InvokerTestingWaitDuration.With(labels).Add(result.Metrics.TestingWaitDuration.Seconds()) + c.InvokerSandboxOccupationDuration.With(labels).Add(result.Metrics.TotalSandboxOccupation.Seconds()) + c.InvokerResourceWaitDuration.With(labels).Add(result.Metrics.ResourceWaitDuration.Seconds()) + c.InvokerFileActionsDuration.With(labels).Add(result.Metrics.FileActionsDuration.Seconds()) + c.InvokerExecutionWaitDuration.With(labels).Add(result.Metrics.ExecutionWaitDuration.Seconds()) + c.InvokerExecutionDuration.With(labels).Add(result.Metrics.ExecutionDuration.Seconds()) + c.InvokerSendResultDuration.With(labels).Add(result.Metrics.SendResultDuration.Seconds()) +} diff --git a/common/testing_system.go b/common/testing_system.go index 8648481..040045b 100644 --- a/common/testing_system.go +++ b/common/testing_system.go @@ -3,6 +3,7 @@ package common import ( "context" "fmt" + "github.com/prometheus/client_golang/prometheus/promhttp" "os/signal" "runtime" "slices" @@ -12,6 +13,7 @@ import ( "testing_system/common/connectors/masterconn" "testing_system/common/connectors/storageconn" "testing_system/common/db" + "testing_system/common/metrics" "testing_system/lib/logger" "github.com/gin-gonic/gin" @@ -19,9 +21,10 @@ import ( ) type TestingSystem struct { - Config *config.Config - Router *gin.Engine - DB *gorm.DB + Config *config.Config + Router *gin.Engine + DB *gorm.DB + Metrics *metrics.Collector MasterConn *masterconn.Connector StorageConn *storageconn.Connector @@ -54,6 +57,9 @@ func InitTestingSystem(configPath string) *TestingSystem { ts.MasterConn = masterconn.NewConnector(ts.Config.MasterConnection) ts.StorageConn = storageconn.NewConnector(ts.Config.StorageConnection) + ts.Metrics = metrics.NewCollector() + ts.Router.GET("/metrics", gin.WrapH(promhttp.Handler())) + return ts } diff --git a/data/compile/config.yaml b/data/compile/config.yaml deleted file mode 100644 index 40df21c..0000000 --- a/data/compile/config.yaml +++ /dev/null @@ -1,8 +0,0 @@ -DefaultLimits: - TL: "5s" - ML: "1G" - WL: "10s" -Languages: - g++: - TemplateValues: - flags: "-O2 -Wall -Wextra" diff --git a/data/compile/scripts/g++.sh.tmpl b/data/compile/scripts/g++.sh.tmpl deleted file mode 100644 index a678cce..0000000 --- a/data/compile/scripts/g++.sh.tmpl +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -g++ "{{.source}}" -o "{{.binary}}" {{.flags}} \ No newline at end of file diff --git a/go.mod b/go.mod index e8e6404..e18215a 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/go-resty/resty/v2 v2.16.5 github.com/google/uuid v1.6.0 + github.com/prometheus/client_golang v1.22.0 github.com/stretchr/testify v1.10.0 github.com/swaggo/files v1.0.1 github.com/swaggo/gin-swagger v1.6.0 @@ -20,8 +21,10 @@ require ( require ( github.com/KyleBanks/depth v1.2.1 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.13.2 // indirect github.com/bytedance/sonic/loader v0.2.4 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.5 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect @@ -49,8 +52,12 @@ require ( github.com/mattn/go-sqlite3 v1.14.22 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.62.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/swaggo/swag v1.16.4 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect diff --git a/go.sum b/go.sum index e3cc5c4..4759b6b 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bytedance/sonic v1.13.2 h1:8/H1FempDZqC4VqjptGo14QQlJx8VdZJegxs6wwfqpQ= github.com/bytedance/sonic v1.13.2/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= @@ -7,6 +9,8 @@ github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCN github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8= github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= @@ -41,8 +45,8 @@ github.com/go-resty/resty/v2 v2.16.5 h1:hBKqmWrr7uRc3euHVqmh1HTHcKn99Smr7o5spptd github.com/go-resty/resty/v2 v2.16.5/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -83,10 +87,20 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= +github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/invoker/check_pipeline.go b/invoker/check_pipeline.go index ad9452c..9497c85 100644 --- a/invoker/check_pipeline.go +++ b/invoker/check_pipeline.go @@ -58,7 +58,7 @@ func (s *JobPipelineState) generateCheckerRunConfig() error { func (s *JobPipelineState) executeCheckerRunCommand() error { s.executeWaitGroup.Add(1) - s.invoker.RunQueue <- s.runChecker + s.runProcess(s.runChecker) s.executeWaitGroup.Wait() if s.test.checkResult.Err != nil { @@ -175,11 +175,11 @@ func (s *JobPipelineState) uploadTestRunResources() error { func (s *JobPipelineState) uploadCheckerOutput() error { checkerOutputRequest := &storageconn.Request{ Resource: resource.CheckerOutput, - SubmitID: uint64(s.job.Submission.ID), + SubmitID: uint64(s.job.submission.ID), TestID: s.job.Test, File: s.test.checkerOutputReader, } - resp := s.invoker.TS.StorageConn.Upload(checkerOutputRequest) + resp := s.uploadResource(checkerOutputRequest) if resp.Error != nil { return fmt.Errorf("can not upload checker output to storage, error: %s", resp.Error.Error()) } diff --git a/invoker/compile_pipeline.go b/invoker/compile_pipeline.go index 74fb7cb..53747e0 100644 --- a/invoker/compile_pipeline.go +++ b/invoker/compile_pipeline.go @@ -13,34 +13,28 @@ import ( ) func (i *Invoker) fullCompilationPipeline(sandbox sandbox.ISandbox, job *Job) { - s := &JobPipelineState{ - job: job, - invoker: i, - sandbox: sandbox, - compile: new(pipelineCompileData), - loggerData: fmt.Sprintf("compile job: %s submission: %d", job.ID, job.Submission.ID), - } - - s.defers = append(s.defers, job.deferFunc) - defer s.deferFunc() + s := i.newPipeline(sandbox, job) + s.compile = new(pipelineCompileData) + s.loggerData = fmt.Sprintf("compile job: %s submission: %d", job.ID, job.submission.ID) + defer s.checkFinish() logger.Trace("Starting compilation for %s", s.loggerData) err := s.compilationProcessPipeline() if err != nil { logger.Error("Error in %s error: %v", s.loggerData, err) - i.failJob(job, "job %s error: %v", job.ID, err) + s.failJob("job %s error: %v", job.ID, err) return } err = s.uploadCompilationResources() if err != nil { logger.Error("Error in %s error: %v", s.loggerData, err) - i.failJob(job, "job %s error: %v", job.ID, err) + s.failJob("job %s error: %v", job.ID, err) return } - i.successJob(job, s.compile.result) + s.successJob(s.compile.result) } func (s *JobPipelineState) compilationProcessPipeline() error { @@ -49,7 +43,7 @@ func (s *JobPipelineState) compilationProcessPipeline() error { return err } - err = s.loadSource() + err = s.loadSolutionSourceFile() if err != nil { return err } @@ -71,25 +65,11 @@ func (s *JobPipelineState) compilationProcessPipeline() error { return nil } -func (s *JobPipelineState) loadSource() error { - source, err := s.invoker.Storage.Source.Get(uint64(s.job.Submission.ID)) - if err != nil { - return fmt.Errorf("can not get submission source, error: %v", err) - } - s.compile.sourceName = "source_" + filepath.Base(*source) - err = s.copyFileToSandbox(*source, s.compile.sourceName, 0644) - if err != nil { - return fmt.Errorf("can not copy submission source to sandbox, error: %v", err) - } - logger.Trace("Loaded source to sandbox for %s", s.loggerData) - return nil -} - func (s *JobPipelineState) setupCompileScript() error { var ok bool - s.compile.language, ok = s.invoker.Compiler.Languages[s.job.Submission.Language] + s.compile.language, ok = s.invoker.Compiler.Languages[s.job.submission.Language] if !ok { - return fmt.Errorf("submission language %s does not exist", s.job.Submission.Language) + return fmt.Errorf("submission language %s does not exist", s.job.submission.Language) } script, err := s.compile.language.GenerateScript(s.compile.sourceName, solutionBinaryFile) if err != nil { @@ -108,7 +88,7 @@ func (s *JobPipelineState) setupCompileScript() error { func (s *JobPipelineState) executeCompilationCommand() error { s.executeWaitGroup.Add(1) - s.invoker.RunQueue <- s.runCompilationCommand + s.runProcess(s.runCompilationCommand) s.executeWaitGroup.Wait() if s.compile.result.Err != nil { @@ -188,10 +168,10 @@ func (s *JobPipelineState) uploadCompilationResources() error { func (s *JobPipelineState) uploadCompileResult() error { compileOutputStoreRequest := &storageconn.Request{ Resource: resource.CompileOutput, - SubmitID: uint64(s.job.Submission.ID), + SubmitID: uint64(s.job.submission.ID), File: s.compile.messageReader, } - resp := s.invoker.TS.StorageConn.Upload(compileOutputStoreRequest) + resp := s.uploadResource(compileOutputStoreRequest) if resp.Error != nil { return fmt.Errorf("can not upload compile output to storage, error: %v", resp.Error.Error()) } diff --git a/invoker/handler.go b/invoker/handler.go index f9b003a..7aed78a 100644 --- a/invoker/handler.go +++ b/invoker/handler.go @@ -1,15 +1,10 @@ package invoker import ( - "errors" + "github.com/gin-gonic/gin" "net/http" "testing_system/common/connectors/invokerconn" - "testing_system/common/db/models" "testing_system/lib/connector" - "testing_system/lib/logger" - - "github.com/gin-gonic/gin" - "gorm.io/gorm" ) func (i *Invoker) HandleStatus(c *gin.Context) { @@ -42,63 +37,3 @@ func (i *Invoker) HandleNewJob(c *gin.Context) { i.Mutex.Unlock() // We unlock mutex without defer because getStatus uses mutex connector.RespOK(c, i.getStatus()) } - -func (i *Invoker) initJob(c *gin.Context, job *Job) bool { - var submission models.Submission - if err := i.TS.DB.WithContext(c).Find(&submission, job.SubmitID).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - connector.RespErr(c, http.StatusBadRequest, "Submission %d not found", job.SubmitID) - } else { - logger.Error("Error while finding submission in db, error: %s", err.Error()) - connector.RespErr(c, http.StatusInternalServerError, "DB Error") - return false - } - } - job.Submission = &submission - - var problem models.Problem - if err := i.TS.DB.WithContext(c).Find(&problem, job.Submission.ProblemID).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - connector.RespErr(c, http.StatusBadRequest, "Problem %d not found", job.Submission.ProblemID) - } else { - logger.Error("Error while finding problem in db, error: %s", err.Error()) - connector.RespErr(c, http.StatusInternalServerError, "DB Error") - return false - } - } - job.Problem = &problem - return true -} - -func (i *Invoker) newCompileJob(c *gin.Context, job *Job) { - i.Storage.Source.Lock(uint64(job.Submission.ID)) - job.Defers = append(job.Defers, func() { i.Storage.Source.Unlock(uint64(job.Submission.ID)) }) - - i.JobQueue <- job -} - -func (i *Invoker) newTestJob(c *gin.Context, job *Job) bool { - if job.Test <= 0 || job.Test > job.Problem.TestsNumber { - connector.RespErr(c, - http.StatusBadRequest, - "%d test required, tests in problem %d are numbered from 1 to %d", - job.Test, job.Problem.ID, job.Problem.TestsNumber) - return false - } - - i.Storage.Binary.Lock(uint64(job.Submission.ID)) - job.Defers = append(job.Defers, func() { i.Storage.Binary.Unlock(uint64(job.Submission.ID)) }) - - i.Storage.TestInput.Lock(uint64(job.Problem.ID), job.Test) - job.Defers = append(job.Defers, func() { i.Storage.TestInput.Unlock(uint64(job.Problem.ID), job.Test) }) - - i.Storage.TestAnswer.Lock(uint64(job.Problem.ID), job.Test) - job.Defers = append(job.Defers, func() { i.Storage.TestAnswer.Unlock(uint64(job.Problem.ID), job.Test) }) - - i.Storage.Checker.Lock(uint64(job.Problem.ID)) - job.Defers = append(job.Defers, func() { i.Storage.Checker.Unlock(uint64(job.Problem.ID)) }) - - // TODO: interactor - i.JobQueue <- job - return true -} diff --git a/invoker/invoker_test.go b/invoker/invoker_test.go index 9dd5e8e..6ca1dd7 100644 --- a/invoker/invoker_test.go +++ b/invoker/invoker_test.go @@ -71,12 +71,12 @@ func (ts *testState) testCompile(submitID uint) *JobPipelineState { SubmitID: submitID, Type: invokerconn.CompileJob, }, - Problem: &models.Problem{ + problem: &models.Problem{ Model: gorm.Model{ ID: 1, }, }, - Submission: &models.Submission{ + submission: &models.Submission{ Model: gorm.Model{ ID: submitID, }, @@ -90,13 +90,9 @@ func (ts *testState) testCompile(submitID uint) *JobPipelineState { uint64(submitID), )) - s := &JobPipelineState{ - invoker: ts.Invoker, - sandbox: ts.Sandbox, - job: job, - compile: new(pipelineCompileData), - loggerData: fmt.Sprintf("compile job: %s submission: %d", job.ID, job.Submission.ID), - } + s := ts.Invoker.newPipeline(ts.Sandbox, job) + s.compile = new(pipelineCompileData) + s.loggerData = fmt.Sprintf("compile job: %s submission: %d", job.ID, job.submission.ID) require.NoError(ts.t, s.compilationProcessPipeline()) @@ -130,11 +126,11 @@ func testCompileSandbox(t *testing.T, sandboxType string) { require.NoError(t, cmd.Run()) require.Equal(t, "1", strings.TrimSpace(stdout.String())) - s.deferFunc() + s.finish() s = ts.testCompile(2) require.Equal(t, verdict.CE, s.compile.result.Verdict) - s.deferFunc() + s.finish() } func (ts *testState) addProblem(problemID uint) { @@ -169,13 +165,13 @@ func (ts *testState) testRun(submitID uint, problemID uint) *sandbox.RunResult { Type: invokerconn.TestJob, Test: 1, }, - Problem: &models.Problem{ + problem: &models.Problem{ Model: gorm.Model{ ID: problemID, }, TestsNumber: 1, }, - Submission: &models.Submission{ + submission: &models.Submission{ Model: gorm.Model{ ID: submitID, }, @@ -183,8 +179,8 @@ func (ts *testState) testRun(submitID uint, problemID uint) *sandbox.RunResult { Language: "cpp", }, } - job.Problem.TimeLimit.FromStr("1s") - job.Problem.MemoryLimit.FromStr("100m") + job.problem.TimeLimit.FromStr("1s") + job.problem.MemoryLimit.FromStr("100m") sourceDir := fmt.Sprintf("%s/binary/%d", ts.FilesDir, submitID) cmd := exec.Command("g++", "source.cpp", "-std=c++17", "-o", "binary") @@ -193,21 +189,17 @@ func (ts *testState) testRun(submitID uint, problemID uint) *sandbox.RunResult { require.NoError(ts.t, ts.Invoker.Storage.Binary.Insert(filepath.Join(sourceDir, "binary"), uint64(submitID))) - s := &JobPipelineState{ - job: job, - invoker: ts.Invoker, - sandbox: ts.Sandbox, - test: new(pipelineTestData), - loggerData: fmt.Sprintf( - "test job: %s submission: %d problem %d test %d", - job.ID, - job.Submission.ID, - job.Problem.ID, - job.Test, - ), - } - - defer s.deferFunc() + s := ts.Invoker.newPipeline(ts.Sandbox, job) + s.test = new(pipelineTestData) + s.loggerData = fmt.Sprintf( + "test job: %s submission: %d problem %d test %d", + job.ID, + job.submission.ID, + job.problem.ID, + job.Test, + ) + + defer s.finish() require.NoError(ts.t, s.testingProcessPipeline()) diff --git a/invoker/job.go b/invoker/job.go index 7671a32..e38e569 100644 --- a/invoker/job.go +++ b/invoker/job.go @@ -1,64 +1,94 @@ package invoker import ( - "fmt" + "errors" + "github.com/gin-gonic/gin" + "gorm.io/gorm" + "net/http" "slices" "testing_system/common/connectors/invokerconn" - "testing_system/common/connectors/masterconn" - "testing_system/common/constants/verdict" "testing_system/common/db/models" - "testing_system/invoker/sandbox" + "testing_system/lib/connector" "testing_system/lib/logger" + "time" ) type Job struct { invokerconn.Job - Submission *models.Submission `json:"-"` - Problem *models.Problem `json:"-"` + submission *models.Submission + problem *models.Problem - Defers []func() `json:"-"` + defers []func() + createTime time.Time } func (j *Job) deferFunc() { - slices.Reverse(j.Defers) - for _, f := range j.Defers { + slices.Reverse(j.defers) + for _, f := range j.defers { f() } - j.Defers = nil + j.defers = nil } -func (i *Invoker) failJob(j *Job, errf string, args ...interface{}) { - i.Mutex.Lock() - delete(i.ActiveJobs, j.ID) - i.Mutex.Unlock() +func (i *Invoker) initJob(c *gin.Context, job *Job) bool { + job.createTime = time.Now() - request := &masterconn.InvokerJobResult{ - JobID: j.ID, - Verdict: verdict.CF, - Error: fmt.Sprintf(errf, args...), - InvokerStatus: i.getStatus(), + var submission models.Submission + if err := i.TS.DB.WithContext(c).Find(&submission, job.SubmitID).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + connector.RespErr(c, http.StatusBadRequest, "Submission %d not found", job.SubmitID) + } else { + logger.Error("Error while finding submission in db, error: %s", err.Error()) + connector.RespErr(c, http.StatusInternalServerError, "DB Error") + return false + } } - err := i.TS.MasterConn.SendInvokerJobResult(request) - if err != nil { - logger.Error("Can not send job %s result, error: %s", j.ID, err.Error()) + job.submission = &submission + + var problem models.Problem + if err := i.TS.DB.WithContext(c).Find(&problem, job.submission.ProblemID).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + connector.RespErr(c, http.StatusBadRequest, "Problem %d not found", job.submission.ProblemID) + } else { + logger.Error("Error while finding problem in db, error: %s", err.Error()) + connector.RespErr(c, http.StatusInternalServerError, "DB Error") + return false + } } + job.problem = &problem + return true } -func (i *Invoker) successJob(j *Job, runResult *sandbox.RunResult) { - i.Mutex.Lock() - delete(i.ActiveJobs, j.ID) - i.Mutex.Unlock() - - request := &masterconn.InvokerJobResult{ - JobID: j.ID, - Verdict: runResult.Verdict, - Points: runResult.Points, - Statistics: runResult.Statistics, - InvokerStatus: i.getStatus(), - } - err := i.TS.MasterConn.SendInvokerJobResult(request) - if err != nil { - logger.Error("Can not send job %s result, error: %s", j.ID, err.Error()) +func (i *Invoker) newCompileJob(c *gin.Context, job *Job) { + i.Storage.Source.Lock(uint64(job.submission.ID)) + job.defers = append(job.defers, func() { i.Storage.Source.Unlock(uint64(job.submission.ID)) }) + + i.JobQueue <- job +} + +func (i *Invoker) newTestJob(c *gin.Context, job *Job) bool { + if job.Test <= 0 || job.Test > job.problem.TestsNumber { + connector.RespErr(c, + http.StatusBadRequest, + "%d test required, tests in problem %d are numbered from 1 to %d", + job.Test, job.problem.ID, job.problem.TestsNumber) + return false } + + i.Storage.Binary.Lock(uint64(job.submission.ID)) + job.defers = append(job.defers, func() { i.Storage.Binary.Unlock(uint64(job.submission.ID)) }) + + i.Storage.TestInput.Lock(uint64(job.problem.ID), job.Test) + job.defers = append(job.defers, func() { i.Storage.TestInput.Unlock(uint64(job.problem.ID), job.Test) }) + + i.Storage.TestAnswer.Lock(uint64(job.problem.ID), job.Test) + job.defers = append(job.defers, func() { i.Storage.TestAnswer.Unlock(uint64(job.problem.ID), job.Test) }) + + i.Storage.Checker.Lock(uint64(job.problem.ID)) + job.defers = append(job.defers, func() { i.Storage.Checker.Unlock(uint64(job.problem.ID)) }) + + // TODO: interactor + i.JobQueue <- job + return true } diff --git a/invoker/pipeline.go b/invoker/pipeline.go index 443e68a..2ce31a8 100644 --- a/invoker/pipeline.go +++ b/invoker/pipeline.go @@ -5,8 +5,12 @@ import ( "io" "slices" "sync" + "testing_system/common/connectors/masterconn" + "testing_system/common/constants/verdict" "testing_system/invoker/compiler" "testing_system/invoker/sandbox" + "testing_system/lib/logger" + "time" ) type JobPipelineState struct { @@ -19,17 +23,12 @@ type JobPipelineState struct { compile *pipelineCompileData test *pipelineTestData - loggerData string + metrics *masterconn.InvokerJobMetrics - defers []func() -} + loggerData string -func (s *JobPipelineState) deferFunc() { - slices.Reverse(s.defers) - for _, f := range s.defers { - f() - } - s.defers = nil + defers []func() + finished bool } type pipelineCompileData struct { @@ -52,6 +51,24 @@ type pipelineTestData struct { hasResources bool } +func (i *Invoker) newPipeline(sandbox sandbox.ISandbox, job *Job) *JobPipelineState { + s := &JobPipelineState{ + sandbox: sandbox, + invoker: i, + job: job, + metrics: new(masterconn.InvokerJobMetrics), + } + + createTime := time.Now() + s.metrics.TestingWaitDuration = createTime.Sub(job.createTime) + s.defers = append(s.defers, func() { + s.metrics.TotalSandboxOccupation = time.Since(createTime) + }) + + s.defers = append(s.defers, job.deferFunc) + return s +} + func (s *JobPipelineState) initSandbox() error { err := s.sandbox.Init() if err != nil { @@ -60,3 +77,66 @@ func (s *JobPipelineState) initSandbox() error { s.defers = append(s.defers, s.sandbox.Cleanup) return nil } + +func (s *JobPipelineState) runProcess(f func()) { + queueEnter := time.Now() + s.invoker.RunQueue <- func() { + processStart := time.Now() + s.metrics.ExecutionWaitDuration += processStart.Sub(queueEnter) + f() + s.metrics.ExecutionDuration += time.Since(processStart) + } +} + +func (s *JobPipelineState) checkFinish() { + if !s.finished { + logger.Panic("Job pipeline for %s was not finished", s.loggerData) + } +} + +func (s *JobPipelineState) finish() { + if s.finished { + logger.Panic("Job pipeline for %s is finished twice", s.loggerData) + } + slices.Reverse(s.defers) + for _, f := range s.defers { + f() + } + s.finished = true + + s.invoker.Mutex.Lock() + defer s.invoker.Mutex.Unlock() + delete(s.invoker.ActiveJobs, s.job.ID) +} + +func (s *JobPipelineState) failJob(errf string, args ...interface{}) { + s.finish() + request := &masterconn.InvokerJobResult{ + Job: &s.job.Job, + Verdict: verdict.CF, + Error: fmt.Sprintf(errf, args...), + InvokerStatus: s.invoker.getStatus(), + Metrics: s.metrics, + } + go s.uploadJobResult(request) +} + +func (s *JobPipelineState) successJob(runResult *sandbox.RunResult) { + s.finish() + request := &masterconn.InvokerJobResult{ + Job: &s.job.Job, + Verdict: runResult.Verdict, + Points: runResult.Points, + Statistics: runResult.Statistics, + InvokerStatus: s.invoker.getStatus(), + Metrics: s.metrics, + } + s.invoker.TS.Go(func() { s.uploadJobResult(request) }) +} + +func (s *JobPipelineState) uploadJobResult(request *masterconn.InvokerJobResult) { + err := s.invoker.TS.MasterConn.SendInvokerJobResult(request) + if err != nil { + logger.Error("Can not send job %s result, error: %s", s.job.ID, err.Error()) + } +} diff --git a/invoker/pipeline_files.go b/invoker/pipeline_files.go index 6841712..7f2b0c1 100644 --- a/invoker/pipeline_files.go +++ b/invoker/pipeline_files.go @@ -7,7 +7,9 @@ import ( "path/filepath" "testing_system/common/connectors/storageconn" "testing_system/common/constants/resource" + "testing_system/invoker/storage" "testing_system/lib/logger" + "time" ) const ( @@ -24,7 +26,7 @@ const ( ) func (s *JobPipelineState) loadSolutionBinary() error { - binary, err := s.invoker.Storage.Binary.Get(uint64(s.job.Submission.ID)) + binary, err := s.loadResource(s.invoker.Storage.Binary, uint64(s.job.submission.ID)) if err != nil { return fmt.Errorf("can not get solution binary, error: %v", err) } @@ -37,7 +39,7 @@ func (s *JobPipelineState) loadSolutionBinary() error { } func (s *JobPipelineState) loadTestInput() error { - testInput, err := s.invoker.Storage.TestInput.Get(uint64(s.job.Problem.ID), s.job.Test) + testInput, err := s.loadResource(s.invoker.Storage.TestInput, uint64(s.job.problem.ID), s.job.Test) if err != nil { return fmt.Errorf("can not get test input, error: %v", err) } @@ -50,7 +52,7 @@ func (s *JobPipelineState) loadTestInput() error { } func (s *JobPipelineState) loadCheckerBinaryFile() error { - checker, err := s.invoker.Storage.Checker.Get(uint64(s.job.Problem.ID)) + checker, err := s.loadResource(s.invoker.Storage.Checker, uint64(s.job.problem.ID)) if err != nil { return fmt.Errorf("can not get checker binary, error: %v", err) } @@ -63,7 +65,7 @@ func (s *JobPipelineState) loadCheckerBinaryFile() error { } func (s *JobPipelineState) loadTestAnswerFile() error { - testAnswer, err := s.invoker.Storage.TestAnswer.Get(uint64(s.job.Problem.ID), s.job.Test) + testAnswer, err := s.loadResource(s.invoker.Storage.TestAnswer, uint64(s.job.problem.ID), s.job.Test) if err != nil { return fmt.Errorf("can not get test answer, error: %s", err.Error()) } @@ -75,6 +77,23 @@ func (s *JobPipelineState) loadTestAnswerFile() error { return nil } +func (s *JobPipelineState) loadSolutionSourceFile() error { + source, err := s.loadResource(s.invoker.Storage.Source, uint64(s.job.submission.ID)) + if err != nil { + return fmt.Errorf("can not get submission source, error: %v", err) + } + if s.compile == nil { + return fmt.Errorf("can not save solution source, pipeline compile field not initialized") + } + s.compile.sourceName = "source_" + filepath.Base(*source) + err = s.copyFileToSandbox(*source, s.compile.sourceName, 0644) + if err != nil { + return fmt.Errorf("can not copy submission source to sandbox, error: %v", err) + } + logger.Trace("Loaded source to sandbox for %s", s.loggerData) + return nil +} + func (s *JobPipelineState) uploadBinary() error { reader, err := s.openSandboxFile(solutionBinaryFile, false) if err != nil { @@ -82,10 +101,10 @@ func (s *JobPipelineState) uploadBinary() error { } binaryStoreRequest := &storageconn.Request{ Resource: resource.CompiledBinary, - SubmitID: uint64(s.job.Submission.ID), + SubmitID: uint64(s.job.submission.ID), File: reader, } - resp := s.invoker.TS.StorageConn.Upload(binaryStoreRequest) + resp := s.uploadResource(binaryStoreRequest) if resp.Error != nil { return fmt.Errorf("can not send solution binary file to storage, error: %v", resp.Error) } @@ -101,11 +120,11 @@ func (s *JobPipelineState) uploadOutput(fileName string, resourceType resource.T outputStoreRequest := &storageconn.Request{ Resource: resourceType, - SubmitID: uint64(s.job.Submission.ID), + SubmitID: uint64(s.job.submission.ID), TestID: s.job.Test, File: reader, } - resp := s.invoker.TS.StorageConn.Upload(outputStoreRequest) + resp := s.uploadResource(outputStoreRequest) if resp.Error != nil { return fmt.Errorf("can not upload %v file to storage, error: %v", resourceType, resp.Error) } @@ -114,6 +133,7 @@ func (s *JobPipelineState) uploadOutput(fileName string, resourceType resource.T } func (s *JobPipelineState) copyFileToSandbox(src string, dst string, perm os.FileMode) error { + start := time.Now() srcReader, err := os.Open(src) if err != nil { return err @@ -125,6 +145,8 @@ func (s *JobPipelineState) copyFileToSandbox(src string, dst string, perm os.Fil } defer dstWriter.Close() _, err = io.Copy(dstWriter, srcReader) + + s.metrics.FileActionsDuration += time.Since(start) return nil } @@ -147,3 +169,17 @@ func (s *JobPipelineState) limitedReader(r io.Reader) io.Reader { return io.LimitReader(r, int64(*s.invoker.TS.Config.Invoker.SaveOutputHead)) } } + +func (s *JobPipelineState) loadResource(getter *storage.CacheGetter, args ...uint64) (*string, error) { + start := time.Now() + res, err := getter.Get(args...) + s.metrics.ResourceWaitDuration += time.Since(start) + return res, err +} + +func (s *JobPipelineState) uploadResource(request *storageconn.Request) *storageconn.Response { + start := time.Now() + resp := s.invoker.TS.StorageConn.Upload(request) + s.metrics.SendResultDuration += time.Since(start) + return resp +} diff --git a/invoker/storage/cache.go b/invoker/storage/cache.go index 19a77fa..bdc4a55 100644 --- a/invoker/storage/cache.go +++ b/invoker/storage/cache.go @@ -11,10 +11,10 @@ import ( // // So we have single LRUSizeCache that holds all file types. Key for this cache is cacheKey, the internal struct with which we can determine the file type and it's keys. // -// To access cache we have cacheGetter structs. Each cacheGetter responds for single file type. -// Each cacheGetter accepts any number of uint64 that are transformed to cacheKey struct using cacheGetter.keyGen func. +// To access cache we have CacheGetter structs. Each CacheGetter responds for single file type. +// Each CacheGetter accepts any number of uint64 that are transformed to cacheKey struct using CacheGetter.keyGen func. // -// So to access files, we call methods of cacheGetter, that transforms our request to request for LRUSizeCache and LRUSizeCache then does all the cache work. +// So to access files, we call methods of CacheGetter, that transforms our request to request for LRUSizeCache and LRUSizeCache then does all the cache work. type commonCache = cache.LRUSizeCache[cacheKey, string] @@ -29,30 +29,30 @@ type cacheKey struct { TestID uint64 `json:"testID"` } -type cacheGetter struct { +type CacheGetter struct { Cache *commonCache keyGen func(vals ...uint64) cacheKey } -func (c *cacheGetter) Get(vals ...uint64) (*string, error) { +func (c *CacheGetter) Get(vals ...uint64) (*string, error) { return c.Cache.Get(c.keyGen(vals...)) } -func (c *cacheGetter) Lock(vals ...uint64) { +func (c *CacheGetter) Lock(vals ...uint64) { c.Cache.Lock(c.keyGen(vals...)) } -func (c *cacheGetter) Unlock(vals ...uint64) error { +func (c *CacheGetter) Unlock(vals ...uint64) error { return c.Cache.Unlock(c.keyGen(vals...)) } // Insert can be used only for testing -func (c *cacheGetter) Insert(file string, vals ...uint64) error { +func (c *CacheGetter) Insert(file string, vals ...uint64) error { return c.Cache.Insert(c.keyGen(vals...), &file, 1) } -func newSourceCache(commonCache *commonCache) *cacheGetter { - return &cacheGetter{ +func newSourceCache(commonCache *commonCache) *CacheGetter { + return &CacheGetter{ Cache: commonCache, keyGen: func(vals ...uint64) cacheKey { return submitKeyGen(resource.SourceCode, vals) @@ -60,8 +60,8 @@ func newSourceCache(commonCache *commonCache) *cacheGetter { } } -func newBinaryCache(commonCache *commonCache) *cacheGetter { - return &cacheGetter{ +func newBinaryCache(commonCache *commonCache) *CacheGetter { + return &CacheGetter{ Cache: commonCache, keyGen: func(vals ...uint64) cacheKey { return submitKeyGen(resource.CompiledBinary, vals) @@ -69,8 +69,8 @@ func newBinaryCache(commonCache *commonCache) *cacheGetter { } } -func newCheckerCache(commonCache *commonCache) *cacheGetter { - return &cacheGetter{ +func newCheckerCache(commonCache *commonCache) *CacheGetter { + return &CacheGetter{ Cache: commonCache, keyGen: func(vals ...uint64) cacheKey { return problemIDKeyGen(resource.Checker, vals) @@ -78,8 +78,8 @@ func newCheckerCache(commonCache *commonCache) *cacheGetter { } } -func newInteractorCache(commonCache *commonCache) *cacheGetter { - return &cacheGetter{ +func newInteractorCache(commonCache *commonCache) *CacheGetter { + return &CacheGetter{ Cache: commonCache, keyGen: func(vals ...uint64) cacheKey { return submitKeyGen(resource.Interactor, vals) @@ -87,15 +87,15 @@ func newInteractorCache(commonCache *commonCache) *cacheGetter { } } -func newTestInputCache(commonCache *commonCache) *cacheGetter { - return &cacheGetter{ +func newTestInputCache(commonCache *commonCache) *CacheGetter { + return &CacheGetter{ Cache: commonCache, keyGen: func(vals ...uint64) cacheKey { return testKeyGen(resource.TestInput, vals) }, } } -func newTestAnswerCache(commonCache *commonCache) *cacheGetter { - return &cacheGetter{ +func newTestAnswerCache(commonCache *commonCache) *CacheGetter { + return &CacheGetter{ Cache: commonCache, keyGen: func(vals ...uint64) cacheKey { return testKeyGen(resource.TestAnswer, vals) }, } diff --git a/invoker/storage/storage.go b/invoker/storage/storage.go index 131f130..ac5961a 100644 --- a/invoker/storage/storage.go +++ b/invoker/storage/storage.go @@ -18,12 +18,12 @@ type InvokerStorage struct { cache *commonCache - Source *cacheGetter - Binary *cacheGetter - Checker *cacheGetter - Interactor *cacheGetter - TestInput *cacheGetter - TestAnswer *cacheGetter + Source *CacheGetter + Binary *CacheGetter + Checker *CacheGetter + Interactor *CacheGetter + TestInput *CacheGetter + TestAnswer *CacheGetter } func NewInvokerStorage(ts *common.TestingSystem) *InvokerStorage { diff --git a/invoker/test_run_pipeline.go b/invoker/test_run_pipeline.go index 8dae0ae..323b488 100644 --- a/invoker/test_run_pipeline.go +++ b/invoker/test_run_pipeline.go @@ -14,29 +14,23 @@ const ( ) func (i *Invoker) fullTestingPipeline(sandbox sandbox.ISandbox, job *Job) { - s := &JobPipelineState{ - job: job, - invoker: i, - sandbox: sandbox, - test: new(pipelineTestData), - loggerData: fmt.Sprintf( - "test job: %s submission: %d problem %d test %d", - job.ID, - job.Submission.ID, - job.Problem.ID, - job.Test, - ), - } + s := i.newPipeline(sandbox, job) + s.test = new(pipelineTestData) + s.loggerData = fmt.Sprintf( + "test job: %s submission: %d problem %d test %d", + job.ID, + job.submission.ID, + job.problem.ID, + job.Test, + ) + defer s.checkFinish() logger.Trace("Starting testing for %s", s.loggerData) - s.defers = append(s.defers, job.deferFunc) - defer s.deferFunc() - err := s.testingProcessPipeline() if err != nil { logger.Error("Error in %s error: %v", s.loggerData, err) - i.failJob(job, "job %s error: %v", job.ID, err) + s.failJob("job %s error: %v", job.ID, err) return } @@ -44,12 +38,12 @@ func (i *Invoker) fullTestingPipeline(sandbox sandbox.ISandbox, job *Job) { err = s.uploadTestRunResources() if err != nil { logger.Error("Error in %s error: %v", s.loggerData, err) - i.failJob(job, "job %s error: %v", job.ID, err) + s.failJob("job %s error: %v", job.ID, err) return } } - i.successJob(job, s.test.runResult) + s.successJob(s.test.runResult) } func (s *JobPipelineState) testingProcessPipeline() error { @@ -94,7 +88,7 @@ func (s *JobPipelineState) testingProcessPipeline() error { func (s *JobPipelineState) generateTestRunConfig() error { s.test.runConfig = new(sandbox.ExecuteConfig) - fillInTestRunConfigLimits(s.test.runConfig, s.job.Problem) + fillInTestRunConfigLimits(s.test.runConfig, s.job.problem) s.test.runConfig.Command = solutionBinaryFile s.test.runConfig.Stdin = &sandbox.IORedirect{FileName: testInputFile} @@ -143,7 +137,7 @@ func fillInTestRunConfigLimits(c *sandbox.ExecuteConfig, problem *models.Problem func (s *JobPipelineState) executeTestRunCommand() error { s.executeWaitGroup.Add(1) - s.invoker.RunQueue <- s.runSolution + s.runProcess(s.runSolution) s.executeWaitGroup.Wait() if s.test.runResult.Err != nil { diff --git a/lib/customfields/memory.go b/lib/customfields/memory.go index a8dfa06..1229ddb 100644 --- a/lib/customfields/memory.go +++ b/lib/customfields/memory.go @@ -6,6 +6,8 @@ import ( "errors" "fmt" "gopkg.in/yaml.v3" + "gorm.io/gorm" + "gorm.io/gorm/schema" ) // Memory is set by number and size suffix. Possible suffixes are: @@ -60,8 +62,14 @@ func (m Memory) Value() (driver.Value, error) { return int64(m), nil } -func (m *Memory) GormDataType() string { - return "int64" // uint64 not supported by goorm +func (m Memory) GormDBDataType(db *gorm.DB, field *schema.Field) string { + switch db.Dialector.Name() { + case "mysql", "sqlite": + return "int64" + case "postgres": + return "bigint" + } + return "" } func (m *Memory) FromStr(s string) error { diff --git a/lib/customfields/time.go b/lib/customfields/time.go index be2b15d..8cd2705 100644 --- a/lib/customfields/time.go +++ b/lib/customfields/time.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "gopkg.in/yaml.v3" + "gorm.io/gorm" + "gorm.io/gorm/schema" ) // Time is set by number and size suffix. Possible suffixes are: @@ -58,8 +60,14 @@ func (t Time) Value() (driver.Value, error) { return int64(t), nil } -func (t *Time) GormDataType() string { - return "int64" // uint64 not supported by goorm +func (t Time) GormDBDataType(db *gorm.DB, field *schema.Field) string { + switch db.Dialector.Name() { + case "mysql", "sqlite": + return "int64" + case "postgres": + return "bigint" + } + return "" } func (t *Time) FromStr(s string) error { diff --git a/master/invoker_handlers.go b/master/invoker_handlers.go index 56f2164..750132d 100644 --- a/master/invoker_handlers.go +++ b/master/invoker_handlers.go @@ -29,9 +29,11 @@ func (m *Master) handleInvokerJobResult(c *gin.Context) { return } - logger.Trace("new job result received, job id: %s", result.JobID) + m.ts.Metrics.NewJobResult(result) + + logger.Trace("new job result received, job id: %s", result.Job.ID) if !m.invokerRegistry.HandleInvokerJobResult(result) { - logger.Trace("job %s is unknown or was rescheduled, skipping", result.JobID) + logger.Trace("job %s is unknown or was rescheduled, skipping", result.Job.ID) connector.RespOK(c, nil) return } diff --git a/master/queue/jobgenerators/icpc_generator.go b/master/queue/jobgenerators/icpc_generator.go index 005d89c..b2af16a 100644 --- a/master/queue/jobgenerators/icpc_generator.go +++ b/master/queue/jobgenerators/icpc_generator.go @@ -120,11 +120,11 @@ func (i *ICPCGenerator) testJobCompleted(job *invokerconn.Job, result *mastercon func (i *ICPCGenerator) JobCompleted(result *masterconn.InvokerJobResult) (*models.Submission, error) { i.mutex.Lock() defer i.mutex.Unlock() - job, ok := i.givenJobs[result.JobID] + job, ok := i.givenJobs[result.Job.ID] if !ok { return nil, fmt.Errorf("job %s does not exist", result.JobID) } - delete(i.givenJobs, result.JobID) + delete(i.givenJobs, result.Job.ID) switch job.Type { case invokerconn.CompileJob: diff --git a/master/queue/queue.go b/master/queue/queue.go index 381db43..202db5c 100644 --- a/master/queue/queue.go +++ b/master/queue/queue.go @@ -43,25 +43,25 @@ func (q *Queue) Submit(problem *models.Problem, submission *models.Submission) e func (q *Queue) JobCompleted(jobResult *masterconn.InvokerJobResult) (submission *models.Submission, err error) { q.mutex.Lock() defer q.mutex.Unlock() - wasID := jobResult.JobID - if origID, ok := q.jobIDToOriginalJobID[jobResult.JobID]; ok { - delete(q.jobIDToOriginalJobID, jobResult.JobID) - jobResult.JobID = origID + wasID := jobResult.Job.ID + if origID, ok := q.jobIDToOriginalJobID[jobResult.Job.ID]; ok { + delete(q.jobIDToOriginalJobID, jobResult.Job.ID) + jobResult.Job.ID = origID defer func() { - jobResult.JobID = wasID + jobResult.Job.ID = wasID }() } - generator, ok := q.originalJobIDToGenerator[jobResult.JobID] + generator, ok := q.originalJobIDToGenerator[jobResult.Job.ID] if !ok { - if wasID != jobResult.JobID { + if wasID != jobResult.Job.ID { logger.Panic("Job has id=%v and origID=%v; was not found in originalJobIDToGenerator", - wasID, jobResult.JobID) + wasID, jobResult.Job.ID) } - return nil, fmt.Errorf("no job with id=%v (origID=%v)", jobResult.JobID, wasID) + return nil, fmt.Errorf("no job with id=%v (origID=%v)", jobResult.Job.ID, wasID) } - delete(q.originalJobIDToJob, jobResult.JobID) - delete(q.originalJobIDToGenerator, jobResult.JobID) + delete(q.originalJobIDToJob, jobResult.Job.ID) + delete(q.originalJobIDToGenerator, jobResult.Job.ID) if _, ok = q.activeGeneratorIDs[generator.ID()]; !ok { q.activeGenerators.PushBack(generator) diff --git a/master/queue/queue_test.go b/master/queue/queue_test.go index 534e2a2..df4c24d 100644 --- a/master/queue/queue_test.go +++ b/master/queue/queue_test.go @@ -23,14 +23,14 @@ func doQueueCycles(t *testing.T, q *Queue, cycles int, maxNoJobs int) int { } if job.Type == invokerconn.CompileJob { sub, err := q.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CD, }) assert.Nil(t, err) assert.Nil(t, sub) } else { sub, err := q.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.OK, }) assert.Nil(t, err) @@ -132,7 +132,7 @@ func TestQueue_RescheduleJob(t *testing.T) { require.Equal(t, job.SubmitID, newJob.SubmitID) _, err = q.JobCompleted(&masterconn.InvokerJobResult{ - JobID: newJob.ID, + Job: newJob, Verdict: verdict.CD, }) assert.Nil(t, err) @@ -167,13 +167,13 @@ func TestQueue_RescheduleJob(t *testing.T) { require.Equal(t, job.SubmitID, newJob.SubmitID) _, err = q.JobCompleted(&masterconn.InvokerJobResult{ - JobID: newJob.ID, + Job: newJob, Verdict: verdict.CD, }) assert.Nil(t, err) // this job should not be found _, err = q.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: &job, Verdict: verdict.CD, }) assert.NotNil(t, err) @@ -206,7 +206,7 @@ func TestQueue_RescheduleJob(t *testing.T) { // compile 1 lastJob := spamJobs(q, 1) _, err := q.JobCompleted(&masterconn.InvokerJobResult{ - JobID: lastJob.ID, + Job: &lastJob, Verdict: verdict.CD, }) require.Nil(t, err) @@ -214,7 +214,7 @@ func TestQueue_RescheduleJob(t *testing.T) { // compile 2 lastJob = spamJobs(q, 2) _, err = q.JobCompleted(&masterconn.InvokerJobResult{ - JobID: lastJob.ID, + Job: &lastJob, Verdict: verdict.CD, }) require.Nil(t, err) @@ -222,7 +222,7 @@ func TestQueue_RescheduleJob(t *testing.T) { // test1 lastJob = spamJobs(q, 1) _, err = q.JobCompleted(&masterconn.InvokerJobResult{ - JobID: lastJob.ID, + Job: &lastJob, Verdict: verdict.OK, }) require.Nil(t, err) @@ -230,7 +230,7 @@ func TestQueue_RescheduleJob(t *testing.T) { // test2 lastJob = spamJobs(q, 2) _, err = q.JobCompleted(&masterconn.InvokerJobResult{ - JobID: lastJob.ID, + Job: &lastJob, Verdict: verdict.OK, }) require.Nil(t, err) @@ -245,7 +245,7 @@ func TestQueue_RescheduleJob(t *testing.T) { func TestQueueWrongJobID(t *testing.T) { q := NewQueue(nil).(*Queue) sub, err := q.JobCompleted(&masterconn.InvokerJobResult{ - JobID: "", + Job: &invokerconn.Job{}, Verdict: verdict.CD, }) assert.Nil(t, sub) diff --git a/master/registry/registry.go b/master/registry/registry.go index a5d000d..2bac90a 100644 --- a/master/registry/registry.go +++ b/master/registry/registry.go @@ -67,15 +67,15 @@ func (r *InvokerRegistry) HandleInvokerJobResult(result *masterconn.InvokerJobRe r.mutex.Lock() defer r.mutex.Unlock() - invoker, exists := r.invokerByJobID[result.JobID] + invoker, exists := r.invokerByJobID[result.Job.ID] if !exists { - logger.Trace("old or unknown job %s is tested", result.JobID) + logger.Trace("old or unknown job %s is tested", result.Job.ID) return false } - delete(r.invokerByJobID, result.JobID) - return invoker.JobTested(result.JobID) + delete(r.invokerByJobID, result.Job.ID) + return invoker.JobTested(result.Job.ID) } func (r *InvokerRegistry) SendJobs() { From 4b70dde5987b7aa4ac044475e31e1e70db993c3e Mon Sep 17 00:00:00 2001 From: Philip Gribov Date: Wed, 30 Apr 2025 17:11:34 +0300 Subject: [PATCH 2/5] Fix tests --- common/metrics/metrics.go | 28 ++++++++++++++++------------ common/testing_system.go | 4 +++- lib/logger/logger.go | 7 +++++-- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/common/metrics/metrics.go b/common/metrics/metrics.go index b21acea..9cce869 100644 --- a/common/metrics/metrics.go +++ b/common/metrics/metrics.go @@ -11,6 +11,8 @@ const ( ) type Collector struct { + Registerer *prometheus.Registry + InvokerJobResults *prometheus.CounterVec InvokerTestingWaitDuration *prometheus.CounterVec InvokerSandboxOccupationDuration *prometheus.CounterVec @@ -22,50 +24,52 @@ type Collector struct { } func NewCollector() *Collector { - c := &Collector{} - c.InvokerJobResults = createInvokerCounter( + c := &Collector{ + Registerer: prometheus.NewRegistry(), + } + c.InvokerJobResults = c.createInvokerCounter( "job_results_count", "Number of job results received from invoker", ) - c.InvokerTestingWaitDuration = createInvokerCounter( + c.InvokerTestingWaitDuration = c.createInvokerCounter( "testing_wait_duration_sum", "Time submission waits for testing in invoker", ) - c.InvokerSandboxOccupationDuration = createInvokerCounter( - "sandbox_occupation_duration_sun", + c.InvokerSandboxOccupationDuration = c.createInvokerCounter( + "sandbox_occupation_duration_sum", "Total sandbox time for submission testing in invoker", ) - c.InvokerResourceWaitDuration = createInvokerCounter( + c.InvokerResourceWaitDuration = c.createInvokerCounter( "resource_wait_duration_sum", "Total time spent waiting for resources for submissions to load in invokers", ) - c.InvokerFileActionsDuration = createInvokerCounter( + c.InvokerFileActionsDuration = c.createInvokerCounter( "file_actions_duration_sum", "Total time spent waiting for file copy to sandbox in invoker", ) - c.InvokerExecutionWaitDuration = createInvokerCounter( + c.InvokerExecutionWaitDuration = c.createInvokerCounter( "execution_wait_duration_sum", "Total time spent waiting for execution of process on invoker when sandbox is set up", ) - c.InvokerExecutionDuration = createInvokerCounter( + c.InvokerExecutionDuration = c.createInvokerCounter( "execution_duration_sum", "Total time spent on executing processes in sandboxes", ) - c.InvokerSendResultDuration = createInvokerCounter( + c.InvokerSendResultDuration = c.createInvokerCounter( "send_result_duration_sum", "Total time spent on sending results from invoker to storage", ) return c } -func createInvokerCounter( +func (c *Collector) createInvokerCounter( name string, help string, ) *prometheus.CounterVec { @@ -78,7 +82,7 @@ func createInvokerCounter( }, []string{invokerLabel, jobTypeLabel}, ) - prometheus.MustRegister(counter) + c.Registerer.MustRegister(counter) return counter } diff --git a/common/testing_system.go b/common/testing_system.go index 040045b..2be7755 100644 --- a/common/testing_system.go +++ b/common/testing_system.go @@ -58,7 +58,9 @@ func InitTestingSystem(configPath string) *TestingSystem { ts.StorageConn = storageconn.NewConnector(ts.Config.StorageConnection) ts.Metrics = metrics.NewCollector() - ts.Router.GET("/metrics", gin.WrapH(promhttp.Handler())) + ts.Router.GET("/metrics", gin.WrapH(promhttp.HandlerFor(ts.Metrics.Registerer, promhttp.HandlerOpts{ + ErrorLog: logger.CreateWriter(logger.LogLevelError, "[Prometheus]"), + }))) return ts } diff --git a/lib/logger/logger.go b/lib/logger/logger.go index ea7c539..664ee1b 100644 --- a/lib/logger/logger.go +++ b/lib/logger/logger.go @@ -2,7 +2,6 @@ package logger import ( "fmt" - "io" baselog "log" "os" "testing_system/common/config" @@ -64,7 +63,11 @@ func (w *logWriter) Write(p []byte) (n int, err error) { return len(p), nil } -func CreateWriter(level int, prefix string) io.Writer { +func (w *logWriter) Println(v ...interface{}) { + logPrint(w.level, "%s %s", w.prefix, fmt.Sprint(v...)) +} + +func CreateWriter(level int, prefix string) *logWriter { return &logWriter{ level: level, prefix: prefix, From b0b796bd20014767032e1d619456ac49f2295b8e Mon Sep 17 00:00:00 2001 From: Philip Gribov Date: Sun, 4 May 2025 18:41:58 +0300 Subject: [PATCH 3/5] Fixes --- invoker/compile_pipeline.go | 2 +- invoker/invoker_test.go | 4 ++-- invoker/pipeline.go | 2 +- invoker/test_run_pipeline.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/invoker/compile_pipeline.go b/invoker/compile_pipeline.go index 53747e0..60f0a50 100644 --- a/invoker/compile_pipeline.go +++ b/invoker/compile_pipeline.go @@ -13,7 +13,7 @@ import ( ) func (i *Invoker) fullCompilationPipeline(sandbox sandbox.ISandbox, job *Job) { - s := i.newPipeline(sandbox, job) + s := i.newPipelineState(sandbox, job) s.compile = new(pipelineCompileData) s.loggerData = fmt.Sprintf("compile job: %s submission: %d", job.ID, job.submission.ID) defer s.checkFinish() diff --git a/invoker/invoker_test.go b/invoker/invoker_test.go index 6ca1dd7..67b9bca 100644 --- a/invoker/invoker_test.go +++ b/invoker/invoker_test.go @@ -90,7 +90,7 @@ func (ts *testState) testCompile(submitID uint) *JobPipelineState { uint64(submitID), )) - s := ts.Invoker.newPipeline(ts.Sandbox, job) + s := ts.Invoker.newPipelineState(ts.Sandbox, job) s.compile = new(pipelineCompileData) s.loggerData = fmt.Sprintf("compile job: %s submission: %d", job.ID, job.submission.ID) @@ -189,7 +189,7 @@ func (ts *testState) testRun(submitID uint, problemID uint) *sandbox.RunResult { require.NoError(ts.t, ts.Invoker.Storage.Binary.Insert(filepath.Join(sourceDir, "binary"), uint64(submitID))) - s := ts.Invoker.newPipeline(ts.Sandbox, job) + s := ts.Invoker.newPipelineState(ts.Sandbox, job) s.test = new(pipelineTestData) s.loggerData = fmt.Sprintf( "test job: %s submission: %d problem %d test %d", diff --git a/invoker/pipeline.go b/invoker/pipeline.go index 2ce31a8..f7d8bea 100644 --- a/invoker/pipeline.go +++ b/invoker/pipeline.go @@ -51,7 +51,7 @@ type pipelineTestData struct { hasResources bool } -func (i *Invoker) newPipeline(sandbox sandbox.ISandbox, job *Job) *JobPipelineState { +func (i *Invoker) newPipelineState(sandbox sandbox.ISandbox, job *Job) *JobPipelineState { s := &JobPipelineState{ sandbox: sandbox, invoker: i, diff --git a/invoker/test_run_pipeline.go b/invoker/test_run_pipeline.go index 323b488..7d5453d 100644 --- a/invoker/test_run_pipeline.go +++ b/invoker/test_run_pipeline.go @@ -14,7 +14,7 @@ const ( ) func (i *Invoker) fullTestingPipeline(sandbox sandbox.ISandbox, job *Job) { - s := i.newPipeline(sandbox, job) + s := i.newPipelineState(sandbox, job) s.test = new(pipelineTestData) s.loggerData = fmt.Sprintf( "test job: %s submission: %d problem %d test %d", From ab7f5aab7cdddf6fa4178ca7cd0ec67896b13333 Mon Sep 17 00:00:00 2001 From: Philip Gribov Date: Tue, 6 May 2025 01:59:26 +0300 Subject: [PATCH 4/5] Fix jobResult for ioi generator --- master/queue/jobgenerators/generator_test.go | 122 +++++++++---------- master/queue/jobgenerators/icpc_generator.go | 2 +- master/queue/jobgenerators/ioi_generator.go | 6 +- 3 files changed, 65 insertions(+), 65 deletions(-) diff --git a/master/queue/jobgenerators/generator_test.go b/master/queue/jobgenerators/generator_test.go index 7646a41..3371252 100644 --- a/master/queue/jobgenerators/generator_test.go +++ b/master/queue/jobgenerators/generator_test.go @@ -46,7 +46,7 @@ func TestICPCGenerator(t *testing.T) { require.Nil(t, err) job := nextJob(t, g, 1, invokerconn.CompileJob, 0) sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CE, }) require.NotNil(t, sub) @@ -68,7 +68,7 @@ func TestICPCGenerator(t *testing.T) { job := nextJob(t, generator, 1, invokerconn.CompileJob, 0) noJobs(t, generator) sub, err := generator.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CD, }) require.Nil(t, sub) @@ -76,7 +76,7 @@ func TestICPCGenerator(t *testing.T) { for i := range fixtureICPCProblemTestsNumber - 1 { job = nextJob(t, generator, 1, invokerconn.TestJob, uint64(i)+1) sub, err = generator.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.OK, }) require.Nil(t, sub) @@ -84,7 +84,7 @@ func TestICPCGenerator(t *testing.T) { } job = nextJob(t, generator, 1, invokerconn.TestJob, fixtureICPCProblemTestsNumber) sub, err = generator.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.OK, }) require.NotNil(t, sub) @@ -99,30 +99,30 @@ func TestICPCGenerator(t *testing.T) { }) t.Run("Tasks finishing", func(t *testing.T) { - prepare := func() (Generator, []string) { + prepare := func() (Generator, []*invokerconn.Job) { problem := fixtureICPCProblem() submission := fixtureSubmission(1) g, err := NewGenerator(problem, submission) require.Nil(t, err) job := nextJob(t, g, 1, invokerconn.CompileJob, 0) sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CD, }) require.Nil(t, sub) require.Nil(t, err) - firstTwoJobIDs := make([]string, 0) + firstTwoJobs := make([]*invokerconn.Job, 0) for i := range 2 { job = nextJob(t, g, 1, invokerconn.TestJob, uint64(i)+1) - firstTwoJobIDs = append(firstTwoJobIDs, job.ID) + firstTwoJobs = append(firstTwoJobs, job) } - return g, firstTwoJobIDs + return g, firstTwoJobs } finishOtherTests := func(g Generator) { for i := 2; i < fixtureICPCProblemTestsNumber-1; i++ { job := nextJob(t, g, 1, invokerconn.TestJob, uint64(i)+1) sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.OK, }) require.Nil(t, sub) @@ -130,7 +130,7 @@ func TestICPCGenerator(t *testing.T) { } job := nextJob(t, g, 1, invokerconn.TestJob, 10) sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.OK, }) require.NotNil(t, sub) @@ -145,10 +145,10 @@ func TestICPCGenerator(t *testing.T) { } t.Run("right order", func(t *testing.T) { - g, firstTwoJobIDs := prepare() - for _, id := range firstTwoJobIDs { + g, firstTwoJobs := prepare() + for _, job := range firstTwoJobs { sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: id, + Job: job, Verdict: verdict.OK, }) require.Nil(t, sub) @@ -158,16 +158,16 @@ func TestICPCGenerator(t *testing.T) { }) t.Run("wrong order + both ok", func(t *testing.T) { - g, firstTwoJobIDs := prepare() + g, firstTwoJobs := prepare() sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: firstTwoJobIDs[1], + Job: firstTwoJobs[1], Verdict: verdict.OK, }) require.Nil(t, sub) require.Nil(t, err) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: firstTwoJobIDs[0], + Job: firstTwoJobs[0], Verdict: verdict.OK, }) require.Nil(t, sub) @@ -177,16 +177,16 @@ func TestICPCGenerator(t *testing.T) { }) t.Run("wrong order + 2nd fail", func(t *testing.T) { - g, firstTwoJobIDs := prepare() + g, firstTwoJobs := prepare() sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: firstTwoJobIDs[1], + Job: firstTwoJobs[1], Verdict: verdict.WA, }) require.Nil(t, sub) require.Nil(t, err) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: firstTwoJobIDs[0], + Job: firstTwoJobs[0], Verdict: verdict.OK, }) require.NotNil(t, sub) @@ -203,16 +203,16 @@ func TestICPCGenerator(t *testing.T) { }) t.Run("wrong order + 1st fail", func(t *testing.T) { - g, firstTwoJobIDs := prepare() + g, firstTwoJobs := prepare() sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: firstTwoJobIDs[1], + Job: firstTwoJobs[1], Verdict: verdict.OK, }) require.Nil(t, sub) require.Nil(t, err) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: firstTwoJobIDs[0], + Job: firstTwoJobs[0], Verdict: verdict.WA, }) require.NotNil(t, sub) @@ -235,7 +235,7 @@ func TestICPCGenerator(t *testing.T) { require.Nil(t, err) job := nextJob(t, g, 1, invokerconn.CompileJob, 0) sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CE, }) require.NotNil(t, sub) @@ -249,7 +249,7 @@ func TestICPCGenerator(t *testing.T) { } sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CE, }) require.Nil(t, sub) @@ -448,7 +448,7 @@ func TestIOIGenerator(t *testing.T) { require.Nil(t, err) job := nextJob(t, g, 1, invokerconn.CompileJob, 0) sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CE, }) require.NotNil(t, sub) @@ -478,7 +478,7 @@ func TestIOIGenerator(t *testing.T) { job := nextJob(t, generator, 1, invokerconn.CompileJob, 0) noJobs(t, generator) sub, err := generator.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CD, }) require.Nil(t, sub) @@ -486,7 +486,7 @@ func TestIOIGenerator(t *testing.T) { for i := range 9 { job = nextJob(t, generator, 1, invokerconn.TestJob, uint64(i)+1) sub, err = generator.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.OK, }) require.Nil(t, sub) @@ -494,7 +494,7 @@ func TestIOIGenerator(t *testing.T) { } job = nextJob(t, generator, 1, invokerconn.TestJob, 10) sub, err = generator.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.OK, }) require.NotNil(t, sub) @@ -547,7 +547,7 @@ func TestIOIGenerator(t *testing.T) { require.NoError(t, err) job := nextJob(t, gen, 0, invokerconn.CompileJob, 0) sub, err := gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CD, }) require.Nil(t, sub) @@ -560,13 +560,13 @@ func TestIOIGenerator(t *testing.T) { job1 := nextJob(t, gen, 0, invokerconn.TestJob, 1) job2 := nextJob(t, gen, 0, invokerconn.TestJob, 2) sub, err := gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job1.ID, + Job: job1, Verdict: verdict.WA, }) require.Nil(t, sub) require.NoError(t, err) sub, err = gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job2.ID, + Job: job2, Verdict: verdict.OK, }) require.Equal(t, verdict.PT, sub.Verdict) @@ -592,13 +592,13 @@ func TestIOIGenerator(t *testing.T) { job1 := nextJob(t, gen, 0, invokerconn.TestJob, 1) job2 := nextJob(t, gen, 0, invokerconn.TestJob, 2) sub, err := gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job2.ID, + Job: job2, Verdict: verdict.OK, }) require.Nil(t, sub) require.NoError(t, err) sub, err = gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job1.ID, + Job: job1, Verdict: verdict.WA, }) require.Equal(t, verdict.PT, sub.Verdict) @@ -624,13 +624,13 @@ func TestIOIGenerator(t *testing.T) { job1 := nextJob(t, gen, 0, invokerconn.TestJob, 1) job2 := nextJob(t, gen, 0, invokerconn.TestJob, 2) sub, err := gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job1.ID, + Job: job1, Verdict: verdict.OK, }) require.Nil(t, sub) require.NoError(t, err) sub, err = gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job2.ID, + Job: job2, Verdict: verdict.WA, }) require.Equal(t, verdict.PT, sub.Verdict) @@ -656,13 +656,13 @@ func TestIOIGenerator(t *testing.T) { job1 := nextJob(t, gen, 0, invokerconn.TestJob, 1) job2 := nextJob(t, gen, 0, invokerconn.TestJob, 2) sub, err := gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job2.ID, + Job: job2, Verdict: verdict.WA, }) require.Nil(t, sub) require.NoError(t, err) sub, err = gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job1.ID, + Job: job1, Verdict: verdict.OK, }) require.Equal(t, verdict.PT, sub.Verdict) @@ -687,7 +687,7 @@ func TestIOIGenerator(t *testing.T) { gen := prepare() job := nextJob(t, gen, 0, invokerconn.TestJob, 1) sub, err := gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.WA, }) require.NotNil(t, sub) @@ -732,14 +732,14 @@ func TestIOIGenerator(t *testing.T) { require.NoError(t, err) job := nextJob(t, gen, 0, invokerconn.CompileJob, 0) sub, err := gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CD, }) require.Nil(t, sub) require.NoError(t, err) job = nextJob(t, gen, 0, invokerconn.TestJob, 1) sub, err = gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.WA, }) require.Nil(t, gen.NextJob()) @@ -785,7 +785,7 @@ func TestIOIGenerator(t *testing.T) { require.NoError(t, err) job := nextJob(t, gen, 1, invokerconn.CompileJob, 0) sub, err := gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CD, }) require.Nil(t, sub) @@ -795,20 +795,20 @@ func TestIOIGenerator(t *testing.T) { job3 := nextJob(t, gen, 1, invokerconn.TestJob, 3) // now finish 1 and 3 sub, err = gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job1.ID, + Job: job1, Verdict: verdict.OK, }) require.NoError(t, err) require.Nil(t, sub) sub, err = gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job3.ID, + Job: job3, Verdict: verdict.WA, Statistics: baseStat, }) // this group is already failed, so the generator should not return any job require.Nil(t, gen.NextJob()) sub, err = gen.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job2.ID, + Job: job2, Verdict: verdict.TL, Statistics: baseStat, }) @@ -872,7 +872,7 @@ func TestIOIGenerator(t *testing.T) { require.NotNil(t, job) require.Nil(t, g.NextJob()) sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CD, }) require.NoError(t, err) @@ -881,7 +881,7 @@ func TestIOIGenerator(t *testing.T) { job = nextJob(t, g, 1, invokerconn.TestJob, 1) require.NotNil(t, job) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.OK, }) job2 := nextJob(t, g, 1, invokerconn.TestJob, 2) @@ -890,13 +890,13 @@ func TestIOIGenerator(t *testing.T) { require.NotNil(t, job3) require.Nil(t, g.NextJob()) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job2.ID, + Job: job2, Verdict: verdict.WA, }) require.NoError(t, err) require.Nil(t, sub) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job3.ID, + Job: job3, Verdict: verdict.OK, }) require.NoError(t, err) @@ -943,7 +943,7 @@ func TestIOIGenerator(t *testing.T) { require.NotNil(t, job) require.Nil(t, g.NextJob()) sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CD, }) require.NoError(t, err) @@ -952,7 +952,7 @@ func TestIOIGenerator(t *testing.T) { job = nextJob(t, g, 1, invokerconn.TestJob, 1) require.NotNil(t, job) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.OK, }) job2 := nextJob(t, g, 1, invokerconn.TestJob, 2) @@ -961,13 +961,13 @@ func TestIOIGenerator(t *testing.T) { require.NotNil(t, job3) require.Nil(t, g.NextJob()) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job2.ID, + Job: job2, Verdict: verdict.WA, }) require.NoError(t, err) require.Nil(t, sub) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job3.ID, + Job: job3, Verdict: verdict.OK, }) require.NoError(t, err) @@ -995,7 +995,7 @@ func TestIOIGenerator(t *testing.T) { require.NotNil(t, job) require.Nil(t, g.NextJob()) sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CD, }) require.NoError(t, err) @@ -1004,7 +1004,7 @@ func TestIOIGenerator(t *testing.T) { job = nextJob(t, g, 1, invokerconn.TestJob, 1) require.NotNil(t, job) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.OK, }) job2 := nextJob(t, g, 1, invokerconn.TestJob, 2) @@ -1013,14 +1013,14 @@ func TestIOIGenerator(t *testing.T) { require.NotNil(t, job3) require.Nil(t, g.NextJob()) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job2.ID, + Job: job2, Verdict: verdict.PT, Points: pointer.Float64(10), }) require.NoError(t, err) require.Nil(t, sub) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job3.ID, + Job: job3, Verdict: verdict.OK, }) require.NoError(t, err) @@ -1048,7 +1048,7 @@ func TestIOIGenerator(t *testing.T) { require.NotNil(t, job) require.Nil(t, g.NextJob()) sub, err := g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.CD, }) require.NoError(t, err) @@ -1057,7 +1057,7 @@ func TestIOIGenerator(t *testing.T) { job = nextJob(t, g, 1, invokerconn.TestJob, 1) require.NotNil(t, job) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job.ID, + Job: job, Verdict: verdict.OK, }) job2 := nextJob(t, g, 1, invokerconn.TestJob, 2) @@ -1066,13 +1066,13 @@ func TestIOIGenerator(t *testing.T) { require.NotNil(t, job3) require.Nil(t, g.NextJob()) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job2.ID, + Job: job2, Verdict: verdict.OK, }) require.NoError(t, err) require.Nil(t, sub) sub, err = g.JobCompleted(&masterconn.InvokerJobResult{ - JobID: job3.ID, + Job: job3, Verdict: verdict.OK, }) require.NoError(t, err) diff --git a/master/queue/jobgenerators/icpc_generator.go b/master/queue/jobgenerators/icpc_generator.go index b2af16a..bc3b7ce 100644 --- a/master/queue/jobgenerators/icpc_generator.go +++ b/master/queue/jobgenerators/icpc_generator.go @@ -122,7 +122,7 @@ func (i *ICPCGenerator) JobCompleted(result *masterconn.InvokerJobResult) (*mode defer i.mutex.Unlock() job, ok := i.givenJobs[result.Job.ID] if !ok { - return nil, fmt.Errorf("job %s does not exist", result.JobID) + return nil, fmt.Errorf("job %s does not exist", result.Job.ID) } delete(i.givenJobs, result.Job.ID) diff --git a/master/queue/jobgenerators/ioi_generator.go b/master/queue/jobgenerators/ioi_generator.go index f4049fb..de8726d 100644 --- a/master/queue/jobgenerators/ioi_generator.go +++ b/master/queue/jobgenerators/ioi_generator.go @@ -444,11 +444,11 @@ func (i *IOIGenerator) testJobCompleted( func (i *IOIGenerator) JobCompleted(jobResult *masterconn.InvokerJobResult) (*models.Submission, error) { i.mutex.Lock() defer i.mutex.Unlock() - job, ok := i.givenJobs[jobResult.JobID] + job, ok := i.givenJobs[jobResult.Job.ID] if !ok { - return nil, fmt.Errorf("job %s does not exist", jobResult.JobID) + return nil, fmt.Errorf("job %s does not exist", jobResult.Job.ID) } - delete(i.givenJobs, jobResult.JobID) + delete(i.givenJobs, jobResult.Job.ID) switch job.Type { case invokerconn.CompileJob: i.compileJobCompleted(job, jobResult) From 74cb9fb767cf118d0e12c8b97e81fafb0a2a26c1 Mon Sep 17 00:00:00 2001 From: Philip Gribov Date: Tue, 6 May 2025 19:37:16 +0300 Subject: [PATCH 5/5] Improve time calculation with defers --- invoker/pipeline.go | 4 ++++ invoker/pipeline_files.go | 9 +++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/invoker/pipeline.go b/invoker/pipeline.go index f7d8bea..013687f 100644 --- a/invoker/pipeline.go +++ b/invoker/pipeline.go @@ -140,3 +140,7 @@ func (s *JobPipelineState) uploadJobResult(request *masterconn.InvokerJobResult) logger.Error("Can not send job %s result, error: %s", s.job.ID, err.Error()) } } + +func updateMetrics(metric *time.Duration, start time.Time) { + *metric += time.Since(start) +} diff --git a/invoker/pipeline_files.go b/invoker/pipeline_files.go index 7f2b0c1..c6df3a0 100644 --- a/invoker/pipeline_files.go +++ b/invoker/pipeline_files.go @@ -133,7 +133,7 @@ func (s *JobPipelineState) uploadOutput(fileName string, resourceType resource.T } func (s *JobPipelineState) copyFileToSandbox(src string, dst string, perm os.FileMode) error { - start := time.Now() + defer updateMetrics(&s.metrics.FileActionsDuration, time.Now()) srcReader, err := os.Open(src) if err != nil { return err @@ -146,7 +146,6 @@ func (s *JobPipelineState) copyFileToSandbox(src string, dst string, perm os.Fil defer dstWriter.Close() _, err = io.Copy(dstWriter, srcReader) - s.metrics.FileActionsDuration += time.Since(start) return nil } @@ -171,15 +170,13 @@ func (s *JobPipelineState) limitedReader(r io.Reader) io.Reader { } func (s *JobPipelineState) loadResource(getter *storage.CacheGetter, args ...uint64) (*string, error) { - start := time.Now() + defer updateMetrics(&s.metrics.ResourceWaitDuration, time.Now()) res, err := getter.Get(args...) - s.metrics.ResourceWaitDuration += time.Since(start) return res, err } func (s *JobPipelineState) uploadResource(request *storageconn.Request) *storageconn.Response { - start := time.Now() + defer updateMetrics(&s.metrics.SendResultDuration, time.Now()) resp := s.invoker.TS.StorageConn.Upload(request) - s.metrics.SendResultDuration += time.Since(start) return resp }