Skip to content

Commit

Permalink
data clean up for scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleksandr Pavlov committed Apr 17, 2023
1 parent 60c1d86 commit 8064b17
Show file tree
Hide file tree
Showing 17 changed files with 133 additions and 38 deletions.
3 changes: 2 additions & 1 deletion Examples/BrownsfashionScraper/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ public override async Task StartAsync(CancellationToken cancellationToken)

})
.WriteToCsvFile("result.csv", true)
.WithParallelismDegree(10)
.BuildAsync();
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _scraper.RunAsync(10, stoppingToken);
await _scraper.RunAsync(stoppingToken);
}
}
}
4 changes: 3 additions & 1 deletion Examples/WebReaper.ConsoleApplication/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
})
.WriteToJsonFile("output.json", dataCleanupOnStart: true)
.TrackVisitedLinksInFile("visited.txt", dataCleanupOnStart: true)
.WithTextFileScheduler("jobs.txt", "currentJob.txt", dataCleanupOnStart: true)
.LogToConsole()
.PageCrawlLimit(10)
.PageCrawlLimit(500)
.HeadlessMode(true)
.WithParallelismDegree(30)
.BuildAsync();

await engine.RunAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ public override async Task StartAsync(CancellationToken cancellationToken)
"Rutracker",
true)
.WithAzureServiceBusScheduler(azureSBConnectionString, queue)
.WithParallelismDegree(10)
.BuildAsync();
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await engine.RunAsync(10, stoppingToken);
await engine.RunAsync(stoppingToken);
}
}

3 changes: 2 additions & 1 deletion Examples/WebReaper.ScraperWorkerService/ScrapingWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public override async Task StartAsync(CancellationToken cancellationToken)
.TrackVisitedLinksInRedis("localhost:6379", "rutracker-visited-links")
.WriteToRedis("localhost:6379", "rutracker-audiobooks", true)
.WithRedisConfigStorage("localhost:6379", "rutracker-scraper-config")
.WithParallelismDegree(20)
.BuildAsync();
}

Expand Down Expand Up @@ -78,7 +79,7 @@ private static async Task ParseTorrentStats(Metadata meta, JObject result)

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _engine.RunAsync(20, stoppingToken);
await _engine.RunAsync( stoppingToken);
}
}

14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,24 @@ This dataCleanupOnStart parameter is present for all sinks, e.g. MongoDbSink, Re

### How to clean visited links from the previous web scrapping run

To clean up the list of visited links just pass true for dataCleanupOnStart parameter:

```C#
var engine = await new ScraperEngineBuilder()
.Get("https://www.reddit.com/r/dotnet/")
.TrackVisitedLinksInFile("visited.txt", dataCleanupOnStart: true)
```

### How to clean job queue from the previous web scraping run

Job queue is a queue of tasks schedules for web scraper. To clean up the job queue pass the dataCleanupOnStart parameter set to true.

```C#
var engine = await new ScraperEngineBuilder()
.Get("https://www.reddit.com/r/dotnet/")
.WithTextFileScheduler("jobs.txt", "currentJob.txt", dataCleanupOnStart: true)
```

### Distributed web scraping with Serverless approach

In the Examples folder you can find the project called WebReaper.AzureFuncs. It demonstrates the use of WebReaper with
Expand Down Expand Up @@ -208,7 +220,7 @@ Finally, it iterates through these new jobs and sends them the the Job queue.
Out of the box there are 4 sinks you can send your parsed data to: ConsoleSink, CsvFileSink, JsonFileSink, CosmosSink (
Azure Cosmos database).

You can easly add your own by implementing the IScraperSink interface:
You can easily add your own by implementing the IScraperSink interface:

