Skip to content

Commit

Permalink
Merge pull request #756 from BAngelique/backkend-elasticsearch-withtests
Browse files Browse the repository at this point in the history
Backend elasticsearch withtests
  • Loading branch information
p-l- committed Aug 25, 2019
2 parents 5d47349 + 95279d3 commit 48dae1a
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 13 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dist: xenial
env:
# MaxMind (& utils)
- DB=maxmind
# Elasticsearch
- DB=elastic ELASTIC_VERSION=7.3.0
# PostgreSQL 10.1, 9.6.6, 9.5.10
- DB=postgres POSTGRES_VERSION=9.5.10
- DB=postgres POSTGRES_VERSION=9.6.6
Expand Down
25 changes: 21 additions & 4 deletions .travis/install_elastic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,30 @@
wget -q "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-$ELASTIC_VERSION-linux-x86_64.tar.gz" -O - | tar zxf -
export PATH="`pwd`/elasticsearch-$ELASTIC_VERSION/bin:$PATH"
PIP_INSTALL_OPTIONS=""
mkdir -p data/db
sudo mount -t tmpfs tmpfs data/db -o users,uid=travis,gid=travis,mode=0700
elasticsearch -d -h -E path.data=`pwd`/data/db
# Since we are going to run a MongoDB server that will use data/db, we
# need to use a different name for Elasticsearch
mkdir -p data/db_es
sudo mount -t tmpfs tmpfs data/db_es -o users,uid=travis,gid=travis,mode=0700
elasticsearch -d -E path.data=`pwd`/data/db_es

until nc -z localhost 9200 ; do echo Waiting for Elasticsearch; sleep 1; done
sleep 2

echo 'DB = "elastic://ivre@localhost:9200/ivre"' >> ~/.ivre.conf
echo 'DB_VIEW = "elastic://ivre@localhost:9200/ivre"' >> ~/.ivre.conf

curl http://127.0.0.1:9200

# We need a MongoDB server for the scan & nmap databases
MONGODB_VERSION=4.0.2 source ./.travis/install_mongo.sh

PYVERS=`python -c 'import sys;print("%d%d" % sys.version_info[:2])'`
if [ -f "requirements-mongo-$PYVERS.txt" ]; then
pip install -U $PIP_INSTALL_OPTIONS -r "requirements-mongo-$PYVERS.txt"
else
pip install -U $PIP_INSTALL_OPTIONS -r "requirements-mongo.txt"
fi

# We also need results since we will not run 30_nmap & 40_passive
# tests. This is used in 20_fake_nmap_passive
mkdir backup
wget -q --no-check-certificate -O - https://ivre.rocks/data/tests/backup_nmap_passive.tar.bz2 | tar jxf -
3 changes: 2 additions & 1 deletion ivre/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2673,7 +2673,8 @@ class MetaDB(object):
"flow": {"neo4j": ("neo4j", "Neo4jDBFlow"),
"mongodb": ("mongo", "MongoDBFlow"),
"postgresql": ("sql.postgres", "PostgresDBFlow")},
"view": {"http": ("http", "HttpDBView"),
"view": {"elastic": ("elastic", "ElasticDBView"),
"http": ("http", "HttpDBView"),
"mongodb": ("mongo", "MongoDBView"),
"postgresql": ("sql.postgres", "PostgresDBView")},
}
Expand Down
166 changes: 166 additions & 0 deletions ivre/db/elastic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-

# This file is part of IVRE.
# Copyright 2011 - 2019 Pierre LALET <pierre.lalet@cea.fr>
#
# IVRE is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# IVRE is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details.
#
# You should have received a copy of the GNU General Public License
# along with IVRE. If not, see <http://www.gnu.org/licenses/>.

"""This sub-module contains functions to interact with the ElasticSearch
databases.
"""

try:
from urllib.parse import unquote
except ImportError:
from urllib import unquote


from elasticsearch import Elasticsearch, helpers


from ivre.db import DB, DBActive, DBView
from ivre import utils


class ElasticDB(DB):

# filters
flt_empty = {'match_all': {}}

def __init__(self, url):
super(ElasticDB, self).__init__()
self.username = ''
self.password = ''
self.hosts = None
if '@' in url.netloc:
username, hostname = url.netloc.split('@', 1)
if ':' in username:
self.username, self.password = (unquote(val) for val in
username.split(':', 1))
else:
self.username = unquote(username)
if hostname:
self.hosts = [hostname]
elif url.netloc:
self.hosts = [url.netloc]
index_prefix = url.path.lstrip('/')
if index_prefix:
self.index_prefix = index_prefix + '-'
else:
self.index_prefix = 'ivre-'
self.params = dict(x.split('=', 1) if '=' in x else (x, None)
for x in url.query.split('&') if x)

def init(self):
"""Initializes the mappings."""
for idxnum, mapping in enumerate(self.mappings):
idxname = self.indexes[idxnum]
self.db_client.indices.delete(
index=idxname,
ignore=[400, 404],
)
self.db_client.indices.create(
index=idxname,
body={"mappings": {"properties": mapping}},
ignore=400,
)

@property
def db_client(self):
"""The DB connection."""
try:
return self._db_client
except AttributeError:
self._db_client = Elasticsearch(
hosts=self.hosts,
http_auth=(self.username, self.password)
)
return self._db_client

@property
def server_info(self):
"""Server information."""
try:
return self._server_info
except AttributeError:
self._server_info = self.db_client.info()
return self._server_info

@staticmethod
def to_binary(data):
return utils.encode_b64(data).decode()

@staticmethod
def from_binary(data):
return utils.decode_b64(data.encode())

@staticmethod
def searchnonexistent():
return {"match": {"_id": 0}}

@classmethod
def searchhost(cls, addr, neg=False):
"""Filters (if `neg` == True, filters out) one particular host
(IP address).
"""
return {"match": {"addr": addr}}

@classmethod
def searchhosts(cls, hosts, neg=False):
pass


class ElasticDBActive(ElasticDB, DBActive):

mappings = [
{
"addr": {"type": "ip"},
},
]
index_hosts = 0

def store_or_merge_host(self, host):
raise NotImplementedError

def store_host(self, host):
self.db_client.index(index=self.indexes[0],
body=host)

def count(self, flt):
return self.db_client.search(
body={"query": flt},
index=self.indexes[0],
size=0
)['hits']['total']['value']

def get(self, spec, **kargs):
"""Queries the active index."""
for rec in helpers.scan(self.db_client, query={"query": spec},
index=self.indexes[0],
ignore_unavailable=True):
yield dict(rec['_source'], _id=rec['_id'])


class ElasticDBView(ElasticDBActive, DBView):

def __init__(self, url):
super(ElasticDBView, self).__init__(url)
self.indexes = ['%s%s' % (self.index_prefix,
self.params.pop('indexname_hosts', 'hosts'))]

def store_or_merge_host(self, host):
if not self.merge_host(host):
self.store_host(host)
30 changes: 28 additions & 2 deletions ivre/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,25 @@ def nmap_record_to_view(rec):
"""Convert an nmap result in view.
"""
if '_id' in rec:
del rec['_id']
if 'scanid' in rec:
del rec['scanid']
if 'source' in rec:
if not rec['source']:
rec['source'] = []
elif not isinstance(rec['source'], list):
rec['source'] = [rec['source']]
for port in rec.get('ports', []):
for script in port.get('scripts', []):
if 'masscan' in script and 'raw' in script['masscan']:
script['masscan']['raw'] = db.nmap.from_binary(
script['masscan']['raw']
)
if 'screendata' in script:
script['screendata'] = db.nmap.from_binary(
script['screendata']
)
return rec


Expand Down Expand Up @@ -382,6 +394,20 @@ def next_record(rec, updt):
return updt
return db.view.merge_host_docs(rec, updt)
next_recs = []

def prepare_record(rec):
for port in rec.get('ports', []):
for script in port.get('scripts', []):
if 'masscan' in script and 'raw' in script['masscan']:
script['masscan']['raw'] = db.view.to_binary(
script['masscan']['raw']
)
if 'screendata' in script:
script['screendata'] = db.view.to_binary(
script['screendata']
)
return rec

# We cannot use a `for itr in itrs` loop here because itrs is
# modified in the loop.
i = 0
Expand Down Expand Up @@ -419,8 +445,8 @@ def next_record(rec, updt):
next_addrs[i] = next_recs[i]['addr']
i += 1
if next_addrs and cur_addr not in next_addrs:
yield cur_rec
yield prepare_record(cur_rec)
cur_rec = None
cur_addr = min(next_addrs)
if cur_rec is not None:
yield cur_rec
yield prepare_record(cur_rec)
4 changes: 4 additions & 0 deletions requirements-elastic.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
elasticsearch
pycrypto
future
bottle
39 changes: 33 additions & 6 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,15 @@ def init_nmap_db(self):
stdin=open(os.devnull))[0], 0)
self.assertEqual(RUN(["ivre", "scancli", "--count"])[1], b"0\n")

def test_20_fake_nmap_passive(self):
"""For Elasticsearch backend: insert results in MongoDB nmap & passive
purposes to feed Elasticsearch view.
"""
if DATABASE != "elastic":
return
subprocess.check_call(["mongorestore", "--db", "ivre", "../backup/"])

def test_30_nmap(self):

#
Expand Down Expand Up @@ -3657,6 +3666,8 @@ def test_50_view(self):
view_count = 0
# Count passive results
self.assertEqual(RUN(["ivre", "db2view", "passive"])[0], 0)
if DATABASE == 'elastic':
time.sleep(ELASTIC_INSERT_TEMPO)
ret, out, _ = RUN(["ivre", "view", "--count"])
self.assertEqual(ret, 0)
view_count = int(out)
Expand All @@ -3666,13 +3677,17 @@ def test_50_view(self):
stdin=open(os.devnull))[0], 0)
# Count active results
self.assertEqual(RUN(["ivre", "db2view", "nmap"])[0], 0)
if DATABASE == 'elastic':
time.sleep(ELASTIC_INSERT_TEMPO)
ret, out, _ = RUN(["ivre", "view", "--count"])
self.assertEqual(ret, 0)
view_count = int(out)
self.assertGreater(view_count, 0)
self.check_value("view_count_active", view_count)
# Count merged results
self.assertEqual(RUN(["ivre", "db2view", "passive"])[0], 0)
if DATABASE == 'elastic':
time.sleep(ELASTIC_INSERT_TEMPO)
ret, out, _ = RUN(["ivre", "view", "--count"])
self.assertEqual(ret, 0)
view_count = int(out)
Expand All @@ -3690,18 +3705,25 @@ def test_50_view(self):
self.assertEqual(ret, 0)
self.assertTrue(not err)
self.assertEqual(len(out.splitlines()), view_count)
# SHORT
res, out, err = RUN(['ivre', 'view', '--short'])
self.assertEqual(res, 0)
self.assertTrue(not err)
self.assertEqual(len(out.splitlines()), view_count)
# GNMAP
ret, out, err = RUN(["ivre", "view", "--gnmap"])
self.assertEqual(ret, 0)
self.assertTrue(not err)
count = sum(1 for line in out.splitlines() if b'Status: Up' in line)
self.check_value("view_gnmap_up_count", count)

if DATABASE == "elastic":
# Support for Elasticsearch is experimental and lacks a
# lot of functionalities. The next test fails for lack of
# .distinct() method.
return

# SHORT
res, out, err = RUN(['ivre', 'view', '--short'])
self.assertEqual(res, 0)
self.assertTrue(not err)
self.assertEqual(len(out.splitlines()), view_count)

# Filters
self.check_view_top_value("view_ssh_top_port", "port:ssh")
self.check_view_top_value("view_http_top_content_type",
Expand Down Expand Up @@ -4221,6 +4243,8 @@ def test_90_cleanup(self):
"utils"],
"neo4j": ["30_nmap", "40_passive", "50_view", "53_nmap_delete",
"54_passive_delete", "90_cleanup", "scans", "utils"],
"elastic": ["30_nmap", "40_passive", "53_nmap_delete", "54_passive_delete",
"60_flow", "90_cleanup", "scans", "utils"],
"maxmind": ["30_nmap", "40_passive", "50_view", "53_nmap_delete",
"54_passive_delete", "60_flow", "90_cleanup", "scans"],
}
Expand Down Expand Up @@ -4277,7 +4301,7 @@ def my_parse_args():


def parse_env():
global DATABASE
global DATABASE, ELASTIC_INSERT_TEMPO
DATABASE = os.getenv("DB")
for test in DATABASES.get(DATABASE, []):
test = "test_%s" % test
Expand All @@ -4288,6 +4312,9 @@ def parse_env():
getattr(IvreTests, test),
),
)
# Elasticsearch insertion is asynchronous, this hack "ensures" the
# data has been inserted before continuing.
ELASTIC_INSERT_TEMPO = int(os.getenv("ES_INSERT_TEMPO", 1))


if __name__ == '__main__':
Expand Down

0 comments on commit 48dae1a

Please sign in to comment.