Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add structured logging #53

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/send/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"hermetic/internal/common_flags"
"hermetic/internal/dps"
"hermetic/internal/teams"
"log/slog"

"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -59,7 +60,7 @@ func parseArgumentsAndCallSend(cmd *cobra.Command, args []string) error {
teamsErrorMessage := teams.CreateGeneralFailureMessage(err)
if err := teams.SendMessage(teamsErrorMessage, common_flags.TeamsWebhookNotificationUrl); err != nil {
err = fmt.Errorf("failed to send error message to Teams, cause: `%w`", err)
fmt.Printf("%s\n", err)
slog.Error(err.Error())
}
return err
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/verify/confirm/confirm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"hermetic/internal/common_flags"
"hermetic/internal/teams"
confirmImplementation "hermetic/internal/verify/confirm"
"log/slog"
"os"
"os/signal"
)
Expand Down Expand Up @@ -57,11 +58,11 @@ func parseArgumentsAndReadConfirmTopic(cmd *cobra.Command, args []string) error
err = confirmImplementation.ReadConfirmTopic(ctx, reader, confirmMessageReceiverUrl)
if err != nil {
err = fmt.Errorf("verification error, cause: `%w`", err)
fmt.Printf("Sending error message to Teams\n")
slog.Info("Sending error message to Teams")
teamsErrorMessage := teams.CreateGeneralFailureMessage(err)
if err := teams.SendMessage(teamsErrorMessage, common_flags.TeamsWebhookNotificationUrl); err != nil {
err = fmt.Errorf("failed to send error message to Teams, cause: `%w`", err)
fmt.Printf("%s\n", err)
slog.Error(err.Error())
}
return err
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/verify/reject/reject.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"hermetic/internal/common_flags"
"hermetic/internal/teams"
rejectImplementation "hermetic/internal/verify/reject"
"log/slog"
"os"
"os/signal"
)
Expand Down Expand Up @@ -45,11 +46,11 @@ func parseArgumentsAndReadRejectTopic(cmd *cobra.Command, args []string) error {
err = rejectImplementation.ReadRejectTopic(ctx, reader, common_flags.TeamsWebhookNotificationUrl)
if err != nil {
err = fmt.Errorf("verification error, cause: `%w`", err)
fmt.Printf("Sending error message to Teams\n")
slog.Info("Sending error message to Teams")
teamsErrorMessage := teams.CreateGeneralFailureMessage(err)
if err := teams.SendMessage(teamsErrorMessage, common_flags.TeamsWebhookNotificationUrl); err != nil {
err = fmt.Errorf("failed to send error message to Teams, cause: `%w`", err)
fmt.Printf("%s\n", err)
slog.Error(err.Error())
}
return err
}
Expand Down
7 changes: 4 additions & 3 deletions internal/dps/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"errors"
"fmt"
"github.com/segmentio/kafka-go"
"log/slog"
)

