Skip to content

Commit

Permalink
feat: Support publishing new log entries to Pub/Sub topics (#1580)
Browse files Browse the repository at this point in the history
Adds initial support publishing new log entries to Pub/Sub topics. Interested
parties can subscribe to the topic in order to receive notifications when new
entries are added.

Signed-off-by: James Alseth <james@jalseth.me>
  • Loading branch information
jalseth committed Aug 28, 2023
1 parent 45bbaf0 commit a49cd04
Show file tree
Hide file tree
Showing 22 changed files with 1,099 additions and 33 deletions.
3 changes: 3 additions & 0 deletions Dockerfile.pubsub-emulator
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# gcloud sdk for pubsub emulator with netcat added for the startup health check
FROM google/cloud-sdk:443.0.0@sha256:a59335d227a98b41ecdf6ff3d69923b8aef0703694e58992ecb75c7147140d37
RUN apt-get install -y netcat
4 changes: 4 additions & 0 deletions cmd/rekor-server/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func init() {
Memory and file-based signers should only be used for testing.`)
rootCmd.PersistentFlags().String("rekor_server.signer-passwd", "", "Password to decrypt signer private key")

rootCmd.PersistentFlags().String("rekor_server.new_entry_publisher", "", "URL for pub/sub queue to send messages to when new entries are added to the log. Ignored if not set. Supported providers: [gcppubsub]")
rootCmd.PersistentFlags().Bool("rekor_server.publish_events_protobuf", false, "Whether to publish events in Protobuf wire format. Applies to all enabled event types.")
rootCmd.PersistentFlags().Bool("rekor_server.publish_events_json", false, "Whether to publish events in CloudEvents JSON format. Applies to all enabled event types.")

rootCmd.PersistentFlags().Uint16("port", 3000, "Port to bind to")

rootCmd.PersistentFlags().Bool("enable_retrieve_api", true, "enables Redis-based index API endpoint")
Expand Down
3 changes: 0 additions & 3 deletions cmd/rekor-server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ var serveCmd = &cobra.Command{
Short: "start http server with configured api",
Long: `Starts a http server and serves the configured api`,
Run: func(cmd *cobra.Command, args []string) {

// Setup the logger to dev/prod
log.ConfigureLogger(viper.GetString("log_type"))

Expand All @@ -83,7 +82,6 @@ var serveCmd = &cobra.Command{
log.Logger.Error(err)
}
}()

//TODO: make this a config option for server to load via viper field
//TODO: add command line option to print versions supported in binary

Expand All @@ -101,7 +99,6 @@ var serveCmd = &cobra.Command{
hashedrekord.KIND: {hashedrekord_v001.APIVERSION},
dsse.KIND: {dsse_v001.APIVERSION},
}

for k, v := range pluggableTypeMap {
log.Logger.Infof("Loading support for pluggable type '%v'", k)
log.Logger.Infof("Loading version '%v' for pluggable type '%v'", v, k)
Expand Down
25 changes: 24 additions & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ services:
context: .
target: "test"
environment:
- TMPDIR=/var/run/attestations # workaround for https://github.com/google/go-cloud/issues/3294
TMPDIR: /var/run/attestations # workaround for https://github.com/google/go-cloud/issues/3294
PUBSUB_EMULATOR_HOST: gcp-pubsub-emulator:8085
command: [
"rekor-server",
"-test.coverprofile=rekor-server.cov",
Expand All @@ -34,7 +35,29 @@ services:
"--enable_attestation_storage",
"--attestation_storage_bucket=file:///var/run/attestations",
"--max_request_body_size=32792576",
"--rekor_server.new_entry_publisher=gcppubsub://projects/test-project/topics/new-entry",
"--rekor_server.publish_events_json=true",
]
ports:
- "3000:3000"
- "2112:2112"
depends_on:
- gcp-pubsub-emulator
gcp-pubsub-emulator:
image: gcp-pubsub-emulator
ports:
- "8085:8085"
command:
- gcloud
- beta
- emulators
- pubsub
- start
- --host-port=0.0.0.0:8085
- --project=test-project
healthcheck:
test: ["CMD", "nc", "-zv", "localhost", "8085"]
interval: 10s
timeout: 3s
retries: 3
start_period: 10s
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,3 @@ services:
timeout: 3s
retries: 3
start_period: 5s

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
)

require (
cloud.google.com/go/pubsub v1.33.0
github.com/AdamKorcz/go-fuzz-headers-1 v0.0.0-20230618160516-e936619f9f18
github.com/cyberphone/json-canonicalization v0.0.0-20220623050100-57a0ce2678a7
github.com/go-redis/redismock/v9 v9.0.3
Expand Down Expand Up @@ -183,7 +184,7 @@ require (
golang.org/x/term v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.135.0 // indirect
google.golang.org/api v0.135.0
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.33.0 h1:6SPCPvWav64tj0sVX/+npCBKhUi/UjJehy9op/V3p2g=
cloud.google.com/go/pubsub v1.33.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
Expand Down
24 changes: 24 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/sigstore/rekor/pkg/log"
"github.com/sigstore/rekor/pkg/pubsub"
"github.com/sigstore/rekor/pkg/sharding"
"github.com/sigstore/rekor/pkg/signer"
"github.com/sigstore/rekor/pkg/storage"
Expand All @@ -39,6 +40,8 @@ import (
"github.com/sigstore/sigstore/pkg/cryptoutils"
"github.com/sigstore/sigstore/pkg/signature"
"github.com/sigstore/sigstore/pkg/signature/options"

_ "github.com/sigstore/rekor/pkg/pubsub/gcp" // Load GCP pubsub implementation
)

func dial(ctx context.Context, rpcServer string) (*grpc.ClientConn, error) {
Expand All @@ -63,6 +66,9 @@ type API struct {
signer signature.Signer
// stops checkpoint publishing
checkpointPublishCancel context.CancelFunc
// Publishes notifications when new entries are added to the log. May be
// nil if no publisher is configured.
newEntryPublisher pubsub.Publisher
}

func NewAPI(treeID uint) (*API, error) {
Expand Down Expand Up @@ -112,6 +118,18 @@ func NewAPI(treeID uint) (*API, error) {

pubkey := cryptoutils.PEMEncode(cryptoutils.PublicKeyPEMType, b)

var newEntryPublisher pubsub.Publisher
if p := viper.GetString("rekor_server.new_entry_publisher"); p != "" {
if !viper.GetBool("rekor_server.publish_events_protobuf") && !viper.GetBool("rekor_server.publish_events_json") {
return nil, fmt.Errorf("%q is configured but neither %q or %q are enabled", "new_entry_publisher", "publish_events_protobuf", "publish_events_json")
}
newEntryPublisher, err = pubsub.Get(ctx, p)
if err != nil {
return nil, fmt.Errorf("init event publisher: %w", err)
}
log.ContextLogger(ctx).Infof("Initialized new entry event publisher: %s", p)
}

return &API{
// Transparency Log Stuff
logClient: logClient,
Expand All @@ -121,6 +139,8 @@ func NewAPI(treeID uint) (*API, error) {
pubkey: string(pubkey),
pubkeyHash: hex.EncodeToString(pubkeyHashBytes[:]),
signer: rekorSigner,
// Utility functionality not required for operation of the core service
newEntryPublisher: newEntryPublisher,
}, nil
}

Expand Down Expand Up @@ -166,4 +186,8 @@ func ConfigureAPI(treeID uint) {

func StopAPI() {
api.checkpointPublishCancel()

if api.newEntryPublisher != nil {
api.newEntryPublisher.Close()
}
}
63 changes: 62 additions & 1 deletion pkg/api/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ import (
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes"

"github.com/sigstore/rekor/pkg/events"
"github.com/sigstore/rekor/pkg/events/newentry"
"github.com/sigstore/rekor/pkg/generated/models"
"github.com/sigstore/rekor/pkg/generated/restapi/operations/entries"
"github.com/sigstore/rekor/pkg/log"
"github.com/sigstore/rekor/pkg/pubsub"
"github.com/sigstore/rekor/pkg/sharding"
"github.com/sigstore/rekor/pkg/tle"
"github.com/sigstore/rekor/pkg/trillianclient"
"github.com/sigstore/rekor/pkg/types"
"github.com/sigstore/rekor/pkg/util"
Expand Down Expand Up @@ -290,7 +294,7 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
RootHash: swag.String(hex.EncodeToString(root.RootHash)),
LogIndex: swag.Int64(queuedLeaf.LeafIndex),
Hashes: hashes,
Checkpoint: stringPointer(string(scBytes)),
Checkpoint: swag.String(string(scBytes)),
}

logEntryAnon.Verification = &models.LogEntryAnonVerification{
Expand All @@ -301,9 +305,66 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
logEntry := models.LogEntry{
entryID: logEntryAnon,
}

if api.newEntryPublisher != nil {
// Publishing notifications should not block the API response.
go func() {
verifiers, err := entry.Verifiers()
if err != nil {
incPublishEvent(newentry.Name, "", false)
log.ContextLogger(ctx).Errorf("Could not get verifiers for log entry %s: %v", entryID, err)
return
}
var subjects []string
for _, v := range verifiers {
subjects = append(subjects, v.Subjects()...)
}

pbEntry, err := tle.GenerateTransparencyLogEntry(logEntryAnon)
if err != nil {
incPublishEvent(newentry.Name, "", false)
log.ContextLogger(ctx).Error(err)
return
}
event, err := newentry.New(entryID, pbEntry, subjects)
if err != nil {
incPublishEvent(newentry.Name, "", false)
log.ContextLogger(ctx).Error(err)
return
}
if viper.GetBool("rekor_server.publish_events_protobuf") {
go publishEvent(ctx, api.newEntryPublisher, event, events.ContentTypeProtobuf)
}
if viper.GetBool("rekor_server.publish_events_json") {
go publishEvent(ctx, api.newEntryPublisher, event, events.ContentTypeJSON)
}
}()
}

return logEntry, nil
}

func publishEvent(ctx context.Context, publisher pubsub.Publisher, event *events.Event, contentType events.EventContentType) {
err := publisher.Publish(context.WithoutCancel(ctx), event, contentType)
incPublishEvent(event.Type().Name(), contentType, err == nil)
if err != nil {
log.ContextLogger(ctx).Error(err)
}
}

func incPublishEvent(event string, contentType events.EventContentType, ok bool) {
status := "SUCCESS"
if !ok {
status = "ERROR"
}
labels := map[string]string{
"event": event,
"status": status,
"content_type": string(contentType),
}
metricPublishEvents.With(labels).Inc()
}

// CreateLogEntryHandler creates new entry into log
func CreateLogEntryHandler(params entries.CreateLogEntryParams) middleware.Responder {
httpReq := params.HTTPRequest
Expand Down
5 changes: 5 additions & 0 deletions pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ var (
Help: "The total number of new log entries",
})

metricPublishEvents = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "rekor_publish_events",
Help: "The status of publishing events to Pub/Sub",
}, []string{"event", "content_type", "status"})

MetricLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "rekor_api_latency",
Help: "Api Latency on calls",
Expand Down
19 changes: 19 additions & 0 deletions pkg/events/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2023 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package events provides methods for working with CloudEvents.
package events

// The version of the CloudEvents specification the package adheres to.
const CloudEventsSpecVersion = "1.0"
Loading

0 comments on commit a49cd04

Please sign in to comment.