Permalink
Browse files

Snare tanner communication (#184)

* Make different type of detection

* Make emulator compatible with the new structure

* fix tests

* fix typo
  • Loading branch information...
rnehra01 authored and afeena committed Aug 16, 2017
1 parent d063e77 commit b3d5ec066e8f0e224a272c6c0827f0c62adb30e8
View
@@ -58,11 +58,12 @@ def extract_get_data(self, path):
if emulator not in attack_params:
attack_params[emulator] = []
attack_params[emulator].append(dict(id=param_id, value=param_value))
if detection['name'] in self.emulators:
emulation_result = await self.emulators[detection['name']].handle(attack_params[detection['name']], session)
detection['payload'] = emulation_result
if emulation_result:
detection['payload'] = emulation_result
return detection
async def handle_post(self, session, data):
@@ -113,11 +114,18 @@ def set_injectable_page(session):
else:
detection = await self.handle_get(session, data)
if 'payload' in detection and type(detection['payload']) is dict:
injectable_page = self.set_injectable_page(session)
if injectable_page is None:
injectable_page = '/index.html'
detection['payload']['page'] = injectable_page
if 'payload' not in detection:
detection['type'] = 1
elif 'payload' in detection:
if 'status_code' not in detection['payload']:
detection['type'] = 2
if detection['payload']['page']:
injectable_page = self.set_injectable_page(session)
if injectable_page is None:
injectable_page = '/index.html'
detection['payload']['page'] = injectable_page
else:
detection['type'] = 3
return detection
@@ -17,7 +17,7 @@ def __init__(self):
async def get_cmd_exec_results(self, container, cmd):
execute_result = await self.helper.execute_cmd(container, cmd)
result = dict(value= execute_result, page= '/index.html')
result = dict(value=execute_result, page=True)
return result
def scan(self, value):
@@ -29,4 +29,4 @@ def scan(self, value):
async def handle(self, attack_params, session= None):
container = await self.create_attacker_env(session)
result = await self.get_cmd_exec_results(container, attack_params[0]['value'])
return result
return result
View
@@ -35,5 +35,6 @@ def scan(self, value):
result = None
container = await self.setup_virtual_env()
if container:
result = await self.get_lfi_result(container, attack_params[0]['value'])
lfi_result = await self.get_lfi_result(container, attack_params[0]['value'])
result = dict(value=lfi_result, page=False)
return result
@@ -31,5 +31,5 @@ def scan(self, value):
async def handle(self, attack_params, session=None):
result = await self.get_injection_result(attack_params[0]['value'])
if not result or 'stdout' not in result:
return ''
return result['stdout']
return dict(status_code=504)
return dict(value=result['stdout'], page=False)
View
@@ -96,6 +96,6 @@ def scan(self, value):
async def handle(self, attack_params, session=None):
result = await self.get_rfi_result(attack_params[0]['value'])
if not result or 'stdout' not in result:
return ''
return dict(value='', page=True)
else:
return result['stdout']
return dict(value=result['stdout'], page=False)
View
@@ -52,7 +52,7 @@ def map_query(self, attack_value):
execute_result = await self.sqli_emulator.execute_query(db_query, attacker_db)
if isinstance(execute_result, list):
execute_result = ' '.join([str(x) for x in execute_result])
result = dict(value=execute_result)
result = dict(value=execute_result, page=True)
return result
async def handle(self, attack_params, session):
View
@@ -18,7 +18,7 @@ def get_xss_result(self, session, attack_params):
value = ''
for param in attack_params:
value += param['value'] if not value else '\n' + param['value']
result = dict(value=value)
result = dict(value=value, page=True)
return result
async def handle(self, attack_params, session):
@@ -13,14 +13,14 @@ def setUp(self):
def test_handle_abspath_lfi(self):
attack_params = [dict(id= 'foo', value= '/etc/passwd')]
result = self.loop.run_until_complete(self.handler.handle(attack_params))
self.assertIn('root:x:0:0:root:/root:/bin/sh', result)
self.assertIn('root:x:0:0:root:/root:/bin/sh', result['value'])
def test_handle_relative_path_lfi(self):
attack_params = [dict(id= 'foo', value= '../../../../../etc/passwd')]
result = self.loop.run_until_complete(self.handler.handle(attack_params))
self.assertIn('root:x:0:0:root:/root:/bin/sh', result)
self.assertIn('root:x:0:0:root:/root:/bin/sh', result['value'])
def test_handle_missing_lfi(self):
attack_params = [dict(id= 'foo', value= '../../../../../etc/bar')]
result = self.loop.run_until_complete(self.handler.handle(attack_params))
self.assertIn('No such file or directory', result)
self.assertIn('No such file or directory', result['value'])
@@ -47,7 +47,7 @@ def test_get_sqli_result(self):
self.handler.sqli_emulator = mock.Mock()
self.handler.sqli_emulator.execute_query = mock_execute_query
assert_result = dict(value="[1, 'name', 'email@mail.com', 'password'] [1, '2', '3', '4']")
assert_result = dict(value="[1, 'name', 'email@mail.com', 'password'] [1, '2', '3', '4']", page=True)
result = self.loop.run_until_complete(self.handler.get_sqli_result(attack_value, 'foo.db'))
self.assertEqual(assert_result, result)
@@ -57,4 +57,4 @@ def test_get_sqli_result_error(self):
that corresponds to your MySQL server version for the\
right syntax to use near foo at line 1'
result = self.loop.run_until_complete(self.handler.get_sqli_result(attack_value, 'foo.db'))
self.assertEqual(assert_result, result)
self.assertEqual(assert_result, result)
@@ -24,5 +24,5 @@ def test_xss(self):
attack_params = [dict(id= 'foo', value= '<script>alert(\'xss\');</script>')]
xss = self.loop.run_until_complete(self.handler.handle(attack_params, None))
assert_result = dict(value=attack_params[0]['value'])
assert_result = dict(value=attack_params[0]['value'], page=True)
self.assertDictEqual(xss, assert_result)

0 comments on commit b3d5ec0

Please sign in to comment.