Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add back the license legacy cmd #498

Merged
merged 1 commit into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion safety/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from safety.alerts import alert
from safety.auth import auth, inject_session, proxy_options, auth_options
from safety.auth.models import Organization
from safety.scan.constants import CLI_MAIN_INTRODUCTION, CLI_DEBUG_HELP, CLI_DISABLE_OPTIONAL_TELEMETRY_DATA_HELP, \
from safety.scan.constants import CLI_LICENSES_COMMAND_HELP, CLI_MAIN_INTRODUCTION, CLI_DEBUG_HELP, CLI_DISABLE_OPTIONAL_TELEMETRY_DATA_HELP, \
DEFAULT_EPILOG, DEFAULT_SPINNER, CLI_CHECK_COMMAND_HELP, CLI_CHECK_UPDATES_HELP, CLI_CONFIGURE_HELP, CLI_GENERATE_HELP, \
CLI_CONFIGURE_PROXY_TIMEOUT, CLI_CONFIGURE_PROXY_REQUIRED, CLI_CONFIGURE_ORGANIZATION_ID, CLI_CONFIGURE_ORGANIZATION_NAME, \
CLI_CONFIGURE_SAVE_TO_SYSTEM, CLI_CONFIGURE_PROXY_HOST_HELP, CLI_CONFIGURE_PROXY_PORT_HELP, CLI_CONFIGURE_PROXY_PROTOCOL_HELP, \
Expand Down Expand Up @@ -308,6 +308,69 @@ def check(ctx, db, full_report, stdin, files, cache, ignore, ignore_unpinned_req
output_exception(exception, exit_code_output=exit_code)


def clean_license_command(f):
"""
Main entry point for validation.
"""
@wraps(f)
def inner(ctx, *args, **kwargs):
# TODO: Remove this soon, for now it keeps a legacy behavior
kwargs.pop("key", None)
kwargs.pop('proxy_protocol', None)
kwargs.pop('proxy_host', None)
kwargs.pop('proxy_port', None)

return f(ctx, *args, **kwargs)

return inner


@cli.command(cls=SafetyCLILegacyCommand, utility_command=True, help=CLI_LICENSES_COMMAND_HELP)
@proxy_options
@auth_options(stage=False)
@click.option("--db", default="",
help="Path to a local license database. Default: empty")
@click.option('--output', "-o", type=click.Choice(['screen', 'text', 'json', 'bare'], case_sensitive=False),
default='screen')
@click.option("--cache", default=0,
help='Whether license database file should be cached.'
'Default: 0 seconds')
@click.option("files", "--file", "-r", multiple=True, type=click.File(),
help="Read input from one (or multiple) requirement files. Default: empty")
@click.pass_context
@clean_license_command
def license(ctx, db, output, cache, files):
"""
Find the open source licenses used by your Python dependencies.
"""
LOG.info('Running license command')
packages = get_packages(files, False)
licenses_db = {}

SafetyContext().params = ctx.params

try:
licenses_db = safety.get_licenses(session=ctx.obj.auth.client, db_mirror=db, cached=cache,
telemetry=ctx.obj.config.telemetry_enabled)
except SafetyError as e:
LOG.exception('Expected SafetyError happened: %s', e)
output_exception(e, exit_code_output=False)
except Exception as e:
LOG.exception('Unexpected Exception happened: %s', e)
exception = e if isinstance(e, SafetyException) else SafetyException(info=e)
output_exception(exception, exit_code_output=False)

filtered_packages_licenses = get_packages_licenses(packages=packages, licenses_db=licenses_db)

announcements = []
if not db:
announcements = safety.get_announcements(session=ctx.obj.auth.client, telemetry=ctx.obj.config.telemetry_enabled)

output_report = SafetyFormatter(output=output).render_licenses(announcements, filtered_packages_licenses)

click.secho(output_report, nl=True)


@cli.command(cls=SafetyCLILegacyCommand, utility_command=True, help=CLI_GENERATE_HELP)
@click.option("--path", default=".", help=CLI_GENERATE_PATH)
@click.argument('name', required=True)
Expand Down
2 changes: 1 addition & 1 deletion safety/cli_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ def invoke(self, ctx):
# Workaround for legacy check options, that now are global options
subcommand_args = set(args)
PROXY_HOST_OPTIONS = set(["--proxy-host", "-ph"])
if "check" in ctx.protected_args and (bool(PROXY_HOST_OPTIONS.intersection(subcommand_args) or "--key" in subcommand_args)) :
if "check" in ctx.protected_args or "license" in ctx.protected_args and (bool(PROXY_HOST_OPTIONS.intersection(subcommand_args) or "--key" in subcommand_args)) :
proxy_options, key = self.parse_legacy_args(args)
if proxy_options:
ctx.params.update(proxy_options)
Expand Down
4 changes: 2 additions & 2 deletions safety/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def get_exit_code(self):

class InvalidCredentialError(DatabaseFetchError):

def __init__(self, credential: Optional[str] = None, message="Your authentication credential '{credential}' is invalid. See {link}.", reason=None):
def __init__(self, credential: Optional[str] = None, message="Your authentication credential{credential}is invalid. See {link}.", reason=None):
self.credential = credential
self.link = 'https://bit.ly/3OY2wEI'
self.message = message.format(credential=self.credential, link=self.link) if self.credential else message.format(link=self.link)
self.message = message.format(credential=f" '{self.credential}' ", link=self.link) if self.credential else message.format(credential=' ', link=self.link)
info = f" Reason: {reason}"
self.message = self.message + (info if reason else "")
super().__init__(self.message)
Expand Down
69 changes: 0 additions & 69 deletions safety/reqs_scan.py

This file was deleted.

175 changes: 1 addition & 174 deletions safety/safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,11 +986,7 @@ def review(*, report=None, params=None):

@sync_safety_context
def get_licenses(*, session=None, db_mirror=False, cached=0, telemetry=True):
# key = key if key else os.environ.get("SAFETY_API_KEY", False)
key = session.api_key

if not key and not db_mirror:
raise InvalidCredentialError(message="The API-KEY was not provided.")

if db_mirror:
mirrors = [db_mirror]
else:
Expand Down Expand Up @@ -1175,172 +1171,3 @@ def save_report(path: str, default_name: str, report: str):

with open(save_at, 'w+') as report_file:
report_file.write(report)


import os
import subprocess

def walklevel(path, depth = 1, deny_list = []):
"""It works just like os.walk, but you can pass it a level parameter
that indicates how deep the recursion will go.
If depth is 1, the current directory is listed.
If depth is 0, nothing is returned.
If depth is -1 (or less than 0), the full depth is walked.
"""

# If depth is negative, just walk
# Not using yield from for python2 compat
# and copy dirs to keep consistant behavior for depth = -1 and depth = inf
if depth < 0:
for root, dirs, files in os.walk(path):
yield root, dirs[:], files
return
elif depth == 0:
return

# path.count(os.path.sep) is safe because
# - On Windows "\\" is never allowed in the name of a file or directory
# - On UNIX "/" is never allowed in the name of a file or directory
# - On MacOS a literal "/" is quitely translated to a ":" so it is still
# safe to count "/".
base_depth = path.rstrip(os.path.sep).count(os.path.sep)
for root, dirs, files in os.walk(path):
for idx, directory in enumerate(dirs):
if f"{root}{directory}" in deny_list:
# print(f"Not scanning {root}{directory}")
del dirs[idx]
yield root, dirs[:], files
cur_depth = root.count(os.path.sep)
if base_depth + depth <= cur_depth:
del dirs[:]

def scan_directory(directory, timeout=None, max_depth=0, current_path=None):
virtual_envs = []
python_interpreters = []
requirements_files = []

deny_list = {
# Windows directories
"C:\\Windows",
"C:\\Program Files",
"C:\\Program Files (x86)",
"C:\\ProgramData",

# Linux and macOS directories
"/usr",
"/usr/local",
"/opt",
"/var",
"/etc",
"/Library",
"/System",
"/Applications",
"~/Library",
"/proc",
"/dev"
}

go_up = ''

for root, dirs, files in walklevel(directory, max_depth, deny_list):
# Skip symbolic links, /proc, and /dev directories
# if os.path.islink(root) or root.startswith('/proc') or root.startswith('/dev'):
# continue

status = f'Scanning: {root.strip()}...'
found = f'Python items found: {len(python_interpreters) + len(requirements_files)}'
status_pad = ' ' * (get_terminal_size().columns - len(status))
found_pad = ' ' * (get_terminal_size().columns - len(found))

click.echo('{}{}\r{}'.format(go_up, f"{status}{status_pad}\n", f"{found}{found_pad}"), nl=False, err=True)

if not go_up:
go_up = "\033[F"

# Look for Python interpreters and requirements
for file_name in files:
file_path = os.path.join(root, file_name)

if file_name.endswith('requirements.txt'):
file_name_randomizer = round(random.random()*100000000)
filepath = f"{root}/{str(file_name)}"
requirements_files.append(file_path)
os.system(f"safety check -r {filepath} --cache 100 --output json >> {current_path}/requirements/{file_name_randomizer}-scan.json")

if file_name.startswith('python'):
try:

p = file_path

# Check if the path is a symbolic link
# if os.path.islink(file_path):
# p = os.path.realpath(file_path)
# if os.path.islink(p):
# p = os.path.realpath(p)
# if os.path.islink(p):
# p = os.path.realpath(p)

output = subprocess.check_output(
[file_path, '--version', '--version'],
stderr=subprocess.STDOUT, timeout=timeout)
result = output.decode('utf-8')

if result.startswith('Python'):
output = subprocess.check_output([p, '-m', 'pip', 'freeze'], stderr=subprocess.STDOUT, timeout=timeout)
requirements += output.decode('utf-8') + '\n'

# Command to run
cmd = ['safety', 'check', '--cache', '100', '--output', 'json' '--stdin']

if not requirements:
continue

# click.secho("Safety check is running...")
# Run the command and pass the string as stdin
result = subprocess.run(cmd, input=requirements, text=True, capture_output=True)
file_name_randomizer = round(random.random()*100000000)
with open(f"{current_path}/environments/{file_name_randomizer}-scan.json", "w") as outfile:
outfile.write(result.stdout)


# output = subprocess.check_output(
# [file_path, '--version', '--version'],
# stderr=subprocess.STDOUT, timeout=timeout)
# result = output.decode('utf-8')

# if result.startswith('Python'):
# parts = result.split()
# # Extract the version number
# version = parts[1]

# # Extract the date and time
# date = re.findall(r'\(.*?\)', result)[0].strip('()')

# # Extract the compiler information
# compiler = re.findall(r'\[.*?\]', result)[0].strip('[]')

python_interpreters.append(file_path)
except Exception:
pass

requirements = ""

click.secho(f"Results are save in {current_path}/...")

# for interp in python_interpreters:
# output = subprocess.check_output(
# [interp, '-m', 'pip', 'freeze'], stderr=subprocess.STDOUT, timeout=timeout)
# requirements += output.decode('utf-8') + '\n'

# # Command to run
# cmd = ['safety', 'check', '--stdin']

# if not requirements:
# click.secho("No packages found.")

# click.secho("Safety check is running...")
# # Run the command and pass the string as stdin
# result = subprocess.run(cmd, input=requirements, text=True, capture_output=True)

# click.secho(result.stdout)
# click.secho(result.stderr)
2 changes: 2 additions & 0 deletions safety/scan/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

CLI_CHECK_COMMAND_HELP = "\\[deprecated] Find vulnerabilities at target files or enviroments. Now replaced by [bold]safety scan[/bold], and will be unsupported beyond 1 May 2024." \
"\n[bold]Example: safety check -r requirements.txt[/bold]"
CLI_LICENSES_COMMAND_HELP = "\\[deprecated] Find licenses at target files or enviroments. This command will be replaced by [bold]safety scan[/bold], and will be unsupported beyond 1 May 2024." \
"\n[bold]Example: safety license -r requirements.txt[/bold]"


CLI_ALERT_COMMAND_HELP = "\\[deprecated] Create GitHub pull requests or GitHub issues using a `safety check` json report file. Being replaced by newer features." \
Expand Down
8 changes: 8 additions & 0 deletions safety/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,14 @@ def get_packages_licenses(*, packages=None, licenses_db=None):
pkg_name = canonicalize_name(pkg.name)
# packages may have different licenses depending their version.
pkg_licenses = packages_licenses_db.get(pkg_name, [])
if not pkg.version:
for req in pkg.requirements:
if is_pinned_requirement(req.specifier):
pkg.version = next(iter(req.specifier)).version
break

if not pkg.version:
continue
version_requested = parse_version(pkg.version)
license_id = None
license_name = None
Expand Down
Loading