Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): refactor runner channels into abstract queues #2971

Merged
merged 45 commits into from Jul 27, 2023
Merged

Conversation

schoren
Copy link
Collaborator

@schoren schoren commented Jul 20, 2023

This PR introduces the concept of test runner pipelines, which decouples the steps of the pipeline from each other. The pipeline consists of several steps, including trigger, poll trace, analyze result, and assert. Instead of each step needing to know what step comes next, it only needs to have a reference to an "outputQueue". This outputQueue is the "input" queue for the next step, which allows for greater flexibility and modularity in the codebase.

The main goal of this PR is to remove all the manual queues that were implemented using Go channels. Previously, each step had a copy/pasted worker code that relied on channels, which made it hard to replace the queue mechanism. The new pipeline queues rely on an abstract queue driver, which allows for different types of queues to be created independently. Currently, we only implement an in-memory driver using Go channels, so we're not changing behavior, just architecture.

With everything decoupled, we can independently create different types of queues, such as a PostgreSQL listen/notify queue. This also adds flexibility to modify the pipeline, such as adding steps or changing the order, without needing to modify any existing steps.

Explanation of the Queue:
https://www.loom.com/share/8209a55d500c4766a0f137f0c723bf22

Explanation of the Pipeline:
https://www.loom.com/share/94aca45c8e8f4f17ab06403e1f65833a

How those fit in the application:
https://www.loom.com/share/43f74fccecb042f69b89f5f9098e56d6

Changes

  • Introduced the concept of test runner pipelines
  • Decoupled the steps of the pipeline using output queues
  • Removed manual queues implemented using Go channels
  • Added an abstract queue driver for greater flexibility
  • Added an in-memory driver using Go channels
  • Updated the pipeline steps to use the new queue mechanism

Checklist

  • tested locally
  • added new dependencies
  • updated the docs
  • added a test

