Skip to content

Commit

Permalink
search indexing, fix for cron definition for seq polling
Browse files Browse the repository at this point in the history
  • Loading branch information
markledwich2 committed Apr 12, 2020
1 parent 6e50c3b commit bf7c35e
Show file tree
Hide file tree
Showing 20 changed files with 436 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Expand Up @@ -6,5 +6,5 @@
"debug.internalConsoleOptions": "neverOpen",
"dotnet-test-explorer.testProjectPath": "/App/Tests",
"python.dataScience.jupyterServerURI": " http://localhost:8888/?token=e272f55131dfdbf050b519e59b2e77600b76178603e5e338",
"python.pythonPath": "/opt/python/latest/bin/python3.8"
"python.pythonPath": "C:\\Users\\mark\\AppData\\Local\\Programs\\Python\\Python38-32\\python.exe"
}
8 changes: 4 additions & 4 deletions App/Mutuo.Etl/Pipe/Pipes.cs
Expand Up @@ -38,23 +38,23 @@ public static class Pipes {
/// <summary>Executes pipe's on the items (logger, no result)</summary>
public static async Task RunPipe<TIn>(
this IEnumerable<TIn> items, Func<IReadOnlyCollection<TIn>, ILogger, Task> transform, IPipeCtx ctx, PipeRunCfg runCfg, ILogger log) =>
await RunPipeMethod<object>(items.Cast<object>(), transform.Method, ctx, runCfg, log);
await RunPipeMethod<object>(items.Cast<object>().ToArray(), transform.Method, ctx, runCfg, log);

/// <summary>Executes pipe's on the items (logger, result)</summary>
public static async Task<IReadOnlyCollection<(PipeRunMetadata Metadata, TOut OutState)>> RunPipe<TIn, TOut>(
this IEnumerable<TIn> items, Func<IReadOnlyCollection<TIn>, ILogger, Task<TOut>> transform, IPipeCtx ctx, PipeRunCfg runCfg, ILogger log)
where TOut : class =>
await RunPipeMethod<TOut>(items.Cast<object>(), transform.Method, ctx, runCfg, log);
await RunPipeMethod<TOut>(items.Cast<object>().ToArray(), transform.Method, ctx, runCfg, log);

/// <summary>Executes pipe's on the items (No logger, result)</summary>
public static async Task<IReadOnlyCollection<(PipeRunMetadata Metadata, TOut OutState)>> RunPipe<TIn, TOut>(
this IEnumerable<TIn> items, Func<IReadOnlyCollection<TIn>, Task<TOut>> transform, IPipeCtx ctx, PipeRunCfg runCfg, ILogger log) where TOut : class =>
await RunPipeMethod<TOut>(items.Cast<object>(), transform.Method, ctx, runCfg, log);
await RunPipeMethod<TOut>(items.Cast<object>().ToArray(), transform.Method, ctx, runCfg, log);

/// <summary>Runs a pipe to process a list of work in batches on multiple containers. The transform is used to provide
/// strong typing, but may not actually be run locally.</summary>
static async Task<IReadOnlyCollection<(PipeRunMetadata Metadata, TOut OutState)>> RunPipeMethod<TOut>(
this IEnumerable<object> items, MethodInfo method, IPipeCtx ctx, PipeRunCfg runCfg, ILogger log) {
this IReadOnlyCollection<object> items, MethodInfo method, IPipeCtx ctx, PipeRunCfg runCfg, ILogger log) {
var isPipe = method.GetCustomAttribute<PipeAttribute>() != null;
if (!isPipe) throw new InvalidOperationException($"given transform '{method.Name}' must be a pipe");
var pipeNme = method.Name;
Expand Down
15 changes: 10 additions & 5 deletions App/Mutuo.Tools/SchemaTool.cs
Expand Up @@ -8,6 +8,7 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Schema.Generation;
using Newtonsoft.Json.Serialization;
using SysExtensions.Collections;
using SysExtensions.IO;
using SysExtensions.Threading;

Expand All @@ -16,19 +17,20 @@ namespace Mutuo.Tools {
public static class SchemaTool {
/// <summary>Save a schema file for a type</summary>
[DisplayName("schema")]
public static async Task GenerateSchema(string types, DirectoryInfo dir) {
public static async Task GenerateSchema(string types, DirectoryInfo dir, bool ignoreRequired) {
// build proj
var shell = new Shell(o => o.WorkingDirectory(dir.FullName));

var run = shell.Run("dotnet", "build");
await run.StandardOutput.PipeToAsync(Console.Out);
var res = await run.Task;
if (!res.Success) throw new InvalidOperationException($"build failed: {res.StandardError}");
if (!res.Success) throw new InvalidOperationException($"build failed: {res.StandardError}");

var projPath = dir.FullName.AsPath();
var projFile = projPath.Files("*.csproj", false).FirstOrDefault() ?? throw new InvalidOperationException("Can't find project");
var latestAssembly = projPath.Files($"{projFile.FileNameWithoutExtension}.dll", true)
.OrderByDescending(f => f.FileInfo().LastWriteTime).FirstOrDefault() ?? throw new InvalidOperationException("Can't find built assembly");
.OrderByDescending(f => f.FileInfo().LastWriteTime).FirstOrDefault() ??
throw new InvalidOperationException("Can't find built assembly");

var a = Assembly.LoadFrom(latestAssembly.FullPath);
await types.Split("|").BlockAction(async type => {
Expand All @@ -42,6 +44,9 @@ public static class SchemaTool {
GenerationProviders = {new StringEnumGenerationProvider()}
};
var schema = g.Generate(t);
if (ignoreRequired)
foreach (var s in schema.AsEnumerable().WithDescendants(s => s.Properties.Values.Concat(s.Items)))
s.Required.Clear();
await File.WriteAllTextAsync($"{dir.FullName}/{t.Name}.schema.json", schema.ToString());
});
}
Expand Down
170 changes: 158 additions & 12 deletions App/SysExtensions/Collections/EnumerableExtensions.cs
@@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
Expand Down Expand Up @@ -33,27 +34,172 @@ public static IEnumerable<T> NotNull<T>(this IEnumerable<T> items)

public static IEnumerable<T> SelectMany<T>(this IEnumerable<IEnumerable<T>> items) => items.SelectMany(i => i);

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size) {
using var enumerator = source.GetEnumerator();
while (enumerator.MoveNext())
yield return TakeIEnumerator(enumerator, size);
public static IEnumerable<T> WithDescendants<T>(this IEnumerable<T> items, Func<T, IEnumerable<T>> children) {
var toRecurse = new Queue<T>(items);
while (toRecurse.Count > 0) {
var item = toRecurse.Dequeue();
yield return item;
foreach (var c in children(item)) toRecurse.Enqueue(c);
}
}

public static IEnumerable<T[]> BatchGreedy<T>(this IEnumerable<T> source, int size) {
var items = source.ToQueue();
while (items.Count > 0) yield return items.Dequeue(size).ToArray();
}

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size, int maxBatches) {
var items = source as IReadOnlyCollection<T> ?? source.ToArray();
return items.Batch(Math.Max(items.Count / maxBatches, size));
public static IEnumerable<IReadOnlyCollection<T>> Batch<T>(this IReadOnlyCollection<T> items, int batchSize, int maxBatches) =>
items.Batch(Math.Max(items.Count / maxBatches, batchSize));

public static IEnumerable<IReadOnlyCollection<T>> Batch<T>(this IEnumerable<T> items, int batchSize) {
var b = new List<T>(batchSize);
foreach (var item in items) {
b.Add(item);
if (b.Count != batchSize) continue;
yield return b;
b = new List<T>(batchSize);
}
if (b.Count > 0)
yield return b;
}

public static IEnumerable<IGrouping<TKey, TSource>> ChunkBy<TSource, TKey>(this IEnumerable<TSource> source, Func<TSource, TKey> keySelector) =>
source.ChunkBy(keySelector, EqualityComparer<TKey>.Default);

public static IEnumerable<IGrouping<TKey, TSource>> ChunkBy<TSource, TKey>(this IEnumerable<TSource> source, Func<TSource, TKey> keySelector,
IEqualityComparer<TKey> comparer) {
// Flag to signal end of source sequence.
const bool noMoreSourceElements = true;

// Auto-generated iterator for the source array.
var enumerator = source.GetEnumerator();

// Move to the first element in the source sequence.
if (!enumerator.MoveNext()) yield break;

// Iterate through source sequence and create a copy of each Chunk.
// On each pass, the iterator advances to the first element of the next "Chunk"
// in the source sequence. This loop corresponds to the outer foreach loop that
// executes the query.
Chunk<TKey, TSource> current = null;
while (true) {
// Get the key for the current Chunk. The source iterator will churn through
// the source sequence until it finds an element with a key that doesn't match.
var key = keySelector(enumerator.Current);

// Make a new Chunk (group) object that initially has one GroupItem, which is a copy of the current source element.
current = new Chunk<TKey, TSource>(key, enumerator, value => comparer.Equals(key, keySelector(value)));

// Return the Chunk. A Chunk is an IGrouping<TKey,TSource>, which is the return value of the ChunkBy method.
// At this point the Chunk only has the first element in its source sequence. The remaining elements will be
// returned only when the client code foreach's over this chunk. See Chunk.GetEnumerator for more info.
yield return current;

// Check to see whether (a) the chunk has made a copy of all its source elements or
// (b) the iterator has reached the end of the source sequence. If the caller uses an inner
// foreach loop to iterate the chunk items, and that loop ran to completion,
// then the Chunk.GetEnumerator method will already have made
// copies of all chunk items before we get here. If the Chunk.GetEnumerator loop did not
// enumerate all elements in the chunk, we need to do it here to avoid corrupting the iterator
// for clients that may be calling us on a separate thread.
if (current.CopyAllChunkElements() == noMoreSourceElements) yield break;
}
}

static IEnumerable<T> TakeIEnumerator<T>(IEnumerator<T> source, int size) {
var i = 0;
do
yield return source.Current;
while (++i < size && source.MoveNext());
// from https://docs.microsoft.com/en-us/dotnet/csharp/linq/group-results-by-contiguous-keys
// A Chunk is a contiguous group of one or more source elements that have the same key. A Chunk
// has a key and a list of ChunkItem objects, which are copies of the elements in the source sequence.
class Chunk<TKey, TSource> : IGrouping<TKey, TSource> {
class ChunkItem {
public ChunkItem(TSource value) => Value = value;
public readonly TSource Value;
public ChunkItem Next;
}

IEnumerator<TSource> enumerator;
Func<TSource, bool> predicate;
readonly ChunkItem head;
ChunkItem tail;
internal bool isLastSourceElement;
readonly object m_Lock;

public Chunk(TKey key, IEnumerator<TSource> enumerator, Func<TSource, bool> predicate) {
Key = key;
this.enumerator = enumerator;
this.predicate = predicate;
head = new ChunkItem(enumerator.Current);
tail = head;
m_Lock = new object();
}

// Indicates that all chunk elements have been copied to the list of ChunkItems,
// and the source enumerator is either at the end, or else on an element with a new key.
// the tail of the linked list is set to null in the CopyNextChunkElement method if the
// key of the next element does not match the current chunk's key, or there are no more elements in the source.
bool DoneCopyingChunk => tail == null;

// Adds one ChunkItem to the current group
// REQUIRES: !DoneCopyingChunk && lock(this)
void CopyNextChunkElement() {
// Try to advance the iterator on the source sequence.
// If MoveNext returns false we are at the end, and isLastSourceElement is set to true
isLastSourceElement = !enumerator.MoveNext();

// If we are (a) at the end of the source, or (b) at the end of the current chunk
// then null out the enumerator and predicate for reuse with the next chunk.
if (isLastSourceElement || !predicate(enumerator.Current)) {
enumerator = null;
predicate = null;
}
else {
tail.Next = new ChunkItem(enumerator.Current);
}

// tail will be null if we are at the end of the chunk elements
// This check is made in DoneCopyingChunk.
tail = tail.Next;
}

// Called after the end of the last chunk was reached. It first checks whether
// there are more elements in the source sequence. If there are, it
// Returns true if enumerator for this chunk was exhausted.
internal bool CopyAllChunkElements() {
while (true)
lock (m_Lock)
if (DoneCopyingChunk) // If isLastSourceElement is false,
// it signals to the outer iterator
// to continue iterating.
return isLastSourceElement;
else
CopyNextChunkElement();
}

public TKey Key { get; }

// Invoked by the inner foreach loop. This method stays just one step ahead
// of the client requests. It adds the next element of the chunk only after
// the clients requests the last element in the list so far.
public IEnumerator<TSource> GetEnumerator() {
//Specify the initial element to enumerate.
var current = head;

// There should always be at least one ChunkItem in a Chunk.
while (current != null) {
// Yield the current item in the list.
yield return current.Value;

// Copy the next item from the source sequence,
// if we are at the end of our local list.
lock (m_Lock)
if (current == tail)
CopyNextChunkElement();

// Move to the next ChunkItem in the list.
current = current.Next;
}
}

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
}
}
7 changes: 7 additions & 0 deletions App/SysExtensions/Net/UriExtensions.cs
@@ -0,0 +1,7 @@
using System;

namespace SysExtensions.Net {
public static class UriExtensions {
public static UriBuilder Build(this Uri uri) => new UriBuilder(uri);
}
}
4 changes: 2 additions & 2 deletions App/SysExtensions/Serialization/JsonExtensions.cs
Expand Up @@ -71,12 +71,12 @@ public static class JsonExtensions {
public static JObject ToJObject(this object o, JsonSerializerSettings settings = null, JsonLoadSettings loadSettings = null)
=> (JObject) ToJToken(o, settings, loadSettings);

public static string ToJson<T>(this T o, JsonSerializerSettings settings = null) where T : new() {
public static string ToJson<T>(this T o, JsonSerializerSettings settings = null) {
settings ??= DefaultSettings();
return JsonConvert.SerializeObject(o, settings);
}

public static Stream ToJsonStream<T>(this T o, JsonSerializerSettings settings = null) where T : new() {
public static Stream ToJsonStream<T>(this T o, JsonSerializerSettings settings = null) {
settings ??= DefaultSettings();
var ms = new MemoryStream();
using (var sw = new StreamWriter(ms, leaveOpen: true))
Expand Down
13 changes: 13 additions & 0 deletions App/SysExtensions/Text/HumanizeExtensions.cs
@@ -1,4 +1,7 @@
using System;
using System.Linq;
using Humanizer.Localisation;
using static Humanizer.Localisation.TimeUnit;

namespace SysExtensions.Text {
public static class HumanizeExtensions {
Expand All @@ -23,6 +26,16 @@ public static class HumanizeExtensions {
throw new ArgumentOutOfRangeException();
}
}

public static string HumanizeShort(this TimeSpan t, int precision = 2, TimeUnit minUnit = Second) {
var units = new (int v, string s, TimeUnit u)[]
{(t.Days, "d", Day), (t.Hours, "h", Hour), (t.Minutes, "m", Minute), (t.Seconds, "s", Second), (t.Milliseconds, "ms", Millisecond)};
var res = units
.SkipWhile(s => s.Item1 == 0 && s.u > minUnit)
.Take(precision).Where(s => s.u >= minUnit)
.Join(" ", s => $"{s.Item1}{s.Item2}");
return res;
}
}

public class Speed {
Expand Down
6 changes: 1 addition & 5 deletions App/SysExtensions/Threading/BlockExtensions.cs
Expand Up @@ -60,11 +60,7 @@ public static class BlockExtensions {
}

static async Task ProduceAsync<T>(this IEnumerable<T> source, ITargetBlock<T> block) {
foreach (var item in source) {
var res = await block.SendAsync(item);
if (!res)
throw new InvalidOperationException("Unable to send item to target block");
}
foreach (var item in source) await block.SendAsync(item);
block.Complete();
}
}
Expand Down
7 changes: 4 additions & 3 deletions App/Tests/Tests.csproj
Expand Up @@ -3,12 +3,13 @@
<TargetFramework>netcoreapp3.1</TargetFramework>
<IsPackable>false</IsPackable>
<LangVersion>8</LangVersion>
<OutputType>Library</OutputType>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="FluentAssertions" Version="5.10.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="MSTest.TestAdapter" Version="2.1.0" />
<PackageReference Include="MSTest.TestFramework" Version="2.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.0-preview-20200318-01" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.16.1" />
<PackageReference Include="Serilog.Sinks.Trace" Version="2.1.0" />
<PackageReference Include="Serilog.Sinks.Debug" Version="1.0.1" />
</ItemGroup>
Expand Down
19 changes: 17 additions & 2 deletions App/Tests/YTReaderTests.cs
@@ -1,5 +1,20 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using FluentAssertions;
using Humanizer;
using NUnit.Framework;
using SysExtensions.Text;

namespace Tests {
[TestClass] public class YtReaderTests { }
public static class FormattingTests {

[Test]
public static void TestTimestampHumanise() {
120.Seconds().HumanizeShort().Should().Be("2m 0s");
0.Seconds().HumanizeShort().Should().Be("0s");
12.Seconds().HumanizeShort().Should().Be("12s");
TimeSpan.FromMilliseconds(2040).HumanizeShort().Should().Be("2s");
new TimeSpan(1, 2, 3, 4).HumanizeShort().Should().Be("1d 2h");
new TimeSpan(1, 2, 3, 4).HumanizeShort(4).Should().Be("1d 2h 3m 4s");
}
}
}

0 comments on commit bf7c35e

Please sign in to comment.