Skip to content

Commit

Permalink
Updated code to handle maximum query length limitation in darktrace. (#…
Browse files Browse the repository at this point in the history
…1259)

* Updated code to handle maximum query length limitation in darktrace.
  • Loading branch information
thangaraj-ramesh authored and delliott90 committed Jan 24, 2023
1 parent f3af8eb commit 4624dba
Show file tree
Hide file tree
Showing 4 changed files with 690 additions and 3 deletions.
1 change: 1 addition & 0 deletions stix_shifter_modules/darktrace/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -865,3 +865,4 @@ ping
### Observations
- Darktrace does not support >= and <=, so the same is achieved by < and > operators by increasing and decreasing the corresponding values.
- Query will return those records in the specified timeframe which satisfy the applied search filters. If there are no records in the given timeframe, search filter won't be applied and "Invalid parameter" error will be returned.
- It is observed that Darktrace API returns 'timed out' error if translated query string length is more than MAX_QUERY_LENGTH. To avoid this error, query is split at 'OR' conditions and multiple requests are made to darktrace to get the result. However, the part of query joined by 'AND' operator won't be split and 'timed out' error is returned if the length of that part is beyond MAX_QUERY_LENGTH.
1 change: 1 addition & 0 deletions stix_shifter_modules/darktrace/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pyparsing==3.0.9
252 changes: 249 additions & 3 deletions stix_shifter_modules/darktrace/stix_translation/query_constructor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import re
from os import path
from pyparsing import nestedExpr, White
from datetime import datetime, timedelta
from stix_shifter_utils.utils import logger
from stix_shifter_utils.stix_translation.src.patterns.pattern_objects import ObservationExpression, \
Expand All @@ -12,6 +13,8 @@
STOP_TIME = datetime.utcnow()
CONFIG_MAP_PATH = "json/config_map.json"
DEFAULT_LIMIT = 10000
MAX_QUERY_LENGTH = 2400
TIMESTAMP_LENGTH = 80


class FileNotFoundException(Exception):
Expand Down Expand Up @@ -338,10 +341,22 @@ def parse_expression(self, pattern: Pattern):
if self.options['result_limit'] > DEFAULT_LIMIT:
self.options['result_limit'] = DEFAULT_LIMIT

query = {"search": darktrace_query, "fields": [],
"timeframe": "custom", "time": {"from": start, "to": stop}, "size": self.options['result_limit']}
darktrace_queries = []

self.qualified_queries.append(query)
# Query length exceed the max query limit will split the query
if len(darktrace_query) > MAX_QUERY_LENGTH:
obj = QuerySeparator()
darktrace_queries = obj.split_query(darktrace_query)

# change single query into list
if darktrace_query and not darktrace_queries:
darktrace_queries = [darktrace_query]

for darktrace_query in darktrace_queries:
query = {"search": darktrace_query, "fields": [],
"timeframe": "custom", "time": {"from": start, "to": stop}, "size": self.options['result_limit']}

self.qualified_queries.append(query)


def translate_pattern(pattern: Pattern, data_model_mapping, options):
Expand All @@ -355,3 +370,234 @@ def translate_pattern(pattern: Pattern, data_model_mapping, options):
translated_query_strings = QueryStringPatternTranslator(pattern, data_model_mapping, options)
queries = translated_query_strings.qualified_queries
return queries


class QuerySeparator:
"""
split the translated query based on query length
"""

def split_query(self, query):
"""
split the query based on query length limit.
Between two statements if OR is present it will split if AND is present it wont split.
param query: str
return queries: list
"""
try:
queries = []
# Based on brackets converting the translated query into nested list
parsed_query = nestedExpr(opener="(", closer=")", ignoreExpr=White('')).parseString(query).asList()
parsed_query = parsed_query[0]

# Check valid parsing
if len(parsed_query) != 3:
return [query]

# process the each statement its need to split as separate query or not
statement_1 = self.split_sub_query(parsed_query[0])
statement_2 = self.split_sub_query(parsed_query[2])
operator = parsed_query[1]

if str(statement_1).count('Unable to split') or str(statement_2).count('Unable to split'):
return [query]

# statement is greater than query length limit and it was split.
if isinstance(statement_1, list) or isinstance(statement_2, list):

# Try to combine the split query
statement_1 = self.combine_list_query(statement_1)
statement_2 = self.combine_list_query(statement_2)

# AND operator is processed if only its contains timestamp then will attach the timestamp in each query
queries = self.attach_timestamp(statement_1, statement_2, operator)

if str(queries).count('Unable to split'):
return [query]

# statements are lesser than query length limit and it wont split.
elif statement_1 and statement_2:
combined_statement = statement_1 + ' ' + operator + ' ' + statement_2
if len(combined_statement) > MAX_QUERY_LENGTH:
if operator == 'AND':
queries.append(query)
else:
queries.append(statement_1)
queries.append(statement_2)
else:
queries.append(combined_statement)
else:
queries.append(query)

return queries
except Exception as e:
logger.info("Unable to split the query. Error occurred {}".format(str(e)))
return [query]

def split_sub_query(self, parsed_query):
"""
Iterate the query from list and based on operator separate the query or combine the query.
param query: str
return queries: list or str
"""

# iterate other than IN operator
if len(parsed_query) == 3:
statement_1 = parsed_query[0]
statement_2 = parsed_query[2]
operator = parsed_query[1]
queries = []
query_list = [statement_1, statement_2]
for index, statement in enumerate(query_list, start=0):
if any(isinstance(i, list) for i in statement):
statement = self.split_sub_query(statement)
elif isinstance(statement, list):
statement = ' '.join(map(str, statement))
if statement and len(statement) > MAX_QUERY_LENGTH:
statement = self.split_query_and_or_operator(statement)

if index == 0:
statement_1 = statement
else:
statement_2 = statement

if isinstance(statement_1, list) or isinstance(statement_2, list):
queries = self.attach_timestamp(statement_1, statement_2, operator)
return queries

combined_query = '(' + statement_1 + ') ' + operator + ' (' + statement_2 + ')'

if len(combined_query) > MAX_QUERY_LENGTH:
queries.append(statement_1)
queries.append(statement_2)
return queries
return combined_query
elif any(isinstance(i, list) for i in parsed_query):
# iterate for IN operator
return self.handle_translated_in_operator(parsed_query)
elif isinstance(parsed_query, list):
combined_query = '(' + ' '.join(map(str, parsed_query)) + ')'
if len(combined_query) > MAX_QUERY_LENGTH:
combined_query = self.split_query_and_or_operator(combined_query)
return combined_query
else:
return '(' + parsed_query + ')'

@staticmethod
def split_query_and_or_operator(parsed_query, query_len=MAX_QUERY_LENGTH):
""" Split the query based on AND/OR operator.
param query: str, MAX_QUERY_LENGTH: int
return queries: list or str """
start = 0
index = 0
result = []
combined_query = ''

while index < len(parsed_query):
if 'OR' in parsed_query[index:len(parsed_query)]:
index = parsed_query[index:len(parsed_query)].index('OR')
index = start + index + 2
elif 'AND' in parsed_query[index:len(parsed_query)]:
index = parsed_query[index:len(parsed_query)].index('AND')
index = start + index + 3
else:
index = len(parsed_query)
if len(combined_query + parsed_query[start: index]) > query_len:
if combined_query:
result.append('(' + combined_query.rstrip('OR').rstrip('AND') + ')')
if parsed_query[start: index]:
result.append('(' + parsed_query[start: index].lstrip('(').rstrip(')') + ')')
else:
combined_query += parsed_query[start: index]
combined_query = combined_query.lstrip('(').rstrip(')')
result.append('(' + combined_query + ')')
break
if len(combined_query + parsed_query[start: index]) > query_len and combined_query:
query = combined_query.rstrip('OR').rstrip('AND')
query = query.lstrip('(').rstrip(')').rstrip("'").rstrip(",")
if query:
result.append('(' + query + ')')
combined_query = ''
combined_query += parsed_query[start: index]
start = index
return result

@staticmethod
def combine_list_query(query_list):
""" Combine the query from list.
param query: list, return queries: list """

query = ''
queries = []

# exit the process for invalid arguments
if isinstance(query_list, str) or len(query_list) == 1:
return query_list

for index, each_query in enumerate(query_list, start=0):
if not query:
query = each_query
continue

combined_query = query + ' OR ' + each_query
if len(combined_query) > MAX_QUERY_LENGTH:
queries.append(query)
query = each_query
if len(query_list) - 1 == index:
queries.append(query)
continue

query = combined_query

if len(query_list) - 1 == index:
queries.append(query)
continue

return queries

@staticmethod
def attach_timestamp(statement_1, statement_2, operator):
"""Attach the timestamp in query"""
queries = []

if operator == 'AND':
if statement_2.count('@fields.epochdate') == 2 \
and len(statement_2) < TIMESTAMP_LENGTH:
if not (statement_2.count('(') and statement_2.count(')')):
statement_2 = '(' + statement_2 + ')'
queries = ['(' + row + ') AND ' + statement_2 for row in statement_1]
else:
return 'Unable to split'

if operator == 'OR':
queries += statement_1 if isinstance(statement_1, list) else [statement_1]
queries += statement_2 if isinstance(statement_2, list) else [statement_2]

return queries

def handle_translated_in_operator(self, parsed_query):
""" Process the translated query having IN operator in STIX pattern.
Params Query: List, return Query: List """
query = ''
queries = []
for row in parsed_query:
if isinstance(row, str):
query += row + ' '
elif isinstance(row, list):

combined_query = '(' + ' '.join(map(str, row)) + ')'

if combined_query.count('NOT') and any(isinstance(i, list) for i in row):
combined_query = self.split_sub_query(row)

query = query.lstrip('OR').lstrip('AND')

if isinstance(combined_query, list):
queries += [query + value for value in combined_query]
elif isinstance(combined_query, str) and len(combined_query) > MAX_QUERY_LENGTH:
separated = self.split_query_and_or_operator(combined_query, MAX_QUERY_LENGTH - len(query))
queries += [query + value for value in separated]
else:
queries += [query + combined_query]
query = ''
return queries
Loading

0 comments on commit 4624dba

Please sign in to comment.