Skip to content

Commit

Permalink
add mysql indexstorage backend
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Callaway <bcallaway@google.com>
  • Loading branch information
bobcallaway authored and cpanato committed Nov 27, 2023
1 parent 0394bf7 commit 014cfb1
Show file tree
Hide file tree
Showing 15 changed files with 363 additions and 39 deletions.
8 changes: 7 additions & 1 deletion cmd/rekor-server/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Memory and file-based signers should only be used for testing.`)
rootCmd.PersistentFlags().Bool("enable_retrieve_api", true, "enables Redis-based index API endpoint")
_ = rootCmd.PersistentFlags().MarkDeprecated("enable_retrieve_api", "this flag is deprecated in favor of enabled_api_endpoints (searchIndex)")
rootCmd.PersistentFlags().String("search_index.storage_provider", "redis",
`Index Storage provider to use. Valid options are: [redis].`)
`Index Storage provider to use. Valid options are: [redis, mysql].`)
rootCmd.PersistentFlags().String("redis_server.address", "127.0.0.1", "Redis server address")
rootCmd.PersistentFlags().Uint16("redis_server.port", 6379, "Redis server port")
rootCmd.PersistentFlags().String("redis_server.password", "", "Redis server password")
Expand All @@ -125,6 +125,12 @@ Memory and file-based signers should only be used for testing.`)
rootCmd.PersistentFlags().Uint64("max_jar_metadata_size", 1048576, "maximum permitted size for jar META-INF/ files, in bytes; set to 0 for unlimited")
rootCmd.PersistentFlags().Uint64("max_apk_metadata_size", 1048576, "maximum permitted size for apk .SIGN and .PKGINFO files, in bytes; set to 0 for unlimited")

rootCmd.PersistentFlags().String("search_index.mysql.dsn", "", "DSN for index storage using MySQL")
rootCmd.PersistentFlags().Duration("search_index.mysql.conn_max_idletime", 0*time.Second, "maximum connection idle time")
rootCmd.PersistentFlags().Duration("search_index.mysql.conn_max_lifetime", 0*time.Second, "maximum connection lifetime")
rootCmd.PersistentFlags().Int("search_index.mysql.max_open_connections", 0, "maximum open connections")
rootCmd.PersistentFlags().Int("search_index.mysql.max_idle_connections", 0, "maximum idle connections")

if err := viper.BindPFlags(rootCmd.PersistentFlags()); err != nil {
log.Logger.Fatal(err)
}
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ services:
"--max_request_body_size=32792576",
"--rekor_server.new_entry_publisher=gcppubsub://projects/test-project/topics/new-entry",
"--rekor_server.publish_events_json=true",
"--search_index.storage_provider=mysql",
"--search_index.mysql.dsn=test:zaphod@tcp(mysql:3306)/test",
]
ports:
- "3000:3000"
- "2112:2112"
depends_on:
- gcp-pubsub-emulator
- mysql
gcp-pubsub-emulator:
image: gcp-pubsub-emulator
ports:
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ services:
"--enable_attestation_storage",
"--attestation_storage_bucket=file:///var/run/attestations",
"--enable_stable_checkpoint",
"--search_index.storage_provider=mysql",
"--search_index.mysql.dsn=test:zaphod@tcp(mysql:3306)/test",
# Uncomment this for production logging
# "--log_type=prod",
]
Expand Down
2 changes: 1 addition & 1 deletion e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ set -e
testdir=$(dirname "$0")

