Skip to content

Commit

Permalink
feat: v2 - add worker mode (#6739)
Browse files Browse the repository at this point in the history
**What**
- Adds support for starting a Medusa process with a worker mode.
- The worker modes supported are "shared", "worker", "server"
- In "worker" mode, API routes are not registered and modules that need to run workers (e.g., event bus redis) can use the flag to conditionally start workers.
- In "server" mode, API routes are registered and workers are not started.
- In "shared" mode, API routes are registered and workers are started. This is great for development.
  • Loading branch information
srindom committed Mar 21, 2024
1 parent 205573f commit 56481e6
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 59 deletions.
8 changes: 8 additions & 0 deletions .changeset/ninety-months-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@medusajs/medusa": patch
"@medusajs/event-bus-redis": patch
"@medusajs/modules-sdk": patch
"@medusajs/types": patch
---

feat: v2 - add worker mode
13 changes: 8 additions & 5 deletions packages/event-bus-redis/src/services/event-bus-redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
})

// Register our worker to handle emit calls
new Worker(moduleOptions.queueName ?? "events-queue", this.worker_, {
prefix: `${this.constructor.name}`,
...(moduleOptions.workerOptions ?? {}),
connection: eventBusRedisConnection,
})
const shouldStartWorker = moduleDeclaration.worker_mode !== "server"
if (shouldStartWorker) {
new Worker(moduleOptions.queueName ?? "events-queue", this.worker_, {
prefix: `${this.constructor.name}`,
...(moduleOptions.workerOptions ?? {}),
connection: eventBusRedisConnection,
})
}
}

