Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: parallelization #352

Open
wants to merge 22 commits into
base: develop
Choose a base branch
from
Open

Conversation

RohitR311
Copy link
Contributor

@RohitR311 RohitR311 commented Jan 14, 2025

Batching and Parallelization to bulk scrape data

Summary by CodeRabbit

  • New Features

    • Introduced parallel processing capabilities for web scraping pagination.
    • Added Kafka configuration for task and result management.
    • Implemented a new class for managing Kafka interactions.
    • Added a new class for consuming scraping tasks from Kafka.
    • Introduced a setup function for initializing Kafka.
    • Added new services for Zookeeper and Kafka in Docker configuration.
  • Improvements

    • Enhanced performance metrics tracking for scraping operations.
    • Optimized scraping process to handle large datasets more efficiently.
    • Expanded TypeScript compilation to include all source files.
  • Technical Updates

    • Added comprehensive performance and progress tracking interfaces.

@RohitR311 RohitR311 marked this pull request as draft January 14, 2025 14:44
Copy link

coderabbitai bot commented Jan 14, 2025

Walkthrough

The pull request introduces substantial enhancements to the Maxun core scraping system, focusing on parallel processing capabilities and improved pagination handling. Key changes include the addition of the handleParallelPagination and waitForScrapingResults methods in the Interpreter class, updates to the InterpreterOptions interface to include maxWorkers, and several new TypeScript interfaces for worker configurations and performance metrics. Additionally, a Kafka configuration is established for managing tasks and results, along with new classes for managing Kafka interactions and processing scraping tasks.

Changes

File Change Summary
maxun-core/src/interpret.ts Added handleParallelPagination and waitForScrapingResults methods; updated InterpreterOptions to include maxWorkers; modified handlePagination to invoke the new method under specific conditions.
maxun-core/src/types/worker.ts Introduced new interfaces: WorkerConfig, SharedState, WorkerProgressData, PerformanceMetrics, and GlobalMetrics.
maxun-core/tsconfig.json Updated include path from "src" to "src/**/*" to compile all nested files.
maxun-core/src/config/kafka.ts Added kafkaConfig with configurations for Kafka broker, topics, and consumer group.
maxun-core/src/utils/kafka-manager.ts Introduced KafkaManager class for managing Kafka producers and consumers, including methods for initialization, topic management, and metrics reporting.
maxun-core/src/utils/scraping-consumer.ts Added ScrapingConsumer class for integrating Kafka messaging with web scraping, including message handling and error management.
maxun-core/src/scripts/setup-kafka.ts Introduced setupKafka function to initialize KafkaManager and handle setup process.
maxun-core/src/scripts/start-consumer.ts Added main function to manage the initialization of ScrapingConsumer and graceful shutdown.
docker-compose.yml Added services for zookeeper, kafka, and kafka-ui, along with new volume declarations for data management.

Possibly related PRs

Suggested reviewers

  • amhsirak

Poem

🐰 In the fields of code, we hop and play,
Parallel paths lead the scraping way.
Kafka's whispers guide our quest,
Workers unite, doing their best.
Pagination flows, swift and bright,
A rabbit's code takes glorious flight! 🚀


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@amhsirak amhsirak changed the title feat: bulk scraping using parallelization feat: parallelization Jan 14, 2025
@amhsirak amhsirak added the Type: Feature New features label Jan 14, 2025
@amhsirak amhsirak marked this pull request as ready for review January 17, 2025 09:49
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🔭 Outside diff range comments (2)
maxun-core/src/interpret.ts (2)

Line range hint 463-714: Optimize handleParallelPagination method

The handleParallelPagination method contains nested loops and multiple asynchronous operations that could be optimized for better performance and reliability.

Consider the following improvements:

  • Use proper error handling to catch and manage exceptions during page navigation and scraping.
  • Avoid tight loops without delays, which can cause high CPU usage.
  • Ensure that shared resources like visitedUrls are accessed in a thread-safe manner.
  • Replace console.log statements with a proper logging mechanism.

Line range hint 829-855: Validate action applicability and selection logic

In getMatchingActionId, actions are selected based on applicability, but there might be scenarios where no action is applicable, leading to undefined behavior.

Ensure that the method handles cases where no matching action is found and that calling code checks for undefined or invalid action IDs.

🧹 Nitpick comments (12)
maxun-core/src/utils/worker.ts (3)

55-65: Avoid potential race conditions when posting progress

When posting progress to the parent thread, the progress data may not reflect the latest state if other asynchronous operations are modifying shared state.

Consider using locks or other synchronization mechanisms when accessing and updating shared state to prevent race conditions.


67-70: Set appropriate navigation timeout

The page.goto function uses a timeout of 30,000 milliseconds. Depending on network conditions and page complexity, this might be insufficient or excessive.

Review whether the timeout duration is appropriate for your use case. You could make it configurable via the WorkerConfig.


95-104: Optimize duplicate filtering logic

The duplicate filtering in lines 95-104 compares JSON strings of items, which can be inefficient for large datasets.

Consider using a hash function to generate a unique identifier for each item. This can improve performance and reduce memory usage.

Apply this diff to implement hashing:

+ const crypto = require('crypto');
  
  const newResults = pageResults.filter(item => {
-   const uniqueKey = JSON.stringify(item);
+   const uniqueKey = crypto.createHash('sha256').update(JSON.stringify(item)).digest('hex');
    
    // Check against local duplicates
    if (scrapedItems.has(uniqueKey)) return false;
maxun-core/src/utils/worker-pool.ts (3)

70-75: Consider more informative error handling in handleWorkerError

The handleWorkerError function logs an error message but could include more detailed information.

Include stack traces or additional context in the error logs to facilitate debugging.


219-235: Prevent duplicated error handling for worker

The worker.on('error') and the catch block both handle errors. This might result in duplicated error logs or conflicting behavior.

Ensure that errors are handled consistently and consider consolidating error handling to avoid duplication.


226-232: Handle non-zero exit codes gracefully

When a worker exits with a non-zero code, an error is thrown with a generic message.

Provide more detailed error messages, possibly including the worker ID and exit code, to aid in troubleshooting.

maxun-core/src/interpret.ts (3)

552-721: Address redundant code and potential bugs in pagination handling

There appears to be duplicated logic between handleParallelPagination and handlePagination. Additionally, certain conditions and error cases may not be adequately handled.

  • Consolidate common logic between the two methods to avoid duplication.
  • Review and test all pagination types (scrollDown, scrollUp, clickNext, clickLoadMore) for correctness.
  • Ensure that edge cases, such as pages without a next button or infinite scrolling that reaches the end, are properly managed.

Line range hint 722-870: Improve runLoop method for better maintainability

The runLoop method is lengthy and handles many responsibilities, making it hard to read and maintain.

Consider refactoring the method by:

  • Extracting smaller helper functions for repeated logic.
  • Simplifying control flow, especially within the while loop.
  • Adding comments or documentation for complex sections.
🧰 Tools
🪛 Biome (1.9.4)

[error] 738-738: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.

The declaration is defined in this switch clause:

Unsafe fix: Wrap the declaration in a block.

(lint/correctness/noSwitchDeclarations)


Line range hint 774-785: Ensure stability when applying ad-blocker

Within runLoop, the application of the ad-blocker is wrapped in a try-catch block, but the catch block only logs the error.

If the ad-blocker is critical for the operation, consider implementing a retry mechanism or failing gracefully if it cannot be applied.

maxun-core/src/types/worker.ts (2)

1-13: Enhance type safety and add documentation for the WorkerConfig interface.

The interface would benefit from:

  1. Stronger typing for the fields property instead of any
  2. JSDoc comments explaining the purpose of each field
  3. Validation constraints for numeric fields

Consider this improvement:

+/**
+ * Configuration for a worker thread in the parallel scraping system.
+ */
 export interface WorkerConfig {
+    /** Zero-based index of the worker thread */
     workerIndex: number;
+    /** Starting index for this worker's batch */
     startIndex: number;
+    /** Ending index for this worker's batch */
     endIndex: number;
+    /** Number of items to process in each batch */
     batchSize: number;
     pageUrls: string[];
     listSelector: string;
-    fields: any;
+    fields: Record<string, {
+        selector: string;
+        attribute?: string;
+        transform?: (value: string) => any;
+    }>;
     pagination: {
         type: string;
         selector: string;
     };
 }

30-48: Standardize time and memory usage types.

Consider using standard types for better consistency:

  1. Use Date objects for time-related fields
  2. Use NodeJS.MemoryUsage type for memory metrics (like in GlobalMetrics)
 export interface PerformanceMetrics {
-    startTime: number;
-    endTime: number;
+    startTime: Date;
+    endTime: Date;
     duration: number;
     pagesProcessed: number;
     itemsScraped: number;
     failedPages: number;
     averageTimePerPage: number;
-    memoryUsage: {
-        heapUsed: number;
-        heapTotal: number;
-        external: number;
-        rss: number;
-    };
+    memoryUsage: NodeJS.MemoryUsage;
     cpuUsage: {
         user: number;
         system: number;
     };
 }
maxun-core/tsconfig.json (1)

10-10: Consider a more specific include pattern.

While "src/**/*" works, it might include test files or other non-source files. Consider using multiple patterns to be more specific:

-  "include": ["src/**/*"],
+  "include": [
+    "src/**/*.ts",
+    "src/**/*.tsx",
+    "src/**/*.json"
+  ],
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9669a34 and e0b52c1.

📒 Files selected for processing (5)
  • maxun-core/src/interpret.ts (7 hunks)
  • maxun-core/src/types/worker.ts (1 hunks)
  • maxun-core/src/utils/worker-pool.ts (1 hunks)
  • maxun-core/src/utils/worker.ts (1 hunks)
  • maxun-core/tsconfig.json (1 hunks)
🧰 Additional context used
🪛 Biome (1.9.4)
maxun-core/src/utils/worker-pool.ts

[error] 203-233: Promise executor functions should not be async.

(lint/suspicious/noAsyncPromiseExecutor)

maxun-core/src/utils/worker.ts

[error] 122-122: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)


[error] 129-129: The catch clause that only rethrows the original error is useless.

An unnecessary catch clause can be confusing.
Unsafe fix: Remove the catch clause.

(lint/complexity/noUselessCatch)

🔇 Additional comments (8)
maxun-core/src/utils/worker.ts (3)

6-19: Review browser launch arguments for security implications

The initializeBrowser function sets various command-line arguments that disable security features like web security and site isolation trials. While these settings may be necessary for certain scraping tasks, they can expose the browser to security risks.

Please ensure that disabling these features is intentional and does not introduce security vulnerabilities. Consider the security implications of each argument:

  • --disable-web-security: Disables same-origin policy, which can expose sensitive data.
  • --disable-site-isolation-trials: Disables site isolation, potentially affecting browser stability and security.

If not strictly necessary, avoid disabling these features.


21-36: Ensure scripts are correctly loaded in the page

The ensureScriptsLoaded function checks for the existence of certain functions on the window object and loads the scraper.js script if they're missing. However, it only adds the script without verifying if it successfully loaded.

Consider adding validation after loading the script to ensure it was successfully injected. Additionally, handle cases where the script might fail to load due to network issues or incorrect paths.


131-133: Ensure proper resource cleanup in finally block

The finally block closes the page and browser. Ensure that these operations are handled correctly even if exceptions occur during closure.

Consider adding try-catch blocks around await page.close() and await browser.close() to handle any errors during cleanup.

maxun-core/src/utils/worker-pool.ts (1)

259-264: Ensure proper termination of workers

In the cleanup method, worker termination is initiated but potential errors during termination are not handled.

Consider adding error handling to worker.terminate() calls to ensure that all workers are properly terminated, even if errors occur.

maxun-core/src/interpret.ts (2)

96-96: Initialize WorkerPool with correct maxWorkers

The WorkerPool is initialized with this.options.maxWorkers but there may be inconsistencies if this.options is not properly set.

Ensure that this.options is correctly initialized and that maxWorkers has a valid value before creating the WorkerPool.


Line range hint 85-91: Handle potential NaN in maxWorkers default value

The calculation for maxWorkers uses os.cpus().length - 1, which could possibly result in zero or negative numbers if os.cpus() returns an empty array.

Add validation to ensure that maxWorkers is at least 1:

- maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)),
+ maxWorkers: Math.max(1, Math.min(os.cpus().length > 0 ? os.cpus().length - 1 : 1, 4)),
maxun-core/src/types/worker.ts (2)

20-28: Well-structured progress tracking interface!

The interface provides comprehensive progress tracking with appropriate types and good separation of concerns.


50-59: Well-designed metrics aggregation interface!

The interface makes good use of NodeJS built-in types and provides comprehensive system-wide metrics.

Comment on lines 44 to 134
try {
browser = await initializeBrowser();
const context = await browser.newContext();
page = await context.newPage();
await ensureScriptsLoaded(page);

for (const [pageIndex, pageUrl] of config.pageUrls.entries()) {
const pageStartTime = Date.now();

try {
// Report progress to main thread
parentPort?.postMessage({
type: 'progress',
data: {
workerId: config.workerIndex,
currentUrl: pageUrl,
processedUrls: pageIndex,
totalUrls: config.pageUrls.length,
timeElapsed: Date.now() - pageStartTime,
scrapedItems: results.length
}
});

const navigationResult = await page.goto(pageUrl, {
waitUntil: 'networkidle',
timeout: 30000
});

if (!navigationResult) continue;

await page.waitForLoadState('networkidle').catch(() => {});

const scrapeConfig = {
listSelector: config.listSelector,
fields: config.fields,
pagination: config.pagination,
limit: config.endIndex - config.startIndex - results.length
};

const pageResults = await page.evaluate(
(cfg) => window.scrapeList(cfg),
scrapeConfig
);

// Filter out duplicates
const newResults = pageResults.filter(item => {
const uniqueKey = JSON.stringify(item);

// Check against local duplicates
if (scrapedItems.has(uniqueKey)) return false;

// Check against shared state results
const isDuplicate = sharedState.results.some(
existingItem => JSON.stringify(existingItem) === uniqueKey
);

if (isDuplicate) return false;
scrapedItems.add(uniqueKey);
sharedState.results.push(item);
sharedState.totalScraped++;
return true;
});

results.push(...newResults);

if (results.length >= config.batchSize) break;

await page.waitForTimeout(1000);

} catch (error) {
parentPort?.postMessage({
type: 'error',
data: {
workerId: config.workerIndex,
url: pageUrl,
error: error.message
}
});
continue;
}
}

return results;

} catch (error) {
throw error;
} finally {
if (page) await page.close();
if (browser) await browser.close();
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential exceptions during scraping

In the scrapeBatch function, several asynchronous operations could throw exceptions, such as launching the browser, creating contexts, and navigating to pages.

Ensure that all asynchronous operations are properly wrapped in try-catch blocks to prevent unhandled promise rejections. Specifically, consider:

  • Adding error handling when initializing the browser and context.
  • Verifying that page.goto successfully navigated to the URL before proceeding.
  • Handling timeouts and navigation errors more gracefully.
🧰 Tools
🪛 Biome (1.9.4)

[error] 122-122: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)


[error] 129-129: The catch clause that only rethrows the original error is useless.

An unnecessary catch clause can be confusing.
Unsafe fix: Remove the catch clause.

(lint/complexity/noUselessCatch)

Comment on lines 114 to 123
parentPort?.postMessage({
type: 'error',
data: {
workerId: config.workerIndex,
url: pageUrl,
error: error.message
}
});
continue;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error reporting for failed page processing

In the catch block, when an error occurs while processing a page, the worker posts an error message but continues to the next page.

Consider adding more context to the error messages, such as the specific error type or stack trace. This can aid in debugging.

Also, evaluate whether continuing to the next page is the desired behavior or if certain errors should halt the worker.

🧰 Tools
🪛 Biome (1.9.4)

[error] 122-122: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

Comment on lines 141 to 156
scrapeBatch(config, sharedState)
.then(results => {
parentPort?.postMessage({
type: 'complete',
data: results
});
})
.catch(error => {
parentPort?.postMessage({
type: 'error',
data: {
workerId: config.workerIndex,
error: error.message
}
});
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle uncaught exceptions in worker initialization

In the worker initialization section, if scrapeBatch throws an unhandled exception, it could cause the worker to exit unexpectedly.

Ensure that all exceptions in scrapeBatch are caught and appropriately handled. Modify the .catch block to include error logging or additional error handling as needed.

error: error.message
}
});
continue;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove unnecessary continue statement

The continue statement at line 122 is unnecessary because it's the last statement in the loop iteration.

Apply this diff to remove the redundant continue statement:

-       continue;
🧰 Tools
🪛 Biome (1.9.4)

[error] 122-122: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

Comment on lines 128 to 130
} catch (error) {
throw error;
} finally {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove useless catch clause that only rethrows

The catch block at lines 128-130 only rethrows the caught error without adding any additional handling.

Apply this diff to remove the unnecessary catch clause:

-   } catch (error) {
-       throw error;
    } finally {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch (error) {
throw error;
} finally {
} finally {
🧰 Tools
🪛 Biome (1.9.4)

[error] 129-129: The catch clause that only rethrows the original error is useless.

An unnecessary catch clause can be confusing.
Unsafe fix: Remove the catch clause.

(lint/complexity/noUselessCatch)

Comment on lines 279 to 281
public isActive(): boolean {
return this.browser !== null && !this.isShuttingDown;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Adjust isActive method to reflect correct state

Similar to getActiveWorkerCount, the isActive method relies on this.browser, which is not relevant in this context.

Update the method to reflect whether there are active workers and the pool is not shutting down:

- return this.browser !== null && !this.isShuttingDown;
+ return this.workers.length > 0 && !this.isShuttingDown;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public isActive(): boolean {
return this.browser !== null && !this.isShuttingDown;
}
public isActive(): boolean {
return this.workers.length > 0 && !this.isShuttingDown;
}

Comment on lines 276 to 281
return this.browser ? 1 : 0;
}

public isActive(): boolean {
return this.browser !== null && !this.isShuttingDown;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct getActiveWorkerCount method logic

The getActiveWorkerCount method returns 1 if this.browser is truthy, which might not accurately reflect the number of active workers.

Since this.browser is not used in this class, and the workers are stored in this.workers, update the method to return the actual count of active workers:

- return this.browser ? 1 : 0;
+ return this.workers.length;

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 203 to 237
new Promise<any[]>(async (resolve, reject) => {
if (this.isShuttingDown) {
reject(new Error('Worker pool is shutting down'));
return;
}

const worker = this.createWorker(config, sharedState);
this.workers.push(worker);

let workerResults: any[] = [];

worker.on('message', (message) => {
if (message.type === 'complete') {
workerResults = message.data;
}
});

worker.on('error', (error) => {
errors.push(error);
this.updateWorkerMetrics(config.workerIndex, { status: 'failed' });
reject(error);
});

worker.on('exit', (code) => {
if (code === 0) {
resolve(workerResults);
} else {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
}).catch(error => {
console.error('Worker error:', error);
return [];
})
);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid using async functions as Promise executors

The function passed to new Promise at lines 203-233 is marked as async, which can lead to unexpected behavior.

Apply this diff to remove the async keyword from the Promise executor function:

- new Promise<any[]>(async (resolve, reject) => {
+ new Promise<any[]>((resolve, reject) => {

According to best practices, the Promise constructor should not be passed an async function.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
new Promise<any[]>(async (resolve, reject) => {
if (this.isShuttingDown) {
reject(new Error('Worker pool is shutting down'));
return;
}
const worker = this.createWorker(config, sharedState);
this.workers.push(worker);
let workerResults: any[] = [];
worker.on('message', (message) => {
if (message.type === 'complete') {
workerResults = message.data;
}
});
worker.on('error', (error) => {
errors.push(error);
this.updateWorkerMetrics(config.workerIndex, { status: 'failed' });
reject(error);
});
worker.on('exit', (code) => {
if (code === 0) {
resolve(workerResults);
} else {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
}).catch(error => {
console.error('Worker error:', error);
return [];
})
);
new Promise<any[]>((resolve, reject) => {
if (this.isShuttingDown) {
reject(new Error('Worker pool is shutting down'));
return;
}
const worker = this.createWorker(config, sharedState);
this.workers.push(worker);
let workerResults: any[] = [];
worker.on('message', (message) => {
if (message.type === 'complete') {
workerResults = message.data;
}
});
worker.on('error', (error) => {
errors.push(error);
this.updateWorkerMetrics(config.workerIndex, { status: 'failed' });
reject(error);
});
worker.on('exit', (code) => {
if (code === 0) {
resolve(workerResults);
} else {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
}).catch(error => {
console.error('Worker error:', error);
return [];
})
);
🧰 Tools
🪛 Biome (1.9.4)

[error] 203-233: Promise executor functions should not be async.

(lint/suspicious/noAsyncPromiseExecutor)

@@ -39,6 +43,7 @@ declare global {
interface InterpreterOptions {
maxRepeats: number;
maxConcurrency: number;
maxWorkers: number;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add missing property to InterpreterOptions interface

The serializableCallback property is used in the code but not defined in the provided InterpreterOptions interface.

Ensure that serializableCallback is correctly defined in the interface to maintain type safety.

Comment on lines +15 to +18
export interface SharedState {
totalScraped: number;
results: any[];
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Thread safety and type safety concerns are valid issues that need addressing

The codebase shows:

  • Multiple workers concurrently access and modify SharedState without synchronization
  • results: any[] is used consistently, lacking type safety
  • No thread-safe mechanisms are implemented for shared state access

Suggested improvements:

  1. Implement proper synchronization using worker_threads features like SharedArrayBuffer or message-based state updates
  2. Add generic type parameter as suggested in the original review
  3. Consider using an intermediary class to manage thread-safe state access
🔗 Analysis chain

Consider thread safety and type safety for SharedState.

The shared state between workers needs careful consideration:

  1. The results array should have a more specific type than any[]
  2. Thread safety should be ensured when multiple workers access this state

Consider using a thread-safe data structure or implementing proper synchronization mechanisms. Also, define a generic type parameter:

-export interface SharedState {
+export interface SharedState<T = unknown> {
     totalScraped: number;
-    results: any[];
+    results: T[];
 }
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Find all files importing or using SharedState
rg "SharedState" -l

# Look for actual usage patterns of SharedState
rg "SharedState" -A 5 -B 5

# Check for any thread synchronization patterns
rg -i "sync|mutex|lock|atomic" maxun-core/src/

# Look for type definitions related to worker results
ast-grep --pattern 'type $_ = $_' maxun-core/src/types/

Length of output: 12076

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
maxun-core/src/interpret.ts (4)

456-457: Add error handling for parallel pagination.

While the parallel pagination implementation is good, consider adding try-catch block here to handle potential failures gracefully.

-          const scrapeResults: Record<string, any>[] = await this.handleParallelPagination(page, config);
-          await this.options.serializableCallback(scrapeResults);
+          try {
+            const scrapeResults: Record<string, any>[] = await this.handleParallelPagination(page, config);
+            await this.options.serializableCallback(scrapeResults);
+          } catch (error) {
+            this.log(`Parallel pagination failed: ${error.message}`, Level.ERROR);
+            // Fallback to regular pagination
+            const scrapeResults = await this.handlePagination(page, config);
+            await this.options.serializableCallback(scrapeResults);
+          }

549-551: Consider environment-specific CPU core limits.

The CPU core calculation could be made more configurable:

  1. The hard-coded maximum of 4 workers might not be optimal for all environments
  2. Consider adding environment variables for fine-tuning
-      const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, 4));
+      const MAX_WORKERS = process.env.MAX_WORKERS ? parseInt(process.env.MAX_WORKERS, 10) : 4;
+      const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, MAX_WORKERS));

614-623: Improve navigation timeout handling.

The hard-coded 30000ms timeout might not be suitable for all network conditions. Consider making it configurable and adding retry logic.

+      const NAVIGATION_TIMEOUT = process.env.NAVIGATION_TIMEOUT ? parseInt(process.env.NAVIGATION_TIMEOUT, 10) : 30000;
+      const MAX_RETRIES = process.env.MAX_RETRIES ? parseInt(process.env.MAX_RETRIES, 10) : 3;
+      
+      let retries = 0;
+      while (retries < MAX_RETRIES) {
         try {
           await Promise.race([
             Promise.all([
-              page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }),
+              page.waitForNavigation({ waitUntil: 'networkidle', timeout: NAVIGATION_TIMEOUT }),
               nextButton.click()
             ]),
             Promise.all([
-              page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }),
+              page.waitForNavigation({ waitUntil: 'networkidle', timeout: NAVIGATION_TIMEOUT }),
               nextButton.dispatchEvent('click')
             ])
           ]);
+          break;
         } catch (error) {
+          retries++;
+          if (retries === MAX_RETRIES) {
+            throw error;
+          }
+          await page.waitForTimeout(1000 * retries); // Exponential backoff
         }
+      }

545-668: Add memory management for large-scale scraping.

The current implementation stores all URLs in memory, which could be problematic for very large datasets.

Consider implementing:

  1. Batch processing of URLs
  2. Stream processing of results
  3. Periodic cleanup of collected data

Example approach:

private async* urlGenerator(page: Page, config: any): AsyncGenerator<string> {
  const batchSize = 1000;
  const urls: string[] = [];
  
  while (true) {
    urls.push(page.url());
    
    if (urls.length >= batchSize) {
      for (const url of urls) {
        yield url;
      }
      urls.length = 0; // Clear the array
    }
    
    if (!await this.navigateToNextPage(page)) {
      break;
    }
  }
  
  // Yield remaining URLs
  for (const url of urls) {
    yield url;
  }
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e0b52c1 and b2e8332.

📒 Files selected for processing (1)
  • maxun-core/src/interpret.ts (4 hunks)
🔇 Additional comments (1)
maxun-core/src/interpret.ts (1)

19-19: LGTM!

The os module import is correctly added for CPU core detection.

Comment on lines +571 to +657
try {
while (true) {
pageUrls.push(page.url())

if (pageUrls.length >= estimatedPages) {
console.log('Reached estimated number of pages. Stopping pagination.');
break;
}

let checkButton = null;
let workingSelector = null;

for (let i = 0; i < availableSelectors.length; i++) {
const selector = availableSelectors[i];
try {
// Wait for selector with a short timeout
checkButton = await page.waitForSelector(selector, { state: 'attached' });
if (checkButton) {
workingSelector = selector;
break;
}
} catch (error) {
console.log(`Selector failed: ${selector}`);
}
}

if(!workingSelector) {
break;
}

const nextButton = await page.$(workingSelector);
if (!nextButton) {
break;
}

const selectorIndex = availableSelectors.indexOf(workingSelector!);
availableSelectors = availableSelectors.slice(selectorIndex);

const previousUrl = page.url();
visitedUrls.push(previousUrl);

try {
// Try both click methods simultaneously
await Promise.race([
Promise.all([
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }),
nextButton.click()
]),
Promise.all([
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }),
nextButton.dispatchEvent('click')
])
]);
} catch (error) {
// Verify if navigation actually succeeded
const currentUrl = page.url();
if (currentUrl === previousUrl) {
console.log("Previous URL same as current URL. Navigation failed.");
}
}

const currentUrl = page.url();
if (visitedUrls.includes(currentUrl)) {
console.log(`Detected navigation to a previously visited URL: ${currentUrl}`);

// Extract the current page number from the URL
const match = currentUrl.match(/\d+/);
if (match) {
const currentNumber = match[0];
// Use visitedUrls.length + 1 as the next page number
const nextNumber = visitedUrls.length + 1;

// Create new URL by replacing the current number with the next number
const nextUrl = currentUrl.replace(currentNumber, nextNumber.toString());

console.log(`Navigating to constructed URL: ${nextUrl}`);

// Navigate to the next page
await Promise.all([
page.waitForNavigation({ waitUntil: 'networkidle' }),
page.goto(nextUrl)
]);
}
}

await page.waitForTimeout(1000);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Modularize the pagination navigation logic.

The navigation logic is complex and could benefit from being split into smaller, focused functions for better maintainability and testing.

Consider extracting these functionalities:

  1. URL collection logic
  2. Navigation handling
  3. URL construction for pagination

Example refactor for URL collection:

private async collectPageUrls(page: Page, config: any): Promise<string[]> {
  const pageUrls: string[] = [];
  const visitedUrls: Set<string> = new Set();
  
  while (true) {
    const currentUrl = page.url();
    if (visitedUrls.has(currentUrl)) {
      break;
    }
    
    pageUrls.push(currentUrl);
    visitedUrls.add(currentUrl);
    
    if (!await this.navigateToNextPage(page)) {
      break;
    }
  }
  
  return pageUrls;
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
maxun-core/src/config/kafka.ts (1)

4-8: Good implementation of topic configuration with DLQ.

The topic naming is clear and follows a consistent pattern. The inclusion of a Dead Letter Queue (DLQ) demonstrates good error handling practices.

Consider adding these enhancements:

  1. Topic name prefixing for multi-environment deployments
  2. Topic configuration for partitions and retention
  3. Documentation for the DLQ handling strategy
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b2e8332 and 5c6f478.

📒 Files selected for processing (1)
  • maxun-core/src/config/kafka.ts (1 hunks)

Comment on lines +1 to +4
export const kafkaConfig = {
clientId: 'maxun-scraper',
brokers: ['localhost:29092'],
topics: {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use environment variables for Kafka configuration.

The broker address and client ID are hardcoded, which limits deployment flexibility and poses security risks. Consider using environment variables for configuration.

Apply this diff to make the configuration more flexible:

+import { config } from 'dotenv';
+
+config();
+
 export const kafkaConfig = {
-    clientId: 'maxun-scraper',
-    brokers: ['localhost:29092'],
+    clientId: process.env.KAFKA_CLIENT_ID || 'maxun-scraper',
+    brokers: (process.env.KAFKA_BROKERS || 'localhost:29092').split(','),

Also, consider adding SSL/authentication configuration for production environments:

    // Add these properties to kafkaConfig
    ssl: process.env.KAFKA_SSL === 'true',
    sasl: process.env.KAFKA_SASL === 'true' ? {
      mechanism: process.env.KAFKA_SASL_MECHANISM,
      username: process.env.KAFKA_USERNAME,
      password: process.env.KAFKA_PASSWORD,
    } : undefined,

Comment on lines +9 to +10
consumerGroup: 'scraping-group'
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add environment separation for consumer groups.

The consumer group should be environment-aware to prevent cross-environment message processing.

Apply this diff:

-    consumerGroup: 'scraping-group'
+    consumerGroup: `${process.env.ENV || 'dev'}-scraping-group`
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
consumerGroup: 'scraping-group'
};
consumerGroup: `${process.env.ENV || 'dev'}-scraping-group`
};

Comment on lines +1 to +10
export const kafkaConfig = {
clientId: 'maxun-scraper',
brokers: ['localhost:29092'],
topics: {
SCRAPING_TASKS: 'scraping-tasks',
SCRAPING_RESULTS: 'scraping-results',
SCRAPING_DLQ: 'scraping-dlq'
},
consumerGroup: 'scraping-group'
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Remove unused Kafka configuration

The Kafka configuration appears to be unused in the codebase. There are no imports of this configuration, no Kafka client instantiations, and no other Kafka-related code present. Consider removing this unused configuration to maintain a clean codebase.

🔗 Analysis chain

Verify Kafka configuration in the codebase.

Let's ensure the configuration is properly utilized across the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for proper usage of Kafka configuration and potential security issues

# Check for hardcoded Kafka configurations
echo "Checking for hardcoded Kafka configurations..."
rg -g '!*.{json,md}' -i 'localhost:29092'

# Check for proper import usage
echo "Checking kafka config imports..."
rg "from ['|\"].*config/kafka" -A 3

# Check for potential security-sensitive Kafka operations
echo "Checking for direct Kafka operations..."
ast-grep --pattern 'new Kafka($$$)'

Length of output: 451


Script:

#!/bin/bash
# Search for Kafka-related code and configurations

# Check for usage of kafkaConfig
echo "Checking for kafkaConfig usage..."
rg "kafkaConfig"

# Check for usage of topic names
echo "Checking for topic name usage..."
rg "scraping-tasks|scraping-results|scraping-dlq"

# Check for any Kafka-related imports or code
echo "Checking for Kafka-related code..."
rg -i "kafka" -g "!*.{json,md,txt}" -A 3

Length of output: 844

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
maxun-core/src/utils/kafka-manager.ts (4)

45-57: Optimize admin client usage in startMetricsReporting

Currently, a new admin client is created and disconnected every 5 seconds within the metrics reporting interval. This could lead to unnecessary overhead and resource consumption. Consider creating the admin client once and reusing it to improve performance.

Apply this diff to reuse the admin client:

  1. Import Admin from kafkajs:

    - import { Kafka, Consumer, Producer } from 'kafkajs';
    + import { Kafka, Consumer, Producer, Admin } from 'kafkajs';
  2. Add a private property for the admin client:

      private kafka: Kafka;
      private producer: Producer;
      private consumer: Consumer;
      private metricsInterval: NodeJS.Timeout | null = null;
    + private admin: Admin;
  3. Initialize the admin client in the constructor:

        super();
        this.kafka = new Kafka({
          clientId: kafkaConfig.clientId,
          brokers: kafkaConfig.brokers
        });
    +   this.admin = this.kafka.admin();
  4. Update createTopics() to use the existing admin client:

    -   const admin = this.kafka.admin();
    +   const admin = this.admin;
        await admin.createTopics({
          topics: [
            { topic: kafkaConfig.topics.SCRAPING_TASKS, numPartitions: 10 },
            { topic: kafkaConfig.topics.SCRAPING_RESULTS, numPartitions: 10 },
            { topic: kafkaConfig.topics.SCRAPING_DLQ, numPartitions: 1 }
          ]
        });
    -   await admin.disconnect();
  5. Modify startMetricsReporting() to reuse the admin client and avoid disconnecting it each time:

        this.metricsInterval = setInterval(async () => {
    -     const admin = this.kafka.admin();
    +     const admin = this.admin;
          const metrics = await admin.fetchTopicMetadata({
            topics: [
              kafkaConfig.topics.SCRAPING_TASKS,
              kafkaConfig.topics.SCRAPING_RESULTS
            ]
          });
          this.emit('metrics', metrics);
    -     await admin.disconnect();
        }, 5000);
      }
  6. Disconnect the admin client in the cleanup() method:

        if (this.metricsInterval) {
          clearInterval(this.metricsInterval);
        }
        await this.producer.disconnect();
        await this.consumer.disconnect();
    +   await this.admin.disconnect();
      }

This refactor reduces the overhead of repeatedly creating and disconnecting the admin client.


25-30: Add error handling in the initialize method

The initialize method performs several asynchronous operations that could fail (connecting the producer, consumer, and creating topics). To enhance robustness, consider wrapping these operations in a try/catch block to handle potential errors gracefully.

Apply this diff to implement error handling:

+    try {
       await this.producer.connect();
       await this.consumer.connect();
       await this.createTopics();
       this.startMetricsReporting();
+    } catch (error) {
+      // Handle errors appropriately, e.g., log and rethrow or exit the process
+      console.error('Error during KafkaManager initialization:', error);
+      throw error;
+    }
     }

32-42: Handle potential errors in createTopics method

Errors might occur during topic creation (e.g., network issues or insufficient permissions). Wrapping the code in a try/catch block allows for proper error handling and logging.

Apply this diff to add error handling:

+    try {
       const admin = this.admin;
       await admin.createTopics({
         topics: [
           { topic: kafkaConfig.topics.SCRAPING_TASKS, numPartitions: 10 },
           { topic: kafkaConfig.topics.SCRAPING_RESULTS, numPartitions: 10 },
           { topic: kafkaConfig.topics.SCRAPING_DLQ, numPartitions: 1 }
         ]
       });
     } catch (error) {
+      // Handle errors, e.g., log the error
+      console.error('Error creating Kafka topics:', error);
+      throw error;
+    }
     }

59-65: Ensure proper cleanup with error handling in cleanup method

Disconnection operations may fail, preventing complete resource cleanup. To enhance reliability, wrap the disconnection logic in a try/catch block.

Apply this diff to handle errors during cleanup:

     if (this.metricsInterval) {
       clearInterval(this.metricsInterval);
     }
+    try {
       await this.producer.disconnect();
       await this.consumer.disconnect();
       await this.admin.disconnect();
+    } catch (error) {
+      // Handle errors, e.g., log the error
+      console.error('Error during KafkaManager cleanup:', error);
+    }
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5c6f478 and 5becc84.

📒 Files selected for processing (1)
  • maxun-core/src/utils/kafka-manager.ts (1 hunks)

Comment on lines +45 to +56
this.metricsInterval = setInterval(async () => {
const admin = this.kafka.admin();
const metrics = await admin.fetchTopicMetadata({
topics: [
kafkaConfig.topics.SCRAPING_TASKS,
kafkaConfig.topics.SCRAPING_RESULTS
]
});

this.emit('metrics', metrics);
await admin.disconnect();
}, 5000);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling in startMetricsReporting

The asynchronous function inside setInterval may throw exceptions (e.g., network errors during fetchTopicMetadata). Unhandled exceptions can cause the interval to stop executing. Wrap the interval's logic in a try/catch block to ensure continuous metrics reporting.

Apply this diff to implement error handling:

     this.metricsInterval = setInterval(async () => {
+      try {
         const admin = this.admin;
         const metrics = await admin.fetchTopicMetadata({
           topics: [
             kafkaConfig.topics.SCRAPING_TASKS,
             kafkaConfig.topics.SCRAPING_RESULTS
           ]
         });
         this.emit('metrics', metrics);
+      } catch (error) {
+        // Handle errors, e.g., log the error
+        console.error('Error fetching Kafka metrics:', error);
+      }
       }, 5000);
     }

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (4)
maxun-core/src/utils/scraping-consumer.ts (4)

125-151: Define types for message to improve type safety

The handleError method accepts message: any, which reduces type safety. Defining the message type enhances code reliability and helps prevent errors.

Example:

+ import { KafkaMessage } from 'kafkajs';

- private async handleError(message: any, error: Error) {
+ private async handleError(message: KafkaMessage, error: Error) {

98-114: Collect and report failed URLs for better error handling

Currently, errors encountered during URL processing are logged but not returned. Collecting failed URLs provides visibility into issues and allows for retries or analysis later on.

Example:

+ const failedUrls: string[] = [];

for (const url of task.urls) {
  try {
    await page.goto(url, {...});

    const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), task.config);
    results.push(...pageResults);
  } catch (error) {
    console.error(`Error processing URL ${url}:`, error);
+   failedUrls.push(url);
  }
}

- // Existing code to send results
+ // Modify the results to include failed URLs
+ await this.producer.send({
+   topic: kafkaConfig.topics.SCRAPING_RESULTS,
+   messages: [{
+     key: task.taskId,
+     value: JSON.stringify({
+       taskId: task.taskId,
+       data: results,
+       failedUrls: failedUrls
+     })
+   }]
+ });

36-66: Gracefully handle consumer shutdown and errors

The start method runs the consumer but doesn't handle potential shutdown signals or errors that may require disconnecting the consumer and producer. Implementing cleanup logic ensures that resources are released properly.

Example:

async start() {
  try {
    await this.consumer.connect();
    await this.producer.connect();
    // Rest of the code
  } catch (error) {
    console.error('Error in consumer:', error);
  } finally {
    await this.consumer.disconnect();
    await this.producer.disconnect();
  }
}

45-45: Make partitionsConsumedConcurrently configurable for flexibility

The number of partitions consumed concurrently is hardcoded to 3, which may not be optimal in all scenarios. Making this value configurable allows for tuning based on the deployment environment.

Example:

- partitionsConsumedConcurrently: 3,
+ partitionsConsumedConcurrently: kafkaConfig.partitionsConsumedConcurrently || 3,

And add this to your kafkaConfig:

partitionsConsumedConcurrently: number;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5becc84 and 5e8a6d1.

📒 Files selected for processing (1)
  • maxun-core/src/utils/scraping-consumer.ts (1 hunks)
🔇 Additional comments (2)
maxun-core/src/utils/scraping-consumer.ts (2)

76-119: ⚠️ Potential issue

Ensure the browser is always closed to prevent resource leaks

In the processTask method, if an error occurs before the browser is initialized, browser remains null, and await browser.close() is not called. This can lead to resource leaks. Initialize browser after launching to ensure it's always closed in the finally block.

Example:

- let browser: Browser | null = null;
+ let browser: Browser;

try {
+   browser = await chromium.launch({...});
    // Rest of the code
} finally {
  if (browser) await browser.close();
}

Likely invalid or redundant comment.


68-73: ⚠️ Potential issue

Ensure scripts are loaded after navigation for reliability

The ensureScriptsLoaded method checks and loads scripts before navigation, which may not guarantee that scripts are available in the page context after navigation. Move the script injection to occur after page navigation to ensure scripts are correctly loaded.

Example:

for (const url of task.urls) {
  try {
    await page.goto(url, {...});

-   await this.ensureScriptsLoaded(page);

+   await this.ensureScriptsLoaded(page);

    const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), task.config);
    results.push(...pageResults);
  } catch (error) {
    console.error(`Error processing URL ${url}:`, error);
  }
}

Likely invalid or redundant comment.

Comment on lines +6 to +17
declare global {
interface Window {
scrape: (selector: string | null) => Record<string, string>[];
scrapeSchema: (
schema: Record<string, { selector: string; tag: string; attribute: string }>
) => Record<string, any>;
scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record<string, any>[];
scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[];
scrollDown: (pages?: number) => void;
scrollUp: (pages?: number) => void;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid polluting the global Window interface

Extending the global Window interface can lead to conflicts and unexpected behavior. Consider encapsulating these functions or using a unique namespace to prevent polluting the global scope.

Example:

- declare global {
-   interface Window {
-     scrape: ...;
-     // Other methods
-   }
- }

+ export interface ScraperWindow extends Window {
+   scrape: ...;
+   // Other methods
+ }

+ // In your code where you use `window`
+ (window as ScraperWindow).scrape(...);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
declare global {
interface Window {
scrape: (selector: string | null) => Record<string, string>[];
scrapeSchema: (
schema: Record<string, { selector: string; tag: string; attribute: string }>
) => Record<string, any>;
scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record<string, any>[];
scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[];
scrollDown: (pages?: number) => void;
scrollUp: (pages?: number) => void;
}
}
export interface ScraperWindow extends Window {
scrape: (selector: string | null) => Record<string, string>[];
scrapeSchema: (
schema: Record<string, { selector: string; tag: string; attribute: string }>
) => Record<string, any>;
scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record<string, any>[];
scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[];
scrollDown: (pages?: number) => void;
scrollUp: (pages?: number) => void;
}

Comment on lines 124 to 151
private async handleError(message: any, error: Error) {
const retryCount = parseInt(message.headers['retry-count'] || '0');

if (retryCount < 3) {
await this.producer.send({
topic: kafkaConfig.topics.SCRAPING_TASKS,
messages: [{
key: message.key,
value: message.value,
headers: {
'retry-count': (retryCount + 1).toString(),
'error': error.message
}
}]
});
} else {
await this.producer.send({
topic: kafkaConfig.topics.SCRAPING_DLQ,
messages: [{
key: message.key,
value: message.value,
headers: {
'final-error': error.message
}
}]
});
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Safely access message.headers to prevent runtime errors

In the handleError method, message.headers may be null or undefined, leading to runtime errors when accessing message.headers['retry-count']. Ensure that message.headers is defined before accessing its properties.

Example:

- const retryCount = parseInt(message.headers['retry-count'] || '0');
+ const headers = message.headers || {};
+ const retryCount = parseInt(headers['retry-count'] || '0');
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private async handleError(message: any, error: Error) {
const retryCount = parseInt(message.headers['retry-count'] || '0');
if (retryCount < 3) {
await this.producer.send({
topic: kafkaConfig.topics.SCRAPING_TASKS,
messages: [{
key: message.key,
value: message.value,
headers: {
'retry-count': (retryCount + 1).toString(),
'error': error.message
}
}]
});
} else {
await this.producer.send({
topic: kafkaConfig.topics.SCRAPING_DLQ,
messages: [{
key: message.key,
value: message.value,
headers: {
'final-error': error.message
}
}]
});
}
}
private async handleError(message: any, error: Error) {
const headers = message.headers || {};
const retryCount = parseInt(headers['retry-count'] || '0');
if (retryCount < 3) {
await this.producer.send({
topic: kafkaConfig.topics.SCRAPING_TASKS,
messages: [{
key: message.key,
value: message.value,
headers: {
'retry-count': (retryCount + 1).toString(),
'error': error.message
}
}]
});
} else {
await this.producer.send({
topic: kafkaConfig.topics.SCRAPING_DLQ,
messages: [{
key: message.key,
value: message.value,
headers: {
'final-error': error.message
}
}]
});
}
}

Comment on lines 75 to 122
private async processTask(task: any) {
let browser: Browser | null = null;
const results: any[] = [];

try {
browser = await chromium.launch({
headless: true,
args: [
"--disable-blink-features=AutomationControlled",
"--disable-web-security",
"--disable-features=IsolateOrigins,site-per-process",
"--disable-site-isolation-trials",
"--disable-extensions",
"--no-sandbox",
"--disable-dev-shm-usage",
]
});

const context = await browser.newContext();
const page = await context.newPage();

await this.ensureScriptsLoaded(page);

for (const url of task.urls) {
try {
await page.goto(url, {
waitUntil: 'networkidle',
timeout: 30000
});

const pageResults = await page.evaluate(
(cfg) => window.scrapeList(cfg),
task.config
);

results.push(...pageResults);
} catch (error) {
console.error(`Error processing URL ${url}:`, error);
}
}

await page.close();
} finally {
if (browser) await browser.close();
}

return results;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Define specific types for task to enhance type safety

The processTask method currently accepts task: any, which reduces type safety and may lead to runtime errors. Defining an interface for the task structure improves code readability and allows TypeScript to catch issues at compile time.

Example:

+ interface ScrapingTask {
+   taskId: string;
+   urls: string[];
+   config: any; // Replace `any` with a more specific type if possible
+ }

- private async processTask(task: any) {
+ private async processTask(task: ScrapingTask) {

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
maxun-core/src/utils/scraping-consumer.ts (1)

6-17: 🛠️ Refactor suggestion

Avoid polluting the global Window interface.

The concern about extending the global Window interface remains valid as it can lead to naming conflicts and unexpected behavior.

Consider using a dedicated namespace or interface as previously suggested:

- declare global {
-   interface Window {
-     scrape: (selector: string | null) => Record<string, string>[];
-     scrapeSchema: (schema: Record<string, { selector: string; tag: string; attribute: string }>) => Record<string, any>;
-     scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record<string, any>[];
-     scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[];
-     scrollDown: (pages?: number) => void;
-     scrollUp: (pages?: number) => void;
-   }
- }

+ export interface ScraperWindow extends Window {
+   scrape: (selector: string | null) => Record<string, string>[];
+   scrapeSchema: (schema: Record<string, { selector: string; tag: string; attribute: string }>) => Record<string, any>;
+   scrapeList: (config: { listSelector: string; fields: any; limit?: number; pagination: any }) => Record<string, any>[];
+   scrapeListAuto: (listSelector: string) => { selector: string; innerText: string }[];
+   scrollDown: (pages?: number) => void;
+   scrollUp: (pages?: number) => void;
+ }
🧹 Nitpick comments (4)
maxun-core/src/utils/scraping-consumer.ts (4)

23-29: Add TypeScript interfaces for workflow tracking.

Define explicit interfaces for workflow tracking to improve type safety and code maintainability.

interface WorkflowStats {
  startTime: number;
  totalTasks: number;
  processedTasks: number;
  totalItems: number;
}

interface ProcessedWorkflows {
  [workflowId: string]: Set<string>;
}

56-59: Consider increasing partition concurrency for better parallelization.

The current setting of 4 concurrent partitions might be a bottleneck for bulk scraping. Consider making this configurable based on system resources and load.

- partitionsConsumedConcurrently: 4,
+ partitionsConsumedConcurrently: process.env.KAFKA_CONCURRENT_PARTITIONS || 4,

150-161: Document browser launch configuration.

The browser launch configuration includes important security settings that should be documented.

  browser = await chromium.launch({
    headless: true,
    args: [
+     // Disable automation detection
      "--disable-blink-features=AutomationControlled",
+     // Allow cross-origin requests
      "--disable-web-security",
+     // Disable process isolation for better performance
      "--disable-features=IsolateOrigins,site-per-process",
      "--disable-site-isolation-trials",
+     // Security settings for containerized environments
      "--disable-extensions",
      "--no-sandbox",
      "--disable-dev-shm-usage",
    ]
  });

202-212: Make workflow cleanup configuration more flexible.

The cleanup interval is hardcoded to one hour. Consider making this configurable.

- const ONE_HOUR = 60 * 60 * 1000;
+ const CLEANUP_INTERVAL = parseInt(process.env.WORKFLOW_CLEANUP_INTERVAL || '3600000');
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5e8a6d1 and 73c2cc3.

📒 Files selected for processing (1)
  • maxun-core/src/utils/scraping-consumer.ts (1 hunks)

Comment on lines +168 to +192
for (const url of task.urls) {
try {
await page.goto(url, {
waitUntil: 'networkidle',
timeout: 30000
});

await page.waitForTimeout(1000);

const pageResults = await page.evaluate((cfg) => window.scrapeList(cfg), task.config);

// Filter out already scraped items
const newResults = pageResults.filter(item => {
const uniqueKey = JSON.stringify(item);
if (scrapedItems.has(uniqueKey)) return false; // Ignore if already scraped
scrapedItems.add(uniqueKey); // Mark as scraped
return true;
});

allResults = allResults.concat(newResults);
console.log(`Results so far (${task.taskId}): ${allResults.length}`);
} catch (error) {
console.error(`Error processing URL ${url}:`, error);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize parallel processing of URLs.

The current implementation processes URLs sequentially. Consider processing URLs in parallel batches for better performance.

private async processUrlBatch(urls: string[], context: BrowserContext, task: any) {
  const results = await Promise.all(
    urls.map(async (url) => {
      const page = await context.newPage();
      try {
        await this.ensureScriptsLoaded(page);
        await page.goto(url, { waitUntil: 'networkidle', timeout: 30000 });
        await page.waitForTimeout(1000);
        return await page.evaluate((cfg) => window.scrapeList(cfg), task.config);
      } catch (error) {
        console.error(`Error processing URL ${url}:`, error);
        return [];
      } finally {
        await page.close();
      }
    })
  );
  return results.flat();
}

Comment on lines +61 to +62
const task = JSON.parse(message.value!.toString());
const workflowId = task.workflowId;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add proper error handling for JSON parsing.

The current implementation might throw unhandled exceptions for malformed messages.

- const task = JSON.parse(message.value!.toString());
- const workflowId = task.workflowId;
+ let task;
+ try {
+   task = JSON.parse(message.value?.toString() || '');
+   if (!task?.workflowId) {
+     throw new Error('Invalid task format: missing workflowId');
+   }
+ } catch (error) {
+   console.error('Failed to parse task:', error);
+   throw error;
+ }
+ const workflowId = task.workflowId;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const task = JSON.parse(message.value!.toString());
const workflowId = task.workflowId;
let task;
try {
task = JSON.parse(message.value?.toString() || '');
if (!task?.workflowId) {
throw new Error('Invalid task format: missing workflowId');
}
} catch (error) {
console.error('Failed to parse task:', error);
throw error;
}
const workflowId = task.workflowId;

Comment on lines +214 to +244
private async handleError(message: any, error: Error) {
const retryCount = parseInt(message.headers['retry-count'] || '0');
const task = JSON.parse(message.value!.toString());

if (retryCount < 3) {
await this.producer.send({
topic: kafkaConfig.topics.SCRAPING_TASKS,
messages: [{
key: message.key,
value: message.value,
headers: {
'workflow-id': task.workflowId,
'retry-count': (retryCount + 1).toString(),
'error': error.message
}
}]
});
} else {
await this.producer.send({
topic: kafkaConfig.topics.SCRAPING_DLQ,
messages: [{
key: message.key,
value: message.value,
headers: {
'workflow-id': task.workflowId,
'final-error': error.message
}
}]
});
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling type safety and header access.

The error handling code lacks proper typing and safe header access.

- private async handleError(message: any, error: Error) {
+ interface KafkaMessage {
+   headers: Record<string, Buffer | null>;
+   key: Buffer | null;
+   value: Buffer | null;
+ }
+ 
+ private async handleError(message: KafkaMessage, error: Error) {
+   const headers = message.headers || {};
+   const retryCount = parseInt(headers['retry-count']?.toString() || '0');

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
maxun-core/src/interpret.ts (1)

567-736: 🛠️ Refactor suggestion

Modularize the pagination navigation logic.

The navigation logic is complex and could benefit from being split into smaller, focused functions for better maintainability and testing.

Additionally, consider these improvements:

  1. Add retry mechanism for failed tasks
  2. Implement backoff strategy for navigation failures
  3. Add metrics collection for performance monitoring

Apply this diff to add these improvements:

+interface NavigationResult {
+    success: boolean;
+    url: string;
+    error?: Error;
+}
+
+private async navigateWithRetry(page: Page, url: string, maxRetries = 3): Promise<NavigationResult> {
+    let lastError: Error | undefined;
+    for (let attempt = 1; attempt <= maxRetries; attempt++) {
+        try {
+            await page.goto(url);
+            await page.waitForLoadState('networkidle');
+            return { success: true, url };
+        } catch (error) {
+            lastError = error;
+            if (attempt < maxRetries) {
+                const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
+                await page.waitForTimeout(delay);
+            }
+        }
+    }
+    return { success: false, url, error: lastError };
+}
+
+private collectMetrics(workflowId: string, metrics: Record<string, number>) {
+    console.log(`Metrics for workflow ${workflowId}:`, metrics);
+    // TODO: Send metrics to monitoring system
+}
+
 private async handleParallelPagination(page: Page, config: any): Promise<any[]> {
     if (config.limit > 10000 && config.pagination.type === 'clickNext') {
         console.time('parallel-scraping');
+        const metrics = {
+            totalPages: 0,
+            failedNavigations: 0,
+            retries: 0,
+        };
 
         const workflowId = `workflow-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
         console.log(`Starting workflow with ID: ${workflowId}`);
🧹 Nitpick comments (4)
maxun-core/src/scripts/start-consumer.ts (2)

12-20: Enhance error logging.

The error handling could be improved by including more details in the error log.

Apply this diff to enhance error logging:

 try {
     console.log('Starting scraping consumer...');
     await consumer.start();
     console.log('Consumer is running and waiting for tasks...');
 } catch (error) {
-    console.error('Failed to start consumer:', error);
+    console.error('Failed to start consumer:', {
+        error: error instanceof Error ? error.message : String(error),
+        stack: error instanceof Error ? error.stack : undefined
+    });
     process.exit(1);
 }

22-22: Enhance error handling in main function invocation.

The error handling in the main function invocation could be improved by adding proper error details and cleanup.

Apply this diff to enhance error handling:

-main().catch(console.error);
+main().catch((error) => {
+    console.error('Fatal error:', {
+        error: error instanceof Error ? error.message : String(error),
+        stack: error instanceof Error ? error.stack : undefined
+    });
+    process.exit(1);
+});
maxun-core/src/scripts/setup-kafka.ts (1)

23-23: Enhance error handling in setupKafka invocation.

The error handling in the setupKafka function invocation could be improved by adding proper error details and cleanup.

Apply this diff to enhance error handling:

-setupKafka().catch(console.error);
+setupKafka().catch((error) => {
+    console.error('Fatal error during Kafka setup:', {
+        error: error instanceof Error ? error.message : String(error),
+        stack: error instanceof Error ? error.stack : undefined
+    });
+    process.exit(1);
+});
maxun-core/src/interpret.ts (1)

853-853: Use proper logging mechanism.

Replace direct console.log with the class's logging mechanism for consistency.

Apply this diff:

-          console.log(`Current scroll height: ${currentHeight}`);
+          this.log(`Current scroll height: ${currentHeight}`, Level.LOG);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 73c2cc3 and a931a13.

📒 Files selected for processing (3)
  • maxun-core/src/interpret.ts (6 hunks)
  • maxun-core/src/scripts/setup-kafka.ts (1 hunks)
  • maxun-core/src/scripts/start-consumer.ts (1 hunks)
🔇 Additional comments (4)
maxun-core/src/scripts/start-consumer.ts (1)

1-2: LGTM!

The import statement is correctly formatted and uses an appropriate relative path.

maxun-core/src/scripts/setup-kafka.ts (1)

1-2: LGTM!

The import statement is correctly formatted and uses an appropriate relative path.

maxun-core/src/interpret.ts (2)

19-23: LGTM!

The new imports are correctly formatted and necessary for implementing the parallel processing capabilities.


47-47: Add missing property to InterpreterOptions interface.

The serializableCallback property is used in the code but not defined in the provided InterpreterOptions interface.

Comment on lines +3 to +10
async function main() {
const consumer = new ScrapingConsumer();

// Handle graceful shutdown
process.on('SIGINT', async () => {
console.log('Shutting down consumer...');
process.exit(0);
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve graceful shutdown handling.

The current SIGINT handler immediately exits without cleaning up resources or waiting for in-progress tasks to complete. This could lead to data loss or inconsistent state.

Apply this diff to implement proper cleanup:

 process.on('SIGINT', async () => {
     console.log('Shutting down consumer...');
+    try {
+        await consumer.stop();
+        console.log('Consumer stopped successfully');
+    } catch (error) {
+        console.error('Error during shutdown:', error);
+    }
     process.exit(0);
 });

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +3 to +21
async function setupKafka() {
const manager = new KafkaManager();

try {
console.log('Initializing Kafka manager...');
await manager.initialize();
console.log('Kafka setup completed successfully');

// Keep monitoring for a while to verify setup
setTimeout(async () => {
await manager.cleanup();
process.exit(0);
}, 10000);

} catch (error) {
console.error('Failed to setup Kafka:', error);
process.exit(1);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve Kafka setup monitoring.

The current implementation has several issues:

  1. Hardcoded timeout duration
  2. Forced cleanup after timeout regardless of setup status
  3. Missing configuration validation

Apply this diff to improve the implementation:

+const KAFKA_SETUP_TIMEOUT_MS = 10000; // Move to config
+
 async function setupKafka() {
     const manager = new KafkaManager();
     
     try {
         console.log('Initializing Kafka manager...');
+        // Validate Kafka configuration
+        if (!manager.validateConfig()) {
+            throw new Error('Invalid Kafka configuration');
+        }
+
         await manager.initialize();
         console.log('Kafka setup completed successfully');
         
-        // Keep monitoring for a while to verify setup
-        setTimeout(async () => {
-            await manager.cleanup();
-            process.exit(0);
-        }, 10000);
+        // Monitor setup status
+        const setupTimeout = new Promise((_, reject) => {
+            setTimeout(() => reject(new Error('Kafka setup verification timeout')), 
+                KAFKA_SETUP_TIMEOUT_MS);
+        });
+
+        try {
+            await Promise.race([
+                manager.verifySetup(),
+                setupTimeout
+            ]);
+            console.log('Kafka setup verified successfully');
+            await manager.cleanup();
+            process.exit(0);
+        } catch (error) {
+            throw new Error(`Kafka setup verification failed: ${error.message}`);
+        }
     } catch (error) {
         console.error('Failed to setup Kafka:', error);
         process.exit(1);
     }
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +77 to +88
private kafka: Kafka;

private producer: any;

private async initializeKafka() {
this.producer = this.kafka.producer({
allowAutoTopicCreation: true,
idempotent: true
});
await this.producer.connect();
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve Kafka producer initialization.

The current implementation has several issues:

  1. Hardcoded producer options
  2. Missing error handling
  3. No cleanup handling

Apply this diff to improve the implementation:

 private kafka: Kafka;
 private producer: any;
+private readonly producerConfig = {
+    allowAutoTopicCreation: true,
+    idempotent: true,
+    // Add other configurable options
+};
 
 private async initializeKafka() {
-    this.producer = this.kafka.producer({
-      allowAutoTopicCreation: true,
-      idempotent: true
-    });
-    await this.producer.connect();
+    try {
+        this.producer = this.kafka.producer(this.producerConfig);
+        await this.producer.connect();
+        console.log('Kafka producer connected successfully');
+    } catch (error) {
+        console.error('Failed to initialize Kafka producer:', error);
+        throw error;
+    }
 }
+
+private async cleanup() {
+    if (this.producer) {
+        try {
+            await this.producer.disconnect();
+            console.log('Kafka producer disconnected successfully');
+        } catch (error) {
+            console.error('Error disconnecting Kafka producer:', error);
+        }
+    }
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private kafka: Kafka;
private producer: any;
private async initializeKafka() {
this.producer = this.kafka.producer({
allowAutoTopicCreation: true,
idempotent: true
});
await this.producer.connect();
}
private kafka: Kafka;
private producer: any;
private readonly producerConfig = {
allowAutoTopicCreation: true,
idempotent: true,
// Add other configurable options
};
private async initializeKafka() {
try {
this.producer = this.kafka.producer(this.producerConfig);
await this.producer.connect();
console.log('Kafka producer connected successfully');
} catch (error) {
console.error('Failed to initialize Kafka producer:', error);
throw error;
}
}
private async cleanup() {
if (this.producer) {
try {
await this.producer.disconnect();
console.log('Kafka producer disconnected successfully');
} catch (error) {
console.error('Error disconnecting Kafka producer:', error);
}
}
}

Comment on lines +93 to +101
this.kafka = new Kafka({
clientId: kafkaConfig.clientId,
brokers: kafkaConfig.brokers
});
this.initializeKafka();
this.options = {
maxRepeats: 5,
maxConcurrency: 5,
maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve constructor initialization.

The current implementation has several issues:

  1. Kafka initialization is not awaited
  2. maxWorkers calculation logic is embedded in the constructor
  3. Missing error handling for Kafka initialization

Apply this diff to improve the implementation:

+private calculateMaxWorkers(): number {
+    return Math.max(1, Math.min(os.cpus().length - 1, 4));
+}
+
 constructor(workflow: WorkflowFile, options?: Partial<InterpreterOptions>) {
     super();
     this.workflow = workflow.workflow;
     this.initializedWorkflow = null;
     this.kafka = new Kafka({
         clientId: kafkaConfig.clientId,
         brokers: kafkaConfig.brokers
     });
-    this.initializeKafka();
+    // Initialize Kafka asynchronously
+    this.initializeKafka().catch(error => {
+        console.error('Failed to initialize Kafka:', error);
+        throw error;
+    });
     this.options = {
         maxRepeats: 5,
         maxConcurrency: 5,
-        maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)),
+        maxWorkers: this.calculateMaxWorkers(),
         serializableCallback: (data) => { 
             log(JSON.stringify(data), Level.WARN);
         },

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +738 to +836
private async waitForScrapingResults(tasks: any[]): Promise<any[]> {
// Create a map to store our workflow's results
const resultsMap = new Map<string, any[]>();

// Extract the workflow ID from the first task - all tasks in this batch will share the same workflow ID
const workflowId = tasks[0].workflowId;
console.log(`Waiting for results from workflow: ${workflowId}`);

// Create a Set of task IDs for quick lookup - these are the only tasks we care about
const expectedTaskIds = new Set(tasks.map(task => task.taskId));

// Create a consumer specifically for this workflow
const resultConsumer = this.kafka.consumer({
groupId: `scraping-group-results-${workflowId}`,
maxWaitTimeInMs: 1000,
maxBytesPerPartition: 2097152 // 2MB
});

try {
await resultConsumer.connect();
console.log('Result consumer connected successfully');

await resultConsumer.subscribe({
topic: kafkaConfig.topics.SCRAPING_RESULTS,
fromBeginning: true
});
console.log('Result consumer subscribed to topic successfully');

return new Promise((resolve, reject) => {
let isRunning = true;

resultConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!isRunning) return;

try {
const result = JSON.parse(message.value!.toString());

// Verify both task ID and workflow ID match
if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) {
// Store this task's results
if (!resultsMap.has(result.taskId)) {
resultsMap.set(result.taskId, result.data);
console.log(`Received results for task ${result.taskId}. ` +
`Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`);
}

// Check if we have all our workflow's results
if (resultsMap.size === tasks.length) {
isRunning = false;

// Sort tasks by their numeric index (extract number from task ID)
const sortedTasks = [...tasks].sort((a, b) => {
const aIndex = parseInt(a.taskId.split('-').pop() || '0');
const bIndex = parseInt(b.taskId.split('-').pop() || '0');
return aIndex - bIndex;
});

// Combine results in the sorted task order
const allResults = sortedTasks
.map(task => {
const taskResults = resultsMap.get(task.taskId);
if (!taskResults) {
console.warn(`Missing results for task ${task.taskId} in workflow ${workflowId}`);
return [];
}
return taskResults;
})
.flat();

console.log(`Successfully collected all results from workflow ${workflowId}`);

resolve(allResults);
}
}
} catch (error) {
console.error(`Error processing message in workflow ${workflowId}:`, error);
reject(error);
}
}
});

// // Add a timeout to prevent hanging
// const timeout = setTimeout(() => {
// if (isRunning) {
// isRunning = false;
// console.error(`Timeout waiting for results from workflow ${workflowId}. ` +
// `Received ${resultsMap.size} of ${tasks.length} expected results.`);
// reject(new Error(`Timeout waiting for results from workflow ${workflowId}`));
// }
// }, 30000); // 30 second timeout
});

} catch (error) {
console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error);
throw error;
}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve result collection reliability.

The current implementation has several issues:

  1. Commented-out timeout code
  2. Missing proper consumer cleanup
  3. No handling of partial results

Apply this diff to improve the implementation:

+interface ScrapingResult {
+    workflowId: string;
+    taskId: string;
+    data: any[];
+}
+
+const RESULT_COLLECTION_TIMEOUT_MS = 30000;
+
 private async waitForScrapingResults(tasks: any[]): Promise<any[]> {
     const resultsMap = new Map<string, any[]>();
     const workflowId = tasks[0].workflowId;
     console.log(`Waiting for results from workflow: ${workflowId}`);
     
     const expectedTaskIds = new Set(tasks.map(task => task.taskId));
     
     const resultConsumer = this.kafka.consumer({ 
         groupId: `scraping-group-results-${workflowId}`,
         maxWaitTimeInMs: 1000,
         maxBytesPerPartition: 2097152
     });
 
     try {
         await resultConsumer.connect();
         console.log('Result consumer connected successfully');
         
         await resultConsumer.subscribe({ 
             topic: kafkaConfig.topics.SCRAPING_RESULTS,
             fromBeginning: true 
         });
         console.log('Result consumer subscribed to topic successfully');
 
         return new Promise((resolve, reject) => {
             let isRunning = true;
+            const timeout = setTimeout(() => {
+                if (isRunning) {
+                    isRunning = false;
+                    const error = new Error(
+                        `Timeout waiting for results from workflow ${workflowId}. ` +
+                        `Received ${resultsMap.size} of ${tasks.length} expected results.`
+                    );
+                    reject(error);
+                }
+            }, RESULT_COLLECTION_TIMEOUT_MS);
 
             resultConsumer.run({
                 eachMessage: async ({ topic, partition, message }) => {
                     if (!isRunning) return;
                     
                     try {
-                        const result = JSON.parse(message.value!.toString());
+                        const result = JSON.parse(message.value!.toString()) as ScrapingResult;
                         
                         if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) {
                             if (!resultsMap.has(result.taskId)) {
                                 resultsMap.set(result.taskId, result.data);
                                 console.log(
                                     `Received results for task ${result.taskId}. ` +
                                     `Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`
                                 );
                             }
 
                             if (resultsMap.size === tasks.length) {
                                 isRunning = false;
+                                clearTimeout(timeout);
                                 
                                 const sortedTasks = [...tasks].sort((a, b) => {
                                     const aIndex = parseInt(a.taskId.split('-').pop() || '0');
                                     const bIndex = parseInt(b.taskId.split('-').pop() || '0');
                                     return aIndex - bIndex;
                                 });
 
                                 const allResults = sortedTasks
                                     .map(task => resultsMap.get(task.taskId) || [])
                                     .flat();
 
                                 console.log(`Successfully collected all results from workflow ${workflowId}`);
                                 
+                                await resultConsumer.disconnect();
                                 resolve(allResults);
                             }
                         }
                     } catch (error) {
                         console.error(`Error processing message in workflow ${workflowId}:`, error);
+                        clearTimeout(timeout);
+                        await resultConsumer.disconnect();
                         reject(error);
                     }
                 }
             });
         });
     } catch (error) {
         console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error);
+        await resultConsumer.disconnect();
         throw error;
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private async waitForScrapingResults(tasks: any[]): Promise<any[]> {
// Create a map to store our workflow's results
const resultsMap = new Map<string, any[]>();
// Extract the workflow ID from the first task - all tasks in this batch will share the same workflow ID
const workflowId = tasks[0].workflowId;
console.log(`Waiting for results from workflow: ${workflowId}`);
// Create a Set of task IDs for quick lookup - these are the only tasks we care about
const expectedTaskIds = new Set(tasks.map(task => task.taskId));
// Create a consumer specifically for this workflow
const resultConsumer = this.kafka.consumer({
groupId: `scraping-group-results-${workflowId}`,
maxWaitTimeInMs: 1000,
maxBytesPerPartition: 2097152 // 2MB
});
try {
await resultConsumer.connect();
console.log('Result consumer connected successfully');
await resultConsumer.subscribe({
topic: kafkaConfig.topics.SCRAPING_RESULTS,
fromBeginning: true
});
console.log('Result consumer subscribed to topic successfully');
return new Promise((resolve, reject) => {
let isRunning = true;
resultConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!isRunning) return;
try {
const result = JSON.parse(message.value!.toString());
// Verify both task ID and workflow ID match
if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) {
// Store this task's results
if (!resultsMap.has(result.taskId)) {
resultsMap.set(result.taskId, result.data);
console.log(`Received results for task ${result.taskId}. ` +
`Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`);
}
// Check if we have all our workflow's results
if (resultsMap.size === tasks.length) {
isRunning = false;
// Sort tasks by their numeric index (extract number from task ID)
const sortedTasks = [...tasks].sort((a, b) => {
const aIndex = parseInt(a.taskId.split('-').pop() || '0');
const bIndex = parseInt(b.taskId.split('-').pop() || '0');
return aIndex - bIndex;
});
// Combine results in the sorted task order
const allResults = sortedTasks
.map(task => {
const taskResults = resultsMap.get(task.taskId);
if (!taskResults) {
console.warn(`Missing results for task ${task.taskId} in workflow ${workflowId}`);
return [];
}
return taskResults;
})
.flat();
console.log(`Successfully collected all results from workflow ${workflowId}`);
resolve(allResults);
}
}
} catch (error) {
console.error(`Error processing message in workflow ${workflowId}:`, error);
reject(error);
}
}
});
// // Add a timeout to prevent hanging
// const timeout = setTimeout(() => {
// if (isRunning) {
// isRunning = false;
// console.error(`Timeout waiting for results from workflow ${workflowId}. ` +
// `Received ${resultsMap.size} of ${tasks.length} expected results.`);
// reject(new Error(`Timeout waiting for results from workflow ${workflowId}`));
// }
// }, 30000); // 30 second timeout
});
} catch (error) {
console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error);
throw error;
}
}
interface ScrapingResult {
workflowId: string;
taskId: string;
data: any[];
}
const RESULT_COLLECTION_TIMEOUT_MS = 30000;
private async waitForScrapingResults(tasks: any[]): Promise<any[]> {
// Create a map to store our workflow's results
const resultsMap = new Map<string, any[]>();
// Extract the workflow ID from the first task - all tasks in this batch will share the same workflow ID
const workflowId = tasks[0].workflowId;
console.log(`Waiting for results from workflow: ${workflowId}`);
// Create a Set of task IDs for quick lookup - these are the only tasks we care about
const expectedTaskIds = new Set(tasks.map(task => task.taskId));
// Create a consumer specifically for this workflow
const resultConsumer = this.kafka.consumer({
groupId: `scraping-group-results-${workflowId}`,
maxWaitTimeInMs: 1000,
maxBytesPerPartition: 2097152 // 2MB
});
try {
await resultConsumer.connect();
console.log('Result consumer connected successfully');
await resultConsumer.subscribe({
topic: kafkaConfig.topics.SCRAPING_RESULTS,
fromBeginning: true
});
console.log('Result consumer subscribed to topic successfully');
return new Promise((resolve, reject) => {
let isRunning = true;
const timeout = setTimeout(() => {
if (isRunning) {
isRunning = false;
const error = new Error(
`Timeout waiting for results from workflow ${workflowId}. ` +
`Received ${resultsMap.size} of ${tasks.length} expected results.`
);
reject(error);
}
}, RESULT_COLLECTION_TIMEOUT_MS);
resultConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!isRunning) return;
try {
const result = JSON.parse(message.value!.toString()) as ScrapingResult;
// Verify both task ID and workflow ID match
if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) {
// Store this task's results
if (!resultsMap.has(result.taskId)) {
resultsMap.set(result.taskId, result.data);
console.log(
`Received results for task ${result.taskId}. ` +
`Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`
);
}
// Check if we have all our workflow's results
if (resultsMap.size === tasks.length) {
isRunning = false;
clearTimeout(timeout);
// Sort tasks by their numeric index (extract number from task ID)
const sortedTasks = [...tasks].sort((a, b) => {
const aIndex = parseInt(a.taskId.split('-').pop() || '0');
const bIndex = parseInt(b.taskId.split('-').pop() || '0');
return aIndex - bIndex;
});
// Combine results in the sorted task order
const allResults = sortedTasks
.map(task => resultsMap.get(task.taskId) || [])
.flat();
console.log(`Successfully collected all results from workflow ${workflowId}`);
await resultConsumer.disconnect();
resolve(allResults);
}
}
} catch (error) {
console.error(`Error processing message in workflow ${workflowId}:`, error);
clearTimeout(timeout);
await resultConsumer.disconnect();
reject(error);
}
}
});
});
} catch (error) {
console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error);
await resultConsumer.disconnect();
throw error;
}
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
maxun-core/src/interpret.ts (3)

81-88: ⚠️ Potential issue

Improve Kafka initialization and error handling.

The current implementation has several issues:

  1. Kafka initialization in the constructor is not awaited
  2. Missing error handling for initialization failures
  3. No cleanup handling for the Kafka producer

This issue was previously identified in a past review. Please refer to the suggested improvements in that review comment.

Also applies to: 93-97


596-682: 🛠️ Refactor suggestion

Modularize the pagination navigation logic.

The navigation logic is complex and should be split into smaller, focused functions for better maintainability and testing.

This issue was previously identified in a past review. Please refer to the suggested improvements in that review comment.


738-836: ⚠️ Potential issue

Improve result collection reliability and resource cleanup.

The current implementation has several issues:

  1. Timeout handling is commented out
  2. Missing proper consumer cleanup
  3. No handling of partial results

This issue was previously identified in a past review. Please refer to the suggested improvements in that review comment.

🧹 Nitpick comments (3)
maxun-core/src/interpret.ts (3)

19-22: Consider using a more specific import from the os module.

Instead of importing the entire os module, import only the required cpus function for better tree-shaking.

-import os from 'os'; 
+import { cpus } from 'os';

567-568: Extract magic numbers as named constants.

The threshold value of 10,000 should be defined as a named constant at the class level for better maintainability.

+private static readonly PARALLEL_SCRAPING_THRESHOLD = 10000;
+private static readonly NAVIGATION_TIMEOUT = 30000;

 private async handleParallelPagination(page: Page, config: any): Promise<any[]> {
-  if (config.limit > 10000 && config.pagination.type === 'clickNext') {
+  if (config.limit > this.PARALLEL_SCRAPING_THRESHOLD && config.pagination.type === 'clickNext') {

689-725: Add memory management for large datasets.

The current implementation stores all URLs in memory before processing. For large datasets, this could lead to memory issues.

Consider implementing a streaming approach or processing URLs in chunks:

private async* urlGenerator(page: Page, config: any): AsyncGenerator<string> {
  let currentUrl = page.url();
  while (true) {
    yield currentUrl;
    const nextUrl = await this.getNextUrl(page, config);
    if (!nextUrl) break;
    currentUrl = nextUrl;
  }
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a931a13 and 5411484.

📒 Files selected for processing (1)
  • maxun-core/src/interpret.ts (6 hunks)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
docker-compose.yml (3)

42-57: LGTM! Consider using more specific volume names.

The Zookeeper configuration follows best practices with proper health checks and environment variables. However, consider making the volume names more specific to avoid potential conflicts in larger deployments.

- zookeeper_data:/var/lib/zookeeper/data
- zookeeper_log:/var/lib/zookeeper/log
+ maxun_zookeeper_data:/var/lib/zookeeper/data
+ maxun_zookeeper_log:/var/lib/zookeeper/log

58-83: Review security configuration for production deployment.

While the Kafka configuration is suitable for development, consider these security aspects for production:

  1. PLAINTEXT protocol is insecure for production use
  2. KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" could lead to unauthorized topic creation
  3. Single broker setup with replication factor of 1 offers no redundancy

For production deployment:

  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ # Use SASL_SSL for production
+ # KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:SASL_SSL
  KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
+ # Disable for production
+ # KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"

141-144: Add newline at end of file.

Add a newline character at the end of the file to comply with POSIX standards.

  zookeeper_data:
  zookeeper_log:
+ 
🧰 Tools
🪛 yamllint (1.35.1)

[error] 144-144: no new line character at the end of file

(new-line-at-end-of-file)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5411484 and 7650794.

📒 Files selected for processing (1)
  • docker-compose.yml (3 hunks)
🧰 Additional context used
🪛 yamllint (1.35.1)
docker-compose.yml

[error] 144-144: no new line character at the end of file

(new-line-at-end-of-file)

Comment on lines +84 to +95
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "${KAFKA_UI_PORT:-9080}:8080"
environment:
KAFKA_CLUSTERS_0_NAME: maxun-cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
depends_on:
- kafka

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Pin the kafka-ui image version.

Using the latest tag can lead to unexpected behavior when the image is updated. Pin to a specific version for reproducible builds.

- image: provectuslabs/kafka-ui:latest
+ image: provectuslabs/kafka-ui:v0.7.1
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "${KAFKA_UI_PORT:-9080}:8080"
environment:
KAFKA_CLUSTERS_0_NAME: maxun-cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
depends_on:
- kafka
kafka-ui:
image: provectuslabs/kafka-ui:v0.7.1
container_name: kafka-ui
ports:
- "${KAFKA_UI_PORT:-9080}:8080"
environment:
KAFKA_CLUSTERS_0_NAME: maxun-cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
depends_on:
- kafka

@@ -63,6 +117,7 @@ services:
- postgres
- redis
- minio
- kafka
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add Kafka configuration for backend service.

The backend service now depends on Kafka but lacks the necessary environment variables for connection configuration.

Add these environment variables to the backend service:

  environment:
+   KAFKA_BROKERS: kafka:9092
+   KAFKA_CLIENT_ID: maxun-backend
+   KAFKA_CONSUMER_GROUP_ID: maxun-scraping-group

Committable suggestion skipped: line range outside the PR's diff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Type: Feature New features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants