Skip to content

Recipe File Watcher

Mark Lauter edited this page May 10, 2026 · 4 revisions

Recipe: File Watcher

A console app that watches a folder, runs every new file through a pipeline, and routes the file to processed/ or failed/ based on the outcome. The pipeline shape — validate, parse, process, archive — is the same shape you'd reach for in any inbound-document workflow: an FTP drop, a vendor export, a daily batch. This recipe builds that as a Plumber pipeline driven by FileSystemWatcher.

What we're building

inbox/      <- new files land here
processed/  <- successful runs end up here
failed/     <- failed runs end up here, with a sidecar .error file

A long-running console app watches inbox/. Every time a file appears, the watcher pushes its path onto a channel. A single consumer reads from the channel, runs each file through a Plumber pipeline, and the pipeline's terminal middleware moves the file to the right destination. Ctrl-C drains the channel and exits cleanly.

Project setup

dotnet new console -n FileWatcherWorker
cd FileWatcherWorker
dotnet add package MSL.Plumber.Pipeline
dotnet add package Microsoft.Extensions.Logging.Console

That's the full dependency list. No host, no EF, no HTTP — just the pipeline library and a console logger.

Request and response

The request is the file path plus its FileInfo snapshot at the moment the watcher saw it. The response is a small result record so the caller knows whether anything went wrong:

namespace FileWatcherWorker;

internal record FileRequest(string Path, FileInfo Info);

internal record ProcessingResult(bool Success, string? ErrorMessage);

The pipeline

Four middleware: validate, parse, process, terminal-archive. Each one focuses on one concern.

ValidateFileMiddleware

FileSystemWatcher has a habit of firing before the upstream process has finished writing. The validation middleware is the place to check size, extension, and whether the file is still locked. Files that fail validation get short-circuited with a ProcessingResult so the archive middleware can route them appropriately:

using Microsoft.Extensions.Logging;
using Plumber;

namespace FileWatcherWorker;

internal sealed class ValidateFileMiddleware(
    RequestMiddleware<FileRequest, ProcessingResult> next,
    ILogger<ValidateFileMiddleware> logger)
{
    private static readonly string[] AcceptedExtensions = [".csv", ".json", ".xml"];
    private const long MaxBytes = 50 * 1024 * 1024;

    public async Task InvokeAsync(RequestContext<FileRequest, ProcessingResult> context)
    {
        var info = context.Request.Info;
        info.Refresh();

        if (!info.Exists)
        {
            context.Response = new ProcessingResult(false, "file disappeared before processing");
            return;
        }

        if (!AcceptedExtensions.Contains(info.Extension, StringComparer.OrdinalIgnoreCase))
        {
            context.Response = new ProcessingResult(false, $"unsupported extension {info.Extension}");
            return;
        }

        if (info.Length > MaxBytes)
        {
            context.Response = new ProcessingResult(false, $"file exceeds {MaxBytes:N0} bytes");
            return;
        }

        if (!await IsReadableAsync(info, context.CancellationToken))
        {
            context.Response = new ProcessingResult(false, "file is still locked");
            return;
        }

        await next(context);
    }

    private static async Task<bool> IsReadableAsync(FileInfo info, CancellationToken cancellationToken)
    {
        // try a few times — the writer may still be flushing
        for (var attempt = 0; attempt < 5; ++attempt)
        {
            try
            {
                await using var stream = info.Open(FileMode.Open, FileAccess.Read, FileShare.Read);
                return true;
            }
            catch (IOException)
            {
                await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken);
            }
        }

        return false;
    }
}

ValidateFileMiddleware retries a few times before giving up — five attempts spaced 200 ms apart is enough for most upload patterns. The retry honors the cancellation token so a Ctrl-C during the wait still tears down promptly.

ParseMiddleware

Read the file, parse it into a typed payload, stash the payload in context.Data so downstream middleware can consume it without re-reading the file:

using System.Text.Json;
using Microsoft.Extensions.Logging;
using Plumber;

namespace FileWatcherWorker;

internal static class DataKeys
{
    public const string Payload = "payload";
}

internal sealed class ParseMiddleware(
    RequestMiddleware<FileRequest, ProcessingResult> next,
    ILogger<ParseMiddleware> logger)
{
    public async Task InvokeAsync(RequestContext<FileRequest, ProcessingResult> context)
    {
        var path = context.Request.Path;
        try
        {
            await using var stream = File.OpenRead(path);
            var payload = await JsonSerializer.DeserializeAsync<Payload>(
                stream, cancellationToken: context.CancellationToken);

            if (payload is null)
            {
                context.Response = new ProcessingResult(false, "payload deserialized as null");
                return;
            }

            context.Data[DataKeys.Payload] = payload;
            await next(context);
        }
        catch (JsonException ex)
        {
            logger.LogWarning(ex, "parse failed for {Path}", path);
            context.Response = new ProcessingResult(false, $"parse failed: {ex.Message}");
        }
    }
}

internal record Payload(string Id, string[] Items);

The Payload shape is a stand-in — your real workflow will have its own DTO. The pattern is the same: parse, stash, hand off.

ProcessMiddleware

The actual work. This is where you'd post to an API, transform the data, write to a database — whatever the workflow is for. The middleware reads the payload back out of context.Data using TryGetValue<T> for the type-safe path:

using Microsoft.Extensions.Logging;
using Plumber;

namespace FileWatcherWorker;

internal sealed class ProcessMiddleware(
    RequestMiddleware<FileRequest, ProcessingResult> next,
    ILogger<ProcessMiddleware> logger)
{
    public async Task InvokeAsync(RequestContext<FileRequest, ProcessingResult> context)
    {
        if (!context.TryGetValue<Payload>(DataKeys.Payload, out var payload))
        {
            context.Response = new ProcessingResult(false, "payload missing from context");
            return;
        }

        // simulated work — replace with the real call
        await Task.Delay(TimeSpan.FromMilliseconds(50), context.CancellationToken);
        logger.LogInformation("processed {Id} ({ItemCount} item(s))",
            payload.Id, payload.Items.Length);

        await next(context);
    }
}

Expensive or fallible work belongs here. Wrap retries, circuit breakers, or compensating actions inside this middleware as your workflow demands.

ArchiveMiddleware

Terminal middleware. Whatever happened upstream — success, validation failure, parse failure, processing exception — this middleware moves the file to the appropriate folder and sets context.Response:

using Microsoft.Extensions.Logging;
using Plumber;

namespace FileWatcherWorker;

internal sealed class ArchiveMiddleware(
    RequestMiddleware<FileRequest, ProcessingResult> next,
    ILogger<ArchiveMiddleware> logger)
{
    private readonly string processedDir;
    private readonly string failedDir;

    public ArchiveMiddleware(
        RequestMiddleware<FileRequest, ProcessingResult> next,
        ILogger<ArchiveMiddleware> logger,
        string processedDir,
        string failedDir)
        : this(next, logger)
    {
        this.processedDir = processedDir;
        this.failedDir = failedDir;
        Directory.CreateDirectory(processedDir);
        Directory.CreateDirectory(failedDir);
    }

    public async Task InvokeAsync(RequestContext<FileRequest, ProcessingResult> context)
    {
        try
        {
            await next(context);
        }
        catch (OperationCanceledException)
        {
            // honor cancellation: leave the file where it is so the next run picks it up
            throw;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "processing threw for {Path}", context.Request.Path);
            context.Response = new ProcessingResult(false, ex.Message);
        }

        var result = context.Response ?? new ProcessingResult(false, "no response");
        var destination = result.Success ? processedDir : failedDir;
        var target = Path.Combine(destination, Path.GetFileName(context.Request.Path));

        try
        {
            File.Move(context.Request.Path, target, overwrite: true);
            if (!result.Success && result.ErrorMessage is not null)
            {
                await File.WriteAllTextAsync(
                    target + ".error", result.ErrorMessage, context.CancellationToken);
            }
        }
        catch (IOException ex)
        {
            logger.LogError(ex, "failed to move {Path} to {Target}", context.Request.Path, target);
        }
    }
}

A few choices to call out:

  • The middleware is registered first in the chain, so its try/catch wraps everything downstream. Any unhandled exception turns into a ProcessingResult(false, ...) rather than crashing the pipeline.
  • OperationCanceledException is allowed to propagate. Cancellation means "shut down" — the file stays in inbox/ so the next run picks it up.
  • A failed file gets a sidecar .error file with the message. That's enough for an operator to triage without digging through logs.
  • Two constructors: the parameterless one matches the standard Use<T>() shape; the one that takes folder paths works with Use<T>(processedDir, failedDir).

Wiring the pipeline

The pipeline assembly is the same CreateBuilder + Configure shape the README and tutorial use:

using Microsoft.Extensions.Logging;
using Plumber;

namespace FileWatcherWorker;

internal static class Pipeline
{
    public static RequestHandlerBuilder<FileRequest, ProcessingResult> CreateBuilder(string[] args) =>
        RequestHandlerBuilder.Create<FileRequest, ProcessingResult>(args)
            .AddJsonFile("appsettings.json", optional: true)
            .ConfigureLogging(logging => logging
                .SetMinimumLevel(LogLevel.Information)
                .AddSimpleConsole(o => o.SingleLine = true));

    public static RequestHandler<FileRequest, ProcessingResult> Configure(
        RequestHandler<FileRequest, ProcessingResult> handler,
        string processedDir,
        string failedDir) =>
        handler
            .Use<ArchiveMiddleware>(processedDir, failedDir)
            .Use<ValidateFileMiddleware>()
            .Use<ParseMiddleware>()
            .Use<ProcessMiddleware>();

    public static RequestHandler<FileRequest, ProcessingResult> Build(
        string[] args, string processedDir, string failedDir) =>
        Configure(CreateBuilder(args).Build(), processedDir, failedDir);
}

ArchiveMiddleware is registered first because it wraps the whole chain in its try/catch and decides where the file ends up. Validate, parse, and process follow in the natural order.

The watcher

FileSystemWatcher is event-driven. Pipeline runs are asynchronous and (for this recipe) sequential. The decoupling layer is a Channel<string> — the watcher pushes file paths onto the channel as fast as events fire; a single consumer reads off the channel and feeds them to the handler one at a time.

using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Plumber;

namespace FileWatcherWorker;

internal sealed class InboxWatcher : IDisposable
{
    private readonly FileSystemWatcher watcher;
    private readonly Channel<string> channel;
    private readonly ILogger<InboxWatcher> logger;

    public InboxWatcher(string inboxPath, ILogger<InboxWatcher> logger)
    {
        Directory.CreateDirectory(inboxPath);
        this.logger = logger;
        channel = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleReader = true,
            SingleWriter = false,
        });

        watcher = new FileSystemWatcher(inboxPath)
        {
            NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite,
            EnableRaisingEvents = false,
        };
        watcher.Created += OnFileEvent;
        watcher.Renamed += OnFileEvent;
    }

    public ChannelReader<string> Reader => channel.Reader;

    public void Start()
    {
        // pick up files that landed before we started
        foreach (var existing in Directory.EnumerateFiles(watcher.Path))
        {
            channel.Writer.TryWrite(existing);
        }

        watcher.EnableRaisingEvents = true;
    }

    public void Stop()
    {
        watcher.EnableRaisingEvents = false;
        channel.Writer.TryComplete();
    }

    private void OnFileEvent(object sender, FileSystemEventArgs e)
    {
        if (!channel.Writer.TryWrite(e.FullPath))
        {
            logger.LogWarning("dropped event for {Path}", e.FullPath);
        }
    }

    public void Dispose()
    {
        watcher.Created -= OnFileEvent;
        watcher.Renamed -= OnFileEvent;
        watcher.Dispose();
    }
}

A few notes:

  • The channel is unbounded with SingleReader = true so the consumer side can read with no contention. Bound it if you'd rather apply backpressure to a chatty watcher.
  • The watcher subscribes to Created and Renamed. Some upload patterns write to a temp file and rename into place; subscribing to both catches that case.
  • On startup, the watcher scans the inbox for files that landed while the process was down. Cold-start drains.

Concurrency: one file at a time

For most file workflows, processing one file at a time is the right default. It's predictable, it avoids contention on shared resources (databases, downstream APIs), and it makes "what's the current state" easy to reason about.

The consumer loop reads from the channel and awaits each pipeline run sequentially:

internal static class Consumer
{
    public static async Task RunAsync(
        ChannelReader<string> reader,
        RequestHandler<FileRequest, ProcessingResult> handler,
        ILogger logger,
        CancellationToken cancellationToken)
    {
        await foreach (var path in reader.ReadAllAsync(cancellationToken))
        {
            try
            {
                var info = new FileInfo(path);
                var result = await handler.InvokeAsync(new FileRequest(path, info), cancellationToken);
                logger.LogInformation("{Path}: success={Success}", path, result?.Success);
            }
            catch (OperationCanceledException)
            {
                throw;
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "consumer caught unexpected error for {Path}", path);
            }
        }
    }
}

If you do need parallel processing, layer a SemaphoreSlim over the loop and start each pipeline run on a Task. Keep the bound small — file workflows usually stress the destination, not the local CPU.

Graceful shutdown

Console.CancelKeyPress cancels the consumer loop. The watcher gets stopped, the channel writer completes, the consumer drains anything already in flight (subject to the cancellation token), and the handler disposes when Program.Main returns:

using FileWatcherWorker;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

var inboxDir = args.FirstOrDefault() ?? "inbox";
var processedDir = "processed";
var failedDir = "failed";

using var loggerFactory = LoggerFactory.Create(b => b
    .SetMinimumLevel(LogLevel.Information)
    .AddSimpleConsole(o => o.SingleLine = true));
var watcherLogger = loggerFactory.CreateLogger<InboxWatcher>();
var consumerLogger = loggerFactory.CreateLogger("Consumer");

using var handler = Pipeline.Build(args, processedDir, failedDir);
using var watcher = new InboxWatcher(inboxDir, watcherLogger);

using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
    e.Cancel = true;
    cts.Cancel();
    watcher.Stop();
};

watcher.Start();
consumerLogger.LogInformation("watching {Path}", inboxDir);

try
{
    await Consumer.RunAsync(watcher.Reader, handler, consumerLogger, cts.Token);
}
catch (OperationCanceledException)
{
    consumerLogger.LogInformation("shutdown complete");
}

return 0;

The shutdown sequence:

  1. User presses Ctrl-C.
  2. e.Cancel = true keeps the runtime from killing the process.
  3. cts.Cancel() cancels the consumer loop and any in-flight pipeline run.
  4. watcher.Stop() disables the file system watcher and completes the channel writer.
  5. The await foreach exits when the channel drains.
  6. using blocks dispose the handler, watcher, logger factory, and cancellation source.

Files that were partway through processing when cancellation hit stay in inbox/ArchiveMiddleware lets OperationCanceledException propagate, so no move happens. The next run picks them up.

Testing

The pipeline is testable independently of the watcher. Build a handler in the test, hand it a FileRequest pointing at a test fixture, and assert on the response and on what happened to the file:

using Plumber.Testing;

public sealed class PipelineTests
{
    [Fact]
    public async Task ValidJsonFileMovesToProcessedAsync()
    {
        var temp = Directory.CreateTempSubdirectory();
        var inbox = Path.Combine(temp.FullName, "inbox");
        var processed = Path.Combine(temp.FullName, "processed");
        var failed = Path.Combine(temp.FullName, "failed");
        Directory.CreateDirectory(inbox);

        var path = Path.Combine(inbox, "sample.json");
        await File.WriteAllTextAsync(path, """{"Id":"abc","Items":["a","b"]}""");

        using var factory = new PlumberApplicationFactory<FileRequest, ProcessingResult>(
            args => Pipeline.CreateBuilder(args),
            handler => Pipeline.Configure(handler, processed, failed));

        var result = await factory.InvokeAsync(new FileRequest(path, new FileInfo(path)));

        Assert.True(result?.Success);
        Assert.True(File.Exists(Path.Combine(processed, "sample.json")));
        Assert.False(File.Exists(path));
    }
}

The watcher is a thin shell over a channel; integration-test it against a real temp folder and assert that events make it onto the reader. Full guidance on testing patterns lives in Testing.

Tested against

  • .NET 10
  • MSL.Plumber.Pipeline 3.x
  • Microsoft.Extensions.Logging.Console 10.x

See also

Clone this wiki locally