Skip to content

Commit

Permalink
Merge pull request #121 from mitre/master
Browse files Browse the repository at this point in the history
Merge metricbeat code into dev
  • Loading branch information
awest1339 committed Apr 17, 2018
2 parents 7c72a86 + 22eb1ff commit 3a898b2
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 0 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -28,6 +28,7 @@ sqlalchemy-utils
sqlalchemy-datatables
#Required by storage modules
elasticsearch>=5.0.0,<6.0.0
elasticsearch-curator
pymongo
#Required for distributed
celery
Expand Down
18 changes: 18 additions & 0 deletions storage/elasticsearch_storage.py
Expand Up @@ -7,6 +7,8 @@
from datetime import datetime
from uuid import uuid4

import curator

from elasticsearch import Elasticsearch, helpers
from elasticsearch.exceptions import TransportError

Expand Down Expand Up @@ -71,6 +73,8 @@ class ElasticSearchStorage(storage.Storage):
'port': 9200,
'index': 'multiscanner_reports',
'doc_type': 'report',
'metricbeat_enabled': True,
'metricbeat_rollover_days': 7,
}

def setup(self):
Expand Down Expand Up @@ -506,3 +510,17 @@ def delete_by_task_id(self, task_id):

def teardown(self):
pass

def delete_index(self, index_prefix, days):
'''
Delete index equal to or older than days.
'''
try:
ilo = curator.IndexList(self.es)
ilo.filter_by_regex(kind='prefix', value=index_prefix)
ilo.filter_by_age(source='name', direction='older', timestring='%Y.%m.%d', unit='days', unit_count=days)
delete_indices = curator.DeleteIndices(ilo)
delete_indices.do_action()
except Exception as e:
# TODO: log exception
return False
50 changes: 50 additions & 0 deletions utils/celery_worker.py
Expand Up @@ -29,6 +29,7 @@
sys.path.insert(0, os.path.join(MS_WD, 'analytics'))

import common
import elasticsearch_storage
import multiscanner
import sql_driver as database
from ssdeep_analytics import SSDeepAnalytic
Expand Down Expand Up @@ -78,12 +79,24 @@

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Run ssdeep match analytic
# Executes every morning at 2:00 a.m.
sender.add_periodic_task(
crontab(hour=2, minute=0),
ssdeep_compare_celery.s(),
)

# Delete old metricbeat indices
# Executes every morning at 3:00 a.m.
storage_conf_path = multiscanner.common.get_config_path(multiscanner.CONFIG, 'storage')
storage_conf = multiscanner.common.parse_config(storage_conf_path)
metricbeat_enabled = storage_conf.get('metricbeat_enabled', True)
if metricbeat_enabled:
sender.add_periodic_task(
crontab(hour=3, minute=0),
metricbeat_rollover.s(),
)


class MultiScannerTask(Task):
'''
Expand Down Expand Up @@ -203,6 +216,43 @@ def ssdeep_compare_celery():
ssdeep_analytic.ssdeep_compare()


@app.task()
def metricbeat_rollover(days, config=multiscanner.CONFIG):
'''
Clean up old Elastic Beats indices
'''
try:
# Get the storage config
storage_conf_path = multiscanner.common.get_config_path(config, 'storage')
storage_handler = multiscanner.storage.StorageHandler(configfile=storage_conf_path)
storage_conf = multiscanner.common.parse_config(storage_conf_path)

metricbeat_enabled = storage_conf.get('metricbeat_enabled', True)

if not metricbeat_enabled:
logger.debug('Metricbeat logging not enbaled, exiting...')
return

if not days:
days = storage_conf.get('metricbeat_rollover_days')
if not days:
raise NameError("name 'days' is not defined, check storage.ini for 'metricbeat_rollover_days' setting")

# Find Elastic storage
for handler in storage_handler.loaded_storage:
if isinstance(handler, elasticsearch_storage.ElasticSearchStorage):
ret = handler.delete_index(index_prefix='metricbeat', days=days)

if ret is False:
logger.warn('Metricbeat Roller failed')
else:
logger.info('Metricbeat indices older than {} days deleted'.format(days))
except Exception as e:
logger.warn(e)
finally:
storage_handler.close()


if __name__ == '__main__':
logger.debug('Initializing celery worker...')
app.start()

0 comments on commit 3a898b2

Please sign in to comment.