```C#
public interface IScraperSink
Expand Down
9 changes: 6 additions & 3 deletions WebReaper.Tests/WebReaper.IntegrationTests/ScraperTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ public async Task SimpleTest()
})
.WithLogger(new TestOutputLogger(output))
.Subscribe(x => result.Add(x))
.WithParallelismDegree(1)
.BuildAsync();

_ = engine.RunAsync(1);
_ = engine.RunAsync();

await Task.Delay(10000);

Expand All @@ -90,9 +91,10 @@ public async Task SimpleTestWithProxy()
.WithLogger(new TestOutputLogger(this.output))
.WithProxies(new WebShareProxyProvider())
.Subscribe(x => result.Add(x))
.WithParallelismDegree(2)
.BuildAsync();

_ = scraper.RunAsync(2);
_ = scraper.RunAsync();

await Task.Delay(30000);

Expand Down Expand Up @@ -121,9 +123,10 @@ public async Task SimpleTestWithSPA()
})
.WithLogger(new TestOutputLogger(this.output))
.Subscribe(x => result.Add(x))
.WithParallelismDegree(10)
.BuildAsync();

_ = engine.RunAsync(10);
_ = engine.RunAsync();

await Task.Delay(20000);

Expand Down
30 changes: 23 additions & 7 deletions WebReaper/Builders/ScraperEngineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace WebReaper.Builders;
public class ScraperEngineBuilder
{
private IVisitedLinkTracker _visitedLinksTracker = new InMemoryVisitedLinkTracker();
private int _parallelismDegree = 8;
private ConfigBuilder ConfigBuilder { get; } = new();
private SpiderBuilder SpiderBuilder { get; } = new();

Expand Down Expand Up @@ -222,21 +223,30 @@ public ScraperEngineBuilder WithScheduler(IScheduler scheduler)
return this;
}

public ScraperEngineBuilder WithAzureServiceBusScheduler(string connectionString, string queueName)
public ScraperEngineBuilder WithAzureServiceBusScheduler(
string connectionString,
string queueName,
bool dataCleanupOnStart = false)
{
Scheduler = new AzureServiceBusScheduler(connectionString, queueName);
Scheduler = new AzureServiceBusScheduler(connectionString, queueName, dataCleanupOnStart);
return this;
}

public ScraperEngineBuilder WithTextFileScheduler(string fileName, string currentJobPositionFileName)
public ScraperEngineBuilder WithTextFileScheduler(
string fileName,
string currentJobPositionFileName,
bool dataCleanupOnStart = false)
{
Scheduler = new FileScheduler(fileName, currentJobPositionFileName, Logger);
Scheduler = new FileScheduler(fileName, currentJobPositionFileName, Logger, dataCleanupOnStart);
return this;
}

public ScraperEngineBuilder WithRedisScheduler(string connectionString, string queueName)
public ScraperEngineBuilder WithRedisScheduler(
string connectionString,
string queueName,
bool dataCleanupOnStart = false)
{
Scheduler = new RedisScheduler(connectionString, queueName, Logger);
Scheduler = new RedisScheduler(connectionString, queueName, Logger, dataCleanupOnStart);
return this;
}

Expand Down Expand Up @@ -301,6 +311,12 @@ public ScraperEngineBuilder PostProcess(Func<Metadata, JObject, Task> action)
return this;
}

public ScraperEngineBuilder WithParallelismDegree(int parallelismDegree)
{
_parallelismDegree = parallelismDegree;
return this;
}

public async Task<ScraperEngine> BuildAsync()
{
await _visitedLinksTracker.Initialization;
Expand All @@ -312,6 +328,6 @@ public async Task<ScraperEngine> BuildAsync()

await ConfigStorage.CreateConfigAsync(config);

return new ScraperEngine(ConfigStorage, Scheduler, spider, Logger);
return new ScraperEngine(_parallelismDegree, ConfigStorage, Scheduler, spider, Logger);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Collections.Concurrent;
using Microsoft.Azure.Amqp.Framing;
using WebReaper.Core.LinkTracker.Abstract;

namespace WebReaper.Core.LinkTracker.Concrete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public class RedisVisitedLinkTracker : RedisBase, IVisitedLinkTracker
private readonly string _redisKey;

public bool DataCleanupOnStart { get; set; }

public Task Initialization { get; }

public RedisVisitedLinkTracker(string connectionString, string redisKey, bool dataCleanupOnStart = false)
: base(connectionString)
Expand All @@ -17,8 +19,6 @@ public RedisVisitedLinkTracker(string connectionString, string redisKey, bool da
DataCleanupOnStart = dataCleanupOnStart;
}

public Task Initialization { get; }

private async Task InitializeAsync()
{
if (!DataCleanupOnStart)
Expand Down
2 changes: 1 addition & 1 deletion WebReaper/Core/Loaders/Abstract/BrowserPageLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public abstract class BrowserPageLoader
PageActionType.ScrollToEnd,
async (page, data) => await page.EvaluateExpressionAsync("window.scrollTo(0, document.body.scrollHeight);")
},
{ PageActionType.Wait, async (page, data) => await Task.Delay((int)data.First()) },
{ PageActionType.Wait, async (page, data) => await Task.Delay(Convert.ToInt32(data.First())) },
{ PageActionType.WaitForNetworkIdle, async (page, data) => await page.WaitForNetworkIdleAsync() },
{ PageActionType.Click, async (page, data) => await page.ClickAsync((string)data.First()) }
};
Expand Down
2 changes: 2 additions & 0 deletions WebReaper/Core/Scheduler/Abstract/IScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ namespace WebReaper.Core.Scheduler.Abstract;

public interface IScheduler
{
public bool DataCleanupOnStart { get; set; }

Task AddAsync(Job job, CancellationToken cancellationToken = default);
Task AddAsync(IEnumerable<Job> jobs, CancellationToken cancellationToken = default);
IAsyncEnumerable<Job> GetAllAsync(CancellationToken cancellationToken = default);
Expand Down
28 changes: 25 additions & 3 deletions WebReaper/Core/Scheduler/Concrete/AzureServiceBusScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,49 @@
using Newtonsoft.Json;
using WebReaper.Core.Scheduler.Abstract;
using WebReaper.Domain;
using Azure.Messaging.ServiceBus.Administration;

namespace WebReaper.Core.Scheduler.Concrete;

public class AzureServiceBusScheduler : IScheduler, IAsyncDisposable
{
private readonly string _queueName;
private readonly ServiceBusClient _client;

private readonly ServiceBusReceiver _receiver;

private readonly ServiceBusSender _sender;
private readonly ServiceBusAdministrationClient _adminClient;

public AzureServiceBusScheduler(string serviceBusConnectionString, string queueName)
public bool DataCleanupOnStart { get; set; }

public Task Initialization { get; }

public AzureServiceBusScheduler(string serviceBusConnectionString, string queueName, bool dataCleanupOnStart = false)
{
_queueName = queueName;
DataCleanupOnStart = dataCleanupOnStart;
_client = new ServiceBusClient(serviceBusConnectionString);

_receiver = _client.CreateReceiver(queueName, new ServiceBusReceiverOptions
_receiver = _client.CreateReceiver(_queueName, new ServiceBusReceiverOptions
{
PrefetchCount = 10
});

_sender = _client.CreateSender(queueName);
_sender = _client.CreateSender(_queueName);

_adminClient = new ServiceBusAdministrationClient(serviceBusConnectionString);

Initialization = InitializeAsync();
}

private async Task InitializeAsync()
{
if (DataCleanupOnStart)
{
await _adminClient.DeleteQueueAsync(_queueName);
await _adminClient.CreateQueueAsync(_queueName);
}
}

public async ValueTask DisposeAsync()
Expand Down
34 changes: 25 additions & 9 deletions WebReaper/Core/Scheduler/Concrete/FileScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Newtonsoft.Json;
using WebReaper.Core.Scheduler.Abstract;
using WebReaper.Domain;
using static System.IO.File;

namespace WebReaper.Core.Scheduler.Concrete;

Expand All @@ -15,14 +14,31 @@ public class FileScheduler : IScheduler

private readonly SemaphoreSlim _semaphore = new(1, 1);
private long _currentJobPosition;
public Task Initialization { get; }

public FileScheduler(string fileName, string currentJobPositionFileName, ILogger logger)
public bool DataCleanupOnStart { get; set; }

public FileScheduler(string fileName, string currentJobPositionFileName, ILogger logger, bool dataCleanupOnStart = false)
{
DataCleanupOnStart = dataCleanupOnStart;
_fileName = fileName;
_currentJobPositionFileName = currentJobPositionFileName;
_logger = logger;
if (Exists(_currentJobPositionFileName))
_currentJobPosition = int.Parse(ReadAllText(_currentJobPositionFileName));

Initialization = InitializeAsync();
}

private async Task InitializeAsync()
{
if (DataCleanupOnStart)
{
File.Delete(_fileName);
File.Delete(_currentJobPositionFileName);
return;
}

if (File.Exists(_currentJobPositionFileName))
_currentJobPosition = int.Parse(await File.ReadAllTextAsync(_currentJobPositionFileName));
}

public async IAsyncEnumerable<Job> GetAllAsync(
Expand All @@ -39,7 +55,7 @@ public FileScheduler(string fileName, string currentJobPositionFileName, ILogger
for (var i = 0; i < _currentJobPosition; i++)
{
_logger.LogInformation("Skipping {Count} line", i);
await sr.ReadLineAsync();
await sr.ReadLineAsync(cancellationToken);
}

while (true)
Expand All @@ -48,7 +64,7 @@ public FileScheduler(string fileName, string currentJobPositionFileName, ILogger
string? jobLine;
try
{
jobLine = await sr.ReadLineAsync();
jobLine = await sr.ReadLineAsync(cancellationToken);
}
finally
{
Expand All @@ -64,7 +80,7 @@ public FileScheduler(string fileName, string currentJobPositionFileName, ILogger

_logger.LogInformation("Writing current job position to file");

await WriteAllTextAsync(_currentJobPositionFileName, $"{_currentJobPosition++}", cancellationToken);
await File.WriteAllTextAsync(_currentJobPositionFileName, $"{_currentJobPosition++}", cancellationToken);

_logger.LogInformation("Deserializing the job and returning it to consumer");

Expand All @@ -80,7 +96,7 @@ public async Task AddAsync(Job job, CancellationToken cancellationToken = defaul
await _semaphore.WaitAsync(cancellationToken);
try
{
await AppendAllTextAsync(_fileName, SerializeToJson(job) + Environment.NewLine, cancellationToken);
await File.AppendAllTextAsync(_fileName, SerializeToJson(job) + Environment.NewLine, cancellationToken);
}
finally
{
Expand All @@ -97,7 +113,7 @@ public async Task AddAsync(IEnumerable<Job> jobs, CancellationToken cancellation
await _semaphore.WaitAsync(cancellationToken);
try
{
await AppendAllLinesAsync(_fileName, serializedJobs, cancellationToken);
await File.AppendAllLinesAsync(_fileName, serializedJobs, cancellationToken);
}
finally
{
Expand Down
2 changes: 2 additions & 0 deletions WebReaper/Core/Scheduler/Concrete/InMemoryScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public IAsyncEnumerable<Job> GetAllAsync(CancellationToken cancellationToken = d
return _jobChannel.Reader.ReadAllAsync(cancellationToken);
}

public bool DataCleanupOnStart { get; set; }

public async Task AddAsync(Job job, CancellationToken cancellationToken = default)
{
await _jobChannel.Writer.WriteAsync(job, cancellationToken);
Expand Down

0 comments on commit 8064b17

Please sign in to comment.