From 8064b17d479686b01c538a90459f1366598eede3 Mon Sep 17 00:00:00 2001 From: Oleksandr Pavlov Date: Mon, 17 Apr 2023 18:34:01 +0300 Subject: [PATCH] data clean up for scheduler --- Examples/BrownsfashionScraper/Worker.cs | 3 +- .../WebReaper.ConsoleApplication/Program.cs | 4 ++- .../ScrapingWorker.cs | 3 +- .../ScrapingWorker.cs | 3 +- README.md | 14 +++++++- .../ScraperTests.cs | 9 +++-- WebReaper/Builders/ScraperEngineBuilder.cs | 30 ++++++++++++---- .../Concrete/FileVisitedLinkedTracker.cs | 1 - .../Concrete/RedisVisitedLinkTracker.cs | 4 +-- .../Loaders/Abstract/BrowserPageLoader.cs | 2 +- .../Core/Scheduler/Abstract/IScheduler.cs | 2 ++ .../Concrete/AzureServiceBusScheduler.cs | 28 +++++++++++++-- .../Core/Scheduler/Concrete/FileScheduler.cs | 34 ++++++++++++++----- .../Scheduler/Concrete/InMemoryScheduler.cs | 2 ++ .../Core/Scheduler/Concrete/RedisScheduler.cs | 19 ++++++++++- WebReaper/Core/ScraperEngine.cs | 8 +++-- WebReaper/Logging/ColorConsoleLogger.cs | 5 +-- 17 files changed, 133 insertions(+), 38 deletions(-) diff --git a/Examples/BrownsfashionScraper/Worker.cs b/Examples/BrownsfashionScraper/Worker.cs index 7b659d5..c138969 100644 --- a/Examples/BrownsfashionScraper/Worker.cs +++ b/Examples/BrownsfashionScraper/Worker.cs @@ -70,12 +70,13 @@ public override async Task StartAsync(CancellationToken cancellationToken) }) .WriteToCsvFile("result.csv", true) + .WithParallelismDegree(10) .BuildAsync(); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await _scraper.RunAsync(10, stoppingToken); + await _scraper.RunAsync(stoppingToken); } } } \ No newline at end of file diff --git a/Examples/WebReaper.ConsoleApplication/Program.cs b/Examples/WebReaper.ConsoleApplication/Program.cs index 91e8cd9..f4687ec 100644 --- a/Examples/WebReaper.ConsoleApplication/Program.cs +++ b/Examples/WebReaper.ConsoleApplication/Program.cs @@ -15,9 +15,11 @@ }) .WriteToJsonFile("output.json", dataCleanupOnStart: true) .TrackVisitedLinksInFile("visited.txt", dataCleanupOnStart: true) + .WithTextFileScheduler("jobs.txt", "currentJob.txt", dataCleanupOnStart: true) .LogToConsole() - .PageCrawlLimit(10) + .PageCrawlLimit(500) .HeadlessMode(true) + .WithParallelismDegree(30) .BuildAsync(); await engine.RunAsync(); diff --git a/Examples/WebReaper.DistributedScraperWorkerService/ScrapingWorker.cs b/Examples/WebReaper.DistributedScraperWorkerService/ScrapingWorker.cs index 6fc3e0e..13f8006 100644 --- a/Examples/WebReaper.DistributedScraperWorkerService/ScrapingWorker.cs +++ b/Examples/WebReaper.DistributedScraperWorkerService/ScrapingWorker.cs @@ -52,12 +52,13 @@ public override async Task StartAsync(CancellationToken cancellationToken) "Rutracker", true) .WithAzureServiceBusScheduler(azureSBConnectionString, queue) + .WithParallelismDegree(10) .BuildAsync(); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await engine.RunAsync(10, stoppingToken); + await engine.RunAsync(stoppingToken); } } diff --git a/Examples/WebReaper.ScraperWorkerService/ScrapingWorker.cs b/Examples/WebReaper.ScraperWorkerService/ScrapingWorker.cs index c21d0f8..98396e0 100644 --- a/Examples/WebReaper.ScraperWorkerService/ScrapingWorker.cs +++ b/Examples/WebReaper.ScraperWorkerService/ScrapingWorker.cs @@ -47,6 +47,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) .TrackVisitedLinksInRedis("localhost:6379", "rutracker-visited-links") .WriteToRedis("localhost:6379", "rutracker-audiobooks", true) .WithRedisConfigStorage("localhost:6379", "rutracker-scraper-config") + .WithParallelismDegree(20) .BuildAsync(); } @@ -78,7 +79,7 @@ private static async Task ParseTorrentStats(Metadata meta, JObject result) protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await _engine.RunAsync(20, stoppingToken); + await _engine.RunAsync( stoppingToken); } } diff --git a/README.md b/README.md index 31b36fc..f3a91fd 100644 --- a/README.md +++ b/README.md @@ -174,12 +174,24 @@ This dataCleanupOnStart parameter is present for all sinks, e.g. MongoDbSink, Re ### How to clean visited links from the previous web scrapping run +To clean up the list of visited links just pass true for dataCleanupOnStart parameter: + ```C# var engine = await new ScraperEngineBuilder() .Get("https://www.reddit.com/r/dotnet/") .TrackVisitedLinksInFile("visited.txt", dataCleanupOnStart: true) ``` +### How to clean job queue from the previous web scraping run + +Job queue is a queue of tasks schedules for web scraper. To clean up the job queue pass the dataCleanupOnStart parameter set to true. + +```C# +var engine = await new ScraperEngineBuilder() + .Get("https://www.reddit.com/r/dotnet/") + .WithTextFileScheduler("jobs.txt", "currentJob.txt", dataCleanupOnStart: true) +``` + ### Distributed web scraping with Serverless approach In the Examples folder you can find the project called WebReaper.AzureFuncs. It demonstrates the use of WebReaper with @@ -208,7 +220,7 @@ Finally, it iterates through these new jobs and sends them the the Job queue. Out of the box there are 4 sinks you can send your parsed data to: ConsoleSink, CsvFileSink, JsonFileSink, CosmosSink ( Azure Cosmos database). -You can easly add your own by implementing the IScraperSink interface: +You can easily add your own by implementing the IScraperSink interface: ```C# public interface IScraperSink diff --git a/WebReaper.Tests/WebReaper.IntegrationTests/ScraperTests.cs b/WebReaper.Tests/WebReaper.IntegrationTests/ScraperTests.cs index d476736..32b2908 100644 --- a/WebReaper.Tests/WebReaper.IntegrationTests/ScraperTests.cs +++ b/WebReaper.Tests/WebReaper.IntegrationTests/ScraperTests.cs @@ -65,9 +65,10 @@ public async Task SimpleTest() }) .WithLogger(new TestOutputLogger(output)) .Subscribe(x => result.Add(x)) + .WithParallelismDegree(1) .BuildAsync(); - _ = engine.RunAsync(1); + _ = engine.RunAsync(); await Task.Delay(10000); @@ -90,9 +91,10 @@ public async Task SimpleTestWithProxy() .WithLogger(new TestOutputLogger(this.output)) .WithProxies(new WebShareProxyProvider()) .Subscribe(x => result.Add(x)) + .WithParallelismDegree(2) .BuildAsync(); - _ = scraper.RunAsync(2); + _ = scraper.RunAsync(); await Task.Delay(30000); @@ -121,9 +123,10 @@ public async Task SimpleTestWithSPA() }) .WithLogger(new TestOutputLogger(this.output)) .Subscribe(x => result.Add(x)) + .WithParallelismDegree(10) .BuildAsync(); - _ = engine.RunAsync(10); + _ = engine.RunAsync(); await Task.Delay(20000); diff --git a/WebReaper/Builders/ScraperEngineBuilder.cs b/WebReaper/Builders/ScraperEngineBuilder.cs index a3ea21c..d5593ca 100644 --- a/WebReaper/Builders/ScraperEngineBuilder.cs +++ b/WebReaper/Builders/ScraperEngineBuilder.cs @@ -28,6 +28,7 @@ namespace WebReaper.Builders; public class ScraperEngineBuilder { private IVisitedLinkTracker _visitedLinksTracker = new InMemoryVisitedLinkTracker(); + private int _parallelismDegree = 8; private ConfigBuilder ConfigBuilder { get; } = new(); private SpiderBuilder SpiderBuilder { get; } = new(); @@ -222,21 +223,30 @@ public ScraperEngineBuilder WithScheduler(IScheduler scheduler) return this; } - public ScraperEngineBuilder WithAzureServiceBusScheduler(string connectionString, string queueName) + public ScraperEngineBuilder WithAzureServiceBusScheduler( + string connectionString, + string queueName, + bool dataCleanupOnStart = false) { - Scheduler = new AzureServiceBusScheduler(connectionString, queueName); + Scheduler = new AzureServiceBusScheduler(connectionString, queueName, dataCleanupOnStart); return this; } - public ScraperEngineBuilder WithTextFileScheduler(string fileName, string currentJobPositionFileName) + public ScraperEngineBuilder WithTextFileScheduler( + string fileName, + string currentJobPositionFileName, + bool dataCleanupOnStart = false) { - Scheduler = new FileScheduler(fileName, currentJobPositionFileName, Logger); + Scheduler = new FileScheduler(fileName, currentJobPositionFileName, Logger, dataCleanupOnStart); return this; } - public ScraperEngineBuilder WithRedisScheduler(string connectionString, string queueName) + public ScraperEngineBuilder WithRedisScheduler( + string connectionString, + string queueName, + bool dataCleanupOnStart = false) { - Scheduler = new RedisScheduler(connectionString, queueName, Logger); + Scheduler = new RedisScheduler(connectionString, queueName, Logger, dataCleanupOnStart); return this; } @@ -301,6 +311,12 @@ public ScraperEngineBuilder PostProcess(Func action) return this; } + public ScraperEngineBuilder WithParallelismDegree(int parallelismDegree) + { + _parallelismDegree = parallelismDegree; + return this; + } + public async Task BuildAsync() { await _visitedLinksTracker.Initialization; @@ -312,6 +328,6 @@ public async Task BuildAsync() await ConfigStorage.CreateConfigAsync(config); - return new ScraperEngine(ConfigStorage, Scheduler, spider, Logger); + return new ScraperEngine(_parallelismDegree, ConfigStorage, Scheduler, spider, Logger); } } \ No newline at end of file diff --git a/WebReaper/Core/LinkTracker/Concrete/FileVisitedLinkedTracker.cs b/WebReaper/Core/LinkTracker/Concrete/FileVisitedLinkedTracker.cs index 1218b67..212a0a4 100644 --- a/WebReaper/Core/LinkTracker/Concrete/FileVisitedLinkedTracker.cs +++ b/WebReaper/Core/LinkTracker/Concrete/FileVisitedLinkedTracker.cs @@ -1,5 +1,4 @@ using System.Collections.Concurrent; -using Microsoft.Azure.Amqp.Framing; using WebReaper.Core.LinkTracker.Abstract; namespace WebReaper.Core.LinkTracker.Concrete; diff --git a/WebReaper/Core/LinkTracker/Concrete/RedisVisitedLinkTracker.cs b/WebReaper/Core/LinkTracker/Concrete/RedisVisitedLinkTracker.cs index 1a85608..52ee4cb 100644 --- a/WebReaper/Core/LinkTracker/Concrete/RedisVisitedLinkTracker.cs +++ b/WebReaper/Core/LinkTracker/Concrete/RedisVisitedLinkTracker.cs @@ -8,6 +8,8 @@ public class RedisVisitedLinkTracker : RedisBase, IVisitedLinkTracker private readonly string _redisKey; public bool DataCleanupOnStart { get; set; } + + public Task Initialization { get; } public RedisVisitedLinkTracker(string connectionString, string redisKey, bool dataCleanupOnStart = false) : base(connectionString) @@ -17,8 +19,6 @@ public RedisVisitedLinkTracker(string connectionString, string redisKey, bool da DataCleanupOnStart = dataCleanupOnStart; } - public Task Initialization { get; } - private async Task InitializeAsync() { if (!DataCleanupOnStart) diff --git a/WebReaper/Core/Loaders/Abstract/BrowserPageLoader.cs b/WebReaper/Core/Loaders/Abstract/BrowserPageLoader.cs index d3bc54e..dcaa846 100644 --- a/WebReaper/Core/Loaders/Abstract/BrowserPageLoader.cs +++ b/WebReaper/Core/Loaders/Abstract/BrowserPageLoader.cs @@ -18,7 +18,7 @@ public abstract class BrowserPageLoader PageActionType.ScrollToEnd, async (page, data) => await page.EvaluateExpressionAsync("window.scrollTo(0, document.body.scrollHeight);") }, - { PageActionType.Wait, async (page, data) => await Task.Delay((int)data.First()) }, + { PageActionType.Wait, async (page, data) => await Task.Delay(Convert.ToInt32(data.First())) }, { PageActionType.WaitForNetworkIdle, async (page, data) => await page.WaitForNetworkIdleAsync() }, { PageActionType.Click, async (page, data) => await page.ClickAsync((string)data.First()) } }; diff --git a/WebReaper/Core/Scheduler/Abstract/IScheduler.cs b/WebReaper/Core/Scheduler/Abstract/IScheduler.cs index 4e4c952..00e8a55 100644 --- a/WebReaper/Core/Scheduler/Abstract/IScheduler.cs +++ b/WebReaper/Core/Scheduler/Abstract/IScheduler.cs @@ -4,6 +4,8 @@ namespace WebReaper.Core.Scheduler.Abstract; public interface IScheduler { + public bool DataCleanupOnStart { get; set; } + Task AddAsync(Job job, CancellationToken cancellationToken = default); Task AddAsync(IEnumerable jobs, CancellationToken cancellationToken = default); IAsyncEnumerable GetAllAsync(CancellationToken cancellationToken = default); diff --git a/WebReaper/Core/Scheduler/Concrete/AzureServiceBusScheduler.cs b/WebReaper/Core/Scheduler/Concrete/AzureServiceBusScheduler.cs index 3043e9f..1d04419 100644 --- a/WebReaper/Core/Scheduler/Concrete/AzureServiceBusScheduler.cs +++ b/WebReaper/Core/Scheduler/Concrete/AzureServiceBusScheduler.cs @@ -3,27 +3,49 @@ using Newtonsoft.Json; using WebReaper.Core.Scheduler.Abstract; using WebReaper.Domain; +using Azure.Messaging.ServiceBus.Administration; namespace WebReaper.Core.Scheduler.Concrete; public class AzureServiceBusScheduler : IScheduler, IAsyncDisposable { + private readonly string _queueName; private readonly ServiceBusClient _client; private readonly ServiceBusReceiver _receiver; private readonly ServiceBusSender _sender; + private readonly ServiceBusAdministrationClient _adminClient; - public AzureServiceBusScheduler(string serviceBusConnectionString, string queueName) + public bool DataCleanupOnStart { get; set; } + + public Task Initialization { get; } + + public AzureServiceBusScheduler(string serviceBusConnectionString, string queueName, bool dataCleanupOnStart = false) { + _queueName = queueName; + DataCleanupOnStart = dataCleanupOnStart; _client = new ServiceBusClient(serviceBusConnectionString); - _receiver = _client.CreateReceiver(queueName, new ServiceBusReceiverOptions + _receiver = _client.CreateReceiver(_queueName, new ServiceBusReceiverOptions { PrefetchCount = 10 }); - _sender = _client.CreateSender(queueName); + _sender = _client.CreateSender(_queueName); + + _adminClient = new ServiceBusAdministrationClient(serviceBusConnectionString); + + Initialization = InitializeAsync(); + } + + private async Task InitializeAsync() + { + if (DataCleanupOnStart) + { + await _adminClient.DeleteQueueAsync(_queueName); + await _adminClient.CreateQueueAsync(_queueName); + } } public async ValueTask DisposeAsync() diff --git a/WebReaper/Core/Scheduler/Concrete/FileScheduler.cs b/WebReaper/Core/Scheduler/Concrete/FileScheduler.cs index 2a9600e..32abda2 100644 --- a/WebReaper/Core/Scheduler/Concrete/FileScheduler.cs +++ b/WebReaper/Core/Scheduler/Concrete/FileScheduler.cs @@ -3,7 +3,6 @@ using Newtonsoft.Json; using WebReaper.Core.Scheduler.Abstract; using WebReaper.Domain; -using static System.IO.File; namespace WebReaper.Core.Scheduler.Concrete; @@ -15,14 +14,31 @@ public class FileScheduler : IScheduler private readonly SemaphoreSlim _semaphore = new(1, 1); private long _currentJobPosition; + public Task Initialization { get; } - public FileScheduler(string fileName, string currentJobPositionFileName, ILogger logger) + public bool DataCleanupOnStart { get; set; } + + public FileScheduler(string fileName, string currentJobPositionFileName, ILogger logger, bool dataCleanupOnStart = false) { + DataCleanupOnStart = dataCleanupOnStart; _fileName = fileName; _currentJobPositionFileName = currentJobPositionFileName; _logger = logger; - if (Exists(_currentJobPositionFileName)) - _currentJobPosition = int.Parse(ReadAllText(_currentJobPositionFileName)); + + Initialization = InitializeAsync(); + } + + private async Task InitializeAsync() + { + if (DataCleanupOnStart) + { + File.Delete(_fileName); + File.Delete(_currentJobPositionFileName); + return; + } + + if (File.Exists(_currentJobPositionFileName)) + _currentJobPosition = int.Parse(await File.ReadAllTextAsync(_currentJobPositionFileName)); } public async IAsyncEnumerable GetAllAsync( @@ -39,7 +55,7 @@ public FileScheduler(string fileName, string currentJobPositionFileName, ILogger for (var i = 0; i < _currentJobPosition; i++) { _logger.LogInformation("Skipping {Count} line", i); - await sr.ReadLineAsync(); + await sr.ReadLineAsync(cancellationToken); } while (true) @@ -48,7 +64,7 @@ public FileScheduler(string fileName, string currentJobPositionFileName, ILogger string? jobLine; try { - jobLine = await sr.ReadLineAsync(); + jobLine = await sr.ReadLineAsync(cancellationToken); } finally { @@ -64,7 +80,7 @@ public FileScheduler(string fileName, string currentJobPositionFileName, ILogger _logger.LogInformation("Writing current job position to file"); - await WriteAllTextAsync(_currentJobPositionFileName, $"{_currentJobPosition++}", cancellationToken); + await File.WriteAllTextAsync(_currentJobPositionFileName, $"{_currentJobPosition++}", cancellationToken); _logger.LogInformation("Deserializing the job and returning it to consumer"); @@ -80,7 +96,7 @@ public async Task AddAsync(Job job, CancellationToken cancellationToken = defaul await _semaphore.WaitAsync(cancellationToken); try { - await AppendAllTextAsync(_fileName, SerializeToJson(job) + Environment.NewLine, cancellationToken); + await File.AppendAllTextAsync(_fileName, SerializeToJson(job) + Environment.NewLine, cancellationToken); } finally { @@ -97,7 +113,7 @@ public async Task AddAsync(IEnumerable jobs, CancellationToken cancellation await _semaphore.WaitAsync(cancellationToken); try { - await AppendAllLinesAsync(_fileName, serializedJobs, cancellationToken); + await File.AppendAllLinesAsync(_fileName, serializedJobs, cancellationToken); } finally { diff --git a/WebReaper/Core/Scheduler/Concrete/InMemoryScheduler.cs b/WebReaper/Core/Scheduler/Concrete/InMemoryScheduler.cs index 1fd7c8f..d02e972 100644 --- a/WebReaper/Core/Scheduler/Concrete/InMemoryScheduler.cs +++ b/WebReaper/Core/Scheduler/Concrete/InMemoryScheduler.cs @@ -13,6 +13,8 @@ public IAsyncEnumerable GetAllAsync(CancellationToken cancellationToken = d return _jobChannel.Reader.ReadAllAsync(cancellationToken); } + public bool DataCleanupOnStart { get; set; } + public async Task AddAsync(Job job, CancellationToken cancellationToken = default) { await _jobChannel.Writer.WriteAsync(job, cancellationToken); diff --git a/WebReaper/Core/Scheduler/Concrete/RedisScheduler.cs b/WebReaper/Core/Scheduler/Concrete/RedisScheduler.cs index aa87910..669261c 100644 --- a/WebReaper/Core/Scheduler/Concrete/RedisScheduler.cs +++ b/WebReaper/Core/Scheduler/Concrete/RedisScheduler.cs @@ -12,10 +12,27 @@ public class RedisScheduler : RedisBase, IScheduler private readonly ILogger _logger; private readonly string _queueName; - public RedisScheduler(string connectionString, string queueName, ILogger logger) : base(connectionString) + public bool DataCleanupOnStart { get; set; } + + public Task Initialization { get; } + + public RedisScheduler(string connectionString, string queueName, ILogger logger, bool dataCleanupOnStart = false) : base(connectionString) { + DataCleanupOnStart = dataCleanupOnStart; _queueName = queueName; _logger = logger; + + Initialization = InitializeAsync(); + } + + private async Task InitializeAsync() + { + if (!DataCleanupOnStart) + return; + + var db = Redis.GetDatabase(); + + await db.KeyDeleteAsync(_queueName); } public async IAsyncEnumerable GetAllAsync( diff --git a/WebReaper/Core/ScraperEngine.cs b/WebReaper/Core/ScraperEngine.cs index 9205108..8abdad8 100644 --- a/WebReaper/Core/ScraperEngine.cs +++ b/WebReaper/Core/ScraperEngine.cs @@ -12,11 +12,13 @@ namespace WebReaper.Core; public class ScraperEngine { public ScraperEngine( + int parallelismDegree, IScraperConfigStorage configStorage, IScheduler jobScheduler, ISpider spider, ILogger logger) { + ParallelismDegree = parallelismDegree; ArgumentNullException.ThrowIfNull(configStorage); ArgumentNullException.ThrowIfNull(jobScheduler); ArgumentNullException.ThrowIfNull(spider); @@ -30,8 +32,10 @@ public class ScraperEngine private IScheduler Scheduler { get; } private ISpider Spider { get; } private ILogger Logger { get; } + + private int ParallelismDegree { get; } - public async Task RunAsync(int parallelismDegree = 8, CancellationToken cancellationToken = default) + public async Task RunAsync(CancellationToken cancellationToken = default) { Logger.LogInformation("Start {class}.{method}", nameof(ScraperEngine), nameof(RunAsync)); @@ -49,7 +53,7 @@ public async Task RunAsync(int parallelismDegree = 8, CancellationToken cancella config.PageActions), cancellationToken); } - var options = new ParallelOptions { MaxDegreeOfParallelism = parallelismDegree }; + var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismDegree }; try { diff --git a/WebReaper/Logging/ColorConsoleLogger.cs b/WebReaper/Logging/ColorConsoleLogger.cs index e380fbd..7bad2bb 100644 --- a/WebReaper/Logging/ColorConsoleLogger.cs +++ b/WebReaper/Logging/ColorConsoleLogger.cs @@ -37,10 +37,7 @@ public bool IsEnabled(LogLevel logLevel) var originalColor = Console.ForegroundColor; Console.ForegroundColor = LogLevelToColorMap[logLevel]; - Console.Write($"[ {logLevel} ] "); - - Console.ForegroundColor = LogLevelToColorMap[logLevel]; - Console.Write($"{formatter(state, exception)}"); + Console.WriteLine($"[{logLevel}] {formatter(state, exception)}"); if (exception != null) Console.WriteLine($"\n\n{exception}");