/**
Expand Down
65 changes: 39 additions & 26 deletions packages/medusa/src/commands/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,52 @@ export default async function ({ port, directory }) {
const app = express()

try {
const { dbConnection } = await loaders({ directory, expressApp: app })
const serverActivity = Logger.activity(`Creating server`)
const server = GracefulShutdownServer.create(
app.listen(port, (err) => {
if (err) {
return
}
Logger.success(serverActivity, `Server is ready on port: ${port}`)
track("CLI_START_COMPLETED")
})
)

// Handle graceful shutdown
const gracefulShutDown = () => {
server
.shutdown()
.then(() => {
Logger.info("Gracefully stopping the server.")
process.exit(0)
})
.catch((e) => {
Logger.error("Error received when shutting down the server.", e)
process.exit(1)
const { dbConnection, configModule, container } = await loaders({
directory,
expressApp: app,
})

const shouldStartServer =
configModule.projectConfig.worker_mode !== "worker"

let server
if (shouldStartServer) {
const serverActivity = Logger.activity(`Creating server`)
server = GracefulShutdownServer.create(
app.listen(port, (err) => {
if (err) {
return
}
Logger.success(serverActivity, `Server is ready on port: ${port}`)
track("CLI_START_COMPLETED")
})
)

// Handle graceful shutdown
const gracefulShutDown = () => {
server
.shutdown()
.then(() => {
Logger.info("Gracefully stopping the server.")
process.exit(0)
})
.catch((e) => {
Logger.error("Error received when shutting down the server.", e)
process.exit(1)
})
}

process.on("SIGTERM", gracefulShutDown)
process.on("SIGINT", gracefulShutDown)
} else {
Logger.info("Running in worker mode, server will not be started.")
}
process.on("SIGTERM", gracefulShutDown)
process.on("SIGINT", gracefulShutDown)

scheduleJob(CRON_SCHEDULE, () => {
track("PING")
})

return { dbConnection, server }
return shouldStartServer ? { dbConnection, server } : { dbConnection }
} catch (err) {
Logger.error("Error starting server", err)
process.exit(1)
Expand Down
15 changes: 14 additions & 1 deletion packages/medusa/src/loaders/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getConfigFile } from "medusa-core-utils"
import { getConfigFile, isDefined } from "medusa-core-utils"
import { ConfigModule } from "../types/global"
import logger from "./logger"

Expand Down Expand Up @@ -58,11 +58,24 @@ export default (rootDirectory: string): ConfigModule => {
)
}

let worker_mode = configModule?.projectConfig?.worker_mode
if (!isDefined(worker_mode)) {
const env = process.env.MEDUSA_WORKER_MODE
if (isDefined(env)) {
if (env === "shared" || env === "worker" || env === "server") {
worker_mode = env
}
} else {
worker_mode = "shared"
}
}

return {
projectConfig: {
jwt_secret: jwt_secret ?? "supersecret",
cookie_secret: cookie_secret ?? "supersecret",
...configModule?.projectConfig,
worker_mode,
},
modules: configModule.modules ?? {},
featureFlags: configModule?.featureFlags ?? {},
Expand Down
53 changes: 30 additions & 23 deletions packages/medusa/src/loaders/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { createDefaultsWorkflow } from "@medusajs/core-flows"
import {
InternalModuleDeclaration,
ModulesDefinition
ModulesDefinition,
} from "@medusajs/modules-sdk"
import { MODULE_RESOURCE_TYPE } from "@medusajs/types"
import { ConfigModule, MODULE_RESOURCE_TYPE } from "@medusajs/types"
import {
ContainerRegistrationKeys,
MedusaV2Flag,
Expand Down Expand Up @@ -91,14 +91,7 @@ async function loadMedusaV2({
}) {
const container = createMedusaContainer()

// Add additional information to context of request
expressApp.use((req: Request, res: Response, next: NextFunction) => {
const ipAddress = requestIp.getClientIp(req) as string
;(req as any).request_context = {
ip_address: ipAddress,
}
next()
})
const shouldStartAPI = configModule.projectConfig.worker_mode !== "worker"

const pgConnection = await pgConnectionLoader({ container, configModule })

Expand All @@ -114,22 +107,33 @@ async function loadMedusaV2({
container,
})

await expressLoader({ app: expressApp, configModule })
if (shouldStartAPI) {
await expressLoader({ app: expressApp, configModule })

expressApp.use((req: Request, res: Response, next: NextFunction) => {
req.scope = container.createScope() as MedusaContainer
req.requestId = (req.headers["x-request-id"] as string) ?? v4()
next()
})
expressApp.use((req: Request, res: Response, next: NextFunction) => {
req.scope = container.createScope() as MedusaContainer
req.requestId = (req.headers["x-request-id"] as string) ?? v4()
next()
})

// TODO: Add Subscribers loader
// Add additional information to context of request
expressApp.use((req: Request, res: Response, next: NextFunction) => {
const ipAddress = requestIp.getClientIp(req) as string
;(req as any).request_context = {
ip_address: ipAddress,
}
next()
})

await apiLoader({
container,
app: expressApp,
configModule,
featureFlagRouter,
})
// TODO: Add Subscribers loader

await apiLoader({
container,
app: expressApp,
configModule,
featureFlagRouter,
})
}

await medusaProjectApisLoader({
rootDirectory,
Expand All @@ -142,6 +146,7 @@ async function loadMedusaV2({
await createDefaultsWorkflow(container).run()

return {
configModule,
container,
app: expressApp,
pgConnection,
Expand All @@ -153,6 +158,7 @@ export default async ({
expressApp,
isTest,
}: Options): Promise<{
configModule: ConfigModule
container: MedusaContainer
dbConnection?: Connection
app: Express
Expand Down Expand Up @@ -319,6 +325,7 @@ export default async ({
track("SEARCH_ENGINE_INDEXING_COMPLETED", { duration: searchAct.duration })

return {
configModule,
container,
dbConnection,
app: expressApp,
Expand Down
12 changes: 11 additions & 1 deletion packages/medusa/src/loaders/load-medusa-project-apis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,19 @@ export default async ({
}: Options): Promise<void> => {
const resolved = getResolvedPlugins(rootDirectory, configModule) || []

const shouldStartAPI = configModule.projectConfig.worker_mode !== "worker"

await promiseAll(
resolved.map(async (pluginDetails) => {
await registerApi(pluginDetails, app, container, configModule, activityId)
if (shouldStartAPI) {
await registerApi(
pluginDetails,
app,
container,
configModule,
activityId
)
}
await registerSubscribers(pluginDetails, container, activityId)
await registerWorkflows(pluginDetails)
})
Expand Down
1 change: 1 addition & 0 deletions packages/medusa/src/loaders/medusa-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ export const loadMedusaApp = async (
}

const medusaApp = await MedusaApp({
workerMode: configModule.projectConfig.worker_mode,
modulesConfig: configModules,
servicesConfig: joinerConfig,
remoteFetchData: remoteQueryFetchData(container),
Expand Down
9 changes: 7 additions & 2 deletions packages/modules-sdk/src/medusa-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ export async function loadModules(
modulesConfig,
sharedContainer,
migrationOnly = false,
loaderOnly = false
loaderOnly = false,
workerMode: "shared" | "worker" | "server" = "server"
) {
const allModules = {}

Expand Down Expand Up @@ -113,6 +114,7 @@ export async function loadModules(
moduleExports,
migrationOnly,
loaderOnly,
workerMode,
})) as LoadedModule

if (loaderOnly) {
Expand Down Expand Up @@ -202,6 +204,7 @@ export type MedusaAppOutput = {
}

export type MedusaAppOptions = {
workerMode?: "shared" | "worker" | "server"
sharedContainer?: MedusaContainer
sharedResourcesConfig?: SharedResources
loadedModules?: LoadedModule[]
Expand Down Expand Up @@ -232,6 +235,7 @@ async function MedusaApp_({
onApplicationStartCb,
migrationOnly = false,
loaderOnly = false,
workerMode = "server",
}: MedusaAppOptions & { migrationOnly?: boolean } = {}): Promise<{
modules: Record<string, LoadedModule | LoadedModule[]>
link: RemoteLink | undefined
Expand Down Expand Up @@ -300,7 +304,8 @@ async function MedusaApp_({
modules,
sharedContainer_,
migrationOnly,
loaderOnly
loaderOnly,
workerMode
)

if (loaderOnly) {
Expand Down
5 changes: 4 additions & 1 deletion packages/modules-sdk/src/medusa-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export type ModuleBootstrapOptions = {
* Forces the modules bootstrapper to only run the modules loaders and return prematurely
*/
loaderOnly?: boolean
workerMode?: "shared" | "worker" | "server"
}

export type LinkModuleBootstrapOptions = {
Expand Down Expand Up @@ -225,6 +226,7 @@ export class MedusaModule {
injectedDependencies,
migrationOnly,
loaderOnly,
workerMode,
}: ModuleBootstrapOptions): Promise<{
[key: string]: T
}> {
Expand Down Expand Up @@ -267,6 +269,7 @@ export class MedusaModule {
options: declaration?.options ?? declaration,
alias: declaration?.alias,
main: declaration?.main,
worker_mode: workerMode,
}
}

Expand Down Expand Up @@ -302,7 +305,7 @@ export class MedusaModule {
moduleResolutions,
logger: logger_,
migrationOnly,
loaderOnly
loaderOnly,
})
} catch (err) {
errorLoading(err)
Expand Down
19 changes: 19 additions & 0 deletions packages/types/src/common/config-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,25 @@ export type ProjectConfigOptions = {
* ```
*/
jobs_batch_size?: number

/**
* Configure the application's worker mode. The default value is `shared`.
* - Use `shared` if you want to run the application in a single process.
* - Use `worker` if you want to run the a worker process only.
* - Use `server` if you want to run the application server only.
*
* @example
* ```js title="medusa-config.js"
* module.exports = {
* projectConfig: {
* worker_mode: "shared"
* // ...
* },
* // ...
* }
* ```
*/
worker_mode?: "shared" | "worker" | "server"
}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/types/src/modules-sdk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export type InternalModuleDeclaration = {
* If the module is the main module for the key when multiple ones are registered
*/
main?: boolean
worker_mode?: "shared" | "worker" | "server"
}

export type ExternalModuleDeclaration = {
Expand Down

0 comments on commit 56481e6

Please sign in to comment.