-
Notifications
You must be signed in to change notification settings - Fork 0
/
htcondor-ingest.logstash.conf
151 lines (136 loc) · 3.53 KB
/
htcondor-ingest.logstash.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
input {
rabbitmq {
host => "${RABBIT_HOST}"
password => "${RABBIT_PASS}"
ssl => true
port => 5671
user => "${RABBIT_USER}"
vhost => "${RABBIT_VHOST}"
queue => "${RABBIT_QUEUE}"
durable => true
}
}
# The filter part of this file is commented out to indicate that it is
# optional.
filter {
# Drop empty messages...have to verify beats assigns [message]
if [message] =~ /^#/ {
drop { }
}
grok {
match => [
"message", "%{DATESTAMP:date}.*File\sTransfer\s(\(%{DATA:stats_type}\):)?%{SPACE}%{DATA:action}\s+%{GREEDYDATA:key_value_pairs}"
]
} # if 'grok' match fails, adds _grokparsefailure to [tags]
# Did we match a message?
if "_grokparsefailure" not in [tags] {
# Add the peer stats tag
if "peer stats" in [message] {
mutate {
add_tag => [ "peer_stats" ]
}
}
# Add the upload or download tag
if "File Transfer Upload" in [message] {
mutate {
add_tag => [ "upload" ]
}
}
if "File Transfer Download" in [message] {
mutate {
add_tag => [ "download" ]
}
}
# Fill timestamp
date {
match => [ "date", "MM/dd/yy HH:mm:ss"]
target => "@timestamp"
add_tag => [ "dated" ]
}
# Now we can try to extract key value pairs
if [key_value_pairs] {
kv {
source => "key_value_pairs"
#field_split_pattern => "\d+ \w+"
value_split => ":\s"
target => "kv"
}
}
if [kv] {
ruby {
code => "
new_kv = event.get('kv').to_hash
new_kv.each do |k,v|
if v.count('.') > 1
event.set(k, v)
elsif v.include? '.'
event.set(k, v.to_f)
else
event.set(k, v.to_i)
end
end
"
}
}
mutate {
remove_field => [ "kv", "key_value_pairs" ]
}
if [dest] {
# Remove internalIP before trying geoip
cidr {
add_tag => [ "internalIP" ]
address => [ "%{dest}" ]
network => [ "10.0.0.0/8", "127.0.0.1", "192.168.0.0/16" ]
}
if "internalIP" not in [tags] {
geoip {
source => "dest"
add_tag => [ "located" ]
target => "[geoip]"
}
}
# Copy the dest field to dest_domain and resolve it
mutate {
copy => { "dest" => "dest_hostname" }
}
dns {
reverse => [ "dest_hostname" ]
action => "replace"
add_tag => [ "dest_resolved" ]
}
if "dest_resolved" in [tags] {
mutate {
split => { "dest_hostname" => "." }
add_field => {"dest_domain" => "%{[dest_hostname][-2]}.%{[dest_hostname][-1]}"}
}
mutate {
join => {"dest_hostname" => "." }
}
}
} # end of [dest] check
ruby {
code => "
require 'openssl'
require 'json'
incl_keys = ['message', 'agent.hostname']
rec_to_hash = event.to_hash.select {|k,v| incl_keys.include?(k)}
rec_str = Hash[rec_to_hash.sort].to_json
event.set('checksum',OpenSSL::Digest.hexdigest('md5',rec_str))
"
}
} # end of _grokparsefailure section
}
output {
elasticsearch {
hosts => "localhost:9200"
manage_template => true
template => "/etc/xfer-mapping.json"
template_overwrite => true
template_name => "htcondor-xfer-stats2"
index => "htcondor-xfer-stats2-%{+YYYY.MM.dd}"
document_type => "log"
document_id => "%{checksum}"
}
# For debugging...prints output to logstash logfile
#stdout { }
}