Skip to content

Commit

Permalink
RunRepository: encapsulate not found case in persistence implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
dermetfan committed Jun 21, 2022
1 parent 42dc094 commit 6dd2781
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 73 deletions.
13 changes: 6 additions & 7 deletions src/application/component/nomad_event.go
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/georgysavva/scany/pgxscan"
"github.com/google/uuid"
nomad "github.com/hashicorp/nomad/api"
"github.com/jackc/pgx/v4"
Expand Down Expand Up @@ -319,19 +318,19 @@ func (self *NomadEventConsumer) getRun(logger zerolog.Logger, idStr string) (*do

run, err := self.RunService.GetByNomadJobIdWithLock(id, "FOR NO KEY UPDATE")
if err != nil {
if pgxscan.NotFound(err) {
logger.Trace().Msg("Ignoring event (no such Run)")
return nil, nil
}
return &run, err
return run, err
}
if run == nil {
logger.Trace().Msg("Ignoring event (no such Run)")
return nil, nil
}

if run.FinishedAt != nil {
logger.Trace().Msg("Ignoring event (Run already finished)")
return nil, nil
}

return &run, nil
return run, nil
}

func (self *NomadEventConsumer) endRun(ctx context.Context, run *domain.Run, timestamp int64, status domain.RunStatus) error {
Expand Down
96 changes: 52 additions & 44 deletions src/application/component/web/main.go
Expand Up @@ -395,9 +395,8 @@ func (self *Web) ActionIdRunGet(w http.ResponseWriter, req *http.Request) {
defer wg.Done()

run, err := self.RunService.GetByInvocationId(id)
if err != nil && !pgxscan.NotFound(err) {
if err != nil {
errChan <- err
return
} else {
entries[i].Run = run
}
Expand Down Expand Up @@ -536,11 +535,9 @@ func (self *Web) InvocationIdGet(w http.ResponseWriter, req *http.Request) {

var run *domain.Run
if run_, err := self.RunService.GetByInvocationId(id); err != nil {
if !pgxscan.NotFound(err) {
self.ServerError(w, err)
return
}
} else {
self.ServerError(w, err)
return
} else if run_ != nil {
run = run_
}

Expand Down Expand Up @@ -591,7 +588,11 @@ func (self *Web) RunIdGet(w http.ResponseWriter, req *http.Request) {

run, err := self.RunService.GetByNomadJobId(id)
if err != nil {
self.NotFound(w, errors.WithMessagef(err, "Failed to find Run %q", id))
self.ServerError(w, errors.WithMessagef(err, "Failed to find Run %q", id))
return
}
if run == nil {
self.NotFound(w, nil)
return
}

Expand All @@ -607,7 +608,7 @@ func (self *Web) RunIdGet(w http.ResponseWriter, req *http.Request) {
return
}

allocsWithLogs, err := self.RunService.GetRunAllocationsWithLogs(run)
allocsWithLogs, err := self.RunService.GetRunAllocationsWithLogs(*run)
if err != nil {
self.NotFound(w, err)
return
Expand Down Expand Up @@ -665,7 +666,7 @@ func (self *Web) RunIdGet(w http.ResponseWriter, req *http.Request) {
"Run": struct {
domain.Run
Action domain.Action
}{run, action},
}{*run, action},
"inputs": inputs,
"output": output,
"facts": facts,
Expand Down Expand Up @@ -736,9 +737,9 @@ func (self *Web) RunGet(w http.ResponseWriter, req *http.Request) {

go func(i int, id uuid.UUID) {
defer wg.Done()
if run, err := self.RunService.GetByInvocationId(id); err != nil && !pgxscan.NotFound(err) {
if run, err := self.RunService.GetByInvocationId(id); err != nil {
errChan <- err
} else if err == nil {
} else {
entries[i].Run = run
}
}(i, invocation.Id)
Expand Down Expand Up @@ -861,13 +862,13 @@ func (self *Web) ApiRunByInputGet(w http.ResponseWriter, req *http.Request) {
} else if invocations, err := self.InvocationService.GetByInputFactIds(factIds, recursive, &ok, page); err != nil {
self.ServerError(w, errors.WithMessage(err, "failed to fetch Invocations"))
} else {
runs := make([]*domain.Run, len(invocations))
for i, invocation := range invocations {
runs := []*domain.Run{}
for _, invocation := range invocations {
if run, err := self.RunService.GetByInvocationId(invocation.Id); err != nil {
self.ServerError(w, err)
return
} else {
runs[i] = run
} else if run != nil {
runs = append(runs, run)
}
}

Expand Down Expand Up @@ -925,17 +926,19 @@ func (self *Web) ApiActionPost(w http.ResponseWriter, req *http.Request) {
}
}

func (self *Web) getRun(req *http.Request) (domain.Run, error) {
func (self *Web) getRun(req *http.Request) (*domain.Run, error) {
if id, err := uuid.Parse(mux.Vars(req)["id"]); err != nil {
return domain.Run{}, err
return nil, err
} else {
return self.RunService.GetByNomadJobId(id)
}
}

func (self *Web) ApiRunIdGet(w http.ResponseWriter, req *http.Request) {
if run, err := self.getRun(req); err != nil {
self.NotFound(w, errors.WithMessage(err, "Could not find Run"))
self.ServerError(w, err)
} else if run == nil {
self.NotFound(w, nil)
} else {
self.json(w, run, http.StatusOK)
}
Expand Down Expand Up @@ -969,7 +972,9 @@ func (self *Web) ApiRunIdInputsGet(w http.ResponseWriter, req *http.Request) {
if id, err := uuid.Parse(mux.Vars(req)["id"]); err != nil {
self.ClientError(w, err)
} else if run, err := self.RunService.GetByNomadJobId(id); err != nil {
self.NotFound(w, err)
self.ServerError(w, err)
} else if run == nil {
self.NotFound(w, nil)
} else if inputs, err := self.InvocationService.GetInputFactIdsById(run.InvocationId); err != nil {
self.NotFound(w, errors.WithMessage(err, "Could not get Run's Invocation's inputs"))
} else {
Expand All @@ -981,11 +986,9 @@ func (self *Web) ApiRunIdOutputGet(w http.ResponseWriter, req *http.Request) {
if id, err := uuid.Parse(mux.Vars(req)["id"]); err != nil {
self.ClientError(w, err)
} else if run, err := self.RunService.GetByNomadJobId(id); err != nil {
if pgxscan.NotFound(err) {
w.WriteHeader(http.StatusNotFound)
} else {
self.ServerError(w, err)
}
self.ServerError(w, err)
} else if run == nil {
self.NotFound(w, nil)
} else if output, err := self.InvocationService.GetOutputById(run.InvocationId); err != nil {
if pgxscan.NotFound(err) {
w.WriteHeader(http.StatusNotFound)
Expand All @@ -999,9 +1002,12 @@ func (self *Web) ApiRunIdOutputGet(w http.ResponseWriter, req *http.Request) {

func (self *Web) ApiRunIdDelete(w http.ResponseWriter, req *http.Request) {
if run, err := self.getRun(req); err != nil {
self.NotFound(w, err)
self.ServerError(w, err)
return
} else if run == nil {
self.NotFound(w, nil)
return
} else if err := self.RunService.Cancel(&run); err != nil {
} else if err := self.RunService.Cancel(run); err != nil {
self.ServerError(w, errors.WithMessagef(err, "Failed to cancel Run %q", run.NomadJobID))
return
}
Expand All @@ -1012,11 +1018,11 @@ func (self *Web) ApiRunIdDelete(w http.ResponseWriter, req *http.Request) {
func (self *Web) ApiRunIdFactPost(w http.ResponseWriter, req *http.Request) {
run, err := self.getRun(req)
if err != nil {
if pgxscan.NotFound(err) {
self.NotFound(w, err)
} else {
self.ClientError(w, err) //TODO: review 5XX error in openAPi documentation
}
self.ServerError(w, err)
return
}
if run == nil {
self.NotFound(w, nil)
return
}

Expand Down Expand Up @@ -1114,18 +1120,14 @@ func (self *Web) ApiRunIdLogGet(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
if id, err := uuid.Parse(vars["id"]); err != nil {
self.ClientError(w, errors.WithMessage(err, "Failed to parse id"))
} else if run, err := self.RunService.GetByNomadJobId(id); err != nil {
self.ClientError(w, errors.WithMessage(err, "Failed to fetch job"))
} else if run == nil {
self.NotFound(w, err)
} else if log, err := self.RunService.JobLog(id, run.CreatedAt, run.FinishedAt); err != nil {
self.ServerError(w, errors.WithMessage(err, "Failed to get logs"))
} else {
run, err := self.RunService.GetByNomadJobId(id)
if err != nil {
self.ClientError(w, errors.WithMessage(err, "Failed to fetch job"))
return
}

if log, err := self.RunService.JobLog(id, run.CreatedAt, run.FinishedAt); err != nil {
self.ServerError(w, errors.WithMessage(err, "Failed to get logs"))
} else {
self.json(w, log, http.StatusOK)
}
self.json(w, log, http.StatusOK)
}
}

Expand Down Expand Up @@ -1260,11 +1262,17 @@ func (self *Web) Error(w http.ResponseWriter, err error) {
}

self.Logger.
Error().
Err(err).
Int("status", status).
Msg("Handler error")

http.Error(w, err.Error(), status)
var msg string
if err != nil {
msg = err.Error()
}

http.Error(w, msg, status)
}

func (self *Web) json(w http.ResponseWriter, obj interface{}, status int) {
Expand Down
16 changes: 8 additions & 8 deletions src/application/service/run.go
Expand Up @@ -28,11 +28,11 @@ import (
type RunService interface {
WithQuerier(config.PgxIface) RunService

GetByNomadJobId(uuid.UUID) (domain.Run, error)
GetByNomadJobIdWithLock(uuid.UUID, string) (domain.Run, error)
GetByNomadJobId(uuid.UUID) (*domain.Run, error)
GetByNomadJobIdWithLock(uuid.UUID, string) (*domain.Run, error)
GetByInvocationId(uuid.UUID) (*domain.Run, error)
GetByActionId(uuid.UUID, *repository.Page) ([]*domain.Run, error)
GetLatestByActionId(uuid.UUID) (domain.Run, error)
GetLatestByActionId(uuid.UUID) (*domain.Run, error)
GetAll(*repository.Page) ([]*domain.Run, error)
Save(*domain.Run) error
Update(*domain.Run) error
Expand Down Expand Up @@ -84,24 +84,24 @@ func (self runService) WithQuerier(querier config.PgxIface) RunService {
}
}

func (self runService) GetByNomadJobId(id uuid.UUID) (run domain.Run, err error) {
func (self runService) GetByNomadJobId(id uuid.UUID) (run *domain.Run, err error) {
self.logger.Trace().Str("nomad-job-id", id.String()).Msg("Getting Run by Nomad Job ID")
run, err = self.runRepository.GetByNomadJobId(id)
err = errors.WithMessagef(err, "Could not select existing Run by Nomad Job ID %q", id)
return
}

func (self runService) GetByNomadJobIdWithLock(id uuid.UUID, lock string) (run domain.Run, err error) {
func (self runService) GetByNomadJobIdWithLock(id uuid.UUID, lock string) (run *domain.Run, err error) {
self.logger.Trace().Str("nomad-job-id", id.String()).Str("lock", lock).Msg("Getting Run by Nomad Job ID with lock")
run, err = self.runRepository.GetByNomadJobIdWithLock(id, lock)
err = errors.WithMessagef(err, "Could not select existing Run by Nomad Job ID %q with lock %q", id, lock)
return
}

func (self runService) GetByInvocationId(invocationId uuid.UUID) (run *domain.Run, err error) {
self.logger.Trace().Str("invocation-id", invocationId.String()).Msg("Getting Runs by input Fact IDs")
self.logger.Trace().Str("invocation-id", invocationId.String()).Msg("Getting Run by Invocation ID")
run, err = self.runRepository.GetByInvocationId(invocationId)
err = errors.WithMessagef(err, "Could not select Runs by Invocation ID %q", invocationId)
err = errors.WithMessagef(err, "Could not select Run by Invocation ID %q", invocationId)
return
}

Expand All @@ -112,7 +112,7 @@ func (self runService) GetByActionId(id uuid.UUID, page *repository.Page) (runs
return
}

func (self runService) GetLatestByActionId(id uuid.UUID) (run domain.Run, err error) {
func (self runService) GetLatestByActionId(id uuid.UUID) (run *domain.Run, err error) {
self.logger.Trace().Str("action-id", id.String()).Msg("Getting latest Run by Action ID")
run, err = self.runRepository.GetLatestByActionId(id)
err = errors.WithMessagef(err, "Could not select latest Run by Action ID %q", id)
Expand Down
6 changes: 3 additions & 3 deletions src/domain/repository/run.go
Expand Up @@ -10,11 +10,11 @@ import (
type RunRepository interface {
WithQuerier(config.PgxIface) RunRepository

GetByNomadJobId(uuid.UUID) (domain.Run, error)
GetByNomadJobIdWithLock(uuid.UUID, string) (domain.Run, error)
GetByNomadJobId(uuid.UUID) (*domain.Run, error)
GetByNomadJobIdWithLock(uuid.UUID, string) (*domain.Run, error)
GetByInvocationId(uuid.UUID) (*domain.Run, error)
GetByActionId(uuid.UUID, *Page) ([]*domain.Run, error)
GetLatestByActionId(uuid.UUID) (domain.Run, error)
GetLatestByActionId(uuid.UUID) (*domain.Run, error)
GetAll(*Page) ([]*domain.Run, error)
Save(*domain.Run) error
Update(*domain.Run) error
Expand Down
33 changes: 22 additions & 11 deletions src/infrastructure/persistence/run.go
Expand Up @@ -3,7 +3,6 @@ package persistence
import (
"context"

"github.com/georgysavva/scany/pgxscan"
"github.com/google/uuid"

"github.com/input-output-hk/cicero/src/config"
Expand All @@ -23,24 +22,32 @@ func (a runRepository) WithQuerier(querier config.PgxIface) repository.RunReposi
return &runRepository{querier}
}

func (a runRepository) GetByNomadJobId(id uuid.UUID) (domain.Run, error) {
func (a runRepository) GetByNomadJobId(id uuid.UUID) (*domain.Run, error) {
return a.GetByNomadJobIdWithLock(id, "")
}

func (a runRepository) GetByNomadJobIdWithLock(id uuid.UUID, lock string) (run domain.Run, err error) {
return run, pgxscan.Get(
context.Background(), a.DB, &run,
func (a runRepository) GetByNomadJobIdWithLock(id uuid.UUID, lock string) (*domain.Run, error) {
run, err := get(
a.DB, &domain.Run{},
`SELECT * FROM run WHERE nomad_job_id = $1 `+lock,
id,
)
if run == nil {
return nil, err
}
return run.(*domain.Run), err
}

func (a runRepository) GetByInvocationId(invocationId uuid.UUID) (run *domain.Run, err error) {
return run, pgxscan.Get(
context.Background(), a.DB, run,
func (a runRepository) GetByInvocationId(invocationId uuid.UUID) (*domain.Run, error) {
run, err := get(
a.DB, &domain.Run{},
`SELECT * FROM run WHERE invocation_id = $1`,
invocationId,
)
if run == nil {
return nil, err
}
return run.(*domain.Run), err
}

func (a runRepository) GetByActionId(id uuid.UUID, page *repository.Page) ([]*domain.Run, error) {
Expand All @@ -54,9 +61,9 @@ func (a runRepository) GetByActionId(id uuid.UUID, page *repository.Page) ([]*do
)
}

func (a runRepository) GetLatestByActionId(id uuid.UUID) (run domain.Run, err error) {
return run, pgxscan.Get(
context.Background(), a.DB, &run,
func (a runRepository) GetLatestByActionId(id uuid.UUID) (*domain.Run, error) {
run, err := get(
a.DB, &domain.Run{},
`SELECT DISTINCT ON (invocation.action_id) run.*
FROM run
JOIN invocation ON
Expand All @@ -65,6 +72,10 @@ func (a runRepository) GetLatestByActionId(id uuid.UUID) (run domain.Run, err er
ORDER BY invocation.action_id, run.created_at DESC`,
id,
)
if run == nil {
return nil, err
}
return run.(*domain.Run), err
}

func (a runRepository) GetAll(page *repository.Page) ([]*domain.Run, error) {
Expand Down

0 comments on commit 6dd2781

Please sign in to comment.