diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 638aa261..985e44a0 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,9 +16,9 @@ jobs: go-version: '1.21.0' - name: Install libpcap-dev run: sudo apt-get install -y libpcap-dev - - run: go build -o scheduler cmd/scheduler/main.go - - run: go build -o worker cmd/worker/main.go - - run: go build -o analyze cmd/analyze/main.go + - run: go build -o scheduler ./cmd/scheduler + - run: go build -o worker ./cmd/worker + - run: go build -o analyze ./cmd/analyze - run: go build -o loader load.go working-directory: function/loader - run: go build -o staticanalyze staticanalyze.go diff --git a/cmd/analyze/Dockerfile b/cmd/analyze/Dockerfile index 3034dba5..5358860c 100644 --- a/cmd/analyze/Dockerfile +++ b/cmd/analyze/Dockerfile @@ -8,7 +8,7 @@ COPY ./go.sum ./ RUN go mod download COPY . ./ -RUN go build -o analyze cmd/analyze/main.go && go build -o worker cmd/worker/main.go +RUN go build -o analyze ./cmd/analyze && go build -o worker ./cmd/worker FROM ubuntu:22.04@sha256:42ba2dfce475de1113d55602d40af18415897167d47c2045ec7b6d9746ff148f diff --git a/cmd/analyze/main.go b/cmd/analyze/main.go index 90e3ec60..ff5b2fd0 100644 --- a/cmd/analyze/main.go +++ b/cmd/analyze/main.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "log/slog" + "net/http" "os" "strings" @@ -19,6 +20,7 @@ import ( "github.com/ossf/package-analysis/internal/resultstore" "github.com/ossf/package-analysis/internal/sandbox" "github.com/ossf/package-analysis/internal/staticanalysis" + "github.com/ossf/package-analysis/internal/useragent" "github.com/ossf/package-analysis/internal/utils" "github.com/ossf/package-analysis/internal/worker" "github.com/ossf/package-analysis/pkg/api/pkgecosystem" @@ -186,6 +188,8 @@ func run() error { analysisMode.InitFlag() flag.Parse() + http.DefaultTransport = useragent.DefaultRoundTripper(http.DefaultTransport, "") + if err := featureflags.Update(*features); err != nil { return usageError{err} } diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index 0000be46..1667b34b 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -5,11 +5,13 @@ import ( "errors" "flag" "fmt" + "net/http" "os" "strings" "github.com/package-url/packageurl-go" + "github.com/ossf/package-analysis/internal/useragent" "github.com/ossf/package-analysis/internal/worker" ) @@ -86,6 +88,8 @@ func processFileLine(text string) error { func run() error { flag.Parse() + http.DefaultTransport = useragent.DefaultRoundTripper(http.DefaultTransport, "") + if *purlFilePath == "" { return newCmdError("Please specify packages to download using -f ") } diff --git a/cmd/worker/config.go b/cmd/worker/config.go new file mode 100644 index 00000000..91099123 --- /dev/null +++ b/cmd/worker/config.go @@ -0,0 +1,80 @@ +package main + +import ( + "log/slog" + "os" + + "github.com/ossf/package-analysis/internal/resultstore" + "github.com/ossf/package-analysis/internal/worker" +) + +// resultBucketPaths holds bucket paths for the different types of results. +type resultBucketPaths struct { + analyzedPkg string + dynamicAnalysis string + executionLog string + fileWrites string + staticAnalysis string +} + +type sandboxImageSpec struct { + tag string + noPull bool +} + +type config struct { + imageSpec sandboxImageSpec + + resultStores *worker.ResultStores + + subURL string + packagesBucket string + notificationTopicURL string + + userAgentExtra string +} + +func (c *config) LogValue() slog.Value { + return slog.GroupValue( + slog.String("subscription", c.subURL), + slog.String("package_bucket", c.packagesBucket), + slog.String("dynamic_results_store", c.resultStores.DynamicAnalysis.String()), + slog.String("static_results_store", c.resultStores.StaticAnalysis.String()), + slog.String("file_write_results_store", c.resultStores.FileWrites.String()), + slog.String("analyzed_packages_store", c.resultStores.AnalyzedPackage.String()), + slog.String("execution_log_store", c.resultStores.ExecutionLog.String()), + slog.String("image_tag", c.imageSpec.tag), + slog.Bool("image_nopull", c.imageSpec.noPull), + slog.String("topic_notification", c.notificationTopicURL), + slog.String("user_agent_extra", c.userAgentExtra), + ) +} + +func resultStoreForEnv(key string) *resultstore.ResultStore { + val := os.Getenv(key) + if val == "" { + return nil + } + return resultstore.New(val, resultstore.ConstructPath()) +} + +func configFromEnv() *config { + return &config{ + imageSpec: sandboxImageSpec{ + tag: os.Getenv("OSSF_SANDBOX_IMAGE_TAG"), + noPull: os.Getenv("OSSF_SANDBOX_NOPULL") != "", + }, + resultStores: &worker.ResultStores{ + AnalyzedPackage: resultStoreForEnv("OSSF_MALWARE_ANALYZED_PACKAGES"), + DynamicAnalysis: resultStoreForEnv("OSSF_MALWARE_ANALYSIS_RESULTS"), + ExecutionLog: resultStoreForEnv("OSSF_MALWARE_ANALYSIS_EXECUTION_LOGS"), + FileWrites: resultStoreForEnv("OSSF_MALWARE_ANALYSIS_FILE_WRITE_RESULTS"), + StaticAnalysis: resultStoreForEnv("OSSF_MALWARE_STATIC_ANALYSIS_RESULTS"), + }, + subURL: os.Getenv("OSSMALWARE_WORKER_SUBSCRIPTION"), + packagesBucket: os.Getenv("OSSF_MALWARE_ANALYSIS_PACKAGES"), + notificationTopicURL: os.Getenv("OSSF_MALWARE_NOTIFICATION_TOPIC"), + + userAgentExtra: os.Getenv("OSSF_MALWARE_USER_AGENT_EXTRA"), + } +} diff --git a/cmd/worker/main.go b/cmd/worker/main.go index e6f1e80e..bb3edca0 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -24,9 +24,9 @@ import ( "github.com/ossf/package-analysis/internal/log" "github.com/ossf/package-analysis/internal/notification" "github.com/ossf/package-analysis/internal/pkgmanager" - "github.com/ossf/package-analysis/internal/resultstore" "github.com/ossf/package-analysis/internal/sandbox" "github.com/ossf/package-analysis/internal/staticanalysis" + "github.com/ossf/package-analysis/internal/useragent" "github.com/ossf/package-analysis/internal/worker" "github.com/ossf/package-analysis/pkg/api/pkgecosystem" ) @@ -35,20 +35,6 @@ const ( localPkgPathFmt = "/local/%s" ) -// resultBucketPaths holds bucket paths for the different types of results. -type resultBucketPaths struct { - analyzedPkg string - dynamicAnalysis string - executionLog string - fileWrites string - staticAnalysis string -} - -type sandboxImageSpec struct { - tag string - noPull bool -} - func copyPackageToLocalFile(ctx context.Context, packagesBucket *blob.Bucket, bucketPath string) (string, *os.File, error) { if packagesBucket == nil { return "", nil, errors.New("packages bucket not set") @@ -77,29 +63,7 @@ func copyPackageToLocalFile(ctx context.Context, packagesBucket *blob.Bucket, bu return fmt.Sprintf(localPkgPathFmt, path.Base(bucketPath)), f, nil } -func makeResultStores(dest resultBucketPaths) worker.ResultStores { - resultStores := worker.ResultStores{} - - if dest.analyzedPkg != "" { - resultStores.AnalyzedPackage = resultstore.New(dest.analyzedPkg, resultstore.ConstructPath()) - } - if dest.dynamicAnalysis != "" { - resultStores.DynamicAnalysis = resultstore.New(dest.dynamicAnalysis, resultstore.ConstructPath()) - } - if dest.executionLog != "" { - resultStores.ExecutionLog = resultstore.New(dest.executionLog, resultstore.ConstructPath()) - } - if dest.fileWrites != "" { - resultStores.FileWrites = resultstore.New(dest.fileWrites, resultstore.ConstructPath()) - } - if dest.staticAnalysis != "" { - resultStores.StaticAnalysis = resultstore.New(dest.staticAnalysis, resultstore.ConstructPath()) - } - - return resultStores -} - -func handleMessage(ctx context.Context, msg *pubsub.Message, packagesBucket *blob.Bucket, resultStores *worker.ResultStores, imageSpec sandboxImageSpec, notificationTopic *pubsub.Topic) error { +func handleMessage(ctx context.Context, msg *pubsub.Message, cfg *config, packagesBucket *blob.Bucket, notificationTopic *pubsub.Topic) error { name := msg.Metadata["name"] if name == "" { slog.WarnContext(ctx, "name is empty") @@ -132,7 +96,7 @@ func handleMessage(ctx context.Context, msg *pubsub.Message, packagesBucket *blo ) localPkgPath := "" - sandboxOpts := []sandbox.Option{sandbox.Tag(imageSpec.tag)} + sandboxOpts := []sandbox.Option{sandbox.Tag(cfg.imageSpec.tag)} if remotePkgPath != "" { tmpPkgPath, pkgFile, err := copyPackageToLocalFile(ctx, packagesBucket, remotePkgPath) @@ -146,7 +110,7 @@ func handleMessage(ctx context.Context, msg *pubsub.Message, packagesBucket *blo sandboxOpts = append(sandboxOpts, sandbox.Volume(pkgFile.Name(), localPkgPath)) } - if imageSpec.noPull { + if cfg.imageSpec.noPull { sandboxOpts = append(sandboxOpts, sandbox.NoPull()) } @@ -159,19 +123,24 @@ func handleMessage(ctx context.Context, msg *pubsub.Message, packagesBucket *blo staticSandboxOpts := append(worker.StaticSandboxOptions(), sandboxOpts...) dynamicSandboxOpts := append(worker.DynamicSandboxOptions(), sandboxOpts...) + // propogate user agent extras to the static analysis sandbox if it is set. + if cfg.userAgentExtra != "" { + staticSandboxOpts = append(staticSandboxOpts, sandbox.SetEnv("OSSF_MALWARE_USER_AGENT_EXTRA", cfg.userAgentExtra)) + } + // run both dynamic and static analysis regardless of error status of either // and return combined error(s) afterwards, if applicable staticResults, _, staticAnalysisErr := worker.RunStaticAnalysis(ctx, pkg, staticSandboxOpts, staticanalysis.All) if staticAnalysisErr == nil { - staticAnalysisErr = worker.SaveStaticAnalysisData(ctx, pkg, resultStores, staticResults) + staticAnalysisErr = worker.SaveStaticAnalysisData(ctx, pkg, cfg.resultStores, staticResults) } result, dynamicAnalysisErr := worker.RunDynamicAnalysis(ctx, pkg, dynamicSandboxOpts, "") if dynamicAnalysisErr == nil { - dynamicAnalysisErr = worker.SaveDynamicAnalysisData(ctx, pkg, resultStores, result.Data) + dynamicAnalysisErr = worker.SaveDynamicAnalysisData(ctx, pkg, cfg.resultStores, result.Data) } - resultStores.AnalyzedPackageSaved = false + cfg.resultStores.AnalyzedPackageSaved = false // combine errors if analysisErr := errors.Join(dynamicAnalysisErr, staticAnalysisErr); analysisErr != nil { @@ -187,12 +156,12 @@ func handleMessage(ctx context.Context, msg *pubsub.Message, packagesBucket *blo return nil } -func messageLoop(ctx context.Context, subURL, packagesBucket, notificationTopicURL string, imageSpec sandboxImageSpec, resultsBuckets *worker.ResultStores) error { - sub, err := pubsub.OpenSubscription(ctx, subURL) +func messageLoop(ctx context.Context, cfg *config) error { + sub, err := pubsub.OpenSubscription(ctx, cfg.subURL) if err != nil { return err } - extender, err := pubsubextender.New(ctx, subURL, sub) + extender, err := pubsubextender.New(ctx, cfg.subURL, sub) if err != nil { return err } @@ -205,8 +174,8 @@ func messageLoop(ctx context.Context, subURL, packagesBucket, notificationTopicU // we pass in a nil notificationTopic object to handleMessage // and continue with the analysis with no notifications published var notificationTopic *pubsub.Topic - if notificationTopicURL != "" { - notificationTopic, err = pubsub.OpenTopic(ctx, notificationTopicURL) + if cfg.notificationTopicURL != "" { + notificationTopic, err = pubsub.OpenTopic(ctx, cfg.notificationTopicURL) if err != nil { return err } @@ -214,9 +183,9 @@ func messageLoop(ctx context.Context, subURL, packagesBucket, notificationTopicU } var pkgsBkt *blob.Bucket - if packagesBucket != "" { + if cfg.packagesBucket != "" { var err error - pkgsBkt, err = blob.OpenBucket(ctx, packagesBucket) + pkgsBkt, err = blob.OpenBucket(ctx, cfg.packagesBucket) if err != nil { return err } @@ -246,7 +215,7 @@ func messageLoop(ctx context.Context, subURL, packagesBucket, notificationTopicU return fmt.Errorf("error starting message ack deadline extender: %w", err) } - if err := handleMessage(msgCtx, msg, pkgsBkt, resultsBuckets, imageSpec, notificationTopic); err != nil { + if err := handleMessage(msgCtx, msg, cfg, pkgsBkt, notificationTopic); err != nil { slog.ErrorContext(msgCtx, "Failed to process message", "error", err) if err := me.Stop(); err != nil { slog.ErrorContext(msgCtx, "Extender failed", "error", err) @@ -267,35 +236,21 @@ func main() { log.Initialize(os.Getenv("LOGGER_ENV")) ctx := context.Background() - subURL := os.Getenv("OSSMALWARE_WORKER_SUBSCRIPTION") - packagesBucket := os.Getenv("OSSF_MALWARE_ANALYSIS_PACKAGES") - notificationTopicURL := os.Getenv("OSSF_MALWARE_NOTIFICATION_TOPIC") - enableProfiler := os.Getenv("OSSF_MALWARE_ANALYSIS_ENABLE_PROFILER") + + cfg := configFromEnv() + + http.DefaultTransport = useragent.DefaultRoundTripper(http.DefaultTransport, cfg.userAgentExtra) if err := featureflags.Update(os.Getenv("OSSF_MALWARE_FEATURE_FLAGS")); err != nil { slog.Error("Failed to parse feature flags", "error", err) os.Exit(1) } - resultsBuckets := resultBucketPaths{ - analyzedPkg: os.Getenv("OSSF_MALWARE_ANALYZED_PACKAGES"), - dynamicAnalysis: os.Getenv("OSSF_MALWARE_ANALYSIS_RESULTS"), - executionLog: os.Getenv("OSSF_MALWARE_ANALYSIS_EXECUTION_LOGS"), - fileWrites: os.Getenv("OSSF_MALWARE_ANALYSIS_FILE_WRITE_RESULTS"), - staticAnalysis: os.Getenv("OSSF_MALWARE_STATIC_ANALYSIS_RESULTS"), - } - resultStores := makeResultStores(resultsBuckets) - - imageSpec := sandboxImageSpec{ - tag: os.Getenv("OSSF_SANDBOX_IMAGE_TAG"), - noPull: os.Getenv("OSSF_SANDBOX_NOPULL") != "", - } - sandbox.InitNetwork(ctx) // If configured, start a webserver so that Go's pprof can be accessed for // debugging and profiling. - if enableProfiler != "" { + if os.Getenv("OSSF_MALWARE_ANALYSIS_ENABLE_PROFILER") != "" { go func() { slog.Info("Starting profiler") http.ListenAndServe(":6060", nil) @@ -304,20 +259,11 @@ func main() { // Log the configuration of the worker at startup so we can observe it. slog.InfoContext(ctx, "Starting worker", - "subscription", subURL, - "package_bucket", packagesBucket, - "results_bucket", resultsBuckets.dynamicAnalysis, - "static_results_bucket", resultsBuckets.staticAnalysis, - "file_write_results_bucket", resultsBuckets.fileWrites, - "analyzed_packages_bucket", resultsBuckets.analyzedPkg, - "execution_log_bucket", resultsBuckets.executionLog, - "image_tag", imageSpec.tag, - "image_nopull", imageSpec.noPull, - "topic_notification", notificationTopicURL, + "config", cfg, "feature_flags", featureflags.State(), ) - err := messageLoop(ctx, subURL, packagesBucket, notificationTopicURL, imageSpec, &resultStores) + err := messageLoop(ctx, cfg) if err != nil { slog.ErrorContext(ctx, "Error encountered", "error", err) } diff --git a/infra/worker/workers-set.yaml b/infra/worker/workers-set.yaml index 151fd0d9..452cbaf2 100644 --- a/infra/worker/workers-set.yaml +++ b/infra/worker/workers-set.yaml @@ -39,6 +39,8 @@ spec: value: gs://ossf-malware-analysis-packages - name: OSSF_MALWARE_NOTIFICATION_TOPIC value: gcppubsub://projects/ossf-malware-analysis/topics/analysis-notify + - name: OSSF_MALWARE_USER_AGENT_EXTRA + value: "production" - name: OSSF_MALWARE_FEATURE_FLAGS value: "CodeExecution" securityContext: diff --git a/internal/useragent/useragent.go b/internal/useragent/useragent.go new file mode 100644 index 00000000..931e4b0f --- /dev/null +++ b/internal/useragent/useragent.go @@ -0,0 +1,40 @@ +package useragent + +import ( + "fmt" + "net/http" +) + +const defaultUserAgentFmt = "package-analysis (github.com/ossf/package-analysis%s)" + +type uaRoundTripper struct { + parent http.RoundTripper + userAgent string +} + +// RoundTrip implements the http.RoundTripper interface. +func (rt *uaRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + req.Header.Set("User-Agent", rt.userAgent) + return rt.parent.RoundTrip(req) +} + +// RoundTripper wraps parent with a RoundTripper that add a user-agent header +// with the contents of ua. +func RoundTripper(ua string, parent http.RoundTripper) http.RoundTripper { + return &uaRoundTripper{ + parent: parent, + userAgent: ua, + } +} + +// DefaultRoundTripper wraps parent with a RoundTripper that adds a default +// Package Analysis user-agent header. +// +// If supplied, extra information can be added to the user-agent, allowing the +// user-agent to be customized for production environments. +func DefaultRoundTripper(parent http.RoundTripper, extra string) http.RoundTripper { + if extra != "" { + extra = ", " + extra + } + return RoundTripper(fmt.Sprintf(defaultUserAgentFmt, extra), parent) +} diff --git a/internal/useragent/useragent_test.go b/internal/useragent/useragent_test.go new file mode 100644 index 00000000..7da30122 --- /dev/null +++ b/internal/useragent/useragent_test.go @@ -0,0 +1,78 @@ +package useragent_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/ossf/package-analysis/internal/useragent" +) + +func TestRoundTripper(t *testing.T) { + want := "test user agent string" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + got := r.Header.Get("user-agent") + if got != want { + t.Errorf("User Agent = %q, want %q", got, want) + } + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + c := http.Client{ + Transport: useragent.RoundTripper(want, http.DefaultTransport), + } + resp, err := c.Get(ts.URL) + if err != nil { + t.Fatalf("Get() = %v; want no error", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("Get() status = %v; want 200", resp.StatusCode) + } +} + +func TestDefaultRoundTripper(t *testing.T) { + want := "package-analysis (github.com/ossf/package-analysis, extra)" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + got := r.Header.Get("user-agent") + if got != want { + t.Errorf("User Agent = %q, want %q", got, want) + } + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + c := http.Client{ + Transport: useragent.DefaultRoundTripper(http.DefaultTransport, "extra"), + } + resp, err := c.Get(ts.URL) + if err != nil { + t.Fatalf("Get() = %v; want no error", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("Get() status = %v; want 200", resp.StatusCode) + } +} + +func TestDefaultRoundTripper_NoExtra(t *testing.T) { + want := "package-analysis (github.com/ossf/package-analysis)" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + got := r.Header.Get("user-agent") + if got != want { + t.Errorf("User Agent = %q, want %q", got, want) + } + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + c := http.Client{ + Transport: useragent.DefaultRoundTripper(http.DefaultTransport, ""), + } + resp, err := c.Get(ts.URL) + if err != nil { + t.Fatalf("Get() = %v; want no error", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("Get() status = %v; want 200", resp.StatusCode) + } +} diff --git a/sandboxes/staticanalysis/staticanalyze.go b/sandboxes/staticanalysis/staticanalyze.go index 354c8334..e96c1ac8 100644 --- a/sandboxes/staticanalysis/staticanalyze.go +++ b/sandboxes/staticanalysis/staticanalyze.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "log/slog" + "net/http" "os" "path/filepath" "time" @@ -15,6 +16,7 @@ import ( "github.com/ossf/package-analysis/internal/staticanalysis" "github.com/ossf/package-analysis/internal/staticanalysis/basicdata" "github.com/ossf/package-analysis/internal/staticanalysis/parsing" + "github.com/ossf/package-analysis/internal/useragent" "github.com/ossf/package-analysis/internal/utils" "github.com/ossf/package-analysis/internal/worker" "github.com/ossf/package-analysis/pkg/api/pkgecosystem" @@ -103,6 +105,9 @@ func run() (err error) { log.Initialize(os.Getenv("LOGGER_ENV")) + userAgentExtra := os.Getenv("OSSF_MALWARE_USER_AGENT_EXTRA") + http.DefaultTransport = useragent.DefaultRoundTripper(http.DefaultTransport, userAgentExtra) + flag.TextVar(&ecosystem, "ecosystem", pkgecosystem.None, fmt.Sprintf("package ecosystem. Can be %s (required)", pkgecosystem.SupportedEcosystemsStrings)) analyses.InitFlag() flag.Parse() @@ -144,7 +149,8 @@ func run() (err error) { slog.InfoContext(ctx, "Static analysis launched", "local_path", *localFile, "output_file", *output, - "analyses", analysisTasks) + "analyses", analysisTasks, + "user_agent_extra", userAgentExtra) workDirs, err := makeWorkDirs() if err != nil {