-
|
Hi, Searching for good examples how to implement the following task:
private static K<M, Unit> StreamBalances(
Channel<ValidatorBalance> channel,
Network network,
string stateId,
IEnumerable<string>? pubKeys = null) =>
from options in Json.Map(t => t.Options)
from _1 in Log<RT>.debug("Requesting {network} validator balances: {state}", network.Name, stateId)
from uri in GetValidatorBalancesUri(network, stateId, pubKeys)
from stream in Http<M, RT>.GetStream(uri)
from res in stream
.StreamFirstPropertyArray<ValidatorBalance>("data", options)
.StreamToChannel(channel)
select res;
public static SourceT<M, ValidatorBalance> GetBalances(Network network, IEnumerable<ulong> slots) =>
from baseUri in GetBaseUri(network)
from channel in M.Pure(Channel.CreateUnbounded<ValidatorBalance>(new() { SingleWriter = false }))
from _ in
slots
.Select(s => StreamBalances(channel, network, s.ToString()))
.ProcessParallel<M, ValidatorBalance, RT>(channel, 5)
.Fork()
from source in SourceT.lift<M, ValidatorBalance>(channel)
select source;
public static IO<Unit> ProcessParallel<M, T, RT>(
this IEnumerable<K<M, Unit>> computations,
Channel<T> channel,
int parallelism = 1) where M : MonadUnliftIO<M> =>
IO.liftAsync(async env =>
{
await Parallel.ForEachAsync(
computations,
new ParallelOptions { MaxDegreeOfParallelism = parallelism, CancellationToken = env.Token },
async (comp, ct) =>
{
// TODO: Unsafe cast, remove
var c = comp as Eff<RT, Unit>;
// TODO: How to execute awaited computation here to get Task/ValueTask to await ??
return;
});
channel.Writer.Complete();
return unit;
});Can't find the way to elegantly unlift computation and execute it. Any ideas ? Thanks. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 6 replies
-
|
There's no way to execute a generic MonadLiftIO, and to Run an If you do depend on the computation actually being an public static Eff<RT, Unit> ProcessParallel<RT>(
this IEnumerable<Eff<RT, Unit>> computations,
int parallelism = 1) =>
Eff.lift<RT, Unit>(
async (rt) =>
{
await Parallel.ForEachAsync(
computations,
new ParallelOptions { MaxDegreeOfParallelism = parallelism },
async (eff, ct) =>
{
await eff.RunIO(rt).RunAsync(EnvIO.New(token: ct));
});
return unit;
});otherwise, I would start with this public static K<M, Unit> ProcessParallel<M>(
this IEnumerable<K<M, Unit>> computations,
int parallelism = 1) where M : MonadUnliftIO<M>
{
//todo something that will probably involve messing with a mutable collection internally
}and try to fill in the blanks from there. It's an interesting problem, but I personally don't have the time for it at the moment. The business with passing around and closing the |
Beta Was this translation helpful? Give feedback.
There's no way to execute a generic MonadLiftIO, and to Run an
Eff<RT>you'd need access to an instance of the runtime, which you wouldn't have in imperative async code.If you do depend on the computation actually being an
Eff, then you could/should remove a lot of the generic type params throughout the whole call stack, and you can just do