# Concurrent Collections Examples for Deadline 2

This notebook demonstrates **meaningful** vs **trivial** usage of concurrent collections in C# applications.

## üéØ **Learning Objectives**
- Understand when concurrent collections provide real benefits
- Learn proper thread-safe collection usage patterns
- Avoid unnecessary concurrent collection overhead
- Implement concurrent collections that solve real threading problems

## ‚úÖ **Meaningful Concurrent Collections Usage Examples**

### 1. Producer-Consumer Pattern with ConcurrentQueue

In [None]:
// ‚úÖ MEANINGFUL: Multiple threads producing and consuming work items
public class WorkItemProcessor
{
    private readonly ConcurrentQueue<WorkItem> _workQueue = new();
    private readonly CancellationTokenSource _cancellationTokenSource = new();
    private readonly List<Task> _workers = new();
    
    public class WorkItem
    {
        public int Id { get; set; }
        public string Data { get; set; }
        public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
    }
    
    public void StartProcessing(int workerCount = 3)
    {
        var token = _cancellationTokenSource.Token;
        
        // Start multiple worker threads consuming from the queue
        for (int i = 0; i < workerCount; i++)
        {
            var workerId = i;
            var worker = Task.Run(async () => await WorkerLoop(workerId, token), token);
            _workers.Add(worker);
        }
    }
    
    // Producer method - can be called from multiple threads
    public void EnqueueWork(WorkItem item)
    {
        _workQueue.Enqueue(item);
        Console.WriteLine($"Enqueued work item {item.Id} at {DateTime.Now:HH:mm:ss.fff}");
    }
    
    // Consumer loop - multiple workers process items concurrently
    private async Task WorkerLoop(int workerId, CancellationToken cancellationToken)
    {
        Console.WriteLine($"Worker {workerId} started");
        
        while (!cancellationToken.IsCancellationRequested)
        {
            if (_workQueue.TryDequeue(out var workItem))
            {
                try
                {
                    Console.WriteLine($"Worker {workerId} processing item {workItem.Id}");
                    
                    // Simulate work processing
                    await ProcessWorkItem(workItem);
                    
                    Console.WriteLine($"Worker {workerId} completed item {workItem.Id}");
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Worker {workerId} failed processing item {workItem.Id}: {ex.Message}");
                }
            }
            else
            {
                // No work available, wait a bit before checking again
                await Task.Delay(100, cancellationToken);
            }
        }
        
        Console.WriteLine($"Worker {workerId} stopped");
    }
    
    private async Task ProcessWorkItem(WorkItem item)
    {
        // Simulate processing time
        await Task.Delay(Random.Shared.Next(500, 2000));
        
        // Simulate potential processing logic
        if (item.Data?.Contains("error") == true)
        {
            throw new InvalidOperationException("Simulated processing error");
        }
    }
    
    public void Stop()
    {
        _cancellationTokenSource.Cancel();
        Task.WaitAll(_workers.ToArray(), TimeSpan.FromSeconds(5));
    }
    
    public int GetQueueLength() => _workQueue.Count;
}

### 2. Thread-Safe Cache with ConcurrentDictionary

In [None]:
// ‚úÖ MEANINGFUL: Shared cache accessed by multiple threads with complex operations
public class ThreadSafeCache<TKey, TValue>
{
    private readonly ConcurrentDictionary<TKey, CacheEntry<TValue>> _cache = new();
    private readonly TimeSpan _defaultExpiration;
    private readonly Timer _cleanupTimer;
    
    public class CacheEntry<T>
    {
        public T Value { get; set; }
        public DateTime ExpiresAt { get; set; }
        public int AccessCount { get; set; }
        public DateTime LastAccessed { get; set; }
    }
    
    public ThreadSafeCache(TimeSpan defaultExpiration)
    {
        _defaultExpiration = defaultExpiration;
        
        // Cleanup expired entries every minute
        _cleanupTimer = new Timer(CleanupExpiredEntries, null, 
            TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
    }
    
    // Thread-safe get or add operation
    public async Task<TValue> GetOrAddAsync(TKey key, Func<TKey, Task<TValue>> valueFactory, 
        TimeSpan? expiration = null)
    {
        var now = DateTime.UtcNow;
        var expiresAt = now.Add(expiration ?? _defaultExpiration);
        
        // Try to get existing value
        if (_cache.TryGetValue(key, out var existingEntry))
        {
            if (existingEntry.ExpiresAt > now)
            {
                // Update access statistics atomically
                _cache.AddOrUpdate(key, existingEntry, (k, entry) =>
                {
                    entry.AccessCount++;
                    entry.LastAccessed = now;
                    return entry;
                });
                
                return existingEntry.Value;
            }
            else
            {
                // Entry expired, remove it
                _cache.TryRemove(key, out _);
            }
        }
        
        // Create new value
        var newValue = await valueFactory(key);
        var newEntry = new CacheEntry<TValue>
        {
            Value = newValue,
            ExpiresAt = expiresAt,
            AccessCount = 1,
            LastAccessed = now
        };
        
        // Add or update atomically (handles race conditions)
        var addedEntry = _cache.AddOrUpdate(key, newEntry, (k, existing) =>
        {
            // If another thread added a value while we were creating ours,
            // check if theirs is still valid
            if (existing.ExpiresAt > now)
            {
                existing.AccessCount++;
                existing.LastAccessed = now;
                return existing;
            }
            return newEntry;
        });
        
        return addedEntry.Value;
    }
    
    public bool TryGetValue(TKey key, out TValue value)
    {
        value = default(TValue);
        
        if (_cache.TryGetValue(key, out var entry))
        {
            if (entry.ExpiresAt > DateTime.UtcNow)
            {
                value = entry.Value;
                
                // Update access statistics
                _cache.AddOrUpdate(key, entry, (k, existing) =>
                {
                    existing.AccessCount++;
                    existing.LastAccessed = DateTime.UtcNow;
                    return existing;
                });
                
                return true;
            }
            else
            {
                // Remove expired entry
                _cache.TryRemove(key, out _);
            }
        }
        
        return false;
    }
    
    public void Remove(TKey key)
    {
        _cache.TryRemove(key, out _);
    }
    
    public void Clear()
    {
        _cache.Clear();
    }
    
    public CacheStatistics GetStatistics()
    {
        var entries = _cache.Values.ToList();
        var now = DateTime.UtcNow;
        
        return new CacheStatistics
        {
            TotalEntries = entries.Count,
            ExpiredEntries = entries.Count(e => e.ExpiresAt <= now),
            TotalAccesses = entries.Sum(e => e.AccessCount),
            AverageAccesses = entries.Count > 0 ? entries.Average(e => e.AccessCount) : 0
        };
    }
    
    private void CleanupExpiredEntries(object state)
    {
        var now = DateTime.UtcNow;
        var expiredKeys = _cache
            .Where(kvp => kvp.Value.ExpiresAt <= now)
            .Select(kvp => kvp.Key)
            .ToList();
        
        foreach (var key in expiredKeys)
        {
            _cache.TryRemove(key, out _);
        }
        
        if (expiredKeys.Count > 0)
        {
            Console.WriteLine($"Cleaned up {expiredKeys.Count} expired cache entries");
        }
    }
    
    public class CacheStatistics
    {
        public int TotalEntries { get; set; }
        public int ExpiredEntries { get; set; }
        public int TotalAccesses { get; set; }
        public double AverageAccesses { get; set; }
    }
    
    public void Dispose()
    {
        _cleanupTimer?.Dispose();
        _cache.Clear();
    }
}

### 3. Thread-Safe Event Aggregator with ConcurrentBag

In [None]:
// ‚úÖ MEANINGFUL: Multiple threads adding events, background thread processing
public class ThreadSafeEventAggregator
{
    private readonly ConcurrentBag<EventEntry> _events = new();
    private readonly Timer _processingTimer;
    private readonly object _processingLock = new();
    
    public class EventEntry
    {
        public string EventType { get; set; }
        public object Data { get; set; }
        public DateTime Timestamp { get; set; } = DateTime.UtcNow;
        public string Source { get; set; }
        public Guid CorrelationId { get; set; } = Guid.NewGuid();
    }
    
    public ThreadSafeEventAggregator(TimeSpan processingInterval)
    {
        _processingTimer = new Timer(ProcessEvents, null, processingInterval, processingInterval);
    }
    
