-
Notifications
You must be signed in to change notification settings - Fork 820
/
TasksStorageActions.cs
121 lines (107 loc) · 4.19 KB
/
TasksStorageActions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//-----------------------------------------------------------------------
// <copyright file="TasksStorageActions.cs" company="Hibernating Rhinos LTD">
// Copyright (c) Hibernating Rhinos LTD. All rights reserved.
// </copyright>
//-----------------------------------------------------------------------
using System;
using log4net;
using Raven.Database.Impl;
using Raven.Database.Storage;
using Raven.Database.Tasks;
using Raven.Json.Linq;
using Raven.Storage.Managed.Impl;
using System.Linq;
using Raven.Database.Extensions;
namespace Raven.Storage.Managed
{
public class TasksStorageActions : ITasksStorageActions
{
private readonly TableStorage storage;
private readonly IUuidGenerator generator;
private readonly ILog logger = LogManager.GetLogger(typeof(TasksStorageActions));
public TasksStorageActions(TableStorage storage, IUuidGenerator generator)
{
this.storage = storage;
this.generator = generator;
}
public void AddTask(Task task, DateTime addedAt)
{
storage.Tasks.Put(new RavenJObject
{
{"index", task.Index},
{"id", generator.CreateSequentialUuid().ToByteArray()},
{"time", addedAt},
{"type", task.Type},
{"mergable", task.SupportsMerging}
}, task.AsBytes());
}
public bool HasTasks
{
get { return ApproximateTaskCount > 0; }
}
public long ApproximateTaskCount
{
get { return storage.Tasks.Count; }
}
public Task GetMergedTask(out int countOfMergedTasks)
{
foreach (var readResult in storage.Tasks)
{
Task task;
try
{
task = Task.ToTask(readResult.Key.Value<string>("type"), readResult.Data());
}
catch (Exception e)
{
logger.ErrorFormat(e, "Could not create instance of a task: {0}", readResult.Key);
continue;
}
MergeSimilarTasks(task, readResult.Key.Value<byte[]>("id"), out countOfMergedTasks);
storage.Tasks.Remove(readResult.Key);
return task;
}
countOfMergedTasks = 0;
return null;
}
private void MergeSimilarTasks(Task task, byte [] taskId, out int taskCount)
{
taskCount = 1;
if (task.SupportsMerging == false)
return;
var keyForTaskToTryMergings = storage.Tasks["ByIndexAndType"].SkipTo(new RavenJObject
{
{"index", task.Index},
{"type", task.Type},
})
.Where(x => new Guid(x.Value<byte[]>("id")) != new Guid(taskId))
.TakeWhile(x =>
StringComparer.InvariantCultureIgnoreCase.Equals(x.Value<string>("index"), task.Index) &&
StringComparer.InvariantCultureIgnoreCase.Equals(x.Value<string>("type"), task.Type)
);
foreach (var keyForTaskToTryMerging in keyForTaskToTryMergings)
{
var readResult = storage.Tasks.Read(keyForTaskToTryMerging);
if(readResult == null)
continue;
Task existingTask;
try
{
existingTask = Task.ToTask(readResult.Key.Value<string>("type"), readResult.Data());
}
catch (Exception e)
{
logger.ErrorFormat(e, "Could not create instance of a task: {0}", readResult.Key);
storage.Tasks.Remove(keyForTaskToTryMerging);
continue;
}
if (task.TryMerge(existingTask) == false)
continue;
storage.Tasks.Remove(keyForTaskToTryMerging);
taskCount += 1;
if (task.SupportsMerging == false)
return;
}
}
}
}