Skip to content

Commit

Permalink
Cut down ginormous methods in DebianParser
Browse files Browse the repository at this point in the history
  • Loading branch information
jajajasalu2 committed Jul 5, 2019
1 parent c24f115 commit 857ef1a
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 67 deletions.
115 changes: 48 additions & 67 deletions patchfinder/parsers/debian_parser.py
Expand Up @@ -3,12 +3,9 @@
import re
import shutil
import tarfile
import urllib.error
import urllib.request
import urllib.parse
import patchfinder.settings as settings
import patchfinder.utils as utils
from bs4 import BeautifulSoup

logger = logging.getLogger(__name__)

Expand All @@ -34,6 +31,7 @@ def __init__(self):
self.file_end_block = re.compile(r'^CVE')
self.fixed_packages = []
self.package_paths = []
self.patches = []


def parse(self, vuln_id):
Expand All @@ -43,12 +41,19 @@ def parse(self, vuln_id):
and retrieved. The debian/patches folder in these packages is checked
for patches that are relevant to the vulnerability. A list of patches
found is returned.
Args:
vuln_id: Self explanatory
Returns:
A list of patches found
"""
self._clean()
self.set_context(vuln_id)
self.find_fixed_packages()
self.retrieve_packages()
return self.extract_patches()
self.extract_patches()
return self.patches


def set_context(self, vuln_id):
Expand Down Expand Up @@ -84,7 +89,6 @@ def find_fixed_packages(self):

logger.info("Looking for fixed packages...")
utils.download_item(self.cve_list_url, self.cve_file)
vuln_found = 0
logger.info("Looking for %s in %s", self.vuln_id, self.cve_file)
pkg_vers = utils.parse_raw_file(self.cve_file, self.file_start_block,
self.file_end_block, self.pkg_ver_line)
Expand All @@ -104,48 +108,30 @@ def retrieve_packages(self):
"""

for package in self.fixed_packages:
pkg = package['package']
ver = package['version']
snapshot_url = 'https://snapshot.debian.org/package/{pkg}/{ver}/' \
.format(pkg=package['package'],
ver=package['version'])
logger.info("Looking for package %s version %s in %s",
package['package'],
package['version'],
snapshot_url)

try:
snapshot_html = urllib.request.urlopen(snapshot_url)
except urllib.error.HTTPError as e:
raise Exception("Error opening {url}".format(url=snapshot_url))
logger.info("Crawled %s", snapshot_url)

soup = BeautifulSoup(snapshot_html, 'html.parser')
quoted_package = urllib.parse.quote(package['package'])
quoted_version = urllib.parse.quote(package['version'])
.format(pkg=pkg,
ver=ver)
find_pkg = re.compile(r'/({pkg}_{ver}\.(debian\.tar\..+|diff\..+' \
r'))$'.format(pkg=quoted_package,
ver=quoted_version))
pkg_url = soup.find('a', href=find_pkg)
assert pkg_url, "Couldn't find package {pkg} {ver} on {url}" \
.format(pkg=package['package'],
ver=package['version'],
url=snapshot_url)
r'))$'.format(pkg=urllib.parse.quote(pkg),
ver=urllib.parse.quote(ver)))
pkg_url = utils.parse_web_page(snapshot_url, 'a', href=find_pkg)
if not pkg_url:
continue

pkg_url = urllib.parse.urljoin('https://snapshot.debian.org/',
pkg_url['href'])
pkg_name = find_pkg.search(pkg_url)
utils.download_item(pkg_url,
os.path.join(settings.DOWNLOAD_DIRECTORY,
pkg_name.group(1)))

self.package_paths.append({'path': \
os.path.join(settings.DOWNLOAD_DIRECTORY,
pkg_name.group(1)),
pkg_name = pkg_url.search(find_pkg).group(1)
pkg_path = os.path.join(settings.DOWNLOAD_DIRECTORY, pkg_name)
pkg_ext_path = os.path.join(settings.DOWNLOAD_DIRECTORY,
pkg + '_' + ver)

utils.download_item(pkg_url, pkg_path)
self.package_paths.append({'path': pkg_path,
'source': pkg_url,
'ext_path': \
os.path.join(settings.DOWNLOAD_DIRECTORY,
package['package'] + \
'_' + \
package['version'])})
'ext_path': pkg_ext_path
})


def extract_patches(self):
Expand All @@ -156,37 +142,32 @@ def extract_patches(self):
If found, the relevant patches are determined w/r/t the vuln id.
"""

patches = []
for package in self.package_paths:
logger.info("Looking for patches in %s", package['path'])
if tarfile.is_tarfile(package['path']):
tar = tarfile.open(package['path'])
try:
if 'debian' in tar.getnames():
logger.info("debian folder found in %s", package['path'])
tar.extractall(package['ext_path'])
finally:
tar.close()
logger.info("Contents extracted to %s", package['ext_path'])
patch_folder = os.path.join(package['ext_path'], \
'debian/patches/')
pkg_path = package['path']
pkg_ext_path = package['ext_path']
pkg_source = package['source']
logger.info("Looking for patches in %s", pkg_path)
if tarfile.is_tarfile(pkg_path):
if not utils.member_in_tarfile(pkg_path, 'debian'):
continue
tarfile.extractall(pkg_path, pkg_ext_path)

logger.info("Contents extracted to %s", pkg_ext_path)

patch_directory = os.path.join(pkg_ext_path, 'debian/patches/')
try:
if not os.path.isdir(patch_folder):
continue
logger.info("Looking for patches in %s", patch_folder)
for f in os.listdir(patch_folder):
if f.find(self.vuln_id) is not -1:
logger.info("Patch found: %s", f)
patches.append({'patch_link': \
os.path.join(patch_folder, f),
'reaching_path': \
package['source']})
files = utils.find_in_directory(patch_directory,
self.vuln_id)
finally:
logging.info("Deleting %s", package['ext_path'])
shutil.rmtree(package['ext_path'])
return patches
logger.info("Deleting %s", pkg_ext_path)
shutil.rmtree(pkg_ext_path)

for f in files:
self.patches.append({'patch_link': f,
'reaching_path': pkg_source})


def _clean(self):
self.fixed_packages = []
self.package_paths = []
self.patches = []
60 changes: 60 additions & 0 deletions patchfinder/utils.py
@@ -1,7 +1,10 @@
import os
import re
import logging
import tarfile
import urllib.request
import urllib.error
from bs4 import BeautifulSoup

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -35,6 +38,19 @@ def parse_raw_file(file_name, start_block, end_block, search_params):
f.close()


def parse_web_page(url, tag, **search_params):
try:
html = urllib.request.urlopen(url)
except urllib.error.HTTPError as e:
raise Exception("Error opening {url}".format(url=url))
logger.info("Crawled %s", url)
soup = BeautifulSoup(html, 'html.parser')

#currently returns only one item, use find_all for multiple
search_results = soup.find(tag, **search_params)
return search_results


def download_item(url, save_as, overwrite=False):
"""Download an item
Expand All @@ -55,3 +71,47 @@ def download_item(url, save_as, overwrite=False):
os.makedirs(parent_dir)
urllib.request.urlretrieve(url, save_as)
logger.info("Downloaded %s...", url)


def member_in_tarfile(tar_file, member):
"""Determine if member is a member of a tarfile
Args:
tar_file: The path to the tarfile
member: Name of the member to be searched for
Returns:
True if member is a member of the tarfile, false otherwise
"""
tar = tarfile.open(tar_file)
try:
if member in tar.getnames():
logger.info("%s found in %s", member, tar_file)
return True
finally:
tar.close()
return False


#NOTE: This method could use a recursive and regex based search
def find_in_directory(directory, file_name):
"""Look for a file in a directory
If multiple files which have the given file name in their names are
found, these are also returned.
Args:
directory: The path to the directory
file_name: Name of the file to be searched for
Yields:
Files with file_name in their names
"""
if not os.path.isdir(directory):
logger.info("Can't find %s", directory)
return
logger.info("Looking for %s in %s", file_name, directory)
for f in os.listdir(directory):
if f.find(file_name) is not -1:
logger.info("Found: %s", f)
yield os.path.join(directory, f)
Binary file added tests/mocks/openjpeg2_2.1.1-1.debian.tar.xz
Binary file not shown.
22 changes: 22 additions & 0 deletions tests/test_utils.py
@@ -1,3 +1,4 @@
import os
import re
import unittest
import unittest.mock as mock
Expand All @@ -23,6 +24,14 @@ def test_parse_raw_file_with_debian_params(self):
self.assertEqual(len(match.groups()), 2)


def test_parse_web_page(self):
url = 'file://' + os.path.abspath('./tests/mocks/3.html')
href = 'https://bugzilla.redhat.com/show_bug.cgi?id=1317826'
regex = re.compile(r'/show_bug\.cgi\?id=\d{7}$')
search_results = utils.parse_web_page(url, 'a', href=regex)
self.assertEqual(search_results['href'], href)


@mock.patch('patchfinder.utils.urllib.request')
@mock.patch('patchfinder.utils.os')
def test_download_item_file_exists(self, mock_os, mock_urllib_request):
Expand All @@ -32,6 +41,7 @@ def test_download_item_file_exists(self, mock_os, mock_urllib_request):
utils.download_item(file_url, file_name)
mock_os.path.isfile.assert_called_with(file_name)
mock_os.path.split.assert_not_called()
mock_urllib_request.urlretrieve.assert_not_called()


@mock.patch('patchfinder.utils.urllib.request')
Expand All @@ -47,3 +57,15 @@ def test_download_item_file_not_exists(self, mock_os, mock_urllib_request):
mock_os.path.split.assert_called_with(file_name)
mock_os.makedirs.assert_called_once()
mock_urllib_request.urlretrieve.assert_called_with(file_url, file_name)


def test_find_in_directory(self):
files = list(utils.find_in_directory('./tests/mocks', 'mock'))
self.assertIn('./tests/mocks/mock_debian_cve_list', files)
self.assertIn('./tests/mocks/mock_file', files)


def test_member_in_tarfile(self):
tar_file = './tests/mocks/openjpeg2_2.1.1-1.debian.tar.xz'
self.assertTrue(utils.member_in_tarfile(tar_file, 'debian'))
self.assertFalse(utils.member_in_tarfile(tar_file, 'deb'))

0 comments on commit 857ef1a

Please sign in to comment.