    // Thread-safe event addition - can be called from any thread
    public void AddEvent(string eventType, object data, string source = null)
    {
        var eventEntry = new EventEntry
        {
            EventType = eventType,
            Data = data,
            Source = source ?? Thread.CurrentThread.Name ?? "Unknown"
        };
        
        _events.Add(eventEntry);
        
        Console.WriteLine($"Event '{eventType}' added from {eventEntry.Source} at {eventEntry.Timestamp:HH:mm:ss.fff}");
    }
    
    // Batch processing of events (called by timer on background thread)
    private void ProcessEvents(object state)
    {
        // Prevent multiple processing threads from running simultaneously
        if (!Monitor.TryEnter(_processingLock))
        {
            Console.WriteLine("Processing already in progress, skipping this cycle");
            return;
        }
        
        try
        {
            // Get snapshot of current events
            var eventsToProcess = _events.ToArray();
            
            if (eventsToProcess.Length == 0)
            {
                return;
            }
            
            Console.WriteLine($"Processing {eventsToProcess.Length} events...");
            
            // Clear the bag by creating a new one (ConcurrentBag doesn't have Clear)
            // Note: This is a simplification - in real scenarios you might want more sophisticated clearing
            var processedEvents = new List<EventEntry>();
            
            // Process events by type
            var eventGroups = eventsToProcess.GroupBy(e => e.EventType);
            
            foreach (var group in eventGroups)
            {
                try
                {
                    ProcessEventGroup(group.Key, group.ToList());
                    processedEvents.AddRange(group);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Error processing event group '{group.Key}': {ex.Message}");
                }
            }
            
            // Remove processed events from the bag
            // Note: ConcurrentBag doesn't support removal, so we'll recreate it with unprocessed events
            var unprocessedEvents = eventsToProcess.Except(processedEvents);
            
            // This is a limitation of ConcurrentBag - consider using ConcurrentQueue for better removal support
            Console.WriteLine($"Processed {processedEvents.Count} events, {unprocessedEvents.Count()} remain");
        }
        finally
        {
            Monitor.Exit(_processingLock);
        }
    }
    
    private void ProcessEventGroup(string eventType, List<EventEntry> events)
    {
        switch (eventType.ToLower())
        {
            case "user_action":
                ProcessUserActions(events);
                break;
            case "system_error":
                ProcessSystemErrors(events);
                break;
            case "performance_metric":
                ProcessPerformanceMetrics(events);
                break;
            default:
                ProcessGenericEvents(events);
                break;
        }
    }
    
    private void ProcessUserActions(List<EventEntry> events)
    {
        var actionCounts = events.GroupBy(e => e.Data?.ToString())
            .ToDictionary(g => g.Key, g => g.Count());
        
        Console.WriteLine($"User actions processed: {string.Join(", ", actionCounts.Select(kv => $"{kv.Key}: {kv.Value}"))}");
    }
    
    private void ProcessSystemErrors(List<EventEntry> events)
    {
        var errorSources = events.GroupBy(e => e.Source).Select(g => g.Key);
        Console.WriteLine($"System errors from sources: {string.Join(", ", errorSources)}");
        
        // In real implementation, you might send alerts, log to external systems, etc.
    }
    
    private void ProcessPerformanceMetrics(List<EventEntry> events)
    {
        var avgTimestamp = events.Average(e => e.Timestamp.Ticks);
        Console.WriteLine($"Performance metrics batch processed, average timestamp: {new DateTime((long)avgTimestamp):HH:mm:ss}");
    }
    
    private void ProcessGenericEvents(List<EventEntry> events)
    {
        Console.WriteLine($"Processed {events.Count} generic events");
    }
    
    public int GetPendingEventCount() => _events.Count;
    
    public void Dispose()
    {
        _processingTimer?.Dispose();
        
        // Process any remaining events
        ProcessEvents(null);
    }
}

## ‚ùå **Trivial/Incorrect Concurrent Collections Usage Examples**

### Anti-patterns to avoid:

In [None]:
// ‚ùå TRIVIAL: Using concurrent collection in single-threaded scenario
public class BadSingleThreadedUsage
{
    private readonly ConcurrentDictionary<string, int> _data = new();
    
    // This method is only called from one thread, concurrent collection is unnecessary
    public void ProcessSingleThreaded()
    {
        for (int i = 0; i < 100; i++)
        {
            _data.TryAdd($"key{i}", i); // Could just use Dictionary<string, int>
        }
        
        foreach (var kvp in _data)
        {
            Console.WriteLine($"{kvp.Key}: {kvp.Value}");
        }
    }
}

// ‚ùå UNNECESSARY: Using concurrent collection when simple locking would suffice
public class BadLockingAlternative
{
    private readonly ConcurrentQueue<string> _messages = new();
    
    // If you only have 2-3 operations and they're simple, regular collection + lock is often better
    public void AddMessage(string message)
    {
        _messages.Enqueue(message); // Could use Queue<string> + lock for simple scenarios
    }
    
    public string GetMessage()
    {
        _messages.TryDequeue(out var message);
        return message;
    }
}

// ‚ùå MISUSE: Using ConcurrentBag when order matters
public class BadOrderedUsage
{
    private readonly ConcurrentBag<int> _orderedNumbers = new();
    
    public void AddNumbersInOrder()
    {
        // ConcurrentBag doesn't guarantee order!
        for (int i = 1; i <= 10; i++)
        {
            _orderedNumbers.Add(i);
        }
    }
    
    public void PrintInOrder()
    {
        // This won't print in order!
        foreach (var number in _orderedNumbers)
        {
            Console.WriteLine(number);
        }
    }
}

// ‚ùå PERFORMANCE ANTI-PATTERN: Using concurrent collections with excessive operations
public class BadPerformanceUsage
{
    private readonly ConcurrentDictionary<int, string> _cache = new();
    
    public void PerformanceKiller()
    {
        // This creates unnecessary overhead for simple operations
        for (int i = 0; i < 1000000; i++)
        {
            // Each of these operations has more overhead than regular Dictionary
            _cache.TryAdd(i, $"value{i}");
            _cache.TryGetValue(i, out var value);
            _cache.TryRemove(i, out var removed);
        }
    }
}

## üîß **Best Practices for Concurrent Collections**

### 1. Choosing the Right Concurrent Collection

In [None]:
// ‚úÖ GOOD: Choosing the right collection for the use case
public class CollectionChoiceExamples
{
    // Use ConcurrentQueue for FIFO scenarios
    private readonly ConcurrentQueue<Order> _orderQueue = new();
    
    // Use ConcurrentStack for LIFO scenarios
    private readonly ConcurrentStack<UndoAction> _undoStack = new();
    
    // Use ConcurrentDictionary for key-value lookup with updates
    private readonly ConcurrentDictionary<string, UserSession> _activeSessions = new();
    
    // Use ConcurrentBag for unordered collection of items
    private readonly ConcurrentBag<LogEntry> _logEntries = new();
    
    // Use BlockingCollection for producer-consumer with blocking
    private readonly BlockingCollection<WorkItem> _workItems = new(100); // Bounded capacity
    
    public void DemonstrateUsage()
    {
        // Queue: First in, first out
        _orderQueue.Enqueue(new Order { Id = 1 });
        if (_orderQueue.TryDequeue(out var order))
        {
            Console.WriteLine($"Processing order {order.Id}");
        }
        
        // Stack: Last in, first out
        _undoStack.Push(new UndoAction { Description = "Delete item" });
        if (_undoStack.TryPop(out var undoAction))
        {
            Console.WriteLine($"Undoing: {undoAction.Description}");
        }
        
        // Dictionary: Key-value with complex operations
        _activeSessions.AddOrUpdate("user123", 
            new UserSession { UserId = "user123", LoginTime = DateTime.UtcNow },
            (key, existing) => { existing.LastActivity = DateTime.UtcNow; return existing; });
        
        // Bag: Unordered collection
        _logEntries.Add(new LogEntry { Message = "Something happened", Timestamp = DateTime.UtcNow });
        
        // BlockingCollection: Producer-consumer with blocking
        Task.Run(() =>
        {
            // Consumer - this will block until items are available
            foreach (var item in _workItems.GetConsumingEnumerable())
            {
                Console.WriteLine($"Processing work item: {item.Id}");
            }
        });
    }
    