func ReadMessages(ctx context.Context, reader *kafka.Reader) (*KafkaResponse, error) {
for {
fmt.Println("Reading next message...")
slog.Info("Reading next message...")
message, err := reader.ReadMessage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to read message: %w", err)
Expand All @@ -22,14 +23,14 @@ func ReadMessages(ctx context.Context, reader *kafka.Reader) (*KafkaResponse, er
if err != nil {
syntaxError := new(json.SyntaxError)
if errors.As(err, &syntaxError) {
fmt.Printf("Could not read message at offset '%d', syntax error in message, skipping offset\n", message.Offset)
slog.Error("Could not read message at offset, syntax error in message, skipping offset", "offset", message.Offset)
continue
}
return nil, fmt.Errorf("failed to unmarshal json, original error: '%w'", err)
}

if !IsWebArchiveOwned(&dpsResponse) {
fmt.Printf("Message at offset '%d' is not owned by web archive, skipping offset\n", message.Offset)
slog.Info("Message at offset is not owned by web archive, skipping offset", "offset", message.Offset)
continue
}

Expand Down
15 changes: 8 additions & 7 deletions internal/dps/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"net"
"os"
"time"
Expand Down Expand Up @@ -71,22 +72,22 @@ func readLatestMessages(kafkaEndpoints []string, transferTopicName string) ([]su
var messages []submission_information_package.Package

for offsetToReadFrom := offsets.first; offsetToReadFrom < offsets.last; offsetToReadFrom++ {
fmt.Printf("Reading message at offset '%d'\n", offsetToReadFrom)
slog.Info("Reading message ", "offset", offsetToReadFrom)
err := messageReader.Reader.SetOffset(offsetToReadFrom)
if err != nil {
return nil, fmt.Errorf("failed to set offset '%d', original error: '%w'", offsetToReadFrom, err)
}

message, err := messageReader.ReadMessageWithTimeout(readTimeout)
if errors.Is(err, context.DeadlineExceeded) {
fmt.Printf("Could not read message at offset '%d', read timeout '%s' exceeded, skipping offset\n", offsetToReadFrom, readTimeout)
slog.Error("Could not read message, timeout exceeded. Skipping offset.", "offset", offsetToReadFrom, "read_timeout", readTimeout)
continue
}
if err != nil {
return nil, fmt.Errorf("failed to read message at offset '%d', original error: '%w'", offsetToReadFrom, err)
}
if message.Value == nil {
fmt.Printf("Message at offset '%d' is nil, skipping offset\n", offsetToReadFrom)
slog.Info("Message at offset is nil, skipping offset", "offset", offsetToReadFrom)
continue
}
var submissionInformationPackage submission_information_package.Package
Expand All @@ -95,7 +96,7 @@ func readLatestMessages(kafkaEndpoints []string, transferTopicName string) ([]su
if err != nil {
syntaxError := new(json.SyntaxError)
if errors.As(err, &syntaxError) {
fmt.Printf("Could not read message at offset '%d', syntax error in message, skipping offset\n", offsetToReadFrom)
slog.Error("Could not read message at offset, syntax error in message, skipping offset", "offset", offsetToReadFrom)
continue
}
return nil, fmt.Errorf("failed to unmarshal json, original error: '%w'", err)
Expand Down Expand Up @@ -154,7 +155,7 @@ func PrepareAndSendSubmissionInformationPackage(kafkaEndpoints []string, transfe
}

for _, message := range relevantMessages {
fmt.Printf("Pushing '%s' to cache\n", message.Path)
slog.Error("Pushing message to cache", "path", message.Path)
err := cache.Set(message.Path, []byte("Sent"))
if err != nil {
return fmt.Errorf("failed to set '%s' in cache, original error: '%w'", message.Path, err)
Expand All @@ -171,7 +172,7 @@ func PrepareAndSendSubmissionInformationPackage(kafkaEndpoints []string, transfe
destinationPath := rootPath + directoryName
_, err := cache.Get(destinationPath)
if err == nil {
fmt.Printf("Skipping directory '%s' as it has already been processed.\n", destinationPath)
slog.Info("Skipping directory, already processed.", "path", destinationPath)
continue
} else {
if !errors.Is(err, bigcache.ErrEntryNotFound) {
Expand All @@ -181,7 +182,7 @@ func PrepareAndSendSubmissionInformationPackage(kafkaEndpoints []string, transfe
if !path.IsDir() {
return fmt.Errorf("found file '%s' in root path '%s', but expected only directories", path.Name(), rootPath)
}
fmt.Printf("Processing directory %s\n", destinationPath)
slog.Info("Processing directory", "path", destinationPath)
submissionInformationPackage := submission_information_package.CreatePackage(destinationPath, directoryName, warcContentType)

kafkaMessage, err := json.Marshal(submissionInformationPackage)
Expand Down
3 changes: 2 additions & 1 deletion internal/kafka/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"context"
"fmt"
"log/slog"

"github.com/google/uuid"
"github.com/segmentio/kafka-go"
Expand All @@ -14,7 +15,7 @@ type Sender struct {

func (sender *Sender) SendMessageToKafkaTopic(payload []byte) error {
kafkaMessageUuid := uuid.New()
fmt.Printf("Sending message with uuid %s\n", kafkaMessageUuid)
slog.Info("Sending message to Kafka topic", "UUID", kafkaMessageUuid)
kafkaMessageUuidBytes, err := kafkaMessageUuid.MarshalText()
if err != nil {
return fmt.Errorf("failed to marshal uuid, original error: '%w'", err)
Expand Down
3 changes: 2 additions & 1 deletion internal/teams/teams.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"github.com/carlmjohnson/requests"
Expand Down Expand Up @@ -36,7 +37,7 @@ type Message struct {
func SendMessage(payload Message, webhookUrl string) error {
timeoutContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
fmt.Println("Sending message to Teams")
slog.Info("Sending message to Teams")
bytes, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal message, cause: `%w`", err)
Expand Down
5 changes: 2 additions & 3 deletions internal/verify/confirm/confirm.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
func ReadConfirmTopic(ctx context.Context, reader *kafka.Reader, receiverUrl string) error {
for {
response, err := dps.ReadMessages(ctx, reader)
slog.Info("Received confirm message from DPS", "message", response.DPSResponse)
if err != nil {
return fmt.Errorf("failed to read message from confirm-topic: `%w`", err)
}
Expand All @@ -24,8 +25,6 @@ func ReadConfirmTopic(ctx context.Context, reader *kafka.Reader, receiverUrl str
if err != nil {
return fmt.Errorf("failed to send confirm message: `%w`", err)
}
} else {
fmt.Printf("Received message: %v\n", response.DPSResponse)
}
}
}
Expand Down Expand Up @@ -61,6 +60,6 @@ func SendConfirmMessage(baseUrl string, response dps.DigitalPreservationSystemRe
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
fmt.Printf("Successfully sent confirm message to %s\n", url)
slog.Info("Successfully sent confirm message", "receiver_url", url)
return nil
}
3 changes: 2 additions & 1 deletion internal/verify/reject/reject.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/segmentio/kafka-go"
"hermetic/internal/dps"
"hermetic/internal/teams"
"log/slog"
)

func ReadRejectTopic(ctx context.Context, reader *kafka.Reader, teamsWebhookNotificationUrl string) error {
Expand All @@ -21,7 +22,7 @@ func ReadRejectTopic(ctx context.Context, reader *kafka.Reader, teamsWebhookNoti
}

func ProcessMessagesFromRejectTopic(reader *kafka.Reader, response *dps.KafkaResponse, teamsWebhookNotificationUrl string) error {
fmt.Printf("Processing message with ContentCategory: '%s'\n", response.DPSResponse.ContentCategory)
slog.Info("Processing message from reject-topic", "content_category", response.DPSResponse.ContentCategory)
payload := createTeamsDigitalPreservationSystemFailureMessage(response, reader.Config().Topic, reader.Config().Brokers)
err := teams.SendMessage(payload, teamsWebhookNotificationUrl)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package main

import (
"fmt"
"hermetic/cmd"
"log/slog"
"os"
)

func main() {
handler := slog.NewJSONHandler(os.Stdout, nil)
logger := slog.New(handler)
slog.SetDefault(logger)

if err := cmd.NewRootCommand().Execute(); err != nil {
fmt.Printf("failed to execute command, got error: '%s'\n", err)
slog.Error("failed to execute command, got error:", err)
os.Exit(1)
}
}