Permalink
Browse files

Add Cookie support for attacks (#152)

* Add cookie support

* fix tests

* Remove unused parameter
  • Loading branch information...
rnehra01 authored and afeena committed Jun 17, 2017
1 parent 6471d69 commit 4df7fcbea6403711146a8a767e6a861c5e9da0e4
Showing with 41 additions and 24 deletions.
  1. +22 −10 tanner/emulators/base.py
  2. +0 −1 tanner/emulators/sqli.py
  3. +1 −1 tanner/server.py
  4. +18 −12 tanner/tests/test_base.py
View
@@ -17,6 +17,7 @@ def __init__(self, base_dir, db_name, loop=None):
}
self.get_emulators = ['sqli', 'rfi', 'lfi', 'xss', 'cmd_exec']
self.post_emulators = ['sqli', 'rfi', 'lfi', 'xss', 'cmd_exec']
self.cookie_emulators = ['sqli']
def extract_get_data(self, path):
"""
@@ -43,7 +44,7 @@ def extract_get_data(self, path):
attack_params = {}
for param_id, param_value in data.items():
for emulator in target_emulators:
possible_detection = self.emulators[emulator].scan(param_value)
possible_detection = self.emulators[emulator].scan(param_value) if param_value else None
if possible_detection:
if detection['order'] < possible_detection['order']:
detection = possible_detection
@@ -63,29 +64,40 @@ def extract_get_data(self, path):
detection = await self.get_emulation_result(session, post_data, self.post_emulators)
return detection
async def handle_get(self, session, path):
async def handle_cookies(self, session, data):
cookies = data['cookies']
detection = await self.get_emulation_result(session, cookies, self.cookie_emulators)
return detection
async def handle_get(self, session, data):
path = data['path']
get_data = self.extract_get_data(path)
detection = dict(name='unknown', order=0)
# dummy for wp-content
if re.match(patterns.WORD_PRESS_CONTENT, path):
detection = {'name': 'wp-content', 'order': 1}
if re.match(patterns.INDEX, path):
detection = {'name': 'index', 'order': 1}
possible_detection = await self.get_emulation_result(session, get_data, self.get_emulators)
if possible_detection and detection['order'] < possible_detection['order'] :
detection = possible_detection
# check attacks against get parameters
possible_get_detection = await self.get_emulation_result(session, get_data, self.get_emulators)
if possible_get_detection and detection['order'] < possible_get_detection['order'] :
detection = possible_get_detection
# check attacks against cookie values
possible_cookie_detection = await self.handle_cookies(session, data)
if possible_cookie_detection and detection['order'] < possible_cookie_detection['order'] :
detection = possible_cookie_detection
return detection
async def emulate(self, data, session, path):
async def emulate(self, data, session):
if data['method'] == 'POST':
detection = await self.handle_post(session, data)
else:
detection = await self.handle_get(session, path)
detection = await self.handle_get(session, data)
return detection
async def handle(self, data, session, path):
detection = await self.emulate(data, session, path)
async def handle(self, data, session):
detection = await self.emulate(data, session)
return detection
View
@@ -17,7 +17,6 @@ def __init__(self, db_name, working_dir):
self.query_map = None
def scan(self, value):
print(value)
detection = None
payload = bytes(value, 'utf-8')
sqli = pylibinjection.detect_sqli(payload)
View
@@ -53,7 +53,7 @@ def _make_response(msg):
)
self.logger.info('Requested path %s', path)
await self.dorks.extract_path(path, self.redis_client)
detection = await self.base_handler.handle(data, session, path)
detection = await self.base_handler.handle(data, session)
session.set_attack_type(path, detection["name"])
response_msg = self._make_response(msg=dict(detection=detection, sess_uuid=session.get_uuid()))
View
@@ -21,7 +21,8 @@ def mock_lfi_scan(value):
self.handler.emulators['lfi'].scan = mock_lfi_scan
def test_handle_sqli(self):
path = '/index.html?id=1 UNION SELECT 1'
data = dict(path= '/index.html?id=1 UNION SELECT 1',
cookies= {'sess_uuid': '9f82e5d0e6b64047bba996222d45e72c'})
async def mock_sqli_handle(path, session):
return 'sqli_test_payload'
@@ -33,13 +34,14 @@ def mock_sqli_scan(value):
self.handler.emulators['sqli'].handle = mock_sqli_handle
self.handler.emulators['sqli'].scan = mock_sqli_scan
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, path))
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, data))
assert_detection = {'name': 'sqli', 'order': 2, 'payload': 'sqli_test_payload'}
self.assertDictEqual(detection, assert_detection)
def test_handle_xss(self):
path = '/index.html?id=<script>alert(1);</script>'
data = dict(path= '/index.html?id=<script>alert(1);</script>',
cookies= {'sess_uuid': '9f82e5d0e6b64047bba996222d45e72c'})
async def mock_xss_handle(path, session):
return 'xss_test_payload'
@@ -51,13 +53,14 @@ def mock_xss_scan(value):
self.handler.emulators['xss'].handle = mock_xss_handle
self.handler.emulators['xss'].scan = mock_xss_scan
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, path))
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, data))
assert_detection = {'name': 'xss', 'order': 3, 'payload': 'xss_test_payload'}
self.assertDictEqual(detection, assert_detection)
def test_handle_lfi(self):
path = '/index.html?file=/etc/passwd'
data = dict(path= '/index.html?file=/etc/passwd',
cookies= {'sess_uuid': '9f82e5d0e6b64047bba996222d45e72c'})
async def mock_lfi_handle(attack_value, session):
return 'lfi_test_payload'
@@ -69,29 +72,32 @@ def mock_lfi_scan(value):
self.handler.emulators['lfi'].handle = mock_lfi_handle
self.handler.emulators['lfi'].scan = mock_lfi_scan
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, path))
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, data))
assert_detection = {'name': 'lfi', 'order': 2, 'payload': 'lfi_test_payload'}
self.assertDictEqual(detection, assert_detection)
def test_handle_index(self):
path = '/index.html'
data = dict(path= '/index.html',
cookies= {'sess_uuid': '9f82e5d0e6b64047bba996222d45e72c'})
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, path))
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, data))
assert_detection = detection = {'name': 'index', 'order': 1}
self.assertDictEqual(detection, assert_detection)
def test_handle_wp_content(self):
path = '/wp-content'
data = dict(path= '/wp-content',
cookies= {'sess_uuid': '9f82e5d0e6b64047bba996222d45e72c'})
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, path))
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, data))
assert_detection = detection = {'name': 'wp-content', 'order': 1}
self.assertDictEqual(detection, assert_detection)
def test_handle_rfi(self):
path = '/index.html?file=http://attack.php'
data = dict(path= '/index.html?file=http://attack.php',
cookies= {'sess_uuid': '9f82e5d0e6b64047bba996222d45e72c'})
async def mock_rfi_handle(path, session):
return 'rfi_test_payload'
@@ -103,7 +109,7 @@ def mock_rfi_scan(value):
self.handler.emulators['rfi'].handle = mock_rfi_handle
self.handler.emulators['rfi'].scan = mock_rfi_scan
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, path))
detection = self.loop.run_until_complete(self.handler.handle_get(self.session, data))
assert_detection = {'name': 'rfi', 'order': 2, 'payload': 'rfi_test_payload'}
self.assertDictEqual(detection, assert_detection)

0 comments on commit 4df7fcb

Please sign in to comment.