@schoren schoren changed the title Pg pubsub feat(server): refactor runner channels into abstract queues Jul 20, 2023
@schoren schoren marked this pull request as ready for review July 26, 2023 21:15
@schoren schoren requested review from xoscar, mathnogueira and danielbdias and removed request for xoscar and mathnogueira July 26, 2023 21:15
@@ -56,12 +52,12 @@ func (g randGenerator) ID() ID {

func (g randGenerator) TraceID() trace.TraceID {
tid := trace.TraceID{}
g.rand.Read(tid[:])
rand.New(rand.NewSource(time.Now().UnixNano())).Read(tid[:])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fixes a race condition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is: how many days of life this line owes you? 😂

@@ -602,7 +603,7 @@ func (r *runRepository) GetTransactionRunSteps(ctx context.Context, id id.ID, ru
WHERE transaction_run_steps.transaction_run_id = $1 AND transaction_run_steps.transaction_run_transaction_id = $2
ORDER BY test_runs.completed_at ASC
`
query, params := sqlutil.Tenant(ctx, query, runID, id)
query, params := sqlutil.Tenant(ctx, query, strconv.Itoa(runID), id)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is required for the new pgx db driver

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: what is the idea of using pgx instead of pq directly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pgx has support for LISTEN / NOTIFY. Originally this PR was going to do everything but it got larger than expected so I'll implement that in a separated PR, but we already did the driver change at the beginning. The idea was to check that everything worked with the new driver

@@ -136,10 +135,6 @@ func readTestRunEventFromRows(rows *sql.Rows) (model.TestRunEvent, error) {
)

if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the controller compares with errors.Is(err, sql.ErrNoRows) now. this func checks the wrapped errors, so it will correctly assert that the error is sql.ErrNoRows without the need to return a custom error

Copy link
Contributor

@danielbdias danielbdias left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall PR is excellent! I liked the way of handling drivers and queues here.
I've added some comments on it, but no blocker.

Comment on lines 97 to 105
return executor.NewTestPipeline(
pipeline,
subscriptionManager,
pipeline.GetQueueForStep(3), // assertion runner step
runRepo,
trRepo,
ppRepo,
dsRepo,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this magic index 3 (for the assertion runner step) to a variable (or even a constant)?

Suggested change
return executor.NewTestPipeline(
pipeline,
subscriptionManager,
pipeline.GetQueueForStep(3), // assertion runner step
runRepo,
trRepo,
ppRepo,
dsRepo,
)
assertionRunnerStepIndex := 3
return executor.NewTestPipeline(
pipeline,
subscriptionManager,
pipeline.GetQueueForStep(assertionRunnerStepIndex), // assertion runner step
runRepo,
trRepo,
ppRepo,
dsRepo,
)

pipeline := executor.NewPipeline(queueBuilder,
executor.PipelineStep{Processor: runner, Driver: executor.NewInMemoryQueueDriver("runner")},
executor.PipelineStep{Processor: tracePoller, Driver: executor.NewInMemoryQueueDriver("tracePoller")},
executor.PipelineStep{Processor: linterRunner, Driver: executor.NewInMemoryQueueDriver("linterRunner")},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to consider in the future: since we are considering renaming the internals of environment and transaction soon, we can also add the linter to the packet and change it to analyzer explicitly. Does it make sense?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but I saw that there's a distinction between linter and analyzer. I didn't dig too much into it, but that looks like a good time to normalize this kinds of things

@@ -80,23 +82,18 @@ func (c *AppConfig) PostgresConnString() string {
defer c.mu.Unlock()

if postgresConnString := c.vp.GetString("postgresConnString"); postgresConnString != "" {
return postgresConnString
fmt.Println("ERROR: postgresConnString was discontinued. Migrate to the new postgres format")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to think about in the future: we should move these server logs from fmt to something else, like zap or logrus, that we can centralize and configure later.
Does it make sense to create a ticket for that? (I can do that if needed!)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, do you mind creating the ticket @danielbdias ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done! Issue: #2996

}

func (p *Pipeline) Begin(ctx context.Context, job Job) {
p.queues[0].Enqueue(ctx, job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Even knowing that we will always have a queue, does it make sense to add a guard clause here or on NewPipeline t avoid having 0 queues?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. I'll add it


finished, finishReason, anotherRun, err := pollerExecutor.ExecuteRequest(request)
run = anotherRun // should store a run to use in another iteration
job := executor.NewJob()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On every call to NewJob, we set Test, Run, and PollingProfile. Does it make sense to add them as parameters on NewJob?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the case for transactions, where it only passes Transaction and TransactionRun.

@@ -56,12 +52,12 @@ func (g randGenerator) ID() ID {

func (g randGenerator) TraceID() trace.TraceID {
tid := trace.TraceID{}
g.rand.Read(tid[:])
rand.New(rand.NewSource(time.Now().UnixNano())).Read(tid[:])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is: how many days of life this line owes you? 😂

server/pkg/id/generator.go Outdated Show resolved Hide resolved
return tid
}

func (g randGenerator) SpanID() trace.SpanID {
sid := trace.SpanID{}
g.rand.Read(sid[:])
rand.New(rand.NewSource(time.Now().UnixNano())).Read(sid[:])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here:

Suggested change
rand.New(rand.NewSource(time.Now().UnixNano())).Read(sid[:])
rndSeed := rand.NewSource(time.Now().UnixNano())
rand.New(rndSeed).Read(sid[:])

@@ -602,7 +603,7 @@ func (r *runRepository) GetTransactionRunSteps(ctx context.Context, id id.ID, ru
WHERE transaction_run_steps.transaction_run_id = $1 AND transaction_run_steps.transaction_run_transaction_id = $2
ORDER BY test_runs.completed_at ASC
`
query, params := sqlutil.Tenant(ctx, query, runID, id)
query, params := sqlutil.Tenant(ctx, query, strconv.Itoa(runID), id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: what is the idea of using pgx instead of pq directly?

Comment on lines 25 to 27
- selector: span[name = "exec UPDATE"]
assertions:
- attr:tracetest.selected_spans.count = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale for removing this assertion? Don't we need to update the test anymore?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. This was an RunUpdate made after creating the new run when rerunning a test, to update the state. I removed it incorrectly. Readding it

@schoren schoren merged commit f9e4bc2 into main Jul 27, 2023
30 checks passed
@schoren schoren deleted the pg-pubsub branch July 27, 2023 18:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants