Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions examples/buffering-strategies-demo.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#!/usr/bin/env bun

// Buffering Strategies Demo - Batching and windowing

import { $ } from '../src/$.mjs';

console.log('πŸ“¦ Buffering Strategies Demo - Batch Processing & Windows\n');

// Example 1: Batch processing with size-based batching
console.log('πŸ—‚οΈ Example 1: Size-based batching');
console.log('Processing data in batches of 3 items...\n');

try {
const batched = $`seq 1 10`.batch(3);

console.log('Batch processing results:');
let batchNum = 1;
for await (const batch of batched) {
const items = batch.data.map(chunk => chunk.data.toString().trim());
console.log(`β”œβ”€ Batch ${batchNum++}: [${items.join(', ')}] (${batch.size} items)`);
}
console.log('βœ… Batch processing completed\n');

} catch (error) {
console.error('❌ Batch demo failed:', error.message);
}

// Example 2: Time-based batching
console.log('⏱️ Example 2: Time-based batching');
console.log('Batching with 500ms time window...\n');

try {
// Generate data with delays to demonstrate time-based batching
const timeBasedBatched = $`sh -c 'for i in {1..8}; do echo "Message $i"; sleep 0.2; done'`
.batch(5, 500); // Max 5 items OR 500ms timeout

console.log('Time-based batch results:');
let batchNum = 1;
for await (const batch of timeBasedBatched) {
const items = batch.data.map(chunk => chunk.data.toString().trim());
const timestamp = new Date(batch.timestamp).toLocaleTimeString();
console.log(`β”œβ”€ Batch ${batchNum++} [${timestamp}]: [${items.join(', ')}] (${batch.size} items)`);
}
console.log('βœ… Time-based batching completed\n');

} catch (error) {
console.error('❌ Time-based batch demo failed:', error.message);
}

// Example 3: Sliding window processing
console.log('πŸͺŸ Example 3: Sliding window analysis');
console.log('Analyzing data with a sliding window of 3 items...\n');

try {
const windowed = $`seq 1 8`.slidingWindow(3);

console.log('Sliding window analysis:');
let windowNum = 1;
for await (const window of windowed) {
const items = window.data.map(chunk => chunk.data.toString().trim());
const sum = items.reduce((acc, val) => acc + parseInt(val), 0);
const avg = (sum / items.length).toFixed(1);
console.log(`β”œβ”€ Window ${windowNum++}: [${items.join(', ')}] β†’ sum: ${sum}, avg: ${avg}`);
}
console.log('βœ… Sliding window analysis completed\n');

} catch (error) {
console.error('❌ Sliding window demo failed:', error.message);
}

// Example 4: Advanced batch processing with transforms
console.log('⚑ Example 4: Advanced batch processing with transforms');
console.log('Combining batching with map/filter operations...\n');

try {
const advancedBatched = $`seq 1 12`
.map(line => parseInt(line.trim()) * 2) // Double each number
.filter(line => parseInt(line) > 10) // Filter > 10
.batch(3); // Batch in groups of 3

console.log('Advanced batch processing:');
let batchNum = 1;
for await (const batch of advancedBatched) {
const items = batch.data.map(chunk => chunk.data.toString().trim());
const numbers = items.map(x => parseInt(x));
const sum = numbers.reduce((a, b) => a + b, 0);
console.log(`β”œβ”€ Batch ${batchNum++}: [${items.join(', ')}] β†’ sum: ${sum}`);
}
console.log('βœ… Advanced processing completed\n');

} catch (error) {
console.error('❌ Advanced demo failed:', error.message);
}

// Example 5: Real-world use case - Log aggregation
console.log('🏭 Example 5: Real-world log aggregation');
console.log('Aggregating log entries by time windows...\n');

try {
const logAggregation = $`sh -c 'for i in {1..15}; do
level=$([ $((i % 3)) -eq 0 ] && echo "ERROR" || echo "INFO")
echo "[$level] $(date +%H:%M:%S.%3N) Event $i occurred"
sleep 0.1
done'`
.batch(4, 800) // Batch every 4 logs or 800ms
.map(async batch => {
const logs = batch.data.map(chunk => chunk.data.toString().trim());
const errorCount = logs.filter(log => log.includes('ERROR')).length;
const infoCount = logs.filter(log => log.includes('INFO')).length;

return {
timestamp: new Date(batch.timestamp).toLocaleTimeString(),
totalLogs: logs.length,
errorCount,
infoCount,
errorRate: (errorCount / logs.length * 100).toFixed(1)
};
});

console.log('Log aggregation results:');
let aggregateNum = 1;
for await (const chunk of logAggregation) {
const summary = JSON.parse(chunk.data.toString());
console.log(`β”œβ”€ Aggregate ${aggregateNum++} [${summary.timestamp}]:`);
console.log(` β”œβ”€ Total logs: ${summary.totalLogs}`);
console.log(` β”œβ”€ Errors: ${summary.errorCount}, Info: ${summary.infoCount}`);
console.log(` └─ Error rate: ${summary.errorRate}%`);
}
console.log('βœ… Log aggregation completed\n');

} catch (error) {
console.error('❌ Log aggregation demo failed:', error.message);
}

