Permalink
Browse files

Improve base emulator architecture (#149)

* new base architecture

* Update cmd_exec emulator

* Update lfi emulator

* Update rfi emulator

* update attack_param to be passed

* Update sqli emulator

* Update xss emulator

* Update xss emulator

* add all emulators and remove comments

* fix sqli post error

* fix base

* support multiple param attacks

* fix tests

* make some class variables

* put comments

* update comments
  • Loading branch information...
rnehra01 authored and afeena committed Jun 13, 2017
1 parent 9f7adbd commit 6471d69c560b580b21106282f895f50021dc4310
View
@@ -6,15 +6,7 @@
from tanner.emulators import lfi, rfi, sqli, xss, cmd_exec
from tanner.utils import patterns
class BaseHandler:
# Reference patterns
patterns = {
patterns.RFI_ATTACK: dict(name='rfi', order=2),
patterns.LFI_ATTACK: dict(name='lfi', order=2),
patterns.XSS_ATTACK: dict(name='xss', order=3)
}
def __init__(self, base_dir, db_name, loop=None):
self.emulators = {
'rfi': rfi.RfiEmulator(base_dir, loop),
@@ -23,57 +15,66 @@ def __init__(self, base_dir, db_name, loop=None):
'sqli': sqli.SqliEmulator(db_name, base_dir),
'cmd_exec': cmd_exec.CmdExecEmulator()
}
self.get_emulators = ['sqli', 'rfi', 'lfi', 'xss', 'cmd_exec']
self.post_emulators = ['sqli', 'rfi', 'lfi', 'xss', 'cmd_exec']
async def handle_post(self, session, data):
def extract_get_data(self, path):
"""
Return all the GET parameter
:param path (str): The URL path from which GET parameters are to be extracted
:return: A MultiDictProxy object containg name and value of parameters
"""
path = urllib.parse.unquote(path)
encodings = [('&&', '%26%26'), (';', '%3B')]
for value, encoded_value in encodings:
path = path.replace(value, encoded_value)
get_data = yarl.URL(path).query
return get_data
async def get_emulation_result(self, session, data, target_emulators):
"""
Return emulation result for the vulnerabilty of highest order
:param session (Session object): Current active session
:param data (MultiDictProxy object): Data to be checked
:param target_emulator (list): Emulators against which data is to be checked
:return: A dict object containing name, order and paylod to be injected for vulnerability
"""
detection = dict(name='unknown', order=0)
xss_result = await self.emulators['xss'].handle(None, session, data)
if xss_result:
detection = {'name': 'xss', 'order': 2, 'payload': xss_result}
else:
sqli_data = self.emulators['sqli'].check_post_data(data)
if sqli_data:
sqli_result = await self.emulators['sqli'].handle(sqli_data, session, 1)
detection = {'name': 'sqli', 'order': 2, 'payload': sqli_result}
else:
cmd_exec_data = await self.emulators['cmd_exec'].check_post_data(data)
if cmd_exec_data:
cmd_exec_results = await self.emulators['cmd_exec'].handle(cmd_exec_data[0][1], session)
detection = {'name': 'cmd_exec', 'order': 3, 'payload': cmd_exec_results}
attack_params = {}
for param_id, param_value in data.items():
for emulator in target_emulators:
possible_detection = self.emulators[emulator].scan(param_value)
if possible_detection:
if detection['order'] < possible_detection['order']:
detection = possible_detection
if emulator not in attack_params:
attack_params[emulator] = []
attack_params[emulator].append(dict(id= param_id, value= param_value))
if detection['name'] in self.emulators:
emulation_result = await self.emulators[detection['name']].handle(attack_params[detection['name']], session)
detection['payload'] = emulation_result
return detection
async def handle_post(self, session, data):
post_data = data['post_data']
detection = await self.get_emulation_result(session, post_data, self.post_emulators)
return detection
async def handle_get(self, session, path):
get_data = self.extract_get_data(path)
detection = dict(name='unknown', order=0)
# dummy for wp-content
if re.match(patterns.WORD_PRESS_CONTENT, path):
detection = {'name': 'wp-content', 'order': 1}
if re.match(patterns.INDEX, path):
detection = {'name': 'index', 'order': 1}
path = urllib.parse.unquote(path)
query = yarl.URL(path).query
for name, value in query.items():
for pattern, patter_details in self.patterns.items():
if pattern.match(value):
if detection['order'] < patter_details['order']:
detection = patter_details
attack_value = value
if detection['order'] <= 1:
cmd_exec = await self.emulators['cmd_exec'].check_get_data(path)
if cmd_exec:
detection = {'name': 'cmd_exec', 'order': 3}
attack_value = cmd_exec[0][1]
else:
sqli = self.emulators['sqli'].check_get_data(path)
if sqli:
detection = {'name': 'sqli', 'order': 2}
attack_value = path
if detection['name'] in self.emulators:
emulation_result = await self.emulators[detection['name']].handle(attack_value, session)
detection['payload'] = emulation_result
possible_detection = await self.get_emulation_result(session, get_data, self.get_emulators)
if possible_detection and detection['order'] < possible_detection['order'] :
detection = possible_detection
return detection
@@ -87,4 +88,4 @@ def __init__(self, base_dir, db_name, loop=None):
async def handle(self, data, session, path):
detection = await self.emulate(data, session, path)
return detection
return detection
@@ -75,25 +75,13 @@ def __init__(self):
except docker.errors.APIError as server_error:
self.logger.error('Error while removing container %s', server_error)
async def check_post_data(self, data):
cmd_data = []
for (param_id, param_value) in data['post_data'].items():
if patterns.CMD_ATTACK.match(param_value):
cmd_data.append((param_id, param_value))
return cmd_data
async def check_get_data(self, path):
cmd_data = []
query = yarl.URL(path).query_string
params = query.split('&')
for param in params:
if len(param.split('=')) == 2:
param_id, param_value = param.split('=')
if patterns.CMD_ATTACK.match(param_value):
cmd_data.append((param_id, param_value))
return cmd_data
def scan(self, value):
detection = None
if patterns.CMD_ATTACK.match(value):
detection = dict(name= 'cmd_exec', order= 3)
return detection
async def handle(self, value, session= None):
async def handle(self, attack_params, session= None):
container = await self.create_attacker_env(session)
result = await self.get_cmd_exec_results(container, value)
result = await self.get_cmd_exec_results(container, attack_params[0]['value'])
return result
View
@@ -50,10 +50,15 @@ def setup_or_update_vdocs(self):
with open(filename, 'w') as vd:
vd.write(value)
def scan(self, value):
detection = None
if patterns.LFI_ATTACK.match(value):
detection = dict(name= 'lfi', order= 2)
return detection
async def handle(self, path, session=None):
async def handle(self, attack_params, session=None):
if not self.whitelist:
self.available_files()
file_path = self.get_file_path(path)
file_path = self.get_file_path(attack_params[0]['value'])
result = self.get_lfi_result(file_path)
return result
View
@@ -87,8 +87,14 @@ def download_file_ftp(self, url):
await session.close()
return rfi_result
async def handle(self, path, session=None):
result = await self.get_rfi_result(path)
def scan(self, value):
detection = None
if patterns.RFI_ATTACK.match(value):
detection = dict(name= 'rfi', order= 2)
return detection
async def handle(self, attack_params, session=None):
result = await self.get_rfi_result(attack_params[0]['value'])
if not result or 'stdout' not in result:
return ''
else:
View
@@ -16,41 +16,23 @@ def __init__(self, db_name, working_dir):
self.query_map = None
@staticmethod
def check_sqli(path):
payload = bytes(path, 'utf-8')
def scan(self, value):
print(value)
detection = None
payload = bytes(value, 'utf-8')
sqli = pylibinjection.detect_sqli(payload)
return int(sqli['sqli'])
if int(sqli['sqli']):
detection = dict(name= 'sqli', order= 2)
return detection
def check_post_data(self, data):
sqli_data = []
for (param, value) in data['post_data'].items():
sqli = self.check_sqli(value)
if sqli:
sqli_data.append((param, value))
return sqli_data
def check_get_data(self, path):
request_query = urllib.parse.urlparse(path).query
parsed_queries = urllib.parse.parse_qsl(request_query)
for query in parsed_queries:
sqli = self.check_sqli(query[1])
return sqli
@staticmethod
def prepare_get_query(path):
query = urllib.parse.urlparse(path).query
parsed_query = urllib.parse.parse_qsl(query)
return parsed_query
def map_query(self, query):
def map_query(self, attack_value):
db_query = None
param = query[0][0]
param_value = query[0][1].replace('\'', ' ')
param = attack_value['id']
param_value = attack_value['value'].replace('\'', ' ')
tables = []
for table, columns in self.query_map.items():
for column in columns:
if query[0][0] == column['name']:
if param == column['name']:
tables.append(dict(table_name=table, column=column))
if tables:
@@ -61,24 +43,22 @@ def map_query(self, query):
return db_query
async def get_sqli_result(self, query, attacker_db):
db_query = self.map_query(query)
async def get_sqli_result(self, attack_value, attacker_db):
db_query = self.map_query(attack_value)
if db_query is None:
result = 'You have an error in your SQL syntax; check the manual\
that corresponds to your MySQL server version for the\
right syntax to use near {} at line 1'.format(query[0][0])
right syntax to use near {} at line 1'.format(attack_value['id'])
else:
execute_result = await self.sqli_emulator.execute_query(db_query, attacker_db)
if isinstance(execute_result, list):
execute_result = ' '.join([str(x) for x in execute_result])
result = dict(value=execute_result, page='/index.html')
return result
async def handle(self, path, session, post_request=0):
async def handle(self, attack_params, session):
if self.query_map is None:
self.query_map = await self.sqli_emulator.setup_db(self.query_map)
if not post_request:
path = self.prepare_get_query(path)
attacker_db = await self.sqli_emulator.create_attacker_db(session)
result = await self.get_sqli_result(path, attacker_db)
result = await self.get_sqli_result(attack_params[0], attacker_db)
return result
View
@@ -6,27 +6,25 @@
class XssEmulator:
@staticmethod
def extract_xss_data(data):
value = ''
if 'post_data' in data:
for field, val in data['post_data'].items():
val = urllib.parse.unquote(val)
xss = re.match(patterns.HTML_TAGS, val)
if xss:
value += val if not value else '\n' + val
return value
def scan(self, value):
detection = None
if patterns.XSS_ATTACK.match(value):
detection = dict(name= 'xss', order= 3)
return detection
def get_xss_result(self, session, val):
def get_xss_result(self, session, attack_params):
result = None
injectable_page = None
value = ''
if session:
injectable_page = self.set_xss_page(session)
if injectable_page is None:
injectable_page = '/index.html'
if val:
result = dict(value=val,
page=injectable_page)
for param in attack_params:
value += param['value'] if not value else '\n' + param['value']
result = dict(value=value,
page=injectable_page)
return result
@staticmethod
@@ -37,9 +35,7 @@ def set_xss_page(session):
injectable_page = page['path']
return injectable_page
async def handle(self, value, session, raw_data=None):
async def handle(self, attack_params, session):
xss_result = None
if not value:
value = self.extract_xss_data(raw_data)
xss_result = self.get_xss_result(session, value)
xss_result = self.get_xss_result(session, attack_params)
return xss_result
Oops, something went wrong.

0 comments on commit 6471d69

Please sign in to comment.