Skip to content

Commit

Permalink
Add db param to jobreq to execute queries against a specific DB.
Browse files Browse the repository at this point in the history
  • Loading branch information
knadh committed May 18, 2023
1 parent f9ffe08 commit 797439e
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Once the reports are generated, it's natural for users to further transform the

## Concepts
#### Task
A task is a named SQL job is loaded into the server on startup. Tasks are defined in .sql files in the simple [goyesql](https://github.com/knadh/goyesql) format. Such queries are self-contained and produce the desired final output with neatly named columns. They can take arbitrary positional arguments for execution. A task can be attached to one or more specific databases defined in the configuration using the `-- db:` tag. In case of multiple databases, the query will be executed against a random one from the list. A `-- queue:` tag to always route the task to a particular queue, unless it's overriden by the `queue` param when making a job request. A `-- results:` tag specifies the results backend to which the results of a task will be written. If there are multiple result backends specified, the results are written a random one.
A task is a named SQL job is loaded into the server on startup. Tasks are defined in .sql files in the simple [goyesql](https://github.com/knadh/goyesql) format. Such queries are self-contained and produce the desired final output with neatly named columns. They can take arbitrary positional arguments for execution. A task can be attached to one or more specific databases defined in the configuration using the `-- db:` tag. In case of multiple databases, the query will be executed against a random one from the list, unless a specific database is specified in the API request (`db`). A `-- queue:` tag to always route the task to a particular queue, unless it's overriden by the `queue` param when making a job request. A `-- results:` tag specifies the results backend to which the results of a task will be written. If there are multiple result backends specified, the results are written a random one.

Example:
```sql
Expand Down
9 changes: 7 additions & 2 deletions cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ import (
type DBs map[string]*sql.DB

// Get returns an *sql.DB from the DBs map by name.
func (d DBs) Get(dbName string) *sql.DB {
return d[dbName]
func (d DBs) Get(dbName string) (*sql.DB, error) {
db, ok := d[dbName]
if !ok {
return nil, fmt.Errorf("unknown db: %s", dbName)
}

return db, nil
}

// GetRandom returns a random *sql.DB from the DBs map.
Expand Down
29 changes: 21 additions & 8 deletions cmd/jobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func createJobSignature(j models.JobReq, taskName string, ttl int, jobber *Jobbe

// Task arguments.
args := append([]tasks.Arg{
// First two arguments have to be jobID and taskName.
// Machinery will refelect on these and pass them as arguments
// to the callback registered using RegisterTask() when a task
// is executed.
{Type: "string", Value: j.JobID},
{Type: "string", Value: taskName},
{Type: "string", Value: j.DB},
{Type: "int", Value: ttl},
}, sliceToTaskArgs(j.Args)...)

Expand Down Expand Up @@ -78,7 +78,7 @@ func createJobSignature(j models.JobReq, taskName string, ttl int, jobber *Jobbe
}

// executeTask executes an SQL statement job and inserts the results into the result backend.
func executeTask(jobID, taskName string, ttl time.Duration, args []interface{}, task *Task, jobber *Jobber) (int64, error) {
func executeTask(jobID, taskName, dbName string, ttl time.Duration, args []interface{}, task *Task, jobber *Jobber) (int64, error) {
// If the job's deleted, stop.
if _, err := jobber.Machinery.GetBackend().GetState(jobID); err != nil {
return 0, errors.New("the job was canceled")
Expand All @@ -100,21 +100,34 @@ func executeTask(jobID, taskName string, ttl time.Duration, args []interface{},

var (
rows *sql.Rows
db *sql.DB
err error
)
// Prepared query.
if task.Stmt != nil {
// Prepared query.
rows, err = task.Stmt.QueryContext(ctx, args...)
} else {
_, db := task.DBs.GetRandom()
rows, err = db.QueryContext(ctx, task.Raw, args...)

if dbName != "" {
// Specific DB.
d, err := task.DBs.Get(dbName)
if err != nil {
return 0, fmt.Errorf("task execution failed: %s: %v", task.Name, err)
}
db = d
} else {
// Random DB.
_, db = task.DBs.GetRandom()
}
}

rows, err = db.QueryContext(ctx, task.Raw, args...)
if err != nil {
if err == context.Canceled {
return 0, errors.New("the job was canceled")
}

return 0, fmt.Errorf("task SQL query execution failed: %v", err)
return 0, fmt.Errorf("task execution failed: %s: %v", task.Name, err)
}
defer rows.Close()

Expand Down Expand Up @@ -197,13 +210,13 @@ func connectJobServer(jobber *Jobber, cfg *config.Config, queries Tasks) (*machi
// Register the tasks with the query names.
for name, query := range queries {
server.RegisterTask(string(name), func(q Task) taskFunc {
return func(jobID, taskName string, ttl int, args ...interface{}) (int64, error) {
return func(jobID, taskName, db string, ttl int, args ...interface{}) (int64, error) {
// Check if the job's been deleted.
if _, err := jobber.Machinery.GetBackend().GetState(jobID); err != nil {
return 0, fmt.Errorf("Skipping deleted job: %v", err)
}

return executeTask(jobID, taskName, time.Duration(ttl)*time.Second, args, &q, jobber)
return executeTask(jobID, taskName, db, time.Duration(ttl)*time.Second, args, &q, jobber)
}
}(query))
}
Expand Down
5 changes: 1 addition & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package main

// (c) Kailash Nadh, 2018. https://nadh.in
// MIT License.

import (
"fmt"
"log"
Expand Down Expand Up @@ -40,7 +37,7 @@ type constants struct {
ResultsDB string
}

type taskFunc func(jobID string, taskName string, ttl int, args ...interface{}) (int64, error)
type taskFunc func(jobID string, taskName, db string, ttl int, args ...interface{}) (int64, error)

// Jobber represents the tooling required to run a job server.
type Jobber struct {
Expand Down
1 change: 1 addition & 0 deletions models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type JobReq struct {
Retries int `json:"retries"`
TTL int `json:"ttl"`
Args []string `json:"args"`
DB string `json:"db"`

ttlDuration time.Duration
}
Expand Down

0 comments on commit 797439e

Please sign in to comment.