-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.go
119 lines (104 loc) · 3.67 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
* scheduler.go: Orchestrates timed data fetching tasks using Google Cloud Scheduler.
* Usage:
* - Configures and initiates scheduled jobs that trigger data retrieval and processing tasks.
* - Ensures tasks are executed at specified intervals, handling retries and logging as necessary.
* - Utilizes cron syntax to define job schedules.
* Dependencies:
* - Requires external scheduling APIs or local cron services.
* - Interacts with client modules (e.g., satellite.go, weather.go, soil.go) to set up data fetch operations.
* - Uses service modules (e.g., imageservice.go, soildataservice.go) to process and store the fetched data.
* Author(s): Shannon Thompson
* Created on: 04/12/2024
*/
package scheduler
import (
"context"
"fmt"
"log"
"strings"
scheduler "cloud.google.com/go/scheduler/apiv1"
"cloud.google.com/go/scheduler/apiv1/schedulerpb"
"github.com/sthompson732/viticulture-harvester-app/internal/config"
"google.golang.org/api/option"
)
type SchedulerClient struct {
Client *scheduler.CloudSchedulerClient
Cfg *config.Config
}
func NewSchedulerClient(ctx context.Context, cfg *config.Config) (*SchedulerClient, error) {
client, err := scheduler.NewCloudSchedulerClient(ctx, option.WithCredentialsFile(cfg.CloudStorage.CredentialsPath))
if err != nil {
return nil, fmt.Errorf("failed to create scheduler client: %v", err)
}
return &SchedulerClient{
Client: client,
Cfg: cfg,
}, nil
}
func (sc *SchedulerClient) SetupJobs(ctx context.Context) error {
for _, jobCfg := range sc.Cfg.DataSources {
if jobCfg.Enabled {
err := sc.createJob(ctx, jobCfg)
if err != nil {
log.Printf("Failed to create job for %s: %v", jobCfg.Description, err)
continue
}
log.Printf("Successfully scheduled job: %s", jobCfg.Description)
}
}
return nil
}
func (sc *SchedulerClient) createJob(ctx context.Context, jobCfg config.DataSourceConfig) error {
parent := fmt.Sprintf("projects/%s/locations/%s", sc.Cfg.ProjectID, sc.Cfg.LocationID)
// Build the HTTP target based on the new documentation
httpTarget := &schedulerpb.HttpTarget{
Uri: jobCfg.Endpoint,
HttpMethod: schedulerpb.HttpMethod(schedulerpb.HttpMethod_value[jobCfg.HttpMethod]),
}
// Add headers if any
if len(jobCfg.Headers) > 0 {
httpTarget.Headers = jobCfg.Headers
}
// Set the body if the method is POST, PUT, or PATCH
if jobCfg.HttpMethod == "POST" || jobCfg.HttpMethod == "PUT" || jobCfg.HttpMethod == "PATCH" {
httpTarget.Body = []byte(jobCfg.Body)
}
// OAuthToken and OidcToken should be set if needed here
job := &schedulerpb.Job{
Name: fmt.Sprintf("%s/jobs/%s", parent, formatJobName(jobCfg.Description)),
Target: &schedulerpb.Job_HttpTarget{HttpTarget: httpTarget},
Schedule: jobCfg.Schedule,
TimeZone: jobCfg.TimeZone,
}
// Use the CreateJob method of the Cloud Scheduler client
_, err := sc.Client.CreateJob(ctx, &schedulerpb.CreateJobRequest{
Parent: parent,
Job: job,
})
if err != nil {
return fmt.Errorf("failed to create job for %s: %v", jobCfg.Description, err)
}
return nil
}
func buildQueryParams(params map[string]string) string {
var parts []string
for key, value := range params {
parts = append(parts, fmt.Sprintf("%s=%s", key, value))
}
return strings.Join(parts, "&")
}
func formatJobName(description string) string {
return strings.ReplaceAll(strings.ToLower(description), " ", "-")
}
func (sc *SchedulerClient) DeleteJob(ctx context.Context, jobName string) error {
// The DeleteJob call returns only an error.
err := sc.Client.DeleteJob(ctx, &schedulerpb.DeleteJobRequest{
Name: jobName,
})
if err != nil {
// Handle the error properly.
return fmt.Errorf("failed to delete job %s: %v", jobName, err)
}
return nil
}