Skip to content

Commit

Permalink
✨ Upload cron results to a table with new format (#830)
Browse files Browse the repository at this point in the history
* add json2 function

* asJSON2

* url2

* draft

* root

* tables and bucket

* fix

* comments

* new transfer instances

* comments

* rename files

* update k8 names

* typo

* fizes

* linter
  • Loading branch information
laurentsimon committed Aug 16, 2021
1 parent b3a3f7e commit 23764f0
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 9 deletions.
1 change: 0 additions & 1 deletion checker/check_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func (r *Runner) Run(ctx context.Context, f CheckFn) CheckResult {
for _, d := range l.messages2 {
res.Details = append(res.Details, d.Msg)
}

if err := logStats(ctx, startTime, &res); err != nil {
panic(err)
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ or ./scorecard --{npm,pypi,rubgems}=<package_name> [--checks=check1,...] [--show
case formatCSV:
err = repoResult.AsCSV(showDetails, *logLevel, os.Stdout)
case formatJSON:
err = repoResult.AsJSON(showDetails, *logLevel, os.Stdout)
// UPGRADEv2: rename.
err = repoResult.AsJSON2(showDetails, *logLevel, os.Stdout)
default:
err = sce.Create(sce.ErrScorecardInternal,
fmt.Sprintf("invalid format flag: %v. Expected [default, csv, json]", format))
Expand Down Expand Up @@ -317,7 +318,7 @@ func init() {
"rubygems package to check, given that the rubygems package has a GitHub repository")
rootCmd.Flags().StringVar(&format, "format", formatDefault, "output format. allowed values are [default, csv, json]")
rootCmd.Flags().StringSliceVar(
&metaData, "metadata", []string{}, "metadata for the project.It can be multiple separated by commas")
&metaData, "metadata", []string{}, "metadata for the project. It can be multiple separated by commas")
rootCmd.Flags().BoolVar(&showDetails, "show-details", false, "show extra details about each check")
checkNames := []string{}
for checkName := range checks.AllChecks {
Expand Down
19 changes: 19 additions & 0 deletions cron/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ const (
shardSize string = "SCORECARD_SHARD_SIZE"
webhookURL string = "SCORECARD_WEBHOOK_URL"
metricExporter string = "SCORECARD_METRIC_EXPORTER"

bigqueryTableV2 string = "SCORECARD_BIGQUERY_TABLEV2"
resultDataBucketURLV2 string = "SCORECARD_DATA_BUCKET_URLV2"
)

var (
Expand All @@ -55,6 +58,7 @@ var (
configYAML []byte
)

//nolint
type config struct {
ProjectID string `yaml:"project-id"`
ResultDataBucketURL string `yaml:"result-data-bucket-url"`
Expand All @@ -65,6 +69,9 @@ type config struct {
WebhookURL string `yaml:"webhook-url"`
MetricExporter string `yaml:"metric-exporter"`
ShardSize int `yaml:"shard-size"`
// UPGRADEv2: to remove.
ResultDataBucketURLV2 string `yaml:"result-data-bucket-url-v2"`
BigQueryTableV2 string `yaml:"bigquery-table-v2"`
}

func getParsedConfigFromFile(byteValue []byte) (config, error) {
Expand Down Expand Up @@ -147,6 +154,18 @@ func GetBigQueryTable() (string, error) {
return getStringConfigValue(bigqueryTable, configYAML, "BigQueryTable", "bigquery-table")
}

// GetBigQueryTableV2 returns the table name to transfer cron job results.
// UPGRADEv2: to remove.
func GetBigQueryTableV2() (string, error) {
return getStringConfigValue(bigqueryTableV2, configYAML, "BigQueryTableV2", "bigquery-table-v2")
}

// GetResultDataBucketURLV2 returns the bucketURL for storing cron job results.
// UPGRADEv2: to remove.
func GetResultDataBucketURLV2() (string, error) {
return getStringConfigValue(resultDataBucketURLV2, configYAML, "ResultDataBucketURLV2", "result-data-bucket-url-v2")
}

// GetShardSize returns the shard_size for the cron job.
func GetShardSize() (int, error) {
return getIntConfigValue(shardSize, configYAML, "ShardSize", "shard-size")
Expand Down
3 changes: 3 additions & 0 deletions cron/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ bigquery-table: scorecard
shard-size: 10
webhook-url:
metric-exporter: stackdriver
# UPGRADEv2: to remove.
result-data-bucket-url-v2: gs://ossf-scorecard-data2
bigquery-table-v2: scorecard2
6 changes: 6 additions & 0 deletions cron/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const (
prodWebhookURL = ""
prodShardSize int = 10
prodMetricExporter string = "stackdriver"
// UPGRADEv2: to remove.
prodBucketV2 = "gs://ossf-scorecard-data2"
prodBigQueryTableV2 = "scorecard2"
)

func getByteValueFromFile(filename string) ([]byte, error) {
Expand Down Expand Up @@ -62,6 +65,9 @@ func TestYAMLParsing(t *testing.T) {
WebhookURL: prodWebhookURL,
ShardSize: prodShardSize,
MetricExporter: prodMetricExporter,
// UPGRADEv2: to remove.
ResultDataBucketURLV2: prodBucketV2,
BigQueryTableV2: prodBigQueryTableV2,
},
},

Expand Down
16 changes: 16 additions & 0 deletions cron/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ func main() {
panic(err)
}

bucket2, err := config.GetResultDataBucketURLV2()
if err != nil {
panic(err)
}

shardNum, err := publishToRepoRequestTopic(ctx, reader, t)
if err != nil {
panic(err)
Expand All @@ -127,4 +132,15 @@ func main() {
if err != nil {
panic(fmt.Errorf("error writing to BlobStore: %w", err))
}

// UPGRADEv2: to remove.
*metadata.ShardLoc = bucket2 + "/" + data.GetBlobFilename("", t)
metadataJSON, err = protojson.Marshal(&metadata)
if err != nil {
panic(fmt.Errorf("error during protojson.Marshal2: %w", err))
}
err = data.WriteToBlobStore(ctx, bucket2, data.GetShardMetadataFilename(t), metadataJSON)
if err != nil {
panic(fmt.Errorf("error writing to BlobStore2: %w", err))
}
}
3 changes: 3 additions & 0 deletions cron/k8s/controller.release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ spec:
value: "gcppubsub://projects/openssf/topics/scorecard-batch-requests-releasetest"
- name: SCORECARD_DATA_BUCKET_URL
value: "gs://ossf-scorecard-data-releasetest"
# UPGRADEv2: to remove.
- name: SCORECARD_DATA_BUCKET_URLV2
value: "gs://ossf-scorecard-data-releasetest2"
- name: SCORECARD_SHARD_SIZE
value: "5"
resources:
Expand Down
41 changes: 41 additions & 0 deletions cron/k8s/transfer-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2021 Security Scorecard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: scorecard-bq-transfer
spec:
# At 02:00UTC on Monday and Thursday.
schedule: "0 2 * * 1,4"
concurrencyPolicy: "Forbid"
jobTemplate:
spec:
template:
spec:
containers:
- name: bq-transfer-v2
image: gcr.io/openssf/scorecard-bq-transfer:latest
imagePullPolicy: Always
resources:
limits:
memory: 1Gi
requests:
memory: 1Gi
env:
- name: SCORECARD_BIGQUERY_TABLE
value: "scorecard2"
- name: SCORECARD_DATA_BUCKET_URL
value: "gs://ossf-scorecard-data2"
restartPolicy: OnFailure
41 changes: 41 additions & 0 deletions cron/k8s/transfer.release-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2021 Security Scorecard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: scorecard-bq-transfer-releasetest
spec:
# Daily at 9am and 6pm PST.
schedule: "0 1,16 * * *"
concurrencyPolicy: "Forbid"
jobTemplate:
spec:
template:
spec:
restartPolicy: OnFailure
containers:
- name: bq-transfer-releasetest-v2
image: gcr.io/openssf/scorecard-bq-transfer:latest
imagePullPolicy: Always
env:
- name: SCORECARD_DATA_BUCKET_URL
value: "gs://ossf-scorecard-data-releasetest2"
- name: SCORECARD_BIGQUERY_TABLE
value: "scorecard_releasetest2"
resources:
limits:
memory: 1Gi
requests:
memory: 1Gi
3 changes: 3 additions & 0 deletions cron/k8s/worker.release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ spec:
env:
- name: SCORECARD_DATA_BUCKET_URL
value: "gs://ossf-scorecard-data-releasetest"
# UPGRADEv2: to remove.
- name: SCORECARD_DATA_BUCKET_URLV2
value: "gs://ossf-scorecard-data-releasetest2"
- name: SCORECARD_REQUEST_SUBSCRIPTION_URL
value: "gcppubsub://projects/openssf/subscriptions/scorecard-batch-worker-releasetest"
- name: SCORECARD_METRIC_EXPORTER
Expand Down
29 changes: 25 additions & 4 deletions cron/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,24 @@ var (
)

func processRequest(ctx context.Context,
batchRequest *data.ScorecardBatchRequest, checksToRun checker.CheckNameToFnMap, bucketURL string,
batchRequest *data.ScorecardBatchRequest, checksToRun checker.CheckNameToFnMap,
bucketURL, bucketURL2 string,
repoClient clients.RepoClient,
httpClient *http.Client, githubClient *github.Client, graphClient *githubv4.Client) error {
filename := data.GetBlobFilename(
fmt.Sprintf("shard-%05d", batchRequest.GetShardNum()),
batchRequest.GetJobTime().AsTime())
// Sanity check - make sure we are not re-processing an already processed request.
exists, err := data.BlobExists(ctx, bucketURL, filename)
exists1, err := data.BlobExists(ctx, bucketURL, filename)
if err != nil {
return fmt.Errorf("error during BlobExists: %w", err)
}
if exists {

exists2, err := data.BlobExists(ctx, bucketURL2, filename)
if err != nil {
return fmt.Errorf("error during BlobExists: %w", err)
}
if exists1 && exists2 {
log.Printf("Already processed shard %s. Nothing to do.", filename)
// We have already processed this request, nothing to do.
return nil
Expand All @@ -84,6 +90,7 @@ func processRequest(ctx context.Context,
}

var buffer bytes.Buffer
var buffer2 bytes.Buffer
// TODO: run Scorecard for each repo in a separate thread.
for _, repoURL := range repoURLs {
log.Printf("Running Scorecard for repo: %s", repoURL.URL())
Expand Down Expand Up @@ -111,10 +118,19 @@ func processRequest(ctx context.Context,
if err := result.AsJSON(true /*showDetails*/, zapcore.InfoLevel, &buffer); err != nil {
return fmt.Errorf("error during result.AsJSON: %w", err)
}

if err := result.AsJSON2(true /*showDetails*/, zapcore.InfoLevel, &buffer2); err != nil {
return fmt.Errorf("error during result.AsJSON2: %w", err)
}
}
if err := data.WriteToBlobStore(ctx, bucketURL, filename, buffer.Bytes()); err != nil {
return fmt.Errorf("error during WriteToBlobStore: %w", err)
}

if err := data.WriteToBlobStore(ctx, bucketURL2, filename, buffer2.Bytes()); err != nil {
return fmt.Errorf("error during WriteToBlobStore2: %w", err)
}

log.Printf("Write to shard file successful: %s", filename)

return nil
Expand Down Expand Up @@ -181,6 +197,11 @@ func main() {
panic(err)
}

bucketURL2, err := config.GetResultDataBucketURLV2()
if err != nil {
panic(err)
}

repoClient, httpClient, githubClient, graphClient, logger := createNetClients(ctx)
defer repoClient.Close()

Expand Down Expand Up @@ -212,7 +233,7 @@ func main() {
log.Print("subscription returned nil message during Receive, exiting")
break
}
if err := processRequest(ctx, req, checksToRun, bucketURL,
if err := processRequest(ctx, req, checksToRun, bucketURL, bucketURL2,
repoClient, httpClient, githubClient, graphClient); err != nil {
log.Printf("error processing request: %v", err)
// Nack the message so that another worker can retry.
Expand Down
51 changes: 49 additions & 2 deletions pkg/scorecard_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,24 @@ type ScorecardResult struct {
Metadata []string
}

//nolint
type jsonCheckResultV2 struct {
Details []string
Score int
Reason string
Name string
}

type jsonScorecardResultV2 struct {
Repo string
Date string
Checks []jsonCheckResultV2
Metadata []string
}

// AsJSON outputs the result in JSON format with a newline at the end.
// If called on []ScorecardResult will create NDJson formatted output.
// UPGRADEv2: will be removed.
func (r *ScorecardResult) AsJSON(showDetails bool, logLevel zapcore.Level, writer io.Writer) error {
encoder := json.NewEncoder(writer)
if showDetails {
Expand All @@ -54,7 +70,7 @@ func (r *ScorecardResult) AsJSON(showDetails bool, logLevel zapcore.Level, write
Date: r.Date,
Metadata: r.Metadata,
}
// UPGRADEv2: remove nolint after uggrade.
// UPGRADEv2: remove nolint after ugrade.
//nolint
for _, checkResult := range r.Checks {
tmpResult := checker.CheckResult{
Expand All @@ -71,12 +87,43 @@ func (r *ScorecardResult) AsJSON(showDetails bool, logLevel zapcore.Level, write
return nil
}

// AsJSON2 is expoting results as JSON for new detail format.
func (r *ScorecardResult) AsJSON2(showDetails bool, logLevel zapcore.Level, writer io.Writer) error {
encoder := json.NewEncoder(writer)

out := jsonScorecardResultV2{
Repo: r.Repo,
Date: r.Date,
Metadata: r.Metadata,
}

//nolint
for _, checkResult := range r.Checks {
tmpResult := jsonCheckResultV2{
Name: checkResult.Name,
Reason: checkResult.Reason,
Score: checkResult.Score,
}
if showDetails {
for _, d := range checkResult.Details2 {
tmpResult.Details = append(tmpResult.Details, d.Msg)
}
}
out.Checks = append(out.Checks, tmpResult)
}
if err := encoder.Encode(out); err != nil {
//nolint:wrapcheck
return sce.Create(sce.ErrScorecardInternal, fmt.Sprintf("encoder.Encode: %v", err))
}
return nil
}

// AsCSV outputs ScorecardResult in CSV format.
func (r *ScorecardResult) AsCSV(showDetails bool, logLevel zapcore.Level, writer io.Writer) error {
w := csv.NewWriter(writer)
record := []string{r.Repo}
columns := []string{"Repository"}
// UPGRADEv2: remove nolint after uggrade.
// UPGRADEv2: remove nolint after ugrade.
//nolint
for _, checkResult := range r.Checks {
columns = append(columns, checkResult.Name+"_Pass", checkResult.Name+"_Confidence")
Expand Down

0 comments on commit 23764f0

Please sign in to comment.