Skip to content

Commit

Permalink
Merge pull request #386 from pyupio/fix/safety-non-cli-support
Browse files Browse the repository at this point in the history
Safety support as a package
  • Loading branch information
yeisonvargasf committed Jun 23, 2022
2 parents 6d78db8 + 3a5d425 commit 7ac6461
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 91 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[![Travis](https://img.shields.io/travis/pyupio/safety.svg)](https://travis-ci.org/pyupio/safety)
[![Updates](https://pyup.io/repos/github/pyupio/safety/shield.svg)](https://pyup.io/repos/github/pyupio/safety/)

Safety checks your installed Python dependencies for known security vulnerabilities and suggests the proper remediations for vulnerabilities detected. Safety can be run on developer machines, in CI/CD pipelines and on production systems.
Safety checks Python dependencies for known security vulnerabilities and suggests the proper remediations for vulnerabilities detected. Safety can be run on developer machines, in CI/CD pipelines and on production systems.

By default it uses the open Python vulnerability database [Safety DB](https://github.com/pyupio/safety-db), which is **licensed for non-commercial use only**.

Expand Down
58 changes: 42 additions & 16 deletions safety/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@

LOG = logging.getLogger(__name__)


@click.group()
@click.option('--debug/--no-debug', default=False)
@click.option('--telemetry/--disable-telemetry', default=True)
@click.version_option(version=get_safety_version())
@click.pass_context
def cli(ctx, debug, telemetry):
"""
Safety checks Python dependencies for known security vulnerabilities and suggests the proper
remediations for vulnerabilities detected. Safety can be run on developer machines, in CI/CD pipelines and
on production systems.
"""
ctx.telemetry = telemetry
level = logging.CRITICAL
if debug:
Expand All @@ -47,17 +53,21 @@ def cli(ctx, debug, telemetry):
with_values={"output": ['json', 'bare'], "json": [True, False], "bare": [True, False]},
help='Full reports include a security advisory (if available). Default: --short-report')
@click.option("--cache", is_flag=False, flag_value=60, default=0,
help="Cache requests to the vulnerability database locally. Default: 0 seconds")
@click.option("--stdin/--no-stdin", default=False, cls=MutuallyExclusiveOption, mutually_exclusive=["files"],
help="Read input from stdin. Default: --no-stdin")
@click.option("files", "--file", "-r", multiple=True, type=click.File(), cls=MutuallyExclusiveOption, mutually_exclusive=["stdin"],
help="Cache requests to the vulnerability database locally. Default: 0 seconds",
hidden=True)
@click.option("--stdin", default=False, cls=MutuallyExclusiveOption, mutually_exclusive=["files"],
help="Read input from stdin.", is_flag=True, show_default=True)
@click.option("files", "--file", "-r", multiple=True, type=click.File(), cls=MutuallyExclusiveOption,
mutually_exclusive=["stdin"],
help="Read input from one (or multiple) requirement files. Default: empty")
@click.option("--ignore", "-i", multiple=True, type=str, default=[], callback=transform_ignore,
help="Ignore one (or multiple) vulnerabilities by ID. Default: empty")
@click.option('--json/--no-json', default=False, cls=MutuallyExclusiveOption, mutually_exclusive=["output", "bare"],
with_values={"output": ['screen', 'text', 'bare', 'json'], "bare": [True, False]}, callback=json_alias)
@click.option('--bare/--not-bare', default=False, cls=MutuallyExclusiveOption, mutually_exclusive=["output", "json"],
with_values={"output": ['screen', 'text', 'bare', 'json'], "json": [True, False]}, callback=bare_alias)
@click.option('--json', default=False, cls=MutuallyExclusiveOption, mutually_exclusive=["output", "bare"],
with_values={"output": ['screen', 'text', 'bare', 'json'], "bare": [True, False]}, callback=json_alias,
hidden=True, is_flag=True, show_default=True)
@click.option('--bare', default=False, cls=MutuallyExclusiveOption, mutually_exclusive=["output", "json"],
with_values={"output": ['screen', 'text', 'bare', 'json'], "json": [True, False]}, callback=bare_alias,
hidden=True, is_flag=True, show_default=True)
@click.option('--output', "-o", type=click.Choice(['screen', 'text', 'json', 'bare'], case_sensitive=False),
default='screen', callback=active_color_if_needed, envvar='SAFETY_OUTPUT')
@click.option("--proxy-protocol", "-pr", type=click.Choice(['http', 'https']), default='https', cls=DependentOption, required_options=['proxy_host'],
Expand All @@ -70,15 +80,19 @@ def cli(ctx, debug, telemetry):
help="Output standard exit codes. Default: --exit-code")
@click.option("--policy-file", type=SafetyPolicyFile(), default='.safety-policy.yml',
help="Define the policy file to be used")
@click.option("--save-json", default="", help="Path to where output file will be placed. Default: empty")
@click.option("--save-json", default="", help="Path to where output file will be placed, if the path is a directory, "
"Safety will use safety-report.json as filename. Default: empty")
@click.pass_context
def check(ctx, key, db, full_report, stdin, files, cache, ignore, output, json, bare, proxy_protocol, proxy_host, proxy_port,
exit_code, policy_file, save_json):
"""
Find vulnerabilities in Python dependencies at the target provided.
"""
LOG.info('Running check command')

try:
packages = get_packages(files, stdin)
ctx.obj = packages
proxy_dictionary = get_proxy_dict(proxy_protocol, proxy_host, proxy_port)

announcements = []
Expand All @@ -89,14 +103,15 @@ def check(ctx, key, db, full_report, stdin, files, cache, ignore, output, json,
ignore_severity_rules = None
ignore, ignore_severity_rules, exit_code = get_processed_options(policy_file, ignore,
ignore_severity_rules, exit_code)
ctx.continue_on_error = not exit_code
ctx.ignore_severity_rules = ignore_severity_rules

is_env_scan = not stdin and not files
params = {'stdin': stdin, 'files': files, 'policy_file': policy_file, 'continue_on_error': not exit_code,
'ignore_severity_rules': ignore_severity_rules}
LOG.info('Calling the check function')
vulns, db_full = safety.check(packages=packages, key=key, db_mirror=db, cached=cache, ignore_vulns=ignore,
ignore_severity_rules=ignore_severity_rules, proxy=proxy_dictionary,
include_ignored=True, is_env_scan=is_env_scan, telemetry=ctx.parent.telemetry)
include_ignored=True, is_env_scan=is_env_scan, telemetry=ctx.parent.telemetry,
params=params)
LOG.debug('Vulnerabilities returned: %s', vulns)
LOG.debug('full database returned is None: %s', db_full is None)

Expand All @@ -117,11 +132,15 @@ def check(ctx, key, db, full_report, stdin, files, cache, ignore, output, json,
LOG.info('All vulnerabilities found (ignored and Not ignored): %s', len(vulns))

if save_json:
default_name = 'safety-report.json'
json_report = output_report

if output != 'json':
json_report = SafetyFormatter(output='json').render_vulnerabilities(announcements, vulns, remediations,
full_report, packages)
if os.path.isdir(save_json):
save_json = os.path.join(save_json, default_name)

with open(save_json, 'w+') as output_json_file:
output_json_file.write(json_report)

Expand Down Expand Up @@ -152,6 +171,9 @@ def check(ctx, key, db, full_report, stdin, files, cache, ignore, output, json,
help="Read input from an insecure report file. Default: empty")
@click.pass_context
def review(ctx, full_report, output, file):
"""
Show an output from a previous exported JSON report.
"""
LOG.info('Running check command')
announcements = safety.get_announcements(key=None, proxy=None, telemetry=ctx.parent.telemetry)
report = {}
Expand All @@ -166,7 +188,8 @@ def review(ctx, full_report, output, file):
exception = e if isinstance(e, SafetyException) else SafetyException(info=e)
output_exception(exception, exit_code_output=True)

vulns, remediations, packages = safety.review(report)
params = {'file': file}
vulns, remediations, packages = safety.review(report, params=params)

output_report = SafetyFormatter(output=output).render_vulnerabilities(announcements, vulns, remediations,
full_report, packages)
Expand Down Expand Up @@ -197,6 +220,9 @@ def review(ctx, full_report, output, file):
help="Proxy protocol (https or http) --proxy-protocol")
@click.pass_context
def license(ctx, key, db, output, cache, files, proxyprotocol, proxyhost, proxyport):
"""
Find the open source licenses used by your Python dependencies.
"""
LOG.info('Running license command')
packages = get_packages(files, False)
ctx.obj = packages
Expand Down Expand Up @@ -230,7 +256,7 @@ def license(ctx, key, db, output, cache, files, proxyprotocol, proxyhost, proxyp
@click.argument('name')
@click.pass_context
def generate(ctx, name, path):
"""create a basic supported file type.
"""Create a boilerplate supported file type.
NAME is the name of the file type to generate. Valid values are: policy_file
"""
Expand Down Expand Up @@ -270,7 +296,7 @@ def generate(ctx, name, path):
@click.argument('name')
@click.pass_context
def validate(ctx, name, path):
"""verify a supported file type.
"""Verify the validity of a supported file type.
NAME is the name of the file type to validate. Valid values are: policy_file
"""
Expand Down
48 changes: 23 additions & 25 deletions safety/output_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import click

from safety.constants import RED, YELLOW
from safety.util import get_safety_version, Package, get_terminal_size
from safety.util import get_safety_version, Package, get_terminal_size, SafetyContext

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -136,7 +136,7 @@ def format_vulnerability(vulnerability, full_mode, only_text=False, columns=get_
{'value': vulnerability.advisory.replace('\n', '')}]}
]

if click.get_current_context().params.get('key', False):
if SafetyContext().key:
fixed_version_line = {'words': [
{'style': {'bold': True}, 'value': 'Fixed versions: '},
{'value': ', '.join(vulnerability.fixed_versions) if vulnerability.fixed_versions else 'No known fix'}
Expand Down Expand Up @@ -357,12 +357,13 @@ def format_long_text(text, color='', columns=get_terminal_size().columns, start_


def get_printable_list_of_scanned_items(scanning_target):
context = click.get_current_context()
context = SafetyContext()

result = []
scanned_items_data = []

if scanning_target == 'environment':
locations = set([pkg.found for pkg in context.obj if isinstance(pkg, Package)])
locations = set([pkg.found for pkg in context.packages if isinstance(pkg, Package)])

for path in locations:
result.append([{'styled': False, 'value': '-> ' + path}])
Expand All @@ -374,7 +375,7 @@ def get_printable_list_of_scanned_items(scanning_target):
scanned_items_data.append(msg)

elif scanning_target == 'stdin':
scanned_stdin = [pkg.name for pkg in context.obj if isinstance(pkg, Package)]
scanned_stdin = [pkg.name for pkg in context.packages if isinstance(pkg, Package)]
value = 'No found packages in stdin'
scanned_items_data = [value]

Expand Down Expand Up @@ -435,8 +436,9 @@ def build_report_brief_section(columns=None, primary_announcement=None, report_t


def build_report_for_review_vuln_report(as_dict=False):
report_from_file = click.get_current_context().review
packages = click.get_current_context().obj
ctx = SafetyContext()
report_from_file = ctx.review
packages = ctx.packages

if as_dict:
return report_from_file
Expand Down Expand Up @@ -509,15 +511,15 @@ def build_scanned_count_sentence(packages):


def add_warnings_if_needed(brief_info):
ctx = click.get_current_context()
ctx = SafetyContext()
warnings = []

if ctx.obj:
if hasattr(ctx, 'continue_on_error') and ctx.continue_on_error:
if ctx.packages:
if ctx.params.get('continue_on_error', False):
warnings += [[{'style': True,
'value': '* Continue-on-error is enabled, so returning successful (0) exit code in all cases.'}]]

if hasattr(ctx, 'ignore_severity_rules') and ctx.ignore_severity_rules and not is_using_api_key():
if ctx.params.get('ignore_severity_rules', False) and not is_using_api_key():
warnings += [[{'style': True,
'value': '* Could not filter by severity, please upgrade your account to include severity data.'}]]

Expand All @@ -528,18 +530,18 @@ def add_warnings_if_needed(brief_info):
def get_report_brief_info(as_dict=False, report_type=1, **kwargs):
LOG.info('get_report_brief_info: %s, %s, %s', as_dict, report_type, kwargs)

context = click.get_current_context()
context = SafetyContext()

packages = [pkg for pkg in context.obj if isinstance(pkg, Package)]
packages = [pkg for pkg in context.packages if isinstance(pkg, Package)]
brief_data = {}
command = context.command.name
command = context.command

if command == 'review':
review = build_report_for_review_vuln_report(as_dict)
return review

key = context.params.get('key', False)
db = context.params.get('db', False)
key = context.key
db = context.db_mirror

scanning_types = {'check': {'name': 'Vulnerabilities', 'action': 'Scanning dependencies', 'scanning_target': 'environment'}, # Files, Env or Stdin
'license': {'name': 'Licenses', 'action': 'Scanning licenses', 'scanning_target': 'environment'}, # Files or Env
Expand All @@ -552,14 +554,14 @@ def get_report_brief_info(as_dict=False, report_type=1, **kwargs):
scanning_types[command]['scanning_target'] = target
break

scanning_target = scanning_types.get(context.command.name, {}).get('scanning_target', '')
scanning_target = scanning_types.get(context.command, {}).get('scanning_target', '')
brief_data['scan_target'] = scanning_target
scanned_items, data = get_printable_list_of_scanned_items(scanning_target)
brief_data['scanned'] = data
nl = [{'style': False, 'value': ''}]

action_executed = [
{'style': True, 'value': scanning_types.get(context.command.name, {}).get('action', '')},
{'style': True, 'value': scanning_types.get(context.command, {}).get('action', '')},
{'style': False, 'value': ' in your '},
{'style': True, 'value': scanning_target + ':'},
]
Expand Down Expand Up @@ -618,7 +620,7 @@ def get_report_brief_info(as_dict=False, report_type=1, **kwargs):
brief_info = [[{'style': False, 'value': 'Safety '},
{'style': True, 'value': 'v' + get_safety_version()},
{'style': False, 'value': ' is scanning for '},
{'style': True, 'value': scanning_types.get(context.command.name, {}).get('name', '')},
{'style': True, 'value': scanning_types.get(context.command, {}).get('name', '')},
{'style': True, 'value': '...'}] + safety_policy_used, action_executed
] + [nl] + scanned_items + [nl] + [using_sentence] + [scanned_count_sentence] + [timestamp]

Expand Down Expand Up @@ -650,15 +652,11 @@ def build_primary_announcement(primary_announcement, columns=None, only_text=Fal


def is_using_api_key():
context = click.get_current_context()
review_used_api_key = context.review.get('api_key', False) if hasattr(context,
'review') and context.review else False
return bool(context.params.get('key', None)) or review_used_api_key
return bool(SafetyContext().key)


def is_using_a_safety_policy_file():
context = click.get_current_context()
return bool(context.params.get('policy_file', None))
return bool(SafetyContext().params.get('policy_file', None))


def should_add_nl(output, found_vulns):
Expand Down

0 comments on commit 7ac6461

Please sign in to comment.