console.log('🎯 Buffering strategies demo completed successfully!');
console.log('πŸ’‘ Key features demonstrated:');
console.log(' β€’ Size-based batching for bulk processing');
console.log(' β€’ Time-based batching for real-time aggregation');
console.log(' β€’ Sliding window analysis for trend monitoring');
console.log(' β€’ Advanced pipeline combinations');
console.log(' β€’ Real-world log aggregation patterns');
34 changes: 34 additions & 0 deletions examples/debug-stream-processing.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env bun

import { $ } from '../src/$.mjs';

console.log('πŸ” Debug Stream Processing\n');

// Test basic stream() functionality
console.log('1. Testing basic stream()');
const basic = $`printf "1\\n2\\n3\\n"`;

for await (const chunk of basic.stream()) {
console.log('Basic chunk:', {
type: chunk.type,
data: JSON.stringify(chunk.data.toString()),
preview: chunk.data.toString().slice(0, 20)
});
}

console.log('\n2. Testing map() functionality');
const mapped = $`printf "1\\n2\\n3\\n"`.map(line => {
console.log('Map input:', JSON.stringify(line));
const num = parseInt(line.trim());
const result = num * 2;
console.log('Map output:', result);
return result;
});

for await (const chunk of mapped) {
console.log('Mapped chunk:', {
type: chunk.type,
data: JSON.stringify(chunk.data.toString()),
originalData: chunk.originalData ? JSON.stringify(chunk.originalData.toString()) : 'none'
});
}
121 changes: 121 additions & 0 deletions examples/stream-analytics-demo.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#!/usr/bin/env bun

// Stream Analytics Demo - Real-time monitoring and metrics

import { $ } from '../src/$.mjs';

console.log('πŸš€ Stream Analytics Demo - Real-time Processing Engine\n');

// Example 1: Log monitoring with analytics
console.log('πŸ“Š Example 1: Real-time log analytics');
console.log('Generating sample log data with errors and response times...\n');

try {
const stats = await $`sh -c 'for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20; do
if [ $((i % 4)) -eq 0 ]; then
echo "ERROR: Failed to process request $i - response time: $((100 + i * 20))ms"
else
echo "INFO: Request $i processed successfully - response time: $((200 + i * 15))ms"
fi
sleep 0.1
done'`
.analyze({
errorRate: line => line.includes('ERROR'),
responseTime: line => {
const match = line.match(/response time: (\d+)ms/);
return match ? parseInt(match[1]) : null;
},
throughput: true,
customMetrics: {
requestCount: (line, analytics) => {
const match = line.match(/Request (\d+)/);
return match ? parseInt(match[1]) : undefined;
},
severity: (line) => {
if (line.includes('ERROR')) return 'high';
if (line.includes('WARN')) return 'medium';
return 'low';
}
}
});

console.log('Real-time analytics results:');
let chunkCount = 0;
for await (const chunk of stats) {
if (chunk.analytics && ++chunkCount % 5 === 0) { // Show every 5th analytics update
console.log(`β”œβ”€ Chunk ${chunkCount}:`, {
errorRate: chunk.analytics.errorRate,
avgResponseTime: Math.round(chunk.analytics.avgResponseTime),
throughputRate: Math.round(chunk.analytics.throughputRate),
elapsedTime: chunk.analytics.elapsedTime,
customMetrics: Object.keys(chunk.analytics.customMetrics)
});
}
}
console.log('βœ… Log analytics completed\n');

} catch (error) {
console.error('❌ Analytics demo failed:', error.message);
}

// Example 2: Stream transforms - processing pipeline
console.log('πŸ”„ Example 2: Stream transformation pipeline');
console.log('Processing data through map β†’ filter β†’ reduce operations...\n');

try {
// Create a data processing pipeline
const pipeline = $`seq 1 10`.map(line => {
const num = parseInt(line.trim());
return `processed_${num * 2}`;
}).filter(line => {
const num = parseInt(line.split('_')[1]);
return num > 10; // Only keep numbers > 10
});

console.log('Filtered and mapped results:');
for await (const chunk of pipeline) {
console.log(`β”œβ”€ ${chunk.data.toString().trim()}`);
}

// Demonstrate reduce operation
const sum = await $`seq 1 5`.reduce((acc, line) => {
return acc + parseInt(line.trim());
}, 0).aggregate();

console.log(`└─ Sum of 1-5: ${sum}\n`);

} catch (error) {
console.error('❌ Transform demo failed:', error.message);
}

// Example 3: Stream splitting
console.log('🌿 Example 3: Stream splitting based on content');
console.log('Splitting mixed log levels into separate streams...\n');

try {
const mixedLogs = $`sh -c 'echo "INFO: System started"; echo "ERROR: Connection failed"; echo "WARN: Low memory"; echo "INFO: User logged in"; echo "ERROR: Database timeout"'`;

const split = mixedLogs.split(line => line.includes('ERROR'));

console.log('Error stream:');
for await (const chunk of split.matched) {
console.log(`🚨 ${chunk.data.toString().trim()}`);
}

console.log('\nInfo/Warning stream:');
for await (const chunk of split.unmatched) {
console.log(`ℹ️ ${chunk.data.toString().trim()}`);
}
console.log('');

} catch (error) {
console.error('❌ Splitting demo failed:', error.message);
}

console.log('🎯 Stream analytics demo completed successfully!');
console.log('πŸ’‘ Key features demonstrated:');
console.log(' β€’ Real-time error rate monitoring');
console.log(' β€’ Response time analysis');
console.log(' β€’ Custom metrics collection');
console.log(' β€’ Stream transformations (map/filter/reduce)');
console.log(' β€’ Content-based stream splitting');
69 changes: 69 additions & 0 deletions examples/stream-merging-demo.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env bun

// Stream Merging Demo - Combining multiple data streams

import { $, merge } from '../src/$.mjs';

console.log('πŸ”€ Stream Merging Demo - Multi-source Processing\n');

// Example 1: Basic stream merging
console.log('πŸ“‘ Example 1: Merging multiple log sources');
console.log('Combining output from different system monitors...\n');

try {
const systemMonitor = $`sh -c 'for i in {1..5}; do echo "SYSTEM: CPU usage at $((60 + RANDOM % 40))%"; sleep 0.2; done'`;
const networkMonitor = $`sh -c 'for i in {1..5}; do echo "NETWORK: $((100 + RANDOM % 200))ms latency to server"; sleep 0.15; done'`;
const diskMonitor = $`sh -c 'for i in {1..5}; do echo "DISK: $((40 + RANDOM % 60))% usage on /var"; sleep 0.25; done'`;

const merged = merge(systemMonitor, networkMonitor, diskMonitor);

console.log('Combined monitoring streams:');
for await (const chunk of merged) {
const source = ['System', 'Network', 'Disk'][chunk.streamIndex];
const timestamp = new Date(chunk.timestamp).toLocaleTimeString();
console.log(`[${timestamp}] ${source}: ${chunk.data.toString().trim()}`);
}
console.log('βœ… Stream merging completed\n');

} catch (error) {
console.error('❌ Merging demo failed:', error.message);
}

// Example 2: Merging with analytics
console.log('πŸ“Š Example 2: Analytics across merged streams');
console.log('Analyzing combined data from multiple sources...\n');

try {
const errorLogs = $`sh -c 'for i in {1..8}; do echo "Service A: ERROR - Connection timeout"; sleep 0.1; done'`;
const infoLogs = $`sh -c 'for i in {1..12}; do echo "Service B: INFO - Request processed"; sleep 0.08; done'`;
const warnLogs = $`sh -c 'for i in {1..6}; do echo "Service C: WARN - Memory usage high"; sleep 0.15; done'`;

const mergedWithAnalytics = merge(errorLogs, infoLogs, warnLogs)
.analyze({
errorRate: (line, streamIndex) => line.includes('ERROR')
});

console.log('Merged stream analytics:');
let count = 0;
for await (const chunk of mergedWithAnalytics) {
if (chunk.analytics && ++count % 8 === 0) { // Show every 8th update
const analytics = chunk.analytics;
console.log(`β”œβ”€ Update ${count}:`);
console.log(` β”œβ”€ Stream distribution: [${analytics.streamCounts.join(', ')}]`);
console.log(` β”œβ”€ Total messages: ${analytics.totalCount}`);
console.log(` β”œβ”€ Error rates: [${analytics.errorRates.join(', ')}]`);
console.log(` └─ Dominant stream: ${analytics.dominantStream} (${['Error', 'Info', 'Warn'][analytics.dominantStream]})`);
}
}
console.log('βœ… Analytics completed\n');

} catch (error) {
console.error('❌ Analytics demo failed:', error.message);
}

console.log('🎯 Stream merging demo completed successfully!');
console.log('πŸ’‘ Key features demonstrated:');
console.log(' β€’ Multi-stream merging with timestamps');
console.log(' β€’ Stream-aware analytics');
console.log(' β€’ Real-time monitoring of multiple sources');
console.log(' β€’ Distribution analysis across streams');
10 changes: 8 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "command-stream",
"version": "0.7.1",
"description": "Modern $ shell utility library with streaming, async iteration, and EventEmitter support, optimized for Bun runtime",
"version": "0.8.0",
"description": "Advanced $ shell utility library with real-time stream processing, analytics engine, transforms, and async iteration, optimized for Bun runtime",
"type": "module",
"main": "src/$.mjs",
"exports": {
Expand Down Expand Up @@ -29,6 +29,12 @@
"shell",
"command",
"streaming",
"analytics",
"transforms",
"real-time",
"map",
"filter",
"reduce",
"async",
"iteration",
"eventemitter",
Expand Down
Loading
Loading