Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Heka is ok when maxprocs = 1, but it stop after 50-60 seconds when I set maxprocs = 4, why? #1954

Closed
wangfeiping opened this issue Jun 23, 2016 · 2 comments

Comments

@wangfeiping
Copy link

wangfeiping commented Jun 23, 2016

Heka is ok when maxprocs = 1, but it stop after 50-60 seconds when I set maxprocs = 4, why?
Thanks.

toml

maxprocs = 4
base_dir = "/apps/heka-0_10_0-linux-amd64/share/heka"
share_dir = "/apps/heka-0_10_0-linux-amd64/share/heka"

[log_input]
type="UdpInput"
address = ":514"

[ts]
type = "TokenSplitter"
delimiter = '\n'

[pd]
type = "PayloadRegexDecoder"
match_regex = '(\S*\n)'

[t_input]
type = "TcpInput"
address = ":514"
splitter = "ts"
decoder = "pd"
use_tls = false

[log_filter]
type = "SandboxFilter"
filename = "log_filter_counter.lua"
ticker_interval = 1
message_matcher = "Logger == 'log_input'"
output_limit = 65600

[PayloadEncoder]
append_newlines = false

[FileOutput]
type = "FileOutput"
message_matcher = "Fields[payload_name]=='local_report'"
path = "/apps/logdata/local-160.log"
flush_count = 1000
flush_operator = "OR"
encoder = "PayloadEncoder"

[local_filter]
type = "SandboxFilter"
filename = "local_log_filter.lua"
ticker_interval = 1
message_matcher = "Fields[payload_name]=='local_report'"
output_limit = 65600

[KO]
type = "KafkaOutput"
message_matcher = "Fields[payload_name]=='logFilterCounter'"
partitioner = "RoundRobin"
topic = "LOGBUF30"
addrs = ["10.19.35.160:9092"]
encoder = "ProtobufEncoder"```

### lua

```require "string"
require "os"
require "table"
local toolbox = require("toolbox")

local tabconcat = table.concat

local counter = 0
local INFLUX_LEVEL = ""

function process_message ()
    local payload = read_message("Payload")
    add_to_payload(payload)

    inject_payload("txt", "logFilterCounter")

    counter = counter + 1
    return 0
end

function timer_event(ns)
    influxReport()
    counter = 0
end

function appendInfluxPrefix()
    add_to_payload("<INFLUX> ")
    local dts = toolbox.getOsDatetimeString()
    add_to_payload(dts)
    add_to_payload(" ")
end

function influxReport()
    appendInfluxPrefix()
    local _, t = toolbox.getOsTime(nil, 0)
    local pd = {
                INFLUX_LEVEL,
                ",vip=",
                ",ip=",
                ",topic=LOGBUF",
                " size=", counter,
                ",value=1 ",
                t, "000000000"
    }
    local post_data = tabconcat(pd);
    add_to_payload(post_data, "\n")
    inject_payload("txt", "local_report")
end```

### heka logs

```2016/06/23 14:25:35 Pre-loading: [KO]
2016/06/23 14:25:35 Pre-loading: [pd]
2016/06/23 14:25:35 Pre-loading: [log_filter]
2016/06/23 14:25:35 Pre-loading: [ts]
2016/06/23 14:25:35 Pre-loading: [log_input]
2016/06/23 14:25:35 Pre-loading: [local_filter]
2016/06/23 14:25:35 Pre-loading: [PayloadEncoder]
2016/06/23 14:25:35 Pre-loading: [t_input]
2016/06/23 14:25:35 Pre-loading: [FileOutput]
2016/06/23 14:25:35 Pre-loading: [TcpInput]
2016/06/23 14:25:35 Pre-loading: [ProtobufDecoder]
2016/06/23 14:25:35 Loading: [ProtobufDecoder]
2016/06/23 14:25:35 Pre-loading: [ProtobufEncoder]
2016/06/23 14:25:35 Loading: [ProtobufEncoder]
2016/06/23 14:25:35 Pre-loading: [TokenSplitter]
2016/06/23 14:25:35 Loading: [TokenSplitter]
2016/06/23 14:25:35 Pre-loading: [HekaFramingSplitter]
2016/06/23 14:25:35 Loading: [HekaFramingSplitter]
2016/06/23 14:25:35 Pre-loading: [NullSplitter]
2016/06/23 14:25:35 Loading: [NullSplitter]
2016/06/23 14:25:35 Loading: [pd]
2016/06/23 14:25:35 Loading: [PayloadEncoder]
2016/06/23 14:25:35 Loading: [ts]
2016/06/23 14:25:35 Loading: [log_input]
2016/06/23 14:25:35 Loading: [t_input]
2016/06/23 14:25:35 Loading: [TcpInput]
2016/06/23 14:25:35 Loading: [log_filter]
2016/06/23 14:25:35 Loading: [local_filter]
2016/06/23 14:25:35 Loading: [KO]
2016/06/23 14:25:36 Loading: [FileOutput]
2016/06/23 14:25:36 Starting hekad...
2016/06/23 14:25:36 Output started: KO
2016/06/23 14:25:36 Output started: FileOutput
2016/06/23 14:25:36 Filter started: log_filter
2016/06/23 14:25:36 Filter started: local_filter
2016/06/23 14:25:36 MessageRouter started.
2016/06/23 14:25:36 Input started: log_input
2016/06/23 14:25:36 Input started: t_input
2016/06/23 14:25:36 Input started: TcpInput
2016/06/23 14:30:36 Diagnostics: 32 packs have been idle more than 120 seconds.
2016/06/23 14:30:36 Diagnostics: (input) Plugin names and quantities found on idle packs:
2016/06/23 14:30:36 Diagnostics: 30 packs have been idle more than 120 seconds.
2016/06/23 14:30:36 Diagnostics:    log_filter: 32
2016/06/23 14:30:36 Diagnostics: (inject) Plugin names and quantities found on idle packs:
2016/06/23 14:30:36 
2016/06/23 14:30:36 Diagnostics:    KO: 30
2016/06/23 14:30:36 
2016/06/23 14:31:06 Diagnostics: 32 packs have been idle more than 120 seconds.
2016/06/23 14:31:06 Diagnostics: (input) Plugin names and quantities found on idle packs:
2016/06/23 14:31:06 Diagnostics:    log_filter: 32
2016/06/23 14:31:06 
2016/06/23 14:31:06 Diagnostics: 30 packs have been idle more than 120 seconds.
2016/06/23 14:31:06 Diagnostics: (inject) Plugin names and quantities found on idle packs:
2016/06/23 14:31:06 Diagnostics:    KO: 30
2016/06/23 14:31:06 
2016/06/23 14:31:36 Diagnostics: 32 packs have been idle more than 120 seconds.
2016/06/23 14:31:36 Diagnostics: 30 packs have been idle more than 120 seconds.
2016/06/23 14:31:36 Diagnostics: (inject) Plugin names and quantities found on idle packs:
2016/06/23 14:31:36 Diagnostics:    KO: 30
2016/06/23 14:31:36 
2016/06/23 14:31:36 Diagnostics: (input) Plugin names and quantities found on idle packs:
2016/06/23 14:31:36 Diagnostics:    log_filter: 32
2016/06/23 14:31:36 
2016/06/23 14:32:06 Diagnostics: 32 packs have been idle more than 120 seconds.
2016/06/23 14:32:06 Diagnostics: 30 packs have been idle more than 120 seconds.
2016/06/23 14:32:06 Diagnostics: (input) Plugin names and quantities found on idle packs:
2016/06/23 14:32:06 Diagnostics: (inject) Plugin names and quantities found on idle packs:
2016/06/23 14:32:06 Diagnostics:    KO: 30
2016/06/23 14:32:06 
2016/06/23 14:32:06 Diagnostics:    log_filter: 32
2016/06/23 14:32:06 
2016/06/23 14:32:36 Diagnostics: 32 packs have been idle more than 120 seconds.
2016/06/23 14:32:36 Diagnostics: (input) Plugin names and quantities found on idle packs:
2016/06/23 14:32:36 Diagnostics:    log_filter: 32
2016/06/23 14:32:36 
2016/06/23 14:32:36 Diagnostics: 30 packs have been idle more than 120 seconds.
2016/06/23 14:32:36 Diagnostics: (inject) Plugin names and quantities found on idle packs:
2016/06/23 14:32:36 Diagnostics:    KO: 30```
@wangfeiping wangfeiping changed the title Heka stop handle messages when maxprocs > 1 ? Heka is ok when maxprocs = 1, but it stop after 50-60 seconds when I set maxprocs = 4, why? Jun 23, 2016
@michaelgibson
Copy link
Contributor

Increasing your threads probably increased the number of packs in heka's pipeline.
When heka runs out of available packs, it will block until it can free up more.
You may need to increase either/both the "plugin_chansize"(default: 30) and/or "poolsize"(default: 100) to allow more packs to exist in the daemon.

http://hekad.readthedocs.io/en/latest/config/index.html#global-configuration-options

@wangfeiping
Copy link
Author

@michaelgibson Ok, I see, thanks!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants