-
Notifications
You must be signed in to change notification settings - Fork 99
/
cli.rb
179 lines (156 loc) · 6.36 KB
/
cli.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# frozen_string_literal: true
require "thor"
require "json"
module Mihari
class CLI < Thor
desc "censys [QUERY]", "Censys IPv4 lookup by a given query"
method_option :title, type: :string, desc: "title"
method_option :description, type: :string, desc: "description"
method_option :tags, type: :array, desc: "tags"
def censys(query)
with_error_handling do
run_analyzer Analyzers::Censys, query: query, options: options
end
end
desc "shodan [QUERY]", "Shodan host lookup by a given query"
method_option :title, type: :string, desc: "title"
method_option :description, type: :string, desc: "description"
method_option :tags, type: :array, desc: "tags"
def shodan(query)
with_error_handling do
run_analyzer Analyzers::Shodan, query: query, options: options
end
end
desc "onyphe [QUERY]", "Onyphe datascan lookup by a given query"
method_option :title, type: :string, desc: "title"
method_option :description, type: :string, desc: "description"
method_option :tags, type: :array, desc: "tags"
def onyphe(query)
with_error_handling do
run_analyzer Analyzers::Onyphe, query: query, options: options
end
end
desc "urlscan [QUERY]", "urlscan lookup by a given query"
method_option :title, type: :string, desc: "title"
method_option :description, type: :string, desc: "description"
method_option :tags, type: :array, desc: "tags"
method_option :target_type, type: :string, default: "url", desc: "target type to fetch from lookup results (target type should be 'url', 'domain' or 'ip')"
def urlscan(query)
with_error_handling do
run_analyzer Analyzers::Urlscan, query: query, options: options
end
end
desc "virustotal [IP|DOMAIN]", "VirusTotal resolutions lookup by a given ip or domain"
method_option :title, type: :string, desc: "title"
method_option :description, type: :string, desc: "description"
method_option :tags, type: :array, desc: "tags"
def virustotal(indiactor)
with_error_handling do
run_analyzer Analyzers::VirusTotal, query: indiactor, options: options
end
end
desc "securitytrails [IP|DOMAIN|EMAIL]", "SecurityTrails lookup by a given ip, domain or email"
method_option :title, type: :string, desc: "title"
method_option :description, type: :string, desc: "description"
method_option :tags, type: :array, desc: "tags"
def securitytrails(indiactor)
with_error_handling do
run_analyzer Analyzers::SecurityTrails, query: indiactor, options: options
end
end
map "st" => :securitytrails
desc "securitytrails_domain_feed [REGEXP]", "SecurityTrails new domain feed lookup by a given regexp"
method_option :title, type: :string, desc: "title"
method_option :description, type: :string, desc: "description"
method_option :tags, type: :array, desc: "tags"
method_option :type, type: :string, default: "registered", desc: "A type of domain feed ('all', 'new' or 'registered')"
def securitytrails_domain_feed(regexp)
with_error_handling do
run_analyzer Analyzers::SecurityTrailsDomainFeed, query: regexp, options: options
end
end
map "st_domain_feed" => :securitytrails_domain_feed
desc "crtsh [QUERY]", "crt.sh lookup by a given query"
method_option :title, type: :string, desc: "title"
method_option :description, type: :string, desc: "description"
method_option :tags, type: :array, desc: "tags"
def crtsh(query)
with_error_handling do
run_analyzer Analyzers::Crtsh, query: query, options: options
end
end
desc "dnpedia [QUERY]", "DNPedia domain lookup by a given query"
method_option :title, type: :string, desc: "title"
method_option :description, type: :string, desc: "description"
method_option :tags, type: :array, desc: "tags"
def dnpedia(query)
with_error_handling do
run_analyzer Analyzers::DNPedia, query: query, options: options
end
end
desc "circl [DOMAIN|SHA1]", "CIRCL passive DNS/SSL lookup by a given domain / SHA1 certificate fingerprint"
method_option :title, type: :string, desc: "title"
method_option :description, type: :string, desc: "description"
method_option :tags, type: :array, desc: "tags"
def circl(query)
with_error_handling do
run_analyzer Analyzers::CIRCL, query: query, options: options
end
end
desc "import_from_json", "Give a JSON input via STDIN"
def import_from_json(input = nil)
with_error_handling do
json = input || STDIN.gets.chomp
raise ArgumentError, "Input not found: please give an input in a JSON format" unless json
json = parse_as_json(json)
raise ArgumentError, "Invalid input format: an input JSON data should have title, description and artifacts key" unless valid_json?(json)
title = json.dig("title")
description = json.dig("description")
artifacts = json.dig("artifacts")
tags = json.dig("tags") || []
basic = Analyzers::Basic.new(title: title, description: description, artifacts: artifacts, tags: tags)
basic.run
end
end
desc "alerts", "Show the alerts on TheHive"
method_option :limit, default: 5, desc: "Number of alerts to show (or 'all' to show all the alerts)"
def alerts
with_error_handling do
viewer = AlertViewer.new(limit: options["limit"])
alerts = viewer.list
puts JSON.pretty_generate(alerts)
end
end
desc "status", "Show the current configuration status"
def status
with_error_handling do
puts JSON.pretty_generate(Status.check)
end
end
no_commands do
def with_error_handling
yield
rescue StandardError => e
notifier = Notifiers::ExceptionNotifier.new
notifier.notify e
end
def parse_as_json(input)
JSON.parse input
rescue JSON::ParserError => _e
nil
end
# @return [true, false]
def valid_json?(json)
%w(title description artifacts).all? { |key| json.key? key }
end
def run_analyzer(analyzer_class, query:, options:)
options = symbolize_hash_keys(options)
analyzer = analyzer_class.new(query, **options)
analyzer.run
end
def symbolize_hash_keys(hash)
hash.map{ |k, v| [k.to_sym, v] }.to_h
end
end
end
end