Skip to content

Commit 8973bcd

Browse files
committed
chore: wip
chore: wip
1 parent d069b58 commit 8973bcd

File tree

2 files changed

+91
-60
lines changed

2 files changed

+91
-60
lines changed

storage/framework/core/cloud/src/cloud/queue.ts

Lines changed: 91 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -26,61 +26,33 @@ export class QueueStack {
2626
const jobsDir = path.jobsPath()
2727

2828
try {
29-
const files = await fs.readdir(jobsDir)
30-
31-
for (const file of files) {
32-
if (file.endsWith('.ts')) {
33-
const filePath = path.jobsPath(file)
34-
35-
// Await the loading of the job module
36-
const jobModule = await this.loadJobModule(filePath)
37-
38-
// Now you can safely access jobModule.default.rate
39-
const rate = jobModule.default?.rate
40-
41-
// if no rate or job is disabled, no need to schedule, skip
42-
if (!rate || jobModule.default?.enabled === false)
43-
continue
44-
45-
// Convert the rate to a Schedule object
46-
const schedule = Schedule.cron(this.cronScheduleFromRate(rate))
47-
48-
const id = `QueueRule${pascalCase(file.replace('.ts', ''))}`
49-
50-
// Perform operations with the jobModule.default as needed
51-
const rule = new Rule(this.scope, id, {
52-
// schedule to run every second
53-
ruleName: `${this.props.appName}-${this.props.appEnv}-queue-rule-${slug(file.replace('.ts', ''))}`,
54-
schedule,
55-
})
56-
57-
rule.addTarget(new EcsTask({
58-
cluster: this.props.cluster,
59-
taskDefinition: this.props.taskDefinition,
60-
containerOverrides: [
61-
{
62-
containerName: `${this.props.appName}-${this.props.appEnv}-api`,
63-
environment: [
64-
{
65-
name: 'QUEUE_WORKER',
66-
value: 'true',
67-
},
68-
{
69-
name: 'JOB',
70-
value: file,
71-
},
72-
],
73-
},
74-
],
75-
76-
retryAttempts: jobModule.default.tries || 3,
77-
78-
subnetSelection: {
79-
subnetType: ec2.SubnetType.PUBLIC, // SubnetType.PRIVATE_WITH_EGRESS
80-
},
81-
}))
82-
}
29+
const jobFiles = await fs.readdir(jobsDir)
30+
const actionFiles = await fs.readdir(actionsDir)
31+
const jobs = []
32+
33+
// then, need to loop through all app/Jobs/*.ts and create a rule for each, potentially overwriting the Schedule.ts jobs
34+
for (const file of jobFiles) {
35+
if (!file.endsWith('.ts'))
36+
continue
37+
38+
const filePath = path.jobsPath(file)
39+
40+
// Await the loading of the job module
41+
jobs.push(await this.loadModule(filePath))
42+
}
43+
44+
for (const file of actionFiles) {
45+
if (!file.endsWith('.ts'))
46+
continue
47+
48+
const filePath = path.actionsPath(file)
49+
50+
// Await the loading of the job module
51+
jobs.push(await this.loadModule(filePath))
8352
}
53+
54+
for (const job of jobs)
55+
await this.createQueueRule(job)
8456
}
8557
catch (err) {
8658
console.error('Error reading the jobs directory:', err)
@@ -100,9 +72,73 @@ export class QueueStack {
10072
}
10173
}
10274

103-
async loadJobModule(filePath: string) {
75+
async loadModule(filePath: string) {
10476
const jobModule = await import(filePath)
10577

10678
return jobModule
10779
}
80+
81+
createQueueRule(job: any) {
82+
// Now you can safely access job.default.rate
83+
const rate = job.default?.rate
84+
85+
// if no rate or job is disabled, no need to schedule, skip
86+
if (!rate || job.default?.enabled === false)
87+
return
88+
89+
// Convert the rate to a Schedule object
90+
const schedule = Schedule.cron(this.cronScheduleFromRate(rate))
91+
92+
const id = `QueueRule${pascalCase(file.replace('.ts', ''))}`
93+
94+
// Perform operations with the jobModule.default as needed
95+
const rule = new Rule(this.scope, id, {
96+
// schedule to run every second
97+
ruleName: `${this.props.appName}-${this.props.appEnv}-queue-rule-${slug(file.replace('.ts', ''))}`,
98+
schedule,
99+
})
100+
101+
rule.addTarget(new EcsTask({
102+
cluster: this.props.cluster,
103+
taskDefinition: this.props.taskDefinition,
104+
containerOverrides: [
105+
{
106+
containerName: `${this.props.appName}-${this.props.appEnv}-api`,
107+
environment: [
108+
{
109+
name: 'QUEUE_WORKER',
110+
value: 'true',
111+
},
112+
{
113+
name: 'JOB',
114+
value: file,
115+
},
116+
{
117+
name: 'JOB_BACKOFF_FACTOR',
118+
value: jobModule.default?.backoffFactor,
119+
},
120+
{
121+
name: 'JOB_RETRIES',
122+
value: jobModule.default?.tries,
123+
},
124+
{
125+
name: 'JOB_INITIAL_DELAY',
126+
value: jobModule.default?.initialDelay,
127+
},
128+
{
129+
name: 'JOB_JITTER',
130+
value: jobModule.default?.jitter,
131+
},
132+
],
133+
},
134+
],
135+
136+
retryAttempts: 1, // we utilize a custom retry mechanism in the job itself
137+
// retryAttempts: jobModule.default.tries || 3,
138+
139+
subnetSelection: {
140+
subnetType: ec2.SubnetType.PUBLIC, // SubnetType.PRIVATE_WITH_EGRESS
141+
},
142+
}))
143+
}
108144
}

storage/framework/core/path/src/index.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,6 @@ export function scriptsPath(path?: string) {
366366
return frameworkPath(`scripts/${path || ''}`)
367367
}
368368

369-
export function schedulePath() {
370-
return appPath(`Schedule.ts`)
371-
}
372-
373369
export function schedulerPath(path?: string) {
374370
return corePath(`scheduler/${path || ''}`)
375371
}
@@ -527,7 +523,6 @@ export const path = {
527523
signalsPath,
528524
slugPath,
529525
scriptsPath,
530-
schedulePath,
531526
securityPath,
532527
serverPath,
533528
serverlessPath,

0 commit comments

Comments
 (0)