Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix pipe compose and rethink the log error model #378

Merged
merged 24 commits into from Dec 20, 2018
Merged

Conversation

lust4life
Copy link
Contributor

@lust4life lust4life commented Dec 14, 2018

  • fix pipe compose

    Events.build here means dispatch each event/message to all pipes, not chains them.

  • use message as the error model for logging error

    since this can express more situations and can take with more info. if some error occurs, we can print the message to see if it is registry closed, or timeout, or partially success (and show which targets/message failed).

  • support timeout the right way

    each put message to buffer can have it's own waiting time, if timeout, giving up putting messages return a timeout error message to the client. But this doesn't give putting message to all targets a transaction semantic meaning.

TODO:

  • add a logging error handler for registry which can be configured by users to check their targets's pressure / pipe processing's health.
  • fix LoggerModule
    exposing a friendly api to help client handle logging errors
  • fix/add test parts for this refactor changes
  • fix other projects error affected by this breaking changes

fixed: #294 #303

@lust4life lust4life self-assigned this Dec 14, 2018
@lust4life lust4life requested a review from haf December 14, 2018 05:53
@lust4life
Copy link
Contributor Author

@haf hi,
Doing some changes about the logging error model and try offer something like waiting time for putting message to buffers. Can you help to see if this makes sense.

And i am a litter bit confusing about this logAck method's implementation, it will start logging immediately, did we do this on purpose?

  /// Special case: e.g. Fatal messages.
  let logAck (logger: Logger) level messageFactory: Promise<unit> =
    let ack = IVar ()
    let inner =
      logger.logWithAck (true, level) messageFactory ^=> function
        | Ok promise ->
          Job.start (promise ^=> IVar.fill ack)
        | Result.Error Rejected ->
          IVar.fill ack ()
        | Result.Error (BufferFull target) ->
          //let e = exn (sprintf "logWithAck (true, _) should have waited for the RingBuffer(s) to accept the Message. Target(%s)" target)
          //IVar.fillFailure ack e
          IVar.fill ack ()
    start inner
    ack :> Promise<_>

or how about this ?

  /// log start when user read the promise returned by this method
  let logAckCold (logger: Logger) level messageFactory: Promise<unit> =
    logger.logWithAck (true, level) messageFactory
    >>=* function
    | Ok promise -> promise
    | Result.Error e -> Promise.unit

Copy link
Member

@haf haf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to discuss the timeouts.

#294 and #303 are nicely done generally!

Alt.always (Result.Error e))
)
|> Seq.Con.mapJob (fun msg -> next msg |> PipeResult.orDefault LogResult.success)
>>= IVar.fill putAllPromises
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just bind reduce as the callback of Seq.Con.mapJob's return value? Why start in parallel and join the result with a promise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since if we bind reduce as callback of Con.mapJob, this will chain these jobs till we reture alt. And this alt will be used for Alt.prepareJob, this means that it will block until all Alt finished (Result.sequence ) then reduce their results to commit an always Alt here.

#r "Hopac"
#r "Hopac.Core"
open Hopac
open Hopac.Infixes

let slow = Alt.prepareJob (fun _ ->
  timeOutMillis 5000 >>-. Alt.always ("slow")
)

let slowWithIVar = Alt.prepareJob (fun _ ->
  let v = IVar ()
  timeOutMillis 5000 
  >>= fun _ -> IVar.fill v ("slow") 
  |> Job.start
  >>-. v
)

let quick = timeOutMillis 500 ^->. "quick"

let test = (slow <|> quick) ^-> printfn "%s"
let testWithIVar = (slowWithIVar <|> quick) ^-> printfn "%s"

run test

run testWithIVar

i write a test and it gives like this:

val slow : Hopac.Alt<string>
val slowWithIVar : Hopac.Alt<string>
val quick : Hopac.Alt<string>
val test : Hopac.Alt<unit>
val testWithIVar : Hopac.Alt<unit>

> run test;;
Binding session to '/Users/lust/.nuget/packages/hopac/0.4.1/lib/netstandard2.0/Hopac.Platform.dll'...
slow
val it : unit = ()

> run testWithIVar;;
quick
val it : unit = ()

src/Logary/Configuration/Ticker.fs Outdated Show resolved Hide resolved
@@ -275,25 +275,8 @@ and SpanInfo =
static member formatId (id: Guid) =
id.ToString("n")

/// Why was the message/metric/event not logged?
[<Struct>]
type LogError =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thing is, if we change this, there's a whole heap of work to do in the Facade as well. I'm OK with having this very simple, like now, because we have only a few cases that happen:

  • target fails terminally due to target's state (e.g. out of disk)
  • target fails grey due to e.g. due to too slow processing
  • connectivity broken / DNS lookup failure, etc; all inside the logary target, and those may be cached, so a service restart is needed
  • rate of logging > rate of sending, dropping in part
  • misconfiguration

We don't really want to have timeouts in the pipeline if we can avoid it, because every time we add another timeout, it's a work item that Hopac's scheduler has to track. If the Hopac scheduler's queue can't keep up, it means the buffers will fill up, which means we'll run out of memory and crash due to that (since we still input messages into the buffer pipeline); that was the reason of the refactor this summer, away from 'log simple' having a timeout alternative, towards this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a litter difference here, in old one: message will not be dropped when the timeout alternative committed, since we have more than one targets. but right now, if timeout, then message will not be append to the buffer, and we will get a timeout error message to return to the client. if we config a global logging error handler on the registry, we can notify this situation, then adjust the target buffer length ,or speed up the processing.

since we have a timeout protocol, user can do backpressure with some balance, no need to wait all the time (since logging is not such important as the business code), but we still want to give some time to buffer if they can't catch up in a short time.

src/Logary/DataModel.fs Outdated Show resolved Hide resolved
src/Logary/Internals/NullLogger.fs Show resolved Hide resolved
false

let private printDotOnOverflow success =
if not success then System.Console.Error.Write '.' else ()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is/was also a good signal on when another Hopac job crashed the scheduling of tasks; by crashing one of the worker threads. If we don't print something here, we'll never be able to see this error case from the logs, since Logary is now indisposed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

          msg 
          |> Target.logAllReduce putBufferTimeOut targets
          |> Alt.afterFun (fun result ->
            start (Job.thunk (fun _ ->
              try
                conf.logResultHandler result
              with
              | e ->
                 eprintfn "%A" e ))

this has been implement by registry logging error handler in here https://github.com/logary/logary/blob/fix-pipe-compose/src/Logary/Configuration/Config.fs#L49

 logResultHandler = function | Result.Error error -> System.Console.Error.Write (MessageWriter.singleLineNoContext.format error) | _ -> ()

src/Logary/Registry.fs Show resolved Hide resolved
src/Logary/Registry.fs Outdated Show resolved Hide resolved
src/Logary/TargetModule.fs Outdated Show resolved Hide resolved
else
RingBuffer.put x.api.requests targetMsg ^->. Result.Ok (upcast ack)
<|>
timeOut (putBufferTimeOut.ToTimeSpan()) ^-> fun _ -> LogError.timeOutToPutBuffer x.name putBufferTimeOut.TotalSeconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what caused our cascading failures of logging high-throughput at tradera.com. We can't go back to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have some perf-test to check this situation? like this one ? #322

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's benchmark it.

@haf
Copy link
Member

haf commented Dec 14, 2018

logAck:

And i am a litter bit confusing about this logAck method's implementation, it will start logging immediately, did we do this on purpose?

I'm not sure without reading its call-sites. I'll have to go through them. But generally, you're right; it's a hot function and should probably not be unless a call site specifically requires it to be.

@haf
Copy link
Member

haf commented Dec 14, 2018

This PR, given that it changes public API, needs to update the facade, and the facade adapter to match.

I think you've done a good job, but I would prefer to have the bool in place, and not change the public API; we can make the wait duration configurable with an environment variable instead.

@lust4life
Copy link
Contributor Author

yes, you are right. i have notice that it will affect facade and its adapter. maybe we can add an extension method to add this duration in message context, not change the origin public api, if waitforbuffer is true, we can check whether we have this duration timeout.

@lust4life
Copy link
Contributor Author

@haf fix adapter and facade. In facade I change the LogError to string, since facade here is in beta, do I need add a v5 version and do adapter for v4 (LogError) ? or can I simply change LogError to string to hotfix here. or we need keep LogError DU in facade (means no need change facade), change adapter to handle this situation ?

In logary at this refactor, use Message instead of LogError here is to be able to express more content in complex scenarios (e.g. Pipe line processing, a DU can not express all situation and message can take more context info for user debug/repair)

@lust4life lust4life changed the title WIP: fix pipe compose and rethink the log error model fix pipe compose and rethink the log error model Dec 15, 2018
@haf
Copy link
Member

haf commented Dec 15, 2018

This error on appveyor looks a bit like a race condition... Will you check it out, or should I?

[09:38:36 ERR] logary/Registry/flush/shutdown timeout and logging after shutdown with no blocking failed in 00:00:00.3260000. 
should have no shutdown ack target. Actual value was ["mockTarget"] but had expected it to be [].
  C:\projects\logary\src\Logary.Tests\Registry.fs(203,1): Logary.Tests.Registry.x2yJ@203-641.Invoke(String x)
 <Expecto>
[09:38:36 INF] EXPECTO! 480 tests run in 00:00:07.6237606 for logary - 469 passed, 10 ignored, 1 failed, 0 errored.  <Expecto>
[09:38:36 INF] EXPECTO?! Summary...

@haf
Copy link
Member

haf commented Dec 15, 2018

@haf fix adapter and facade. In facade I change the LogError to string, since facade here is in beta, do I need add a v5 version and do adapter for v4 (LogError) ? or can I simply change LogError to string to hotfix here.

I think we can keep it v4 based on that it's in beta. For my own use-case it's easy enough to switch.

@haf
Copy link
Member

haf commented Dec 15, 2018

In logary at this refactor, use Message instead of LogError here is to be able to express more content in complex scenarios (e.g. Pipe line processing, a DU can not express all situation and message can take more context info for user debug/repair)

Sure

examples/Libryy/LoggingV4.fs Outdated Show resolved Hide resolved
@lust4life
Copy link
Contributor Author

lust4life commented Dec 17, 2018

This error on appveyor looks a bit like a race condition... Will you check it out, or should I?

I really have no idea about what is going on here, I don't know if this has anything to do with hopac scheduling timeout.

since we do timeOut here first then push flush request to target, https://github.com/logary/logary/blob/fix-pipe-compose/src/Logary/Registry.fs#L172-L174 , do you have any idea?

@lust4life
Copy link
Contributor Author

@haf can you help release a beta version ? so that i can fill the gap between https://github.com/logary/AspNetCore and the newest logary

@lust4life
Copy link
Contributor Author

@haf > Yes, let's benchmark it.

fix the Logary.PerfTest , and try some perf test.

image

image

image

@haf
Copy link
Member

haf commented Dec 20, 2018

Let's make the CI pass, so I can merge it. I'll look at this ASAP; sorry for the delay.

@lust4life
Copy link
Contributor Author

seems that :

  • we should encourage users to use xxxWithBP, if targets's processing rate can catch up (or the ringbuffer size is enough long), there's almost no cost . if targets can't catch up, then a user defined timeout will make some back pressure for users.

  • do xxxWichAck if user want to ensure to get the ack from the backend targets's processing

  • I don't think we should wrap an asynchronous api in a sync way, e.g logSimple/logWith

    the reason here is, if user have Job/Task , they can decide the way to start this job :

    • start from the current thread (blocking) until the real job occurs, but no need waiting for result. e.g. startAsTask in tpl or start the job in hopac
    • start from the background thread , no blocking either, e.g. queueAsTask in tpl , queue the job in hopac

and if we expose logSimple, i think we should use queue instead of start, since start here will rearrange the worker thread's workstack, if the worker thread is busy doing something, and users log message in a high rate, this will become slower, queue here only append a work into the workstack which means have a constant rate (the perf result above show this), but the drawback is message factory doesn't invoke in the user's call-site. so we should leave this decision to users.

i think, start here maybe the real reason to slow the perf, not the alt.timeout ?

what do you think ?

@haf
Copy link
Member

haf commented Dec 20, 2018

Sounds well reasoned to me.

I don't think we should wrap an asynchronous api in a sync way;

But often when users don't have access to a synchronisation context they don't want to care, so not exposing this forces something on the user and causes questions they may not need to understand.

We could have configuration availble possible that changes the default when the ring buffer is full, though.

@haf haf merged commit 86401e9 into master Dec 20, 2018
@haf haf deleted the fix-pipe-compose branch December 20, 2018 15:43
@haf
Copy link
Member

haf commented Dec 27, 2018

Releasing this as beta-29 now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Some FsMessageTemplate tests failing
2 participants