Skip to content

Commit

Permalink
update to latest MBrace.Core
Browse files Browse the repository at this point in the history
  • Loading branch information
eiriktsarpalis committed Jul 18, 2016
1 parent 96e2243 commit 0402b76
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 63 deletions.
2 changes: 1 addition & 1 deletion MBrace.AWS.sln
@@ -1,6 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 14
VisualStudioVersion = 14.0.25123.0
VisualStudioVersion = 14.0.25420.1
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".paket", ".paket", "{63297B98-5CED-492C-A5B7-A5B4F73CF142}"
ProjectSection(SolutionItems) = preProject
Expand Down
5 changes: 3 additions & 2 deletions paket.dependencies
Expand Up @@ -4,8 +4,8 @@ source https://nuget.org/api/v2

nuget NUnit ~> 2.0
nuget NUnit.Runners ~> 2.0
nuget MBrace.Core ~> 1.2.6
nuget MBrace.Runtime ~> 1.2.6
nuget MBrace.Core ~> 1.3.0
nuget MBrace.Runtime ~> 1.3.0
nuget MBrace.Tests
nuget FSharp.Compiler.Service
nuget MathNet.Numerics ~> 3.7.0
Expand All @@ -18,6 +18,7 @@ nuget AWSSDK.S3 ~> 3.0
nuget AWSSDK.DynamoDBv2 ~> 3.0
nuget AWSSDK.SQS ~> 3.0
nuget Argu ~> 3.0
nuget Unquote
nuget FSharp.AWS.DynamoDB prerelease

nuget Owin version_in_path: true
Expand Down
47 changes: 28 additions & 19 deletions paket.lock
Expand Up @@ -2,20 +2,23 @@ REDIRECTS: ON
FRAMEWORK: >= NET45
NUGET
remote: https://www.nuget.org/api/v2
Argu (3.0)
AWSSDK.Core (3.1.8.1)
AWSSDK.DynamoDBv2 (3.1.5.1)
AWSSDK.Core (>= 3.1.8 < 3.2)
AWSSDK.S3 (3.1.8)
AWSSDK.Core (>= 3.1.8 < 3.2)
AWSSDK.SQS (3.1.0.11)
AWSSDK.Core (>= 3.1.8 < 3.2)
specs:
Argu (3.2)
AWSSDK.Core (3.1.9)
AWSSDK.DynamoDBv2 (3.1.5.2)
AWSSDK.Core (>= 3.1.9 < 3.2)
AWSSDK.S3 (3.1.8.1)
AWSSDK.Core (>= 3.1.9 < 3.2)
AWSSDK.SQS (3.1.0.12)
AWSSDK.Core (>= 3.1.9 < 3.2)
FsCheck (2.5)
FSharp.Core (>= 3.1.2.5)
FSharp.AWS.DynamoDB (0.4.1-beta)
AWSSDK.DynamoDBv2 (>= 3.1.4)
Unquote (>= 3.1.1)
FSharp.Compiler.Service (5.0)
FSharp.Compiler.Service (6.0)
System.Collections.Immutable (>= 1.2)
System.Reflection.Metadata (>= 1.4.1-beta-24227-04)
FSharp.Core (4.0.0.1)
FsPickler (2.1)
FsPickler.Json (2.1)
Expand All @@ -27,21 +30,22 @@ NUGET
MathNet.Numerics (3.7.1)
MathNet.Numerics.MKL.Win-x64 (1.8)
MathNet.Numerics (>= 2.4)
MBrace.Core (1.2.6)
MBrace.Flow (1.2.6)
MBrace.Core (1.3)
MBrace.Flow (1.3)
FSharp.Core (>= 3.0)
MBrace.Core (1.2.6)
MBrace.Core (1.3)
Streams (>= 0.4 < 0.5)
MBrace.Runtime (1.2.6)
MBrace.Runtime (1.3)
FsPickler (>= 2.1 < 2.2)
FsPickler.Json (>= 2.1 < 2.2)
MBrace.Core (1.2.6)
MBrace.Core (1.3)
Vagabond (>= 0.13 < 0.14)
MBrace.Tests (1.2.6)
MBrace.Tests (1.3)
FsCheck (>= 2.0.1)
MBrace.Core (1.2.6)
MBrace.Flow (1.2.6)
MBrace.Core (1.3)
MBrace.Flow (1.3)
NUnit (>= 2.6 < 3.0)
Unquote (>= 3.0 < 4.0)
Microsoft.Owin (3.0.1) - version_in_path: true
Owin (>= 1.0)
Microsoft.Owin.Host.SystemWeb (3.0.1)
Expand All @@ -53,6 +57,9 @@ NUGET
NUnit.Runners (2.6.4)
Owin (1.0) - version_in_path: true
Streams (0.4.1)
System.Collections.Immutable (1.2)
System.Reflection.Metadata (1.4.1-beta-24227-04)
System.Collections.Immutable (>= 1.2)
Unquote (3.1.1)
Vagabond (0.13)
FsPickler (>= 2.1 < 3.0)
Expand All @@ -61,7 +68,8 @@ NUGET
GROUP Build
NUGET
remote: https://www.nuget.org/api/v2
FAKE (4.29.2)
specs:
FAKE (4.31.1)
Microsoft.Bcl (1.1.10) - framework: net10, net11, net20, net30, net35, net40, net40-full
Microsoft.Bcl.Build (>= 1.0.14)
Microsoft.Bcl.Build (1.0.21) - import_targets: false, framework: net10, net11, net20, net30, net35, net40, net40-full
Expand All @@ -74,5 +82,6 @@ NUGET
SourceLink.Fake (1.1)
GITHUB
remote: fsharp/FAKE
modules/Octokit/Octokit.fsx (c56456abac6b744c3bb95b217687db19fd19b367)
specs:
modules/Octokit/Octokit.fsx (40dd0ff66fc379474450f93fd1ba545f6cf6c121)
Octokit (>= 0.20)
1 change: 0 additions & 1 deletion src/MBrace.AWS/Runtime/Entities/WorkerId.fs
Expand Up @@ -3,7 +3,6 @@
open System

open MBrace.Runtime
open MBrace.Runtime.Utils.PerformanceMonitor
open MBrace.AWS.Runtime.Utilities

open FSharp.AWS.DynamoDB
Expand Down
4 changes: 2 additions & 2 deletions src/MBrace.AWS/Runtime/WorkerManager.fs
Expand Up @@ -125,7 +125,7 @@ type WorkerManager private (clusterId : ClusterId, logger : ISystemLogger) =

member this.GetAvailableWorkers() = this.GetAvailableWorkers()

member __.SubmitPerformanceMetrics(workerId : IWorkerId, perf : Utils.PerformanceMonitor.PerformanceInfo) = async {
member __.SubmitPerformanceMetrics(workerId : IWorkerId, perf : PerformanceInfo) = async {
let! _ = getTable().UpdateItemAsync(getKey workerId, updatePerfMetrics perf)
return ()
}
Expand All @@ -151,7 +151,7 @@ type WorkerManager private (clusterId : ClusterId, logger : ISystemLogger) =
ProcessorCount = info.ProcessorCount
HeartbeatInterval = info.HeartbeatInterval
HeartbeatThreshold = info.HeartbeatThreshold
PerformanceInfo = Utils.PerformanceMonitor.PerformanceInfo.Empty
PerformanceInfo = PerformanceInfo.Empty
}

let! key = getTable().PutItemAsync(record)
Expand Down
3 changes: 1 addition & 2 deletions tests/MBrace.AWS.Tests/CloudTests.fs
Expand Up @@ -23,14 +23,13 @@ type ``AWS Cloud Tests`` (config : Configuration, localWorkers : int) =

override __.Run (workflow : Cloud<'T>) =
session.Cluster.RunAsync (workflow)
|> Async.Catch
|> Async.RunSync

override __.Run (workflow : ICloudCancellationTokenSource -> #Cloud<'T>) =
async {
let cluster = session.Cluster
let cts = cluster.CreateCancellationTokenSource()
try return! cluster.RunAsync(workflow cts, cancellationToken = cts.Token) |> Async.Catch
try return! cluster.RunAsync(workflow cts, cancellationToken = cts.Token)
finally cts.Cancel()
} |> Async.RunSync

Expand Down
31 changes: 31 additions & 0 deletions tests/MBrace.AWS.Tests/MBrace.AWS.Tests.fsproj
Expand Up @@ -301,6 +301,37 @@
</ItemGroup>
</When>
</Choose>
<Choose>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2')">
<ItemGroup>
<Reference Include="System.Collections.Immutable">
<HintPath>..\..\packages\System.Collections.Immutable\lib\netstandard1.0\System.Collections.Immutable.dll</HintPath>
<Private>True</Private>
<Paket>True</Paket>
</Reference>
</ItemGroup>
</When>
</Choose>
<Choose>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And $(TargetFrameworkVersion) == 'v4.5'">
<ItemGroup>
<Reference Include="System.Reflection.Metadata">
<HintPath>..\..\packages\System.Reflection.Metadata\lib\portable-net45+win8\System.Reflection.Metadata.dll</HintPath>
<Private>True</Private>
<Paket>True</Paket>
</Reference>
</ItemGroup>
</When>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2')">
<ItemGroup>
<Reference Include="System.Reflection.Metadata">
<HintPath>..\..\packages\System.Reflection.Metadata\lib\netstandard1.1\System.Reflection.Metadata.dll</HintPath>
<Private>True</Private>
<Paket>True</Paket>
</Reference>
</ItemGroup>
</When>
</Choose>
<Choose>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2')">
<ItemGroup>
Expand Down
47 changes: 26 additions & 21 deletions tests/MBrace.AWS.Tests/RuntimeTests.fs
Expand Up @@ -3,6 +3,7 @@
open System
open System.Threading

open Swensen.Unquote.Assertions
open NUnit.Framework

open MBrace.Core
Expand Down Expand Up @@ -37,19 +38,19 @@ type ``AWS Runtime Tests`` (config : Configuration, localWorkers : int) =

[<Test>]
member __.``1. Runtime : Get worker count`` () =
run (Cloud.GetWorkerCount()) |> shouldEqual (session.Cluster.Workers |> Seq.length)
test <@ run (Cloud.GetWorkerCount()) = session.Cluster.Workers.Length @>

[<Test>]
member __.``1. Runtime : Get current worker`` () =
run Cloud.CurrentWorker |> shouldBe (fun _ -> true)
run Cloud.CurrentWorker |> ignore

[<Test>]
member __.``1. Runtime : Get task id`` () =
run (Cloud.GetCloudProcessId()) |> shouldBe (fun _ -> true)
run (Cloud.GetCloudProcessId()) |> ignore

[<Test>]
member __.``1. Runtime : Get work item id`` () =
run (Cloud.GetWorkItemId()) |> shouldBe (fun _ -> true)
run (Cloud.GetWorkItemId()) |> ignore

[<Test>]
member __.``1. Runtime : Worker Log Observable`` () =
Expand All @@ -59,14 +60,14 @@ type ``AWS Runtime Tests`` (config : Configuration, localWorkers : int) =
use d = worker.SystemLogs.Subscribe ra.Add
cluster.Run(cloud { return () }, target = worker)
System.Threading.Thread.Sleep 2000
ra.Count |> shouldBe (fun i -> i > 0)
test <@ ra.Count > 0 @>

[<Test>]
member __.``1. Runtime : Additional resources`` () =
let cluster = session.Cluster
let res = (42, "forty-two")
cluster.Run(Cloud.GetResource<int * string>(), additionalResources = resource { yield res })
|> shouldEqual res
let value = (42, "forty-two")
let result = cluster.Run(Cloud.GetResource<int * string>(), additionalResources = resource { yield value })
test <@ result = value @>

[<Test>]
member __.``1. Runtime : Cluster Log Observable`` () =
Expand All @@ -75,7 +76,7 @@ type ``AWS Runtime Tests`` (config : Configuration, localWorkers : int) =
use d = cluster.SystemLogs.Subscribe ra.Add
cluster.Run(Cloud.ParallelEverywhere(cloud { return 42 }) |> Cloud.Ignore)
System.Threading.Thread.Sleep 2000
ra.Count |> shouldBe (fun i -> i >= cluster.Workers.Length)
test <@ ra.Count >= cluster.Workers.Length @>

[<Test>]
member __.``1. Runtime : CloudProcess Log Observable`` () =
Expand All @@ -94,7 +95,8 @@ type ``AWS Runtime Tests`` (config : Configuration, localWorkers : int) =
let job = session.Cluster.CreateProcess(workflow)
use d = job.Logs.Subscribe(fun e -> ra.Add(e))
do job.Result
ra |> Seq.filter (fun e -> e.Message.Contains "Work item") |> Seq.length |> shouldEqual 2000
let logCount = ra |> Seq.filter (fun e -> e.Message.Contains "Work item") |> Seq.length
test <@ logCount = 2000 @>

[<Test>]
member __.``2. Fault Tolerance : Custom fault policy`` () =
Expand All @@ -107,7 +109,7 @@ type ``AWS Runtime Tests`` (config : Configuration, localWorkers : int) =
}, faultPolicy = FaultPolicy.NoRetry)
while not f.Value do Thread.Sleep 1000
session.Chaos()
Choice.protect (fun () -> t.Result) |> Choice.shouldFailwith<_, FaultException>)
raises<FaultException> <@ t.Result @>)

[<Test>]
member __.``2. Fault Tolerance : Custom fault policy nested`` () =
Expand All @@ -126,7 +128,7 @@ type ``AWS Runtime Tests`` (config : Configuration, localWorkers : int) =
let t = cluster.CreateProcess(computation (), faultPolicy = FaultPolicy.InfiniteRetries())
while not f.Value do Thread.Sleep 1000
session.Chaos()
Choice.protect (fun () -> t.Result) |> Choice.shouldFailwith<_, FaultException>)
raises<FaultException> <@ t.Result @>)

[<Test>]
member __.``2. Fault Tolerance : targeted workers`` () =
Expand All @@ -145,11 +147,11 @@ type ``AWS Runtime Tests`` (config : Configuration, localWorkers : int) =
let t = cluster.Run (wf ())
while not f.Value do Thread.Sleep 1000
session.Chaos()
Choice.protect(fun () -> t.Result) |> Choice.shouldFailwith<_, FaultException>)
raises<FaultException> <@ t.Result @>)

