Permalink
Browse files

initial tests for environment sanity and event processing, closes #338

  • Loading branch information...
jeffbryner committed Mar 28, 2016
1 parent 7fd56b8 commit a634497aaa551c3dbb4c438698a3dcaa17155ea5
Showing with 205 additions and 0 deletions.
  1. +71 −0 tests/conftest.py
  2. +8 −0 tests/setup.cfg
  3. +80 −0 tests/test_events.py
  4. +46 −0 tests/test_servers.py
View
@@ -0,0 +1,71 @@
import pytest
import tempfile
import os
import ConfigParser
def getConfig(optionname,thedefault,section,configfile):
"""read an option from a config file or set a default
send 'thedefault' as the data class you want to get a string back
i.e. 'True' will return a string
True will return a bool
1 will return an int
"""
#getConfig('something','adefaultvalue')
retvalue=thedefault
opttype=type(thedefault)
if os.path.isfile(configfile):
config = ConfigParser.ConfigParser()
config.readfp(open(configfile))
if config.has_option(section,optionname):
if opttype==bool:
retvalue=config.getboolean(section,optionname)
elif opttype==int:
retvalue=config.getint(section,optionname)
elif opttype==float:
retvalue=config.getfloat(section,optionname)
else:
retvalue=config.get(section,optionname)
return retvalue
@pytest.fixture
def options():
options=dict()
configFile='setup.cfg'
if pytest.config.inifile:
configFile=str(pytest.config.inifile)
options["esserver"]=getConfig('esserver','localhost:9200','mozdef',configFile)
options["loginput"]=getConfig('loginput','localhost:8080','mozdef',configFile)
options["webuiurl"]=getConfig('webuiurl','http://localhost/','mozdef',configFile)
options["kibanaurl"]=getConfig('kibanaurl','http://localhost:9090/','mozdef',configFile)
if pytest.config.option.verbose > 0:
options["verbose"]=True
print('Using options: \n\t%r' % options)
else:
options["verbose"]=False
return options
@pytest.fixture()
def cleandir():
newpath = tempfile.mkdtemp()
os.chdir(newpath)
def pytest_report_header(config):
if config.option.verbose > 0:
return ["reporting verbose test output"]
#def pytest_addoption(parser):
#parser.addoption("--esserver",
#action="store",
#default="localhost:9200",
#help="elastic search servers to use for testing")
#parser.addoption("--mozdefserver",
#action="store",
#default="localhost:8080",
#help="mozdef server to use for testing")
View
@@ -0,0 +1,8 @@
[pytest]
addopts = --capture=no
[mozdef]
esserver = elasticsearchservername:9200
loginput = mozdefservername:8080
webuiurl = https://mozdefservername/
kibanaurl = http://mozdefservername:9090/
View
@@ -0,0 +1,80 @@
import requests
import pytest
import json
import uuid
from datetime import datetime
from elasticsearch_dsl.connections import connections
from elasticsearch.helpers import scan
def test_basic_event_send(options):
server=options['loginput']
#create a sample test event
anevent=json.loads(r'''{
"category": "pytest",
"processid": "0",
"severity": "DEBUG",
"utctimestamp": "",
"hostname": "testhost.pytest.com",
"summary": "a test event for pytest from test_basic_event_send",
"eventsource": "pytest",
"details": {
"processid": "14148",
"hostname": "testvictim.pytest.com",
"program": "pytest",
"sourceipaddress": "10.1.2.3"
}
}''')
for i in range(0,5):
anevent['timestamp']=datetime.utcnow().isoformat()
anevent['details']['uuid']=str(uuid.uuid1())
if options["verbose"]:
print('sending {0}'.format(anevent))
r=requests.put(url="http://{0}/events".format(server),data=json.dumps(anevent))
if options["verbose"]:
print(r)
assert r.status_code == 200
def test_event_send_and_store(options):
inputServer=options['loginput']
esServer = options['esserver']
uuids=[]
#create a sample test event
anevent=json.loads(r'''{
"category": "pytest",
"processid": "0",
"severity": "DEBUG",
"utctimestamp": "",
"hostname": "testhost.pytest.com",
"summary": "a test event for pytest from test_basic_event_send",
"eventsource": "pytest",
"details": {
"processid": "14148",
"hostname": "testvictim.pytest.com",
"program": "pytest",
"sourceipaddress": "10.1.2.3"
}
}''')
#send events
for i in range(0,5):
anevent['timestamp']=datetime.utcnow().isoformat()
anevent['details']['uuid']=str(uuid.uuid1())
uuids.append(anevent['details']['uuid'])
if options["verbose"]:
print('sending {0}'.format(anevent))
r=requests.put(url="http://{0}/events".format(inputServer),data=json.dumps(anevent))
if options["verbose"]:
print(r)
assert r.status_code == 200
#search for events to have landed in ES
es=connections.create_connection(hosts=['{0}'.format(esServer)])
for u in uuids:
for hit in scan(es,
query={"query":{"match":{"details.uuid":"{0}".format(u)}}},
index="events",
doc_type="event"):
assert u == hit['_source']['details']['uuid']
View
@@ -0,0 +1,46 @@
import requests
import pytest
def test_elastic_search_server_exists(options):
server = options['esserver']
if options["verbose"]:
print('Testing connection to {0}'.format(server))
r=requests.get(url="http://{0}/".format(server))
if options["verbose"]:
print('\tReceived: {0}'.format(r.json()))
assert r.status_code==200
def test_mozdef_loginput_endpoint(options):
server=options['loginput']
if options["verbose"]:
print("Testing connection to {0}".format(server))
r=requests.get(url="http://{0}/test/".format(server))
if options["verbose"]:
print('\tReceived http status code: {0}'.format(r.status_code))
assert r.status_code==200
def test_mozdef_webui_endpoint(options):
server=options['webuiurl']
if options["verbose"]:
print("Testing connection to {0}".format(server))
r=requests.get(url="{0}".format(server),verify=False)
if options["verbose"]:
print('\tReceived http status code: {0}'.format(r.status_code))
assert r.status_code==200
def test_mozdef_kibana_endpoint(options):
server=options['kibanaurl']
if options["verbose"]:
print("Testing connection to {0}".format(server))
r=requests.get(url="{0}".format(server),verify=False)
if options["verbose"]:
print('\tReceived http status code: {0}'.format(r.status_code))
assert r.status_code==200

0 comments on commit a634497

Please sign in to comment.