-
Notifications
You must be signed in to change notification settings - Fork 768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: parallelization #352
base: develop
Are you sure you want to change the base?
Conversation
WalkthroughThe pull request introduces substantial enhancements to the Maxun core scraping system, focusing on parallel processing capabilities and improved pagination handling. Key changes include the addition of the Changes
Possibly related PRs
Suggested reviewers
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
🔭 Outside diff range comments (2)
maxun-core/src/interpret.ts (2)
Line range hint
463-714
: OptimizehandleParallelPagination
methodThe
handleParallelPagination
method contains nested loops and multiple asynchronous operations that could be optimized for better performance and reliability.Consider the following improvements:
- Use proper error handling to catch and manage exceptions during page navigation and scraping.
- Avoid tight loops without delays, which can cause high CPU usage.
- Ensure that shared resources like
visitedUrls
are accessed in a thread-safe manner.- Replace
console.log
statements with a proper logging mechanism.
Line range hint
829-855
: Validate action applicability and selection logicIn
getMatchingActionId
, actions are selected based on applicability, but there might be scenarios where no action is applicable, leading to undefined behavior.Ensure that the method handles cases where no matching action is found and that calling code checks for
undefined
or invalid action IDs.
🧹 Nitpick comments (12)
maxun-core/src/utils/worker.ts (3)
55-65
: Avoid potential race conditions when posting progressWhen posting progress to the parent thread, the progress data may not reflect the latest state if other asynchronous operations are modifying shared state.
Consider using locks or other synchronization mechanisms when accessing and updating shared state to prevent race conditions.
67-70
: Set appropriate navigation timeoutThe
page.goto
function uses a timeout of 30,000 milliseconds. Depending on network conditions and page complexity, this might be insufficient or excessive.Review whether the timeout duration is appropriate for your use case. You could make it configurable via the
WorkerConfig
.
95-104
: Optimize duplicate filtering logicThe duplicate filtering in lines 95-104 compares JSON strings of items, which can be inefficient for large datasets.
Consider using a hash function to generate a unique identifier for each item. This can improve performance and reduce memory usage.
Apply this diff to implement hashing:
+ const crypto = require('crypto'); const newResults = pageResults.filter(item => { - const uniqueKey = JSON.stringify(item); + const uniqueKey = crypto.createHash('sha256').update(JSON.stringify(item)).digest('hex'); // Check against local duplicates if (scrapedItems.has(uniqueKey)) return false;maxun-core/src/utils/worker-pool.ts (3)
70-75
: Consider more informative error handling inhandleWorkerError
The
handleWorkerError
function logs an error message but could include more detailed information.Include stack traces or additional context in the error logs to facilitate debugging.
219-235
: Prevent duplicated error handling for workerThe
worker.on('error')
and thecatch
block both handle errors. This might result in duplicated error logs or conflicting behavior.Ensure that errors are handled consistently and consider consolidating error handling to avoid duplication.
226-232
: Handle non-zero exit codes gracefullyWhen a worker exits with a non-zero code, an error is thrown with a generic message.
Provide more detailed error messages, possibly including the worker ID and exit code, to aid in troubleshooting.
maxun-core/src/interpret.ts (3)
552-721
: Address redundant code and potential bugs in pagination handlingThere appears to be duplicated logic between
handleParallelPagination
andhandlePagination
. Additionally, certain conditions and error cases may not be adequately handled.
- Consolidate common logic between the two methods to avoid duplication.
- Review and test all pagination types (
scrollDown
,scrollUp
,clickNext
,clickLoadMore
) for correctness.- Ensure that edge cases, such as pages without a next button or infinite scrolling that reaches the end, are properly managed.
Line range hint
722-870
: ImproverunLoop
method for better maintainabilityThe
runLoop
method is lengthy and handles many responsibilities, making it hard to read and maintain.Consider refactoring the method by:
- Extracting smaller helper functions for repeated logic.
- Simplifying control flow, especially within the while loop.
- Adding comments or documentation for complex sections.
🧰 Tools
🪛 Biome (1.9.4)
[error] 738-738: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
Line range hint
774-785
: Ensure stability when applying ad-blockerWithin
runLoop
, the application of the ad-blocker is wrapped in a try-catch block, but the catch block only logs the error.If the ad-blocker is critical for the operation, consider implementing a retry mechanism or failing gracefully if it cannot be applied.
maxun-core/src/types/worker.ts (2)
1-13
: Enhance type safety and add documentation for the WorkerConfig interface.The interface would benefit from:
- Stronger typing for the
fields
property instead ofany
- JSDoc comments explaining the purpose of each field
- Validation constraints for numeric fields
Consider this improvement:
+/** + * Configuration for a worker thread in the parallel scraping system. + */ export interface WorkerConfig { + /** Zero-based index of the worker thread */ workerIndex: number; + /** Starting index for this worker's batch */ startIndex: number; + /** Ending index for this worker's batch */ endIndex: number; + /** Number of items to process in each batch */ batchSize: number; pageUrls: string[]; listSelector: string; - fields: any; + fields: Record<string, { + selector: string; + attribute?: string; + transform?: (value: string) => any; + }>; pagination: { type: string; selector: string; }; }
30-48
: Standardize time and memory usage types.Consider using standard types for better consistency:
- Use
Date
objects for time-related fields- Use
NodeJS.MemoryUsage
type for memory metrics (like in GlobalMetrics)export interface PerformanceMetrics { - startTime: number; - endTime: number; + startTime: Date; + endTime: Date; duration: number; pagesProcessed: number; itemsScraped: number; failedPages: number; averageTimePerPage: number; - memoryUsage: { - heapUsed: number; - heapTotal: number; - external: number; - rss: number; - }; + memoryUsage: NodeJS.MemoryUsage; cpuUsage: { user: number; system: number; }; }maxun-core/tsconfig.json (1)
10-10
: Consider a more specific include pattern.While
"src/**/*"
works, it might include test files or other non-source files. Consider using multiple patterns to be more specific:- "include": ["src/**/*"], + "include": [ + "src/**/*.ts", + "src/**/*.tsx", + "src/**/*.json" + ],
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
maxun-core/src/interpret.ts
(7 hunks)maxun-core/src/types/worker.ts
(1 hunks)maxun-core/src/utils/worker-pool.ts
(1 hunks)maxun-core/src/utils/worker.ts
(1 hunks)maxun-core/tsconfig.json
(1 hunks)
🧰 Additional context used
🪛 Biome (1.9.4)
maxun-core/src/utils/worker-pool.ts
[error] 203-233: Promise executor functions should not be async
.
(lint/suspicious/noAsyncPromiseExecutor)
maxun-core/src/utils/worker.ts
[error] 122-122: Unnecessary continue statement
Unsafe fix: Delete the unnecessary continue statement
(lint/correctness/noUnnecessaryContinue)
[error] 129-129: The catch clause that only rethrows the original error is useless.
An unnecessary catch clause can be confusing.
Unsafe fix: Remove the catch clause.
(lint/complexity/noUselessCatch)
🔇 Additional comments (8)
maxun-core/src/utils/worker.ts (3)
6-19
: Review browser launch arguments for security implicationsThe
initializeBrowser
function sets various command-line arguments that disable security features like web security and site isolation trials. While these settings may be necessary for certain scraping tasks, they can expose the browser to security risks.Please ensure that disabling these features is intentional and does not introduce security vulnerabilities. Consider the security implications of each argument:
--disable-web-security
: Disables same-origin policy, which can expose sensitive data.--disable-site-isolation-trials
: Disables site isolation, potentially affecting browser stability and security.If not strictly necessary, avoid disabling these features.
21-36
: Ensure scripts are correctly loaded in the pageThe
ensureScriptsLoaded
function checks for the existence of certain functions on thewindow
object and loads thescraper.js
script if they're missing. However, it only adds the script without verifying if it successfully loaded.Consider adding validation after loading the script to ensure it was successfully injected. Additionally, handle cases where the script might fail to load due to network issues or incorrect paths.
131-133
: Ensure proper resource cleanup infinally
blockThe
finally
block closes thepage
andbrowser
. Ensure that these operations are handled correctly even if exceptions occur during closure.Consider adding try-catch blocks around
await page.close()
andawait browser.close()
to handle any errors during cleanup.maxun-core/src/utils/worker-pool.ts (1)
259-264
: Ensure proper termination of workersIn the
cleanup
method, worker termination is initiated but potential errors during termination are not handled.Consider adding error handling to
worker.terminate()
calls to ensure that all workers are properly terminated, even if errors occur.maxun-core/src/interpret.ts (2)
96-96
: InitializeWorkerPool
with correctmaxWorkers
The
WorkerPool
is initialized withthis.options.maxWorkers
but there may be inconsistencies ifthis.options
is not properly set.Ensure that
this.options
is correctly initialized and thatmaxWorkers
has a valid value before creating theWorkerPool
.
Line range hint
85-91
: Handle potential NaN inmaxWorkers
default valueThe calculation for
maxWorkers
usesos.cpus().length - 1
, which could possibly result in zero or negative numbers ifos.cpus()
returns an empty array.Add validation to ensure that
maxWorkers
is at least 1:- maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)), + maxWorkers: Math.max(1, Math.min(os.cpus().length > 0 ? os.cpus().length - 1 : 1, 4)),maxun-core/src/types/worker.ts (2)
20-28
: Well-structured progress tracking interface!The interface provides comprehensive progress tracking with appropriate types and good separation of concerns.
50-59
: Well-designed metrics aggregation interface!The interface makes good use of NodeJS built-in types and provides comprehensive system-wide metrics.
maxun-core/src/utils/worker.ts
Outdated
try { | ||
browser = await initializeBrowser(); | ||
const context = await browser.newContext(); | ||
page = await context.newPage(); | ||
await ensureScriptsLoaded(page); | ||
|
||
for (const [pageIndex, pageUrl] of config.pageUrls.entries()) { | ||
const pageStartTime = Date.now(); | ||
|
||
try { | ||
// Report progress to main thread | ||
parentPort?.postMessage({ | ||
type: 'progress', | ||
data: { | ||
workerId: config.workerIndex, | ||
currentUrl: pageUrl, | ||
processedUrls: pageIndex, | ||
totalUrls: config.pageUrls.length, | ||
timeElapsed: Date.now() - pageStartTime, | ||
scrapedItems: results.length | ||
} | ||
}); | ||
|
||
const navigationResult = await page.goto(pageUrl, { | ||
waitUntil: 'networkidle', | ||
timeout: 30000 | ||
}); | ||
|
||
if (!navigationResult) continue; | ||
|
||
await page.waitForLoadState('networkidle').catch(() => {}); | ||
|
||
const scrapeConfig = { | ||
listSelector: config.listSelector, | ||
fields: config.fields, | ||
pagination: config.pagination, | ||
limit: config.endIndex - config.startIndex - results.length | ||
}; | ||
|
||
const pageResults = await page.evaluate( | ||
(cfg) => window.scrapeList(cfg), | ||
scrapeConfig | ||
); | ||
|
||
// Filter out duplicates | ||
const newResults = pageResults.filter(item => { | ||
const uniqueKey = JSON.stringify(item); | ||
|
||
// Check against local duplicates | ||
if (scrapedItems.has(uniqueKey)) return false; | ||
|
||
// Check against shared state results | ||
const isDuplicate = sharedState.results.some( | ||
existingItem => JSON.stringify(existingItem) === uniqueKey | ||
); | ||
|
||
if (isDuplicate) return false; | ||
scrapedItems.add(uniqueKey); | ||
sharedState.results.push(item); | ||
sharedState.totalScraped++; | ||
return true; | ||
}); | ||
|
||
results.push(...newResults); | ||
|
||
if (results.length >= config.batchSize) break; | ||
|
||
await page.waitForTimeout(1000); | ||
|
||
} catch (error) { | ||
parentPort?.postMessage({ | ||
type: 'error', | ||
data: { | ||
workerId: config.workerIndex, | ||
url: pageUrl, | ||
error: error.message | ||
} | ||
}); | ||
continue; | ||
} | ||
} | ||
|
||
return results; | ||
|
||
} catch (error) { | ||
throw error; | ||
} finally { | ||
if (page) await page.close(); | ||
if (browser) await browser.close(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential exceptions during scraping
In the scrapeBatch
function, several asynchronous operations could throw exceptions, such as launching the browser, creating contexts, and navigating to pages.
Ensure that all asynchronous operations are properly wrapped in try-catch blocks to prevent unhandled promise rejections. Specifically, consider:
- Adding error handling when initializing the browser and context.
- Verifying that
page.goto
successfully navigated to the URL before proceeding. - Handling timeouts and navigation errors more gracefully.
🧰 Tools
🪛 Biome (1.9.4)
[error] 122-122: Unnecessary continue statement
Unsafe fix: Delete the unnecessary continue statement
(lint/correctness/noUnnecessaryContinue)
[error] 129-129: The catch clause that only rethrows the original error is useless.
An unnecessary catch clause can be confusing.
Unsafe fix: Remove the catch clause.
(lint/complexity/noUselessCatch)
maxun-core/src/utils/worker.ts
Outdated
parentPort?.postMessage({ | ||
type: 'error', | ||
data: { | ||
workerId: config.workerIndex, | ||
url: pageUrl, | ||
error: error.message | ||
} | ||
}); | ||
continue; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve error reporting for failed page processing
In the catch block, when an error occurs while processing a page, the worker posts an error message but continues to the next page.
Consider adding more context to the error messages, such as the specific error type or stack trace. This can aid in debugging.
Also, evaluate whether continuing to the next page is the desired behavior or if certain errors should halt the worker.
🧰 Tools
🪛 Biome (1.9.4)
[error] 122-122: Unnecessary continue statement
Unsafe fix: Delete the unnecessary continue statement
(lint/correctness/noUnnecessaryContinue)
maxun-core/src/utils/worker.ts
Outdated
scrapeBatch(config, sharedState) | ||
.then(results => { | ||
parentPort?.postMessage({ | ||
type: 'complete', | ||
data: results | ||
}); | ||
}) | ||
.catch(error => { | ||
parentPort?.postMessage({ | ||
type: 'error', | ||
data: { | ||
workerId: config.workerIndex, | ||
error: error.message | ||
} | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle uncaught exceptions in worker initialization
In the worker initialization section, if scrapeBatch
throws an unhandled exception, it could cause the worker to exit unexpectedly.
Ensure that all exceptions in scrapeBatch
are caught and appropriately handled. Modify the .catch
block to include error logging or additional error handling as needed.
maxun-core/src/utils/worker.ts
Outdated
error: error.message | ||
} | ||
}); | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove unnecessary continue
statement
The continue
statement at line 122 is unnecessary because it's the last statement in the loop iteration.
Apply this diff to remove the redundant continue
statement:
- continue;
🧰 Tools
🪛 Biome (1.9.4)
[error] 122-122: Unnecessary continue statement
Unsafe fix: Delete the unnecessary continue statement
(lint/correctness/noUnnecessaryContinue)
maxun-core/src/utils/worker.ts
Outdated
} catch (error) { | ||
throw error; | ||
} finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove useless catch
clause that only rethrows
The catch
block at lines 128-130 only rethrows the caught error without adding any additional handling.
Apply this diff to remove the unnecessary catch
clause:
- } catch (error) {
- throw error;
} finally {
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
} catch (error) { | |
throw error; | |
} finally { | |
} finally { |
🧰 Tools
🪛 Biome (1.9.4)
[error] 129-129: The catch clause that only rethrows the original error is useless.
An unnecessary catch clause can be confusing.
Unsafe fix: Remove the catch clause.
(lint/complexity/noUselessCatch)
maxun-core/src/utils/worker-pool.ts
Outdated
public isActive(): boolean { | ||
return this.browser !== null && !this.isShuttingDown; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adjust isActive
method to reflect correct state
Similar to getActiveWorkerCount
, the isActive
method relies on this.browser
, which is not relevant in this context.
Update the method to reflect whether there are active workers and the pool is not shutting down:
- return this.browser !== null && !this.isShuttingDown;
+ return this.workers.length > 0 && !this.isShuttingDown;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
public isActive(): boolean { | |
return this.browser !== null && !this.isShuttingDown; | |
} | |
public isActive(): boolean { | |
return this.workers.length > 0 && !this.isShuttingDown; | |
} |
maxun-core/src/utils/worker-pool.ts
Outdated
return this.browser ? 1 : 0; | ||
} | ||
|
||
public isActive(): boolean { | ||
return this.browser !== null && !this.isShuttingDown; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct getActiveWorkerCount
method logic
The getActiveWorkerCount
method returns 1
if this.browser
is truthy, which might not accurately reflect the number of active workers.
Since this.browser
is not used in this class, and the workers are stored in this.workers
, update the method to return the actual count of active workers:
- return this.browser ? 1 : 0;
+ return this.workers.length;
Committable suggestion skipped: line range outside the PR's diff.
maxun-core/src/utils/worker-pool.ts
Outdated
new Promise<any[]>(async (resolve, reject) => { | ||
if (this.isShuttingDown) { | ||
reject(new Error('Worker pool is shutting down')); | ||
return; | ||
} | ||
|
||
const worker = this.createWorker(config, sharedState); | ||
this.workers.push(worker); | ||
|
||
let workerResults: any[] = []; | ||
|
||
worker.on('message', (message) => { | ||
if (message.type === 'complete') { | ||
workerResults = message.data; | ||
} | ||
}); | ||
|
||
worker.on('error', (error) => { | ||
errors.push(error); | ||
this.updateWorkerMetrics(config.workerIndex, { status: 'failed' }); | ||
reject(error); | ||
}); | ||
|
||
worker.on('exit', (code) => { | ||
if (code === 0) { | ||
resolve(workerResults); | ||
} else { | ||
reject(new Error(`Worker stopped with exit code ${code}`)); | ||
} | ||
}); | ||
}).catch(error => { | ||
console.error('Worker error:', error); | ||
return []; | ||
}) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid using async
functions as Promise executors
The function passed to new Promise
at lines 203-233 is marked as async
, which can lead to unexpected behavior.
Apply this diff to remove the async
keyword from the Promise executor function:
- new Promise<any[]>(async (resolve, reject) => {
+ new Promise<any[]>((resolve, reject) => {
According to best practices, the Promise constructor should not be passed an async function.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
new Promise<any[]>(async (resolve, reject) => { | |
if (this.isShuttingDown) { | |
reject(new Error('Worker pool is shutting down')); | |
return; | |
} | |
const worker = this.createWorker(config, sharedState); | |
this.workers.push(worker); | |
let workerResults: any[] = []; | |
worker.on('message', (message) => { | |
if (message.type === 'complete') { | |
workerResults = message.data; | |
} | |
}); | |
worker.on('error', (error) => { | |
errors.push(error); | |
this.updateWorkerMetrics(config.workerIndex, { status: 'failed' }); | |
reject(error); | |
}); | |
worker.on('exit', (code) => { | |
if (code === 0) { | |
resolve(workerResults); | |
} else { | |
reject(new Error(`Worker stopped with exit code ${code}`)); | |
} | |
}); | |
}).catch(error => { | |
console.error('Worker error:', error); | |
return []; | |
}) | |
); | |
new Promise<any[]>((resolve, reject) => { | |
if (this.isShuttingDown) { | |
reject(new Error('Worker pool is shutting down')); | |
return; | |
} | |
const worker = this.createWorker(config, sharedState); | |
this.workers.push(worker); | |
let workerResults: any[] = []; | |
worker.on('message', (message) => { | |
if (message.type === 'complete') { | |
workerResults = message.data; | |
} | |
}); | |
worker.on('error', (error) => { | |
errors.push(error); | |
this.updateWorkerMetrics(config.workerIndex, { status: 'failed' }); | |
reject(error); | |
}); | |
worker.on('exit', (code) => { | |
if (code === 0) { | |
resolve(workerResults); | |
} else { | |
reject(new Error(`Worker stopped with exit code ${code}`)); | |
} | |
}); | |
}).catch(error => { | |
console.error('Worker error:', error); | |
return []; | |
}) | |
); |
🧰 Tools
🪛 Biome (1.9.4)
[error] 203-233: Promise executor functions should not be async
.
(lint/suspicious/noAsyncPromiseExecutor)
@@ -39,6 +43,7 @@ declare global { | |||
interface InterpreterOptions { | |||
maxRepeats: number; | |||
maxConcurrency: number; | |||
maxWorkers: number; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add missing property to InterpreterOptions
interface
The serializableCallback
property is used in the code but not defined in the provided InterpreterOptions
interface.
Ensure that serializableCallback
is correctly defined in the interface to maintain type safety.
export interface SharedState { | ||
totalScraped: number; | ||
results: any[]; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Thread safety and type safety concerns are valid issues that need addressing
The codebase shows:
- Multiple workers concurrently access and modify
SharedState
without synchronization results: any[]
is used consistently, lacking type safety- No thread-safe mechanisms are implemented for shared state access
Suggested improvements:
- Implement proper synchronization using
worker_threads
features likeSharedArrayBuffer
or message-based state updates - Add generic type parameter as suggested in the original review
- Consider using an intermediary class to manage thread-safe state access
🔗 Analysis chain
Consider thread safety and type safety for SharedState.
The shared state between workers needs careful consideration:
- The
results
array should have a more specific type thanany[]
- Thread safety should be ensured when multiple workers access this state
Consider using a thread-safe data structure or implementing proper synchronization mechanisms. Also, define a generic type parameter:
-export interface SharedState {
+export interface SharedState<T = unknown> {
totalScraped: number;
- results: any[];
+ results: T[];
}
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Find all files importing or using SharedState
rg "SharedState" -l
# Look for actual usage patterns of SharedState
rg "SharedState" -A 5 -B 5
# Check for any thread synchronization patterns
rg -i "sync|mutex|lock|atomic" maxun-core/src/
# Look for type definitions related to worker results
ast-grep --pattern 'type $_ = $_' maxun-core/src/types/
Length of output: 12076
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
maxun-core/src/interpret.ts (4)
456-457
: Add error handling for parallel pagination.While the parallel pagination implementation is good, consider adding try-catch block here to handle potential failures gracefully.
- const scrapeResults: Record<string, any>[] = await this.handleParallelPagination(page, config); - await this.options.serializableCallback(scrapeResults); + try { + const scrapeResults: Record<string, any>[] = await this.handleParallelPagination(page, config); + await this.options.serializableCallback(scrapeResults); + } catch (error) { + this.log(`Parallel pagination failed: ${error.message}`, Level.ERROR); + // Fallback to regular pagination + const scrapeResults = await this.handlePagination(page, config); + await this.options.serializableCallback(scrapeResults); + }
549-551
: Consider environment-specific CPU core limits.The CPU core calculation could be made more configurable:
- The hard-coded maximum of 4 workers might not be optimal for all environments
- Consider adding environment variables for fine-tuning
- const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, 4)); + const MAX_WORKERS = process.env.MAX_WORKERS ? parseInt(process.env.MAX_WORKERS, 10) : 4; + const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, MAX_WORKERS));
614-623
: Improve navigation timeout handling.The hard-coded 30000ms timeout might not be suitable for all network conditions. Consider making it configurable and adding retry logic.
+ const NAVIGATION_TIMEOUT = process.env.NAVIGATION_TIMEOUT ? parseInt(process.env.NAVIGATION_TIMEOUT, 10) : 30000; + const MAX_RETRIES = process.env.MAX_RETRIES ? parseInt(process.env.MAX_RETRIES, 10) : 3; + + let retries = 0; + while (retries < MAX_RETRIES) { try { await Promise.race([ Promise.all([ - page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), + page.waitForNavigation({ waitUntil: 'networkidle', timeout: NAVIGATION_TIMEOUT }), nextButton.click() ]), Promise.all([ - page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), + page.waitForNavigation({ waitUntil: 'networkidle', timeout: NAVIGATION_TIMEOUT }), nextButton.dispatchEvent('click') ]) ]); + break; } catch (error) { + retries++; + if (retries === MAX_RETRIES) { + throw error; + } + await page.waitForTimeout(1000 * retries); // Exponential backoff } + }
545-668
: Add memory management for large-scale scraping.The current implementation stores all URLs in memory, which could be problematic for very large datasets.
Consider implementing:
- Batch processing of URLs
- Stream processing of results
- Periodic cleanup of collected data
Example approach:
private async* urlGenerator(page: Page, config: any): AsyncGenerator<string> { const batchSize = 1000; const urls: string[] = []; while (true) { urls.push(page.url()); if (urls.length >= batchSize) { for (const url of urls) { yield url; } urls.length = 0; // Clear the array } if (!await this.navigateToNextPage(page)) { break; } } // Yield remaining URLs for (const url of urls) { yield url; } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
maxun-core/src/interpret.ts
(4 hunks)
🔇 Additional comments (1)
maxun-core/src/interpret.ts (1)
19-19
: LGTM!The
os
module import is correctly added for CPU core detection.
try { | ||
while (true) { | ||
pageUrls.push(page.url()) | ||
|
||
if (pageUrls.length >= estimatedPages) { | ||
console.log('Reached estimated number of pages. Stopping pagination.'); | ||
break; | ||
} | ||
|
||
let checkButton = null; | ||
let workingSelector = null; | ||
|
||
for (let i = 0; i < availableSelectors.length; i++) { | ||
const selector = availableSelectors[i]; | ||
try { | ||
// Wait for selector with a short timeout | ||
checkButton = await page.waitForSelector(selector, { state: 'attached' }); | ||
if (checkButton) { | ||
workingSelector = selector; | ||
break; | ||
} | ||
} catch (error) { | ||
console.log(`Selector failed: ${selector}`); | ||
} | ||
} | ||
|
||
if(!workingSelector) { | ||
break; | ||
} | ||
|
||
const nextButton = await page.$(workingSelector); | ||
if (!nextButton) { | ||
break; | ||
} | ||
|
||
const selectorIndex = availableSelectors.indexOf(workingSelector!); | ||
availableSelectors = availableSelectors.slice(selectorIndex); | ||
|
||
const previousUrl = page.url(); | ||
visitedUrls.push(previousUrl); | ||
|
||
try { | ||
// Try both click methods simultaneously | ||
await Promise.race([ | ||
Promise.all([ | ||
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), | ||
nextButton.click() | ||
]), | ||
Promise.all([ | ||
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), | ||
nextButton.dispatchEvent('click') | ||
]) | ||
]); | ||
} catch (error) { | ||
// Verify if navigation actually succeeded | ||
const currentUrl = page.url(); | ||
if (currentUrl === previousUrl) { | ||
console.log("Previous URL same as current URL. Navigation failed."); | ||
} | ||
} | ||
|
||
const currentUrl = page.url(); | ||
if (visitedUrls.includes(currentUrl)) { | ||
console.log(`Detected navigation to a previously visited URL: ${currentUrl}`); | ||
|
||
// Extract the current page number from the URL | ||
const match = currentUrl.match(/\d+/); | ||
if (match) { | ||
const currentNumber = match[0]; | ||
// Use visitedUrls.length + 1 as the next page number | ||
const nextNumber = visitedUrls.length + 1; | ||
|
||
// Create new URL by replacing the current number with the next number | ||
const nextUrl = currentUrl.replace(currentNumber, nextNumber.toString()); | ||
|
||
console.log(`Navigating to constructed URL: ${nextUrl}`); | ||
|
||
// Navigate to the next page | ||
await Promise.all([ | ||
page.waitForNavigation({ waitUntil: 'networkidle' }), | ||
page.goto(nextUrl) | ||
]); | ||
} | ||
} | ||
|
||
await page.waitForTimeout(1000); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Modularize the pagination navigation logic.
The navigation logic is complex and could benefit from being split into smaller, focused functions for better maintainability and testing.
Consider extracting these functionalities:
- URL collection logic
- Navigation handling
- URL construction for pagination
Example refactor for URL collection:
private async collectPageUrls(page: Page, config: any): Promise<string[]> {
const pageUrls: string[] = [];
const visitedUrls: Set<string> = new Set();
while (true) {
const currentUrl = page.url();
if (visitedUrls.has(currentUrl)) {
break;
}
pageUrls.push(currentUrl);
visitedUrls.add(currentUrl);
if (!await this.navigateToNextPage(page)) {
break;
}
}
return pageUrls;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
maxun-core/src/config/kafka.ts (1)
4-8
: Good implementation of topic configuration with DLQ.The topic naming is clear and follows a consistent pattern. The inclusion of a Dead Letter Queue (DLQ) demonstrates good error handling practices.
Consider adding these enhancements:
- Topic name prefixing for multi-environment deployments
- Topic configuration for partitions and retention
- Documentation for the DLQ handling strategy
export const kafkaConfig = { | ||
clientId: 'maxun-scraper', | ||
brokers: ['localhost:29092'], | ||
topics: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use environment variables for Kafka configuration.
The broker address and client ID are hardcoded, which limits deployment flexibility and poses security risks. Consider using environment variables for configuration.
Apply this diff to make the configuration more flexible:
+import { config } from 'dotenv';
+
+config();
+
export const kafkaConfig = {
- clientId: 'maxun-scraper',
- brokers: ['localhost:29092'],
+ clientId: process.env.KAFKA_CLIENT_ID || 'maxun-scraper',
+ brokers: (process.env.KAFKA_BROKERS || 'localhost:29092').split(','),
Also, consider adding SSL/authentication configuration for production environments:
// Add these properties to kafkaConfig
ssl: process.env.KAFKA_SSL === 'true',
sasl: process.env.KAFKA_SASL === 'true' ? {
mechanism: process.env.KAFKA_SASL_MECHANISM,
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
} : undefined,
consumerGroup: 'scraping-group' | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add environment separation for consumer groups.
The consumer group should be environment-aware to prevent cross-environment message processing.
Apply this diff:
- consumerGroup: 'scraping-group'
+ consumerGroup: `${process.env.ENV || 'dev'}-scraping-group`
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
consumerGroup: 'scraping-group' | |
}; | |
consumerGroup: `${process.env.ENV || 'dev'}-scraping-group` | |
}; |
export const kafkaConfig = { | ||
clientId: 'maxun-scraper', | ||
brokers: ['localhost:29092'], | ||
topics: { | ||
SCRAPING_TASKS: 'scraping-tasks', | ||
SCRAPING_RESULTS: 'scraping-results', | ||
SCRAPING_DLQ: 'scraping-dlq' | ||
}, | ||
consumerGroup: 'scraping-group' | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Remove unused Kafka configuration
The Kafka configuration appears to be unused in the codebase. There are no imports of this configuration, no Kafka client instantiations, and no other Kafka-related code present. Consider removing this unused configuration to maintain a clean codebase.
🔗 Analysis chain
Verify Kafka configuration in the codebase.
Let's ensure the configuration is properly utilized across the codebase.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for proper usage of Kafka configuration and potential security issues
# Check for hardcoded Kafka configurations
echo "Checking for hardcoded Kafka configurations..."
rg -g '!*.{json,md}' -i 'localhost:29092'
# Check for proper import usage
echo "Checking kafka config imports..."
rg "from ['|\"].*config/kafka" -A 3
# Check for potential security-sensitive Kafka operations
echo "Checking for direct Kafka operations..."
ast-grep --pattern 'new Kafka($$$)'
Length of output: 451
Script:
#!/bin/bash
# Search for Kafka-related code and configurations
# Check for usage of kafkaConfig
echo "Checking for kafkaConfig usage..."
rg "kafkaConfig"
# Check for usage of topic names
echo "Checking for topic name usage..."
rg "scraping-tasks|scraping-results|scraping-dlq"
# Check for any Kafka-related imports or code
echo "Checking for Kafka-related code..."
rg -i "kafka" -g "!*.{json,md,txt}" -A 3
Length of output: 844
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
maxun-core/src/utils/kafka-manager.ts (4)
45-57
: Optimize admin client usage instartMetricsReporting
Currently, a new admin client is created and disconnected every 5 seconds within the metrics reporting interval. This could lead to unnecessary overhead and resource consumption. Consider creating the admin client once and reusing it to improve performance.
Apply this diff to reuse the admin client:
Import
Admin
fromkafkajs
:- import { Kafka, Consumer, Producer } from 'kafkajs'; + import { Kafka, Consumer, Producer, Admin } from 'kafkajs';Add a private property for the admin client:
private kafka: Kafka; private producer: Producer; private consumer: Consumer; private metricsInterval: NodeJS.Timeout | null = null; + private admin: Admin;
Initialize the admin client in the constructor:
super(); this.kafka = new Kafka({ clientId: kafkaConfig.clientId, brokers: kafkaConfig.brokers }); + this.admin = this.kafka.admin();
Update
createTopics()
to use the existing admin client:- const admin = this.kafka.admin(); + const admin = this.admin; await admin.createTopics({ topics: [ { topic: kafkaConfig.topics.SCRAPING_TASKS, numPartitions: 10 }, { topic: kafkaConfig.topics.SCRAPING_RESULTS, numPartitions: 10 }, { topic: kafkaConfig.topics.SCRAPING_DLQ, numPartitions: 1 } ] }); - await admin.disconnect();Modify
startMetricsReporting()
to reuse the admin client and avoid disconnecting it each time:this.metricsInterval = setInterval(async () => { - const admin = this.kafka.admin(); + const admin = this.admin; const metrics = await admin.fetchTopicMetadata({ topics: [ kafkaConfig.topics.SCRAPING_TASKS, kafkaConfig.topics.SCRAPING_RESULTS ] }); this.emit('metrics', metrics); - await admin.disconnect(); }, 5000); }Disconnect the admin client in the
cleanup()
method:if (this.metricsInterval) { clearInterval(this.metricsInterval); } await this.producer.disconnect(); await this.consumer.disconnect(); + await this.admin.disconnect(); }
This refactor reduces the overhead of repeatedly creating and disconnecting the admin client.
25-30
: Add error handling in theinitialize
methodThe
initialize
method performs several asynchronous operations that could fail (connecting the producer, consumer, and creating topics). To enhance robustness, consider wrapping these operations in atry/catch
block to handle potential errors gracefully.Apply this diff to implement error handling:
+ try { await this.producer.connect(); await this.consumer.connect(); await this.createTopics(); this.startMetricsReporting(); + } catch (error) { + // Handle errors appropriately, e.g., log and rethrow or exit the process + console.error('Error during KafkaManager initialization:', error); + throw error; + } }
32-42
: Handle potential errors increateTopics
methodErrors might occur during topic creation (e.g., network issues or insufficient permissions). Wrapping the code in a
try/catch
block allows for proper error handling and logging.Apply this diff to add error handling:
+ try { const admin = this.admin; await admin.createTopics({ topics: [ { topic: kafkaConfig.topics.SCRAPING_TASKS, numPartitions: 10 }, { topic: kafkaConfig.topics.SCRAPING_RESULTS, numPartitions: 10 }, { topic: kafkaConfig.topics.SCRAPING_DLQ, numPartitions: 1 } ] }); } catch (error) { + // Handle errors, e.g., log the error + console.error('Error creating Kafka topics:', error); + throw error; + } }
59-65
: Ensure proper cleanup with error handling incleanup
methodDisconnection operations may fail, preventing complete resource cleanup. To enhance reliability, wrap the disconnection logic in a
try/catch
block.Apply this diff to handle errors during cleanup:
if (this.metricsInterval) { clearInterval(this.metricsInterval); } + try { await this.producer.disconnect(); await this.consumer.disconnect(); await this.admin.disconnect(); + } catch (error) { + // Handle errors, e.g., log the error + console.error('Error during KafkaManager cleanup:', error); + } }
this.metricsInterval = setInterval(async () => { | ||
const admin = this.kafka.admin(); | ||
const metrics = await admin.fetchTopicMetadata({ | ||
topics: [ | ||
kafkaConfig.topics.SCRAPING_TASKS, | ||
kafkaConfig.topics.SCRAPING_RESULTS | ||
] | ||
}); | ||
|
||
this.emit('metrics', metrics); | ||
await admin.disconnect(); | ||
}, 5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling in startMetricsReporting
The asynchronous function inside setInterval
may throw exceptions (e.g., network errors during fetchTopicMetadata
). Unhandled exceptions can cause the interval to stop executing. Wrap the interval's logic in a try/catch
block to ensure continuous metrics reporting.
Apply this diff to implement error handling:
this.metricsInterval = setInterval(async () => {
+ try {
const admin = this.admin;
const metrics = await admin.fetchTopicMetadata({
topics: [
kafkaConfig.topics.SCRAPING_TASKS,
kafkaConfig.topics.SCRAPING_RESULTS
]
});
this.emit('metrics', metrics);
+ } catch (error) {
+ // Handle errors, e.g., log the error
+ console.error('Error fetching Kafka metrics:', error);
+ }
}, 5000);
}
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (4)
maxun-core/src/utils/scraping-consumer.ts (4)
125-151
: Define types formessage
to improve type safetyThe
handleError
method acceptsmessage: any
, which reduces type safety. Defining the message type enhances code reliability and helps prevent errors.Example:
+ import { KafkaMessage } from 'kafkajs'; - private async handleError(message: any, error: Error) { + private async handleError(message: KafkaMessage, error: Error) {
98-114
: Collect and report failed URLs for better error handlingCurrently, errors encountered during URL processing are logged but not returned. Collecting failed URLs provides visibility into issues and allows for retries or analysis later on.
Example:
+ const failedUrls: string[] = []; for (const url of task.urls) { try { await page.goto(url, {...}); const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), task.config); results.push(...pageResults); } catch (error) { console.error(`Error processing URL ${url}:`, error); + failedUrls.push(url); } } - // Existing code to send results + // Modify the results to include failed URLs + await this.producer.send({ + topic: kafkaConfig.topics.SCRAPING_RESULTS, + messages: [{ + key: task.taskId, + value: JSON.stringify({ + taskId: task.taskId, + data: results, + failedUrls: failedUrls + }) + }] + });
36-66
: Gracefully handle consumer shutdown and errorsThe
start
method runs the consumer but doesn't handle potential shutdown signals or errors that may require disconnecting the consumer and producer. Implementing cleanup logic ensures that resources are released properly.Example:
async start() { try { await this.consumer.connect(); await this.producer.connect(); // Rest of the code } catch (error) { console.error('Error in consumer:', error); } finally { await this.consumer.disconnect(); await this.producer.disconnect(); } }
45-45
: MakepartitionsConsumedConcurrently
configurable for flexibilityThe number of partitions consumed concurrently is hardcoded to
3
, which may not be optimal in all scenarios. Making this value configurable allows for tuning based on the deployment environment.Example:
- partitionsConsumedConcurrently: 3, + partitionsConsumedConcurrently: kafkaConfig.partitionsConsumedConcurrently || 3,And add this to your
kafkaConfig
:partitionsConsumedConcurrently: number;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
maxun-core/src/utils/scraping-consumer.ts
(1 hunks)
🔇 Additional comments (2)
maxun-core/src/utils/scraping-consumer.ts (2)
76-119
:⚠️ Potential issueEnsure the browser is always closed to prevent resource leaks
In the
processTask
method, if an error occurs before the browser is initialized,browser
remainsnull
, andawait browser.close()
is not called. This can lead to resource leaks. Initializebrowser
after launching to ensure it's always closed in thefinally
block.Example:
- let browser: Browser | null = null; + let browser: Browser; try { + browser = await chromium.launch({...}); // Rest of the code } finally { if (browser) await browser.close(); }Likely invalid or redundant comment.
68-73
:⚠️ Potential issueEnsure scripts are loaded after navigation for reliability
The
ensureScriptsLoaded
method checks and loads scripts before navigation, which may not guarantee that scripts are available in the page context after navigation. Move the script injection to occur after page navigation to ensure scripts are correctly loaded.Example:
for (const url of task.urls) { try { await page.goto(url, {...}); - await this.ensureScriptsLoaded(page); + await this.ensureScriptsLoaded(page); const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), task.config); results.push(...pageResults); } catch (error) { console.error(`Error processing URL ${url}:`, error); } }Likely invalid or redundant comment.
declare global { | ||
interface Window { | ||
scrape: (selector: string | null) => Record<string, string>[]; | ||
scrapeSchema: ( | ||
schema: Record<string, { selector: string; tag: string; attribute: string }> | ||
) => Record<string, any>; | ||
scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record<string, any>[]; | ||
scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[]; | ||
scrollDown: (pages?: number) => void; | ||
scrollUp: (pages?: number) => void; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid polluting the global Window
interface
Extending the global Window
interface can lead to conflicts and unexpected behavior. Consider encapsulating these functions or using a unique namespace to prevent polluting the global scope.
Example:
- declare global {
- interface Window {
- scrape: ...;
- // Other methods
- }
- }
+ export interface ScraperWindow extends Window {
+ scrape: ...;
+ // Other methods
+ }
+ // In your code where you use `window`
+ (window as ScraperWindow).scrape(...);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
declare global { | |
interface Window { | |
scrape: (selector: string | null) => Record<string, string>[]; | |
scrapeSchema: ( | |
schema: Record<string, { selector: string; tag: string; attribute: string }> | |
) => Record<string, any>; | |
scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record<string, any>[]; | |
scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[]; | |
scrollDown: (pages?: number) => void; | |
scrollUp: (pages?: number) => void; | |
} | |
} | |
export interface ScraperWindow extends Window { | |
scrape: (selector: string | null) => Record<string, string>[]; | |
scrapeSchema: ( | |
schema: Record<string, { selector: string; tag: string; attribute: string }> | |
) => Record<string, any>; | |
scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record<string, any>[]; | |
scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[]; | |
scrollDown: (pages?: number) => void; | |
scrollUp: (pages?: number) => void; | |
} |
private async handleError(message: any, error: Error) { | ||
const retryCount = parseInt(message.headers['retry-count'] || '0'); | ||
|
||
if (retryCount < 3) { | ||
await this.producer.send({ | ||
topic: kafkaConfig.topics.SCRAPING_TASKS, | ||
messages: [{ | ||
key: message.key, | ||
value: message.value, | ||
headers: { | ||
'retry-count': (retryCount + 1).toString(), | ||
'error': error.message | ||
} | ||
}] | ||
}); | ||
} else { | ||
await this.producer.send({ | ||
topic: kafkaConfig.topics.SCRAPING_DLQ, | ||
messages: [{ | ||
key: message.key, | ||
value: message.value, | ||
headers: { | ||
'final-error': error.message | ||
} | ||
}] | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Safely access message.headers
to prevent runtime errors
In the handleError
method, message.headers
may be null
or undefined
, leading to runtime errors when accessing message.headers['retry-count']
. Ensure that message.headers
is defined before accessing its properties.
Example:
- const retryCount = parseInt(message.headers['retry-count'] || '0');
+ const headers = message.headers || {};
+ const retryCount = parseInt(headers['retry-count'] || '0');
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private async handleError(message: any, error: Error) { | |
const retryCount = parseInt(message.headers['retry-count'] || '0'); | |
if (retryCount < 3) { | |
await this.producer.send({ | |
topic: kafkaConfig.topics.SCRAPING_TASKS, | |
messages: [{ | |
key: message.key, | |
value: message.value, | |
headers: { | |
'retry-count': (retryCount + 1).toString(), | |
'error': error.message | |
} | |
}] | |
}); | |
} else { | |
await this.producer.send({ | |
topic: kafkaConfig.topics.SCRAPING_DLQ, | |
messages: [{ | |
key: message.key, | |
value: message.value, | |
headers: { | |
'final-error': error.message | |
} | |
}] | |
}); | |
} | |
} | |
private async handleError(message: any, error: Error) { | |
const headers = message.headers || {}; | |
const retryCount = parseInt(headers['retry-count'] || '0'); | |
if (retryCount < 3) { | |
await this.producer.send({ | |
topic: kafkaConfig.topics.SCRAPING_TASKS, | |
messages: [{ | |
key: message.key, | |
value: message.value, | |
headers: { | |
'retry-count': (retryCount + 1).toString(), | |
'error': error.message | |
} | |
}] | |
}); | |
} else { | |
await this.producer.send({ | |
topic: kafkaConfig.topics.SCRAPING_DLQ, | |
messages: [{ | |
key: message.key, | |
value: message.value, | |
headers: { | |
'final-error': error.message | |
} | |
}] | |
}); | |
} | |
} |
private async processTask(task: any) { | ||
let browser: Browser | null = null; | ||
const results: any[] = []; | ||
|
||
try { | ||
browser = await chromium.launch({ | ||
headless: true, | ||
args: [ | ||
"--disable-blink-features=AutomationControlled", | ||
"--disable-web-security", | ||
"--disable-features=IsolateOrigins,site-per-process", | ||
"--disable-site-isolation-trials", | ||
"--disable-extensions", | ||
"--no-sandbox", | ||
"--disable-dev-shm-usage", | ||
] | ||
}); | ||
|
||
const context = await browser.newContext(); | ||
const page = await context.newPage(); | ||
|
||
await this.ensureScriptsLoaded(page); | ||
|
||
for (const url of task.urls) { | ||
try { | ||
await page.goto(url, { | ||
waitUntil: 'networkidle', | ||
timeout: 30000 | ||
}); | ||
|
||
const pageResults = await page.evaluate( | ||
(cfg) => window.scrapeList(cfg), | ||
task.config | ||
); | ||
|
||
results.push(...pageResults); | ||
} catch (error) { | ||
console.error(`Error processing URL ${url}:`, error); | ||
} | ||
} | ||
|
||
await page.close(); | ||
} finally { | ||
if (browser) await browser.close(); | ||
} | ||
|
||
return results; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Define specific types for task
to enhance type safety
The processTask
method currently accepts task: any
, which reduces type safety and may lead to runtime errors. Defining an interface for the task structure improves code readability and allows TypeScript to catch issues at compile time.
Example:
+ interface ScrapingTask {
+ taskId: string;
+ urls: string[];
+ config: any; // Replace `any` with a more specific type if possible
+ }
- private async processTask(task: any) {
+ private async processTask(task: ScrapingTask) {
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (1)
maxun-core/src/utils/scraping-consumer.ts (1)
6-17
: 🛠️ Refactor suggestionAvoid polluting the global Window interface.
The concern about extending the global Window interface remains valid as it can lead to naming conflicts and unexpected behavior.
Consider using a dedicated namespace or interface as previously suggested:
- declare global { - interface Window { - scrape: (selector: string | null) => Record<string, string>[]; - scrapeSchema: (schema: Record<string, { selector: string; tag: string; attribute: string }>) => Record<string, any>; - scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record<string, any>[]; - scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[]; - scrollDown: (pages?: number) => void; - scrollUp: (pages?: number) => void; - } - } + export interface ScraperWindow extends Window { + scrape: (selector: string | null) => Record<string, string>[]; + scrapeSchema: (schema: Record<string, { selector: string; tag: string; attribute: string }>) => Record<string, any>; + scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record<string, any>[]; + scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[]; + scrollDown: (pages?: number) => void; + scrollUp: (pages?: number) => void; + }
🧹 Nitpick comments (4)
maxun-core/src/utils/scraping-consumer.ts (4)
23-29
: Add TypeScript interfaces for workflow tracking.Define explicit interfaces for workflow tracking to improve type safety and code maintainability.
interface WorkflowStats { startTime: number; totalTasks: number; processedTasks: number; totalItems: number; } interface ProcessedWorkflows { [workflowId: string]: Set<string>; }
56-59
: Consider increasing partition concurrency for better parallelization.The current setting of 4 concurrent partitions might be a bottleneck for bulk scraping. Consider making this configurable based on system resources and load.
- partitionsConsumedConcurrently: 4, + partitionsConsumedConcurrently: process.env.KAFKA_CONCURRENT_PARTITIONS || 4,
150-161
: Document browser launch configuration.The browser launch configuration includes important security settings that should be documented.
browser = await chromium.launch({ headless: true, args: [ + // Disable automation detection "--disable-blink-features=AutomationControlled", + // Allow cross-origin requests "--disable-web-security", + // Disable process isolation for better performance "--disable-features=IsolateOrigins,site-per-process", "--disable-site-isolation-trials", + // Security settings for containerized environments "--disable-extensions", "--no-sandbox", "--disable-dev-shm-usage", ] });
202-212
: Make workflow cleanup configuration more flexible.The cleanup interval is hardcoded to one hour. Consider making this configurable.
- const ONE_HOUR = 60 * 60 * 1000; + const CLEANUP_INTERVAL = parseInt(process.env.WORKFLOW_CLEANUP_INTERVAL || '3600000');
for (const url of task.urls) { | ||
try { | ||
await page.goto(url, { | ||
waitUntil: 'networkidle', | ||
timeout: 30000 | ||
}); | ||
|
||
await page.waitForTimeout(1000); | ||
|
||
const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), task.config); | ||
|
||
// Filter out already scraped items | ||
const newResults = pageResults.filter(item => { | ||
const uniqueKey = JSON.stringify(item); | ||
if (scrapedItems.has(uniqueKey)) return false; // Ignore if already scraped | ||
scrapedItems.add(uniqueKey); // Mark as scraped | ||
return true; | ||
}); | ||
|
||
allResults = allResults.concat(newResults); | ||
console.log(`Results so far (${task.taskId}): ${allResults.length}`); | ||
} catch (error) { | ||
console.error(`Error processing URL ${url}:`, error); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Optimize parallel processing of URLs.
The current implementation processes URLs sequentially. Consider processing URLs in parallel batches for better performance.
private async processUrlBatch(urls: string[], context: BrowserContext, task: any) {
const results = await Promise.all(
urls.map(async (url) => {
const page = await context.newPage();
try {
await this.ensureScriptsLoaded(page);
await page.goto(url, { waitUntil: 'networkidle', timeout: 30000 });
await page.waitForTimeout(1000);
return await page.evaluate((cfg) => window.scrapeList(cfg), task.config);
} catch (error) {
console.error(`Error processing URL ${url}:`, error);
return [];
} finally {
await page.close();
}
})
);
return results.flat();
}
const task = JSON.parse(message.value!.toString()); | ||
const workflowId = task.workflowId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add proper error handling for JSON parsing.
The current implementation might throw unhandled exceptions for malformed messages.
- const task = JSON.parse(message.value!.toString());
- const workflowId = task.workflowId;
+ let task;
+ try {
+ task = JSON.parse(message.value?.toString() || '');
+ if (!task?.workflowId) {
+ throw new Error('Invalid task format: missing workflowId');
+ }
+ } catch (error) {
+ console.error('Failed to parse task:', error);
+ throw error;
+ }
+ const workflowId = task.workflowId;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
const task = JSON.parse(message.value!.toString()); | |
const workflowId = task.workflowId; | |
let task; | |
try { | |
task = JSON.parse(message.value?.toString() || ''); | |
if (!task?.workflowId) { | |
throw new Error('Invalid task format: missing workflowId'); | |
} | |
} catch (error) { | |
console.error('Failed to parse task:', error); | |
throw error; | |
} | |
const workflowId = task.workflowId; |
private async handleError(message: any, error: Error) { | ||
const retryCount = parseInt(message.headers['retry-count'] || '0'); | ||
const task = JSON.parse(message.value!.toString()); | ||
|
||
if (retryCount < 3) { | ||
await this.producer.send({ | ||
topic: kafkaConfig.topics.SCRAPING_TASKS, | ||
messages: [{ | ||
key: message.key, | ||
value: message.value, | ||
headers: { | ||
'workflow-id': task.workflowId, | ||
'retry-count': (retryCount + 1).toString(), | ||
'error': error.message | ||
} | ||
}] | ||
}); | ||
} else { | ||
await this.producer.send({ | ||
topic: kafkaConfig.topics.SCRAPING_DLQ, | ||
messages: [{ | ||
key: message.key, | ||
value: message.value, | ||
headers: { | ||
'workflow-id': task.workflowId, | ||
'final-error': error.message | ||
} | ||
}] | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling type safety and header access.
The error handling code lacks proper typing and safe header access.
- private async handleError(message: any, error: Error) {
+ interface KafkaMessage {
+ headers: Record<string, Buffer | null>;
+ key: Buffer | null;
+ value: Buffer | null;
+ }
+
+ private async handleError(message: KafkaMessage, error: Error) {
+ const headers = message.headers || {};
+ const retryCount = parseInt(headers['retry-count']?.toString() || '0');
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (1)
maxun-core/src/interpret.ts (1)
567-736
: 🛠️ Refactor suggestionModularize the pagination navigation logic.
The navigation logic is complex and could benefit from being split into smaller, focused functions for better maintainability and testing.
Additionally, consider these improvements:
- Add retry mechanism for failed tasks
- Implement backoff strategy for navigation failures
- Add metrics collection for performance monitoring
Apply this diff to add these improvements:
+interface NavigationResult { + success: boolean; + url: string; + error?: Error; +} + +private async navigateWithRetry(page: Page, url: string, maxRetries = 3): Promise<NavigationResult> { + let lastError: Error | undefined; + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + await page.goto(url); + await page.waitForLoadState('networkidle'); + return { success: true, url }; + } catch (error) { + lastError = error; + if (attempt < maxRetries) { + const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000); + await page.waitForTimeout(delay); + } + } + } + return { success: false, url, error: lastError }; +} + +private collectMetrics(workflowId: string, metrics: Record<string, number>) { + console.log(`Metrics for workflow ${workflowId}:`, metrics); + // TODO: Send metrics to monitoring system +} + private async handleParallelPagination(page: Page, config: any): Promise<any[]> { if (config.limit > 10000 && config.pagination.type === 'clickNext') { console.time('parallel-scraping'); + const metrics = { + totalPages: 0, + failedNavigations: 0, + retries: 0, + }; const workflowId = `workflow-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; console.log(`Starting workflow with ID: ${workflowId}`);
🧹 Nitpick comments (4)
maxun-core/src/scripts/start-consumer.ts (2)
12-20
: Enhance error logging.The error handling could be improved by including more details in the error log.
Apply this diff to enhance error logging:
try { console.log('Starting scraping consumer...'); await consumer.start(); console.log('Consumer is running and waiting for tasks...'); } catch (error) { - console.error('Failed to start consumer:', error); + console.error('Failed to start consumer:', { + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined + }); process.exit(1); }
22-22
: Enhance error handling in main function invocation.The error handling in the main function invocation could be improved by adding proper error details and cleanup.
Apply this diff to enhance error handling:
-main().catch(console.error); +main().catch((error) => { + console.error('Fatal error:', { + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined + }); + process.exit(1); +});maxun-core/src/scripts/setup-kafka.ts (1)
23-23
: Enhance error handling in setupKafka invocation.The error handling in the setupKafka function invocation could be improved by adding proper error details and cleanup.
Apply this diff to enhance error handling:
-setupKafka().catch(console.error); +setupKafka().catch((error) => { + console.error('Fatal error during Kafka setup:', { + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined + }); + process.exit(1); +});maxun-core/src/interpret.ts (1)
853-853
: Use proper logging mechanism.Replace direct console.log with the class's logging mechanism for consistency.
Apply this diff:
- console.log(`Current scroll height: ${currentHeight}`); + this.log(`Current scroll height: ${currentHeight}`, Level.LOG);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
maxun-core/src/interpret.ts
(6 hunks)maxun-core/src/scripts/setup-kafka.ts
(1 hunks)maxun-core/src/scripts/start-consumer.ts
(1 hunks)
🔇 Additional comments (4)
maxun-core/src/scripts/start-consumer.ts (1)
1-2
: LGTM!The import statement is correctly formatted and uses an appropriate relative path.
maxun-core/src/scripts/setup-kafka.ts (1)
1-2
: LGTM!The import statement is correctly formatted and uses an appropriate relative path.
maxun-core/src/interpret.ts (2)
19-23
: LGTM!The new imports are correctly formatted and necessary for implementing the parallel processing capabilities.
47-47
: Add missing property toInterpreterOptions
interface.The
serializableCallback
property is used in the code but not defined in the providedInterpreterOptions
interface.
async function main() { | ||
const consumer = new ScrapingConsumer(); | ||
|
||
// Handle graceful shutdown | ||
process.on('SIGINT', async () => { | ||
console.log('Shutting down consumer...'); | ||
process.exit(0); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve graceful shutdown handling.
The current SIGINT handler immediately exits without cleaning up resources or waiting for in-progress tasks to complete. This could lead to data loss or inconsistent state.
Apply this diff to implement proper cleanup:
process.on('SIGINT', async () => {
console.log('Shutting down consumer...');
+ try {
+ await consumer.stop();
+ console.log('Consumer stopped successfully');
+ } catch (error) {
+ console.error('Error during shutdown:', error);
+ }
process.exit(0);
});
Committable suggestion skipped: line range outside the PR's diff.
async function setupKafka() { | ||
const manager = new KafkaManager(); | ||
|
||
try { | ||
console.log('Initializing Kafka manager...'); | ||
await manager.initialize(); | ||
console.log('Kafka setup completed successfully'); | ||
|
||
// Keep monitoring for a while to verify setup | ||
setTimeout(async () => { | ||
await manager.cleanup(); | ||
process.exit(0); | ||
}, 10000); | ||
|
||
} catch (error) { | ||
console.error('Failed to setup Kafka:', error); | ||
process.exit(1); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve Kafka setup monitoring.
The current implementation has several issues:
- Hardcoded timeout duration
- Forced cleanup after timeout regardless of setup status
- Missing configuration validation
Apply this diff to improve the implementation:
+const KAFKA_SETUP_TIMEOUT_MS = 10000; // Move to config
+
async function setupKafka() {
const manager = new KafkaManager();
try {
console.log('Initializing Kafka manager...');
+ // Validate Kafka configuration
+ if (!manager.validateConfig()) {
+ throw new Error('Invalid Kafka configuration');
+ }
+
await manager.initialize();
console.log('Kafka setup completed successfully');
- // Keep monitoring for a while to verify setup
- setTimeout(async () => {
- await manager.cleanup();
- process.exit(0);
- }, 10000);
+ // Monitor setup status
+ const setupTimeout = new Promise((_, reject) => {
+ setTimeout(() => reject(new Error('Kafka setup verification timeout')),
+ KAFKA_SETUP_TIMEOUT_MS);
+ });
+
+ try {
+ await Promise.race([
+ manager.verifySetup(),
+ setupTimeout
+ ]);
+ console.log('Kafka setup verified successfully');
+ await manager.cleanup();
+ process.exit(0);
+ } catch (error) {
+ throw new Error(`Kafka setup verification failed: ${error.message}`);
+ }
} catch (error) {
console.error('Failed to setup Kafka:', error);
process.exit(1);
}
}
Committable suggestion skipped: line range outside the PR's diff.
private kafka: Kafka; | ||
|
||
private producer: any; | ||
|
||
private async initializeKafka() { | ||
this.producer = this.kafka.producer({ | ||
allowAutoTopicCreation: true, | ||
idempotent: true | ||
}); | ||
await this.producer.connect(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve Kafka producer initialization.
The current implementation has several issues:
- Hardcoded producer options
- Missing error handling
- No cleanup handling
Apply this diff to improve the implementation:
private kafka: Kafka;
private producer: any;
+private readonly producerConfig = {
+ allowAutoTopicCreation: true,
+ idempotent: true,
+ // Add other configurable options
+};
private async initializeKafka() {
- this.producer = this.kafka.producer({
- allowAutoTopicCreation: true,
- idempotent: true
- });
- await this.producer.connect();
+ try {
+ this.producer = this.kafka.producer(this.producerConfig);
+ await this.producer.connect();
+ console.log('Kafka producer connected successfully');
+ } catch (error) {
+ console.error('Failed to initialize Kafka producer:', error);
+ throw error;
+ }
}
+
+private async cleanup() {
+ if (this.producer) {
+ try {
+ await this.producer.disconnect();
+ console.log('Kafka producer disconnected successfully');
+ } catch (error) {
+ console.error('Error disconnecting Kafka producer:', error);
+ }
+ }
+}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private kafka: Kafka; | |
private producer: any; | |
private async initializeKafka() { | |
this.producer = this.kafka.producer({ | |
allowAutoTopicCreation: true, | |
idempotent: true | |
}); | |
await this.producer.connect(); | |
} | |
private kafka: Kafka; | |
private producer: any; | |
private readonly producerConfig = { | |
allowAutoTopicCreation: true, | |
idempotent: true, | |
// Add other configurable options | |
}; | |
private async initializeKafka() { | |
try { | |
this.producer = this.kafka.producer(this.producerConfig); | |
await this.producer.connect(); | |
console.log('Kafka producer connected successfully'); | |
} catch (error) { | |
console.error('Failed to initialize Kafka producer:', error); | |
throw error; | |
} | |
} | |
private async cleanup() { | |
if (this.producer) { | |
try { | |
await this.producer.disconnect(); | |
console.log('Kafka producer disconnected successfully'); | |
} catch (error) { | |
console.error('Error disconnecting Kafka producer:', error); | |
} | |
} | |
} |
this.kafka = new Kafka({ | ||
clientId: kafkaConfig.clientId, | ||
brokers: kafkaConfig.brokers | ||
}); | ||
this.initializeKafka(); | ||
this.options = { | ||
maxRepeats: 5, | ||
maxConcurrency: 5, | ||
maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve constructor initialization.
The current implementation has several issues:
- Kafka initialization is not awaited
- maxWorkers calculation logic is embedded in the constructor
- Missing error handling for Kafka initialization
Apply this diff to improve the implementation:
+private calculateMaxWorkers(): number {
+ return Math.max(1, Math.min(os.cpus().length - 1, 4));
+}
+
constructor(workflow: WorkflowFile, options?: Partial<InterpreterOptions>) {
super();
this.workflow = workflow.workflow;
this.initializedWorkflow = null;
this.kafka = new Kafka({
clientId: kafkaConfig.clientId,
brokers: kafkaConfig.brokers
});
- this.initializeKafka();
+ // Initialize Kafka asynchronously
+ this.initializeKafka().catch(error => {
+ console.error('Failed to initialize Kafka:', error);
+ throw error;
+ });
this.options = {
maxRepeats: 5,
maxConcurrency: 5,
- maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)),
+ maxWorkers: this.calculateMaxWorkers(),
serializableCallback: (data) => {
log(JSON.stringify(data), Level.WARN);
},
Committable suggestion skipped: line range outside the PR's diff.
private async waitForScrapingResults(tasks: any[]): Promise<any[]> { | ||
// Create a map to store our workflow's results | ||
const resultsMap = new Map<string, any[]>(); | ||
|
||
// Extract the workflow ID from the first task - all tasks in this batch will share the same workflow ID | ||
const workflowId = tasks[0].workflowId; | ||
console.log(`Waiting for results from workflow: ${workflowId}`); | ||
|
||
// Create a Set of task IDs for quick lookup - these are the only tasks we care about | ||
const expectedTaskIds = new Set(tasks.map(task => task.taskId)); | ||
|
||
// Create a consumer specifically for this workflow | ||
const resultConsumer = this.kafka.consumer({ | ||
groupId: `scraping-group-results-${workflowId}`, | ||
maxWaitTimeInMs: 1000, | ||
maxBytesPerPartition: 2097152 // 2MB | ||
}); | ||
|
||
try { | ||
await resultConsumer.connect(); | ||
console.log('Result consumer connected successfully'); | ||
|
||
await resultConsumer.subscribe({ | ||
topic: kafkaConfig.topics.SCRAPING_RESULTS, | ||
fromBeginning: true | ||
}); | ||
console.log('Result consumer subscribed to topic successfully'); | ||
|
||
return new Promise((resolve, reject) => { | ||
let isRunning = true; | ||
|
||
resultConsumer.run({ | ||
eachMessage: async ({ topic, partition, message }) => { | ||
if (!isRunning) return; | ||
|
||
try { | ||
const result = JSON.parse(message.value!.toString()); | ||
|
||
// Verify both task ID and workflow ID match | ||
if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) { | ||
// Store this task's results | ||
if (!resultsMap.has(result.taskId)) { | ||
resultsMap.set(result.taskId, result.data); | ||
console.log(`Received results for task ${result.taskId}. ` + | ||
`Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`); | ||
} | ||
|
||
// Check if we have all our workflow's results | ||
if (resultsMap.size === tasks.length) { | ||
isRunning = false; | ||
|
||
// Sort tasks by their numeric index (extract number from task ID) | ||
const sortedTasks = [...tasks].sort((a, b) => { | ||
const aIndex = parseInt(a.taskId.split('-').pop() || '0'); | ||
const bIndex = parseInt(b.taskId.split('-').pop() || '0'); | ||
return aIndex - bIndex; | ||
}); | ||
|
||
// Combine results in the sorted task order | ||
const allResults = sortedTasks | ||
.map(task => { | ||
const taskResults = resultsMap.get(task.taskId); | ||
if (!taskResults) { | ||
console.warn(`Missing results for task ${task.taskId} in workflow ${workflowId}`); | ||
return []; | ||
} | ||
return taskResults; | ||
}) | ||
.flat(); | ||
|
||
console.log(`Successfully collected all results from workflow ${workflowId}`); | ||
|
||
resolve(allResults); | ||
} | ||
} | ||
} catch (error) { | ||
console.error(`Error processing message in workflow ${workflowId}:`, error); | ||
reject(error); | ||
} | ||
} | ||
}); | ||
|
||
// // Add a timeout to prevent hanging | ||
// const timeout = setTimeout(() => { | ||
// if (isRunning) { | ||
// isRunning = false; | ||
// console.error(`Timeout waiting for results from workflow ${workflowId}. ` + | ||
// `Received ${resultsMap.size} of ${tasks.length} expected results.`); | ||
// reject(new Error(`Timeout waiting for results from workflow ${workflowId}`)); | ||
// } | ||
// }, 30000); // 30 second timeout | ||
}); | ||
|
||
} catch (error) { | ||
console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error); | ||
throw error; | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve result collection reliability.
The current implementation has several issues:
- Commented-out timeout code
- Missing proper consumer cleanup
- No handling of partial results
Apply this diff to improve the implementation:
+interface ScrapingResult {
+ workflowId: string;
+ taskId: string;
+ data: any[];
+}
+
+const RESULT_COLLECTION_TIMEOUT_MS = 30000;
+
private async waitForScrapingResults(tasks: any[]): Promise<any[]> {
const resultsMap = new Map<string, any[]>();
const workflowId = tasks[0].workflowId;
console.log(`Waiting for results from workflow: ${workflowId}`);
const expectedTaskIds = new Set(tasks.map(task => task.taskId));
const resultConsumer = this.kafka.consumer({
groupId: `scraping-group-results-${workflowId}`,
maxWaitTimeInMs: 1000,
maxBytesPerPartition: 2097152
});
try {
await resultConsumer.connect();
console.log('Result consumer connected successfully');
await resultConsumer.subscribe({
topic: kafkaConfig.topics.SCRAPING_RESULTS,
fromBeginning: true
});
console.log('Result consumer subscribed to topic successfully');
return new Promise((resolve, reject) => {
let isRunning = true;
+ const timeout = setTimeout(() => {
+ if (isRunning) {
+ isRunning = false;
+ const error = new Error(
+ `Timeout waiting for results from workflow ${workflowId}. ` +
+ `Received ${resultsMap.size} of ${tasks.length} expected results.`
+ );
+ reject(error);
+ }
+ }, RESULT_COLLECTION_TIMEOUT_MS);
resultConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!isRunning) return;
try {
- const result = JSON.parse(message.value!.toString());
+ const result = JSON.parse(message.value!.toString()) as ScrapingResult;
if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) {
if (!resultsMap.has(result.taskId)) {
resultsMap.set(result.taskId, result.data);
console.log(
`Received results for task ${result.taskId}. ` +
`Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`
);
}
if (resultsMap.size === tasks.length) {
isRunning = false;
+ clearTimeout(timeout);
const sortedTasks = [...tasks].sort((a, b) => {
const aIndex = parseInt(a.taskId.split('-').pop() || '0');
const bIndex = parseInt(b.taskId.split('-').pop() || '0');
return aIndex - bIndex;
});
const allResults = sortedTasks
.map(task => resultsMap.get(task.taskId) || [])
.flat();
console.log(`Successfully collected all results from workflow ${workflowId}`);
+ await resultConsumer.disconnect();
resolve(allResults);
}
}
} catch (error) {
console.error(`Error processing message in workflow ${workflowId}:`, error);
+ clearTimeout(timeout);
+ await resultConsumer.disconnect();
reject(error);
}
}
});
});
} catch (error) {
console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error);
+ await resultConsumer.disconnect();
throw error;
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private async waitForScrapingResults(tasks: any[]): Promise<any[]> { | |
// Create a map to store our workflow's results | |
const resultsMap = new Map<string, any[]>(); | |
// Extract the workflow ID from the first task - all tasks in this batch will share the same workflow ID | |
const workflowId = tasks[0].workflowId; | |
console.log(`Waiting for results from workflow: ${workflowId}`); | |
// Create a Set of task IDs for quick lookup - these are the only tasks we care about | |
const expectedTaskIds = new Set(tasks.map(task => task.taskId)); | |
// Create a consumer specifically for this workflow | |
const resultConsumer = this.kafka.consumer({ | |
groupId: `scraping-group-results-${workflowId}`, | |
maxWaitTimeInMs: 1000, | |
maxBytesPerPartition: 2097152 // 2MB | |
}); | |
try { | |
await resultConsumer.connect(); | |
console.log('Result consumer connected successfully'); | |
await resultConsumer.subscribe({ | |
topic: kafkaConfig.topics.SCRAPING_RESULTS, | |
fromBeginning: true | |
}); | |
console.log('Result consumer subscribed to topic successfully'); | |
return new Promise((resolve, reject) => { | |
let isRunning = true; | |
resultConsumer.run({ | |
eachMessage: async ({ topic, partition, message }) => { | |
if (!isRunning) return; | |
try { | |
const result = JSON.parse(message.value!.toString()); | |
// Verify both task ID and workflow ID match | |
if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) { | |
// Store this task's results | |
if (!resultsMap.has(result.taskId)) { | |
resultsMap.set(result.taskId, result.data); | |
console.log(`Received results for task ${result.taskId}. ` + | |
`Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`); | |
} | |
// Check if we have all our workflow's results | |
if (resultsMap.size === tasks.length) { | |
isRunning = false; | |
// Sort tasks by their numeric index (extract number from task ID) | |
const sortedTasks = [...tasks].sort((a, b) => { | |
const aIndex = parseInt(a.taskId.split('-').pop() || '0'); | |
const bIndex = parseInt(b.taskId.split('-').pop() || '0'); | |
return aIndex - bIndex; | |
}); | |
// Combine results in the sorted task order | |
const allResults = sortedTasks | |
.map(task => { | |
const taskResults = resultsMap.get(task.taskId); | |
if (!taskResults) { | |
console.warn(`Missing results for task ${task.taskId} in workflow ${workflowId}`); | |
return []; | |
} | |
return taskResults; | |
}) | |
.flat(); | |
console.log(`Successfully collected all results from workflow ${workflowId}`); | |
resolve(allResults); | |
} | |
} | |
} catch (error) { | |
console.error(`Error processing message in workflow ${workflowId}:`, error); | |
reject(error); | |
} | |
} | |
}); | |
// // Add a timeout to prevent hanging | |
// const timeout = setTimeout(() => { | |
// if (isRunning) { | |
// isRunning = false; | |
// console.error(`Timeout waiting for results from workflow ${workflowId}. ` + | |
// `Received ${resultsMap.size} of ${tasks.length} expected results.`); | |
// reject(new Error(`Timeout waiting for results from workflow ${workflowId}`)); | |
// } | |
// }, 30000); // 30 second timeout | |
}); | |
} catch (error) { | |
console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error); | |
throw error; | |
} | |
} | |
interface ScrapingResult { | |
workflowId: string; | |
taskId: string; | |
data: any[]; | |
} | |
const RESULT_COLLECTION_TIMEOUT_MS = 30000; | |
private async waitForScrapingResults(tasks: any[]): Promise<any[]> { | |
// Create a map to store our workflow's results | |
const resultsMap = new Map<string, any[]>(); | |
// Extract the workflow ID from the first task - all tasks in this batch will share the same workflow ID | |
const workflowId = tasks[0].workflowId; | |
console.log(`Waiting for results from workflow: ${workflowId}`); | |
// Create a Set of task IDs for quick lookup - these are the only tasks we care about | |
const expectedTaskIds = new Set(tasks.map(task => task.taskId)); | |
// Create a consumer specifically for this workflow | |
const resultConsumer = this.kafka.consumer({ | |
groupId: `scraping-group-results-${workflowId}`, | |
maxWaitTimeInMs: 1000, | |
maxBytesPerPartition: 2097152 // 2MB | |
}); | |
try { | |
await resultConsumer.connect(); | |
console.log('Result consumer connected successfully'); | |
await resultConsumer.subscribe({ | |
topic: kafkaConfig.topics.SCRAPING_RESULTS, | |
fromBeginning: true | |
}); | |
console.log('Result consumer subscribed to topic successfully'); | |
return new Promise((resolve, reject) => { | |
let isRunning = true; | |
const timeout = setTimeout(() => { | |
if (isRunning) { | |
isRunning = false; | |
const error = new Error( | |
`Timeout waiting for results from workflow ${workflowId}. ` + | |
`Received ${resultsMap.size} of ${tasks.length} expected results.` | |
); | |
reject(error); | |
} | |
}, RESULT_COLLECTION_TIMEOUT_MS); | |
resultConsumer.run({ | |
eachMessage: async ({ topic, partition, message }) => { | |
if (!isRunning) return; | |
try { | |
const result = JSON.parse(message.value!.toString()) as ScrapingResult; | |
// Verify both task ID and workflow ID match | |
if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) { | |
// Store this task's results | |
if (!resultsMap.has(result.taskId)) { | |
resultsMap.set(result.taskId, result.data); | |
console.log( | |
`Received results for task ${result.taskId}. ` + | |
`Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}` | |
); | |
} | |
// Check if we have all our workflow's results | |
if (resultsMap.size === tasks.length) { | |
isRunning = false; | |
clearTimeout(timeout); | |
// Sort tasks by their numeric index (extract number from task ID) | |
const sortedTasks = [...tasks].sort((a, b) => { | |
const aIndex = parseInt(a.taskId.split('-').pop() || '0'); | |
const bIndex = parseInt(b.taskId.split('-').pop() || '0'); | |
return aIndex - bIndex; | |
}); | |
// Combine results in the sorted task order | |
const allResults = sortedTasks | |
.map(task => resultsMap.get(task.taskId) || []) | |
.flat(); | |
console.log(`Successfully collected all results from workflow ${workflowId}`); | |
await resultConsumer.disconnect(); | |
resolve(allResults); | |
} | |
} | |
} catch (error) { | |
console.error(`Error processing message in workflow ${workflowId}:`, error); | |
clearTimeout(timeout); | |
await resultConsumer.disconnect(); | |
reject(error); | |
} | |
} | |
}); | |
}); | |
} catch (error) { | |
console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error); | |
await resultConsumer.disconnect(); | |
throw error; | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
maxun-core/src/interpret.ts (3)
81-88
:⚠️ Potential issueImprove Kafka initialization and error handling.
The current implementation has several issues:
- Kafka initialization in the constructor is not awaited
- Missing error handling for initialization failures
- No cleanup handling for the Kafka producer
This issue was previously identified in a past review. Please refer to the suggested improvements in that review comment.
Also applies to: 93-97
596-682
: 🛠️ Refactor suggestionModularize the pagination navigation logic.
The navigation logic is complex and should be split into smaller, focused functions for better maintainability and testing.
This issue was previously identified in a past review. Please refer to the suggested improvements in that review comment.
738-836
:⚠️ Potential issueImprove result collection reliability and resource cleanup.
The current implementation has several issues:
- Timeout handling is commented out
- Missing proper consumer cleanup
- No handling of partial results
This issue was previously identified in a past review. Please refer to the suggested improvements in that review comment.
🧹 Nitpick comments (3)
maxun-core/src/interpret.ts (3)
19-22
: Consider using a more specific import from theos
module.Instead of importing the entire
os
module, import only the requiredcpus
function for better tree-shaking.-import os from 'os'; +import { cpus } from 'os';
567-568
: Extract magic numbers as named constants.The threshold value of 10,000 should be defined as a named constant at the class level for better maintainability.
+private static readonly PARALLEL_SCRAPING_THRESHOLD = 10000; +private static readonly NAVIGATION_TIMEOUT = 30000; private async handleParallelPagination(page: Page, config: any): Promise<any[]> { - if (config.limit > 10000 && config.pagination.type === 'clickNext') { + if (config.limit > this.PARALLEL_SCRAPING_THRESHOLD && config.pagination.type === 'clickNext') {
689-725
: Add memory management for large datasets.The current implementation stores all URLs in memory before processing. For large datasets, this could lead to memory issues.
Consider implementing a streaming approach or processing URLs in chunks:
private async* urlGenerator(page: Page, config: any): AsyncGenerator<string> { let currentUrl = page.url(); while (true) { yield currentUrl; const nextUrl = await this.getNextUrl(page, config); if (!nextUrl) break; currentUrl = nextUrl; } }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
docker-compose.yml (3)
42-57
: LGTM! Consider using more specific volume names.The Zookeeper configuration follows best practices with proper health checks and environment variables. However, consider making the volume names more specific to avoid potential conflicts in larger deployments.
- zookeeper_data:/var/lib/zookeeper/data - zookeeper_log:/var/lib/zookeeper/log + maxun_zookeeper_data:/var/lib/zookeeper/data + maxun_zookeeper_log:/var/lib/zookeeper/log
58-83
: Review security configuration for production deployment.While the Kafka configuration is suitable for development, consider these security aspects for production:
PLAINTEXT
protocol is insecure for production useKAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
could lead to unauthorized topic creation- Single broker setup with replication factor of 1 offers no redundancy
For production deployment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + # Use SASL_SSL for production + # KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:SASL_SSL KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + # Disable for production + # KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
141-144
: Add newline at end of file.Add a newline character at the end of the file to comply with POSIX standards.
zookeeper_data: zookeeper_log: +
🧰 Tools
🪛 yamllint (1.35.1)
[error] 144-144: no new line character at the end of file
(new-line-at-end-of-file)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
docker-compose.yml
(3 hunks)
🧰 Additional context used
🪛 yamllint (1.35.1)
docker-compose.yml
[error] 144-144: no new line character at the end of file
(new-line-at-end-of-file)
kafka-ui: | ||
image: provectuslabs/kafka-ui:latest | ||
container_name: kafka-ui | ||
ports: | ||
- "${KAFKA_UI_PORT:-9080}:8080" | ||
environment: | ||
KAFKA_CLUSTERS_0_NAME: maxun-cluster | ||
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 | ||
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 | ||
depends_on: | ||
- kafka | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Pin the kafka-ui image version.
Using the latest
tag can lead to unexpected behavior when the image is updated. Pin to a specific version for reproducible builds.
- image: provectuslabs/kafka-ui:latest
+ image: provectuslabs/kafka-ui:v0.7.1
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
kafka-ui: | |
image: provectuslabs/kafka-ui:latest | |
container_name: kafka-ui | |
ports: | |
- "${KAFKA_UI_PORT:-9080}:8080" | |
environment: | |
KAFKA_CLUSTERS_0_NAME: maxun-cluster | |
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 | |
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 | |
depends_on: | |
- kafka | |
kafka-ui: | |
image: provectuslabs/kafka-ui:v0.7.1 | |
container_name: kafka-ui | |
ports: | |
- "${KAFKA_UI_PORT:-9080}:8080" | |
environment: | |
KAFKA_CLUSTERS_0_NAME: maxun-cluster | |
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 | |
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 | |
depends_on: | |
- kafka |
@@ -63,6 +117,7 @@ services: | |||
- postgres | |||
- redis | |||
- minio | |||
- kafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add Kafka configuration for backend service.
The backend service now depends on Kafka but lacks the necessary environment variables for connection configuration.
Add these environment variables to the backend service:
environment:
+ KAFKA_BROKERS: kafka:9092
+ KAFKA_CLIENT_ID: maxun-backend
+ KAFKA_CONSUMER_GROUP_ID: maxun-scraping-group
Committable suggestion skipped: line range outside the PR's diff.
Batching and Parallelization to bulk scrape data
Summary by CodeRabbit
New Features
Improvements
Technical Updates