docker_compose="docker compose -f docker-compose.yml -f docker-compose.test.yml"
if ! ${docker_compose} version 2&>1 >/dev/null; then
if ! ${docker_compose} version >/dev/null 2>&1; then
docker_compose="docker-compose -f docker-compose.yml -f docker-compose.test.yml"
fi

Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ require (
cloud.google.com/go/profiler v0.4.0
cloud.google.com/go/pubsub v1.33.0
github.com/AdamKorcz/go-fuzz-headers-1 v0.0.0-20230618160516-e936619f9f18
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/cyberphone/json-canonicalization v0.0.0-20220623050100-57a0ce2678a7
github.com/go-redis/redismock/v9 v9.2.0
github.com/go-sql-driver/mysql v1.7.1
github.com/golang/mock v1.6.0
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/hashicorp/go-retryablehttp v0.7.5
github.com/jmoiron/sqlx v1.3.5
github.com/redis/go-redis/v9 v9.3.0
github.com/sassoftware/relic/v7 v7.6.1
github.com/sigstore/protobuf-specs v0.2.1
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 h1:WpB/QDNLpMw
github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVKJUX0=
github.com/alessio/shellescape v1.4.1/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
Expand Down Expand Up @@ -244,6 +246,7 @@ github.com/go-openapi/validate v0.22.2 h1:Lda8nadL/5kIvS5mdXCAIuZ7IVXvKFIppLnw+E
github.com/go-openapi/validate v0.22.2/go.mod h1:kVxh31KbfsxU8ZyoHaDbLBWU5CnMdqBUEtadQ2G4d5M=
github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw=
github.com/go-redis/redismock/v9 v9.2.0/go.mod h1:18KHfGDK4Y6c2R0H38EUGWAdc7ZQS9gfYxc94k7rWT0=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
Expand Down Expand Up @@ -400,6 +403,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmhodges/clock v0.0.0-20160418191101-880ee4c33548 h1:dYTbLf4m0a5u0KLmPfB6mgxbcV7588bOCx79hxa5Sr4=
github.com/jmhodges/clock v0.0.0-20160418191101-880ee4c33548/go.mod h1:hGT6jSUVzF6no3QaDSMLGLEHtHSBSefs+MgcDWnmhmo=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
Expand All @@ -423,6 +428,7 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/letsencrypt/boulder v0.0.0-20221109233200-85aa52084eaf h1:ndns1qx/5dL43g16EQkPV/i8+b3l5bYQwLeoSBe7tS8=
github.com/letsencrypt/boulder v0.0.0-20221109233200-85aa52084eaf/go.mod h1:aGkAgvWY/IUcVFfuly53REpfv5edu25oij+qHRFaraA=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
Expand All @@ -438,6 +444,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
Expand Down
6 changes: 6 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,10 @@ func StopAPI() {
if api.newEntryPublisher != nil {
api.newEntryPublisher.Close()
}

if indexStorageClient != nil {
if err := indexStorageClient.Shutdown(); err != nil {
log.Logger.Errorf("shutting down indexStorageClient: %v", err)
}
}
}
6 changes: 2 additions & 4 deletions pkg/api/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,8 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
log.ContextLogger(ctx).Errorf("getting entry index keys: %v", err)
return
}
for _, key := range keys {
if err := addToIndex(context.Background(), key, entryID); err != nil {
log.ContextLogger(ctx).Errorf("adding keys to index: %v", err)
}
if err := addToIndex(context.Background(), keys, entryID); err != nil {
log.ContextLogger(ctx).Errorf("adding keys to index: %v", err)
}
}()
}
Expand Down
47 changes: 35 additions & 12 deletions pkg/api/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,20 @@ func SearchIndexHandler(params index.SearchIndexParams) middleware.Responder {
}
var result = NewCollection(queryOperator)

var lookupKeys []string

if params.Query.Hash != "" {
// This must be a valid hash
sha := util.PrefixSHA(params.Query.Hash)
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(sha))
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult)
sha := strings.ToLower(util.PrefixSHA(params.Query.Hash))
if queryOperator == "or" {
lookupKeys = append(lookupKeys, sha)
} else {
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, []string{sha})
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult)
}
result.Add(resultUUIDs)
}
result.Add(resultUUIDs)
}
if params.Query.PublicKey != nil {
af, err := pki.NewArtifactFactory(pki.Format(swag.StringValue(params.Query.PublicKey.Format)))
Expand All @@ -72,14 +78,31 @@ func SearchIndexHandler(params index.SearchIndexParams) middleware.Responder {
}

keyHash := sha256.Sum256(canonicalKey)
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(hex.EncodeToString(keyHash[:])))
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult)
keyHashStr := strings.ToLower(hex.EncodeToString(keyHash[:]))
if queryOperator == "or" {
lookupKeys = append(lookupKeys, keyHashStr)
} else {
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, []string{keyHashStr})
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult)
}
result.Add(resultUUIDs)
}
result.Add(resultUUIDs)
}
if params.Query.Email != "" {
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(params.Query.Email.String()))
emailStr := strings.ToLower(params.Query.Email.String())
if queryOperator == "or" {
lookupKeys = append(lookupKeys, emailStr)
} else {
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, []string{emailStr})
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult)
}
result.Add(resultUUIDs)
}
}
if len(lookupKeys) > 0 {
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, lookupKeys)
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult)
}
Expand All @@ -99,8 +122,8 @@ func SearchIndexNotImplementedHandler(_ index.SearchIndexParams) middleware.Resp

}

