Skip to content

Commit

Permalink
remove goroutine usage from SearchLogQuery (#1407)
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Callaway <bcallaway@google.com>
  • Loading branch information
bobcallaway committed Mar 28, 2023
1 parent abf7cee commit 01a2321
Showing 1 changed file with 35 additions and 76 deletions.
111 changes: 35 additions & 76 deletions pkg/api/entries.go
Expand Up @@ -33,7 +33,6 @@ import (
ttypes "github.com/google/trillian/types"
"github.com/spf13/viper"
"github.com/transparency-dev/merkle/rfc6962"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -358,8 +357,6 @@ func SearchLogQueryHandler(params entries.SearchLogQueryParams) middleware.Respo
}

if len(params.Entry.EntryUUIDs) > 0 || len(params.Entry.Entries()) > 0 {
g, _ := errgroup.WithContext(httpReqCtx)

var searchHashes [][]byte
for _, entryID := range params.Entry.EntryUUIDs {
// if we got this far, then entryID is either a 64 or 80 character hex string
Expand Down Expand Up @@ -389,67 +386,44 @@ func SearchLogQueryHandler(params entries.SearchLogQueryParams) middleware.Respo
}

entries := params.Entry.Entries()
searchHashesChan := make(chan []byte, len(entries))
for _, e := range entries {
e := e // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
entry, err := types.UnmarshalEntry(e)
if err != nil {
return fmt.Errorf("unmarshalling entry: %w", err)
}

leaf, err := types.CanonicalizeEntry(httpReqCtx, entry)
if err != nil {
return fmt.Errorf("canonicalizing entry: %w", err)
}
hasher := rfc6962.DefaultHasher
leafHash := hasher.HashLeaf(leaf)
searchHashesChan <- leafHash
return nil
})
}
entry, err := types.UnmarshalEntry(e)
if err != nil {
return handleRekorAPIError(params, http.StatusBadRequest, fmt.Errorf("unmarshalling entry: %w", err), err.Error())
}

if err := g.Wait(); err != nil {
return handleRekorAPIError(params, http.StatusBadRequest, err, err.Error())
}
close(searchHashesChan)
for hash := range searchHashesChan {
searchHashes = append(searchHashes, hash)
leaf, err := types.CanonicalizeEntry(httpReqCtx, entry)
if err != nil {
return handleRekorAPIError(params, http.StatusBadRequest, fmt.Errorf("canonicalizing entry: %w", err), err.Error())
}
hasher := rfc6962.DefaultHasher
leafHash := hasher.HashLeaf(leaf)
searchHashes = append(searchHashes, leafHash)
}

searchByHashResults := make([]map[int64]*trillian.GetEntryAndProofResponse, len(searchHashes))
g, _ = errgroup.WithContext(httpReqCtx)
for i, hash := range searchHashes {
i, hash := i, hash // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
var results map[int64]*trillian.GetEntryAndProofResponse
for _, shard := range api.logRanges.AllShards() {
tcs := NewTrillianClientFromTreeID(httpReqCtx, shard)
resp := tcs.getLeafAndProofByHash(hash)
switch resp.status {
case codes.OK:
leafResult := resp.getLeafAndProofResult
if leafResult != nil && leafResult.Leaf != nil {
if results == nil {
results = map[int64]*trillian.GetEntryAndProofResponse{}
}
results[shard] = resp.getLeafAndProofResult
var results map[int64]*trillian.GetEntryAndProofResponse
for _, shard := range api.logRanges.AllShards() {
tcs := NewTrillianClientFromTreeID(httpReqCtx, shard)
resp := tcs.getLeafAndProofByHash(hash)
switch resp.status {
case codes.OK:
leafResult := resp.getLeafAndProofResult
if leafResult != nil && leafResult.Leaf != nil {
if results == nil {
results = map[int64]*trillian.GetEntryAndProofResponse{}
}
case codes.NotFound:
// do nothing here, do not throw 404 error
continue
default:
log.ContextLogger(httpReqCtx).Errorf("error getLeafAndProofByHash(%s): code: %v, msg %v", hex.EncodeToString(hash), resp.status, resp.err)
return fmt.Errorf(trillianCommunicationError)
results[shard] = resp.getLeafAndProofResult
}
case codes.NotFound:
// do nothing here, do not throw 404 error
continue
default:
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("error getLeafAndProofByHash(%s): code: %v, msg %v", hex.EncodeToString(hash), resp.status, resp.err), trillianCommunicationError)
}
searchByHashResults[i] = results
return nil
})
}

if err := g.Wait(); err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, err, err.Error())
}
searchByHashResults[i] = results
}

for _, hashMap := range searchByHashResults {
Expand All @@ -468,28 +442,13 @@ func SearchLogQueryHandler(params entries.SearchLogQueryParams) middleware.Respo
}

if len(params.Entry.LogIndexes) > 0 {
g, _ := errgroup.WithContext(httpReqCtx)
resultPayloadChan := make(chan models.LogEntry, len(params.Entry.LogIndexes))

for _, logIndex := range params.Entry.LogIndexes {
logIndex := logIndex // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
logEntry, err := retrieveLogEntryByIndex(httpReqCtx, int(swag.Int64Value(logIndex)))
if err != nil && !errors.Is(err, ErrNotFound) {
return err
} else if err == nil {
resultPayloadChan <- logEntry
}
return nil
})
}

if err := g.Wait(); err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, err, err.Error())
}
close(resultPayloadChan)
for result := range resultPayloadChan {
resultPayload = append(resultPayload, result)
logEntry, err := retrieveLogEntryByIndex(httpReqCtx, int(swag.Int64Value(logIndex)))
if err != nil && !errors.Is(err, ErrNotFound) {
return handleRekorAPIError(params, http.StatusInternalServerError, err, err.Error())
} else if err == nil {
resultPayload = append(resultPayload, logEntry)
}
}
}

Expand Down

0 comments on commit 01a2321

Please sign in to comment.