Skip to content

Latest commit

 

History

History
244 lines (156 loc) · 11.4 KB

tail2kafka-config.org

File metadata and controls

244 lines (156 loc) · 11.4 KB

tail2kafka 的所有配置都是lua文件(lua非常简单,甚至都可能意识不到这是个lua)。这些lua文件放到同一个目录,启动时以这个目录为参数。这些lua文件中,有一个名称固定的 main.lua 用来指定全局配置,其余配置,每个lua文件指定一个数据文件。

可以参考源码 blackboxtest/tail2kafka 中的一些配置。

main.lua

hostshell

必配项,string

当多台机器发送数据到kafka时,可能需要标识数据的来源。例如多台web服务器发送access_log到kafka。可以用hostshell指定一行shell命令,获取机器名,例如IP或者hostname。注意:如果使用了自动分区,该机器名必须能够解析出IP。

例如: ~hostshell = “hostname”~ ,tail2kafka会执行 hostname 命令,该命令的输出作为机器名。

pidfile

必配项,string

指定tail2kafka的pid文件,pid文件用于停止或重新加载配置文件

例如: ~pidfile = “/var/run/tail2kafka.pid”~

brokers

必配项,string

指定kafka的机器和端口,多个机器和端口用逗号分号。

例如: ~brokers = “127.0.0.1:9092”~

partition

可选项,int

指定kafka的partition,也可以再各个数据文件的配置中指定。

kafka_global

必选项,table

librdkafka全局配置,参考源码 blackboxtest/tail2kafkalibrdkafka

kafka_topic

可选项,table

librdkafka的topic配置

polllimit

可选项, int, 默认值 polllimit=100

当文件写入相当频繁,可以转成轮训模式,参数用来指定轮训的间隔,单位是毫秒。

rotatedelay

可选项,int,默认值 -1,关闭,单位是秒

以nginx为例,当想rotate文件时,先把文件mv走,然后给nginx发送 USR1 信号,重新打开文件。这里有个问题,文件mv走时,tail2kafka感知到了,然后去读新文件,但是在mv走之后,nginx重新打开文件之前,nginx仍然会往老文件写日志。 rotatedelay 指定tail2kafka 感知到文件mv时,延迟几秒钟打开新文件,防止数据丢失。

这个值不能设置太大,如果太大,则新文件中的数据会有大的延迟,建议关闭或者设置在10s以内。

pingbackurl

可选项,字符串,默认值 “”,关闭

pingbackurl 是一个http地址,tail2kafka会把关键事件以GET请求的形式发送这个地址。例如配置了 pingbackurl=http://server/pingback/tail2kafka 则发生文件rotate时会请求 http://server/pingback/tail2kafka?event=ROTATE&file=/data/logs/access_2018-03-21-04_10.142.113.65_log&size=891010&md5=4b04460d5b0a8b79a5a7a7b78e55aecf 这里file是rotate后老文件的名称,size是文件大小,md5是文件的md5。可以通过日志了解更多事件信息。

libdir

可选项,字符串,默认值 /var/lib/tail2kafka

用于存放fileoff,topic的历史文件等一些运行信息。

logdir

可选项,字符串,默认值 /var/log/tail2kafka

数据源文件lua

部分配置可以同时出现在main.lua和数据源文件lua,后者会覆盖前者,如果后者没有指定,会继承前者。

topic

必填项,string

指定kafka的topic

例如: ~topic = “cron”~

fileAlias

可选项,string,默认和topic值相同

一个文件可以被发往多个topic,当kafka不可用时,需要记录尚未发送数据的文件列表,使用 fileAlias 作为文件列表的文件名。

file

必填项,string,例如: ~file = “/var/log/message”~

指定要发往kafka的源数据文件,tail2kafka可以检测到3种文件rotate的情况,并在rotate后重新打开文件,从0开始读取。三种rotate情况:

  • 文件被清空,例如: truncate --size 0 /tmp/log 不推荐,这种rotate方式可能丢数据
  • 文件被改名,例如: /tmp/log /tmp/log.1
  • 文件被删除,例如: unlink /tmp/log 不推荐,这种rotate方式可能丢数据
  • 文件名自身带时间,建议至少分钟级别,例如:=basic.%Y-%m-%d_%H-%M.log= ,这种格式的文件,需要设置 fileWithTimeFormat=true

fileWithTimeFormat

可选项,boolean,默认值 fileWithTimeFormat=false

当文件名自身带时间时,设置为true。tail2kafka会跟踪时间变化。

startpos

可选项,string,默认值 startpos=log_start

每往kafka发送一条消息,会在fileoff中记录消息在相应文件中的位置。当tail2kafka重启启动,或者reload时会使用这个文件, startpos 指定了获取文件开始位置的策略,有4个可选值。

名称含义
log_start优先使用fileoff中记录的文件位置,如果fileoff没找到,从头开始
log_end优先使用fileoff中记录的文件位置,如果fileoff没找到,从最后一行开始
start从头开始,忽略fileoff中的值
end从最后一行开始

autocreat

可选项,boolean,默认 autocreat = false

默认情况,当file指定的文件不存在时,tail2kafka会启动失败,如果指定 autocreat 为 true,可以自动创建不存在的文件。

fileOwner

可选项,字符串

autocreat 为true时,自动创建文件,默认文件的owner和tail2kafka的运行用户相同,通过fileOwner改变,以免写文件的进程无法写入。例如:某些时候 nginx 以 nobody 的身份写入log。

md5sum

可选项,boolean,默认值 true

实时计算发送内容的md5,用于消费kafka时校验数据的完整性。这个md5不一定准,当tail2kafka发送重启或reload时,如果不是从文件开头读,md5值不准确。计算md5需要耗费cpu,一般情况影响有限。

partition

可选项,int,无默认值

指定kafka的partition,如果没有指定,使用main.lua中的配置。精心指定partition,可以实现均衡,但也很容易出错。 partition=-100 是一个特殊配置,把数据在多个分区间随机分布,且只能在每个lua配置中单独指定。

autoparti

可选项,boolean,默认 autoparti = false

如果autoparti为true,那么使用hostshell的配置对应的IP得到一个数对kafka的全部partition取模。这会导致partition不均衡,但是配置简单,适合数据源机器特别多的情况。

rotatedelay

含义同main.lua

rawcopy

可选项,boolean,默认 rawcopy = false

默认情况,逐行发送新增内容到kafka。一次发送一行。如果不需要逐行处理,可以设置 rawcopy 为 true,一次复制多行数据到kafka,可以提高性能。

注意 默认情况,一次发送一行,不包含换行符。一次发送多行时,只有最后一行没有换行符。处理kafka中的数据时,直接按换行符split就行。

filter

可选项,table,无默认值

tail2kafka内置了split功能,把数据行按照空格分隔成字段,通过filter指定字段的下标,然后拼接成行发送。相当于选择一行的某些字段发送,而不是整行发送。对于特别大的行,而行中的某些字段显然没有用,可以使用filter减少发送的内容。

注意 ~”“和[]~ 包围的字符,被当做一个字段处理。下标从1开始,负数下标倒着数。

例如: filter = {4, 5, 6, -3} ,行的内容为 127.0.0.1 - - [16/Dec/2016:10:17:01 +0800] "GET /logm HTTP/1.1" 200 288 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.16.2.3 Basic ECC ,发送的内容为 2016-12-16T10:17:01 GET /logm HTTP/1.1 200 288

这里同时指定了 timeidx = 4 ,把时间转成了 iso8601 格式。

grep

可选项,function,无默认值

grep 是 filter 的增强版,是个lua函数。filter只能挑选制定的字段,不能改变字段的内容。grep 的输入是行split后的字段,输出是处理后的字段。

注意 如果返回 nil ,这行数据会被忽略。

如果指定了 withhost = true ,把主机名 (参考: hostshell )自动放到行首。

例如:

grep = function(fields)
   return {'[' .. fields[4] .. '] "' .. fields[5] .. '"', fields[6]}
end

那么发送的行为 zzyong [16/Dec/2016:10:17:01 +0800] "GET /logm HTTP/1.1" 200 这里指定了 withhost ,但是没有指定 timeidx

aggregate

可选项,function,无默认值

aggregate 是 grep 的增强版,aggregate的输出是一个 key + hash table ,用于做各种统计,例如统计状态码,错误数量等。同一时间字段的数据会被尽量合并,但因为日志不报证时间字段绝对递增,所以同一时间的数据可能分多次发送,尤其时单位是秒时,处理kafka中的数据时需要合并。

注意 必须配置 timeidxwithtime ,另外时间字段的精度(秒,分钟等)决定了聚合的粒度。为了能做到机器级别,可以配置 withhost 字段

例如:

aggregate = function(fields)
  local tbl = {}
  tbl["total"] = 1
  tbl["status_" .. fields[6] = 1
  return "user", tbl
end

那么发送到kafka类似 2016-12-16T10:17:01 zzyong user total=100 status_200=94 status_304=6 如果配置了 pkey=www ,那么同时会发送 2016-12-16T10:17:01 zzyong www total=190 status_200=174 status_304=16

这个是什么意思呢?它统计了user这个类别(可以是域名,日志文件名,或者某个业务)下的总请求量,http各个状态码的数量。如果配置了pkey,那么同时统计了这台机器的总请求量,各个状态码的数量。

这里时间字段是秒级的,所以统计也是秒级的。但是因为并发访问,可能出现

127.0.0.1 - - [16/Dec/2016:10:17:01 +0800] "GET /logm HTTP/1.1" 200 288 "-" "curl/7.19.7"
127.0.0.1 - - [16/Dec/2016:10:17:02 +0800] "GET /logm HTTP/1.1" 200 288 "-" "curl/7.19.7"
127.0.0.1 - - [16/Dec/2016:10:17:01 +0800] "GET /logm HTTP/1.1" 200 288 "-" "curl/7.19.7"

这里时间字段不是绝对递增的,kafka 会收到两条 2016-12-16T10:17:01 的数据,处理数据时,需要把他们累加起来。

注意 如果返回 nil ,这行数据会被忽略。

pkey

可选项 string || int,无默认

配合 aggregate 使用,指定全局的统计类别。

transform

可选项 function 无默认值

输入是一行数据,transform操作这行数据,然后输出操作后的数据。 注意 如果返回 nil 忽略这行,如果返回空字符串,则使用源数据(也可以返回元数据,返回空算一种优化吧)。

transform = function(line)
  local s = string.sub(line, 1, 7);
  if s == "[error]" then return "";
  elseif s == "[warn]" then return "[error]" .. string.sub(line, 8)
  else return nil end
end

如果是=[error]= 开头的,原样发送,如果是 [warn] 开头的,用 [error] 替换然后发送,否则忽略。

timeidx

可选项 int 无默认值

指定时间字段的下标,主要配合 filter grep aggregate 使用。如果指定timeidx,时间从格式 28/Feb/2015:12:30:23 +0800 转成 2015-03-30T16:31:53

withtime

可选项 boolean 默认 withtime=false

如果 true ,会在发往kafka前添加时间字段。

withhost

可选项 boolean 默认 withhost=false

如果 true ,会在发往kafka前添加机器名。

autonl

可选项 boolean 默认 autonl=true

如果 true ,会在发往kafka的行尾添加换行