const (
dsn = "#"
)
type SortArgs struct {
// Strings is a slice of strings to sort.
Strings []string `json:"strings"`
}
type SortWorker struct {
// An embedded WorkerDefaults sets up default methods to fulfill the rest of
// the Worker interface:
river.WorkerDefaults[SortArgs]
}
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
fmt.Printf("Received job: %+v\n", job)
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}
func (SortArgs) Kind() string { return "sort" }
func InitRiverClient(ctx context.Context) (*river.Client[pgx.Tx], error) {
dbPool, err := pgxpool.New(ctx, dsn)
if err != nil {
return nil, fmt.Errorf("failed to create database pool: %w", err)
}
periodicJobs := []*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(1*time.Minute),
func() (river.JobArgs, *river.InsertOpts) {
return SortArgs{Strings: []string{"banana", "apple", "cherry"}}, &river.InsertOpts{
MaxAttempts: 5,
Priority: 0,
}
},
&river.PeriodicJobOpts{RunOnStart: true},
),
}
riverClient, err := river.NewClient[pgx.Tx](riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
JobTimeout: 10 * time.Minute,
MaxAttempts: 5,
PeriodicJobs: periodicJobs,
Workers: RegisterWorkerTasks(),
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})),
})
if err != nil {
return nil, fmt.Errorf("failed to create River client: %w", err)
}
if err := riverClient.Start(ctx); err != nil {
fmt.Println("Failed to start River client: %v", err)
}
// Listen for system interrupts for graceful shutdown
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
}()
// Wait for context cancellation
<-ctx.Done()
return riverClient, nil
}
func RegisterWorkerTasks() *river.Workers {
riverWorkers := river.NewWorkers()
river.AddWorker[SortArgs](riverWorkers, &SortWorker{})
return riverWorkers
}
func main() {
ctx := context.Background()
runMigrations(ctx)
client, err := InitRiverClient(ctx)
if err != nil {
log.Fatalf("Failed to initialize River client: %v", err)
}
defer client.Stop(ctx)
// Wait for interrupt signal
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
fmt.Println("Shutting down...")
}
func runMigrations(ctx context.Context) {
dbPool, err := pgxpool.New(ctx, dsn)
migrator, _ := rivermigrate.New[pgx.Tx](riverpgxv5.New(dbPool), nil)
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) // Apply all available up migrations
if err != nil {
os.Exit(1)
}
for _, version := range res.Versions {
fmt.Printf("Applied River migration version %d", version.Version)
}
}```
have attached a demo code for cron job, no tasks are being picked up