func addToIndex(ctx context.Context, key, value string) error {
err := indexStorageClient.WriteIndex(ctx, key, value)
func addToIndex(ctx context.Context, keys []string, value string) error {
err := indexStorageClient.WriteIndex(ctx, keys, value)
if err != nil {
return fmt.Errorf("redis client: %w", err)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/indexstorage/indexstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@ import (
"context"
"fmt"

"github.com/sigstore/rekor/pkg/indexstorage/mysql"
"github.com/sigstore/rekor/pkg/indexstorage/redis"
"github.com/spf13/viper"
)

type IndexStorage interface {
LookupIndices(context.Context, string) ([]string, error) // Returns indices for specified key
WriteIndex(context.Context, string, string) error // Writes index for specified key
LookupIndices(context.Context, []string) ([]string, error) // Returns indices for specified keys
WriteIndex(context.Context, []string, string) error // Writes index for specified keys
Shutdown() error // Method to run on shutdown
}

// NewIndexStorage instantiates a new IndexStorage provider based on the requested type
func NewIndexStorage(providerType string) (IndexStorage, error) {
switch providerType {
case redis.ProviderType:
return redis.NewProvider(viper.GetString("redis_server.address"), viper.GetString("redis_server.port"), viper.GetString("redis_server.password"))
case mysql.ProviderType:
return mysql.NewProvider(viper.GetString("search_index.mysql.dsn"))
default:
return nil, fmt.Errorf("invalid index storage provider type: %v", providerType)
}
Expand Down
134 changes: 134 additions & 0 deletions pkg/indexstorage/mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2023 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"context"
"errors"
"fmt"

"github.com/jmoiron/sqlx"
"github.com/sigstore/rekor/pkg/log"
"github.com/spf13/viper"

// this imports the mysql driver for go
_ "github.com/go-sql-driver/mysql"
)

const (
ProviderType = "mysql"

lookupStmt = "SELECT EntryUUIDs FROM EntryIndex WHERE EntryKey IN (?)"
writeStmt = "INSERT IGNORE INTO EntryIndex (EntryKey, EntryUUIDs) VALUES (:key, :uuid)"
createTableStmt = `CREATE TABLE IF NOT EXISTS EntryIndex (
PK BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
EntryKey varchar(512) NOT NULL,
EntryUUIDs char(80) NOT NULL,
PRIMARY KEY(PK),
UNIQUE(EntryKey, EntryUUIDs)
)`
)

// IndexStorageProvider implements indexstorage.IndexStorage
type IndexStorageProvider struct {
db *sqlx.DB
}

func NewProvider(dsn string) (*IndexStorageProvider, error) {
var err error
provider := &IndexStorageProvider{}
provider.db, err = sqlx.Open(ProviderType, dsn)
if err != nil {
return nil, err
}
if err = provider.db.Ping(); err != nil {
return nil, err
}

provider.db.SetConnMaxIdleTime(viper.GetDuration("search_index.mysql.conn_max_idletime"))
provider.db.SetConnMaxLifetime(viper.GetDuration("search_index.mysql.conn_max_lifetime"))
provider.db.SetMaxOpenConns(viper.GetInt("search_index.mysql.max_open_connections"))
provider.db.SetMaxIdleConns(viper.GetInt("search_index.mysql.max_idle_connections"))

if _, err := provider.db.Exec(createTableStmt); err != nil {
return nil, fmt.Errorf("create table if not exists failed: %w", err)
}

return provider, nil
}

// LookupIndices looks up and returns all indices for the specified keys. The key value(s) will be canonicalized
// by converting all characters into a lowercase value before looking up in Redis
func (isp *IndexStorageProvider) LookupIndices(ctx context.Context, keys []string) ([]string, error) {
if isp.db == nil {
return []string{}, errors.New("sql client has not been initialized")
}

query, args, err := sqlx.In(lookupStmt, keys)
if err != nil {
return []string{}, fmt.Errorf("error preparing statement: %w", err)
}
rows, err := isp.db.QueryContext(ctx, isp.db.Rebind(query), args...)
if err != nil {
return []string{}, fmt.Errorf("error looking up indices from mysql: %w", err)
}
defer rows.Close()

var entryUUIDs []string
for rows.Next() {
var result string
if err := rows.Scan(&result); err != nil {
return []string{}, fmt.Errorf("error parsing results from mysql: %w", err)
}
entryUUIDs = append(entryUUIDs, result)
}

if err := rows.Err(); err != nil {
return []string{}, fmt.Errorf("error processing results from mysql: %w", err)
}
return entryUUIDs, nil
}

// WriteIndex adds the index for the specified key. The key value will be canonicalized
// by converting all characters into a lowercase value before appending the index in Redis
func (isp *IndexStorageProvider) WriteIndex(ctx context.Context, keys []string, index string) error {
if isp.db == nil {
return errors.New("sql client has not been initialized")
}

var valueMaps []map[string]interface{}
for _, key := range keys {
valueMaps = append(valueMaps, map[string]interface{}{"key": key, "uuid": index})
}
result, err := isp.db.NamedExecContext(ctx, writeStmt, valueMaps)
if err != nil {
return fmt.Errorf("mysql write error: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("mysql error getting rowsAffected: %w", err)
}
log.ContextLogger(ctx).Debugf("WriteIndex affected %d rows", rowsAffected)
return nil
}

// Shutdown cleans up any client resources that may be held by the provider
func (isp *IndexStorageProvider) Shutdown() error {
if isp.db == nil {
return nil
}

return isp.db.Close()
}
Loading

0 comments on commit 014cfb1

Please sign in to comment.