[<Test>]
member __.``2. Fault Tolerance : fault data`` () =
session.Cluster.Run(Cloud.TryGetFaultData()) |> shouldBe Option.isNone
test <@ session.Cluster.Run(Cloud.TryGetFaultData()) |> Option.isNone @>

repeat repeats (fun () ->
let cluster = session.Cluster
Expand All @@ -164,7 +166,7 @@ type ``AWS Runtime Tests`` (config : Configuration, localWorkers : int) =

while not f.Value do Thread.Sleep 1000
session.Chaos()
t.Result |> shouldBe (function Some { NumberOfFaults = 1 } -> true | _ -> false))
raises<FaultException> <@ match t.Result with Some { NumberOfFaults = 1 } -> true | _ -> false @>)

[<Test>]
member __.``2. Fault Tolerance : protected parallel workflows`` () =
Expand All @@ -185,9 +187,12 @@ type ``AWS Runtime Tests`` (config : Configuration, localWorkers : int) =

while f.Value < localWorkers do Thread.Sleep 1000
session.Chaos()
cloudProcess.Result
|> Array.forall (function FaultException _ -> true | _ -> false)
|> shouldEqual true)

test
<@
cloudProcess.Result
|> Array.forall (function FaultException _ -> true | _ -> false)
@>)

[<Test>]
member __.``2. Fault Tolerance : map/reduce`` () =
Expand All @@ -201,7 +206,7 @@ type ``AWS Runtime Tests`` (config : Configuration, localWorkers : int) =
while not f.Value do Thread.Sleep 1000
do Thread.Sleep 1000
session.Chaos()
t.Result |> shouldEqual 100)
test <@ t.Result = 100 @>)


[<Test>]
Expand All @@ -221,8 +226,8 @@ type ``AWS Runtime Tests`` (config : Configuration, localWorkers : int) =
let t = runtime.CreateProcess (wf (), faultPolicy = FaultPolicy.NoRetry)
while not f.Value do Thread.Sleep 1000
session.Chaos()
Choice.protect(fun () -> t.Result) |> Choice.shouldFailwith<_, FaultException>
t.Status |> shouldEqual CloudProcessStatus.Faulted)
raises<FaultException> <@ t.Result @>
test <@ t.Status = CloudProcessStatus.Faulted @>)


[<Category("Standalone Cluster")>]
Expand Down
32 changes: 17 additions & 15 deletions tests/MBrace.AWS.Tests/StoreTests/FileStoreTests.fs
@@ -1,6 +1,8 @@
namespace MBrace.AWS.Tests.Store

open System.IO

open Swensen.Unquote.Assertions
open NUnit.Framework

open Amazon
Expand Down Expand Up @@ -51,7 +53,7 @@ type ``Local S3 FileStore Tests`` () =
let stream = store.BeginWrite file |> run
for i in 1 .. 17 do stream.Write(largeBuf, 0, 1024 * 1024)
stream.Close()
store.GetFileSize file |> run |> shouldEqual (17L * 1024L * 1024L)
test <@ store.GetFileSize file |> run = 17L * 1024L * 1024L @>
finally
store.DeleteFile file |> run

Expand All @@ -65,20 +67,20 @@ type ``Local S3 FileStore Tests`` () =

try
use stream = store.BeginRead file |> run
stream.Length |> shouldEqual 100L
stream.Position |> shouldEqual 0L
stream.ReadByte() |> shouldEqual 0
stream.Position |> shouldEqual 1L

stream.Seek(50L, SeekOrigin.Begin) |> shouldEqual 50L
stream.Position |> shouldEqual 50L
stream.ReadByte() |> shouldEqual 50
stream.Position |> shouldEqual 51L

stream.Seek(9L, SeekOrigin.Current) |> shouldEqual 60L
stream.Position |> shouldEqual 60L
stream.ReadByte() |> shouldEqual 60
stream.Position |> shouldEqual 61L
test <@ stream.Length = 100L @>
test <@ stream.Position = 0L @>
test <@ stream.ReadByte() = 0 @>
test <@ stream.Position = 1L @>

test <@ stream.Seek(50L, SeekOrigin.Begin) = 50L @>
test <@ stream.Position = 50L @>
test <@ stream.ReadByte() = 50 @>
test <@ stream.Position = 51L @>

test <@ stream.Seek(9L, SeekOrigin.Current) = 60L @>
test <@ stream.Position = 60L @>
test <@ stream.ReadByte() = 60 @>
test <@ stream.Position = 61L @>

finally
store.DeleteFile file |> run
Expand Down

0 comments on commit 0402b76

Please sign in to comment.