    public class Order { public int Id { get; set; } }
    public class UndoAction { public string Description { get; set; } }
    public class UserSession { public string UserId { get; set; } public DateTime LoginTime { get; set; } public DateTime LastActivity { get; set; } }
    public class LogEntry { public string Message { get; set; } public DateTime Timestamp { get; set; } }
    public class WorkItem { public int Id { get; set; } }
}

### 2. Performance Considerations

In [None]:
// ‚úÖ GOOD: Performance-aware concurrent collection usage
public class PerformanceAwareUsage
{
    private readonly ConcurrentDictionary<string, ExpensiveObject> _cache;
    
    public PerformanceAwareUsage()
    {
        // Initialize with appropriate concurrency level and capacity
        _cache = new ConcurrentDictionary<string, ExpensiveObject>(
            concurrencyLevel: Environment.ProcessorCount,
            capacity: 1000);
    }
    
    // Use GetOrAdd to avoid race conditions and minimize object creation
    public ExpensiveObject GetOrCreateExpensiveObject(string key)
    {
        return _cache.GetOrAdd(key, k => 
        {
            Console.WriteLine($"Creating expensive object for key: {k}");
            return new ExpensiveObject(k);
        });
    }
    
    // Use AddOrUpdate for complex update logic
    public void UpdateOrCreateObject(string key, Func<ExpensiveObject> factory, 
        Func<ExpensiveObject, ExpensiveObject> updateFunction)
    {
        _cache.AddOrUpdate(key, 
            addValue: factory(),
            updateValueFactory: (k, existing) => updateFunction(existing));
    }
    
    // Batch operations for better performance
    public void BatchUpdate(Dictionary<string, ExpensiveObject> updates)
    {
        // Process updates in parallel for better performance
        Parallel.ForEach(updates, kvp =>
        {
            _cache.AddOrUpdate(kvp.Key, kvp.Value, (k, existing) => kvp.Value);
        });
    }
    
    // Efficient cleanup of expired items
    public void CleanupExpiredItems(TimeSpan maxAge)
    {
        var cutoff = DateTime.UtcNow - maxAge;
        var keysToRemove = new List<string>();
        
        // First, identify keys to remove (avoid modifying during enumeration)
        foreach (var kvp in _cache)
        {
            if (kvp.Value.CreatedAt < cutoff)
            {
                keysToRemove.Add(kvp.Key);
            }
        }
        
        // Then remove them
        foreach (var key in keysToRemove)
        {
            _cache.TryRemove(key, out _);
        }
        
        Console.WriteLine($"Removed {keysToRemove.Count} expired items");
    }
    
    public class ExpensiveObject
    {
        public string Key { get; }
        public DateTime CreatedAt { get; }
        public byte[] Data { get; }
        
        public ExpensiveObject(string key)
        {
            Key = key;
            CreatedAt = DateTime.UtcNow;
            
            // Simulate expensive initialization
            Thread.Sleep(10);
            Data = new byte[1024]; // 1KB of data
            Random.Shared.NextBytes(Data);
        }
    }
}

## üìù **Key Takeaways**

### ‚úÖ **Use Concurrent Collections When:**
- Multiple threads actually access the collection simultaneously
- You need thread-safe operations without explicit locking
- Performance under contention is important
- You're implementing producer-consumer patterns

### ‚ùå **Don't Use Concurrent Collections When:**
- Only one thread accesses the collection
- You can easily use simple locking with regular collections
- The overhead outweighs the benefits
- Order is critical (for ConcurrentBag)

### üîß **Best Practices:**
1. **Choose the right collection** for your access pattern
2. **Initialize with appropriate capacity** and concurrency level
3. **Use atomic operations** like GetOrAdd, AddOrUpdate when possible
4. **Avoid excessive operations** in tight loops if not necessary
5. **Consider memory usage** and cleanup strategies

### üéØ **Collection Selection Guide:**
- **ConcurrentQueue**: FIFO, producer-consumer scenarios
- **ConcurrentStack**: LIFO, undo operations
- **ConcurrentDictionary**: Key-value lookup with updates
- **ConcurrentBag**: Unordered collection, parallel processing
- **BlockingCollection**: Producer-consumer with blocking/bounded capacity