Skip to content

Commit

Permalink
fix: add back the license legacy cmd (#498)
Browse files Browse the repository at this point in the history
  • Loading branch information
yeisonvargasf committed Jan 20, 2024
1 parent dd43359 commit 69dab0b
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 248 deletions.
65 changes: 64 additions & 1 deletion safety/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from safety.alerts import alert
from safety.auth import auth, inject_session, proxy_options, auth_options
from safety.auth.models import Organization
from safety.scan.constants import CLI_MAIN_INTRODUCTION, CLI_DEBUG_HELP, CLI_DISABLE_OPTIONAL_TELEMETRY_DATA_HELP, \
from safety.scan.constants import CLI_LICENSES_COMMAND_HELP, CLI_MAIN_INTRODUCTION, CLI_DEBUG_HELP, CLI_DISABLE_OPTIONAL_TELEMETRY_DATA_HELP, \
DEFAULT_EPILOG, DEFAULT_SPINNER, CLI_CHECK_COMMAND_HELP, CLI_CHECK_UPDATES_HELP, CLI_CONFIGURE_HELP, CLI_GENERATE_HELP, \
CLI_CONFIGURE_PROXY_TIMEOUT, CLI_CONFIGURE_PROXY_REQUIRED, CLI_CONFIGURE_ORGANIZATION_ID, CLI_CONFIGURE_ORGANIZATION_NAME, \
CLI_CONFIGURE_SAVE_TO_SYSTEM, CLI_CONFIGURE_PROXY_HOST_HELP, CLI_CONFIGURE_PROXY_PORT_HELP, CLI_CONFIGURE_PROXY_PROTOCOL_HELP, \
Expand Down Expand Up @@ -308,6 +308,69 @@ def check(ctx, db, full_report, stdin, files, cache, ignore, ignore_unpinned_req
output_exception(exception, exit_code_output=exit_code)


def clean_license_command(f):
"""
Main entry point for validation.
"""
@wraps(f)
def inner(ctx, *args, **kwargs):
# TODO: Remove this soon, for now it keeps a legacy behavior
kwargs.pop("key", None)
kwargs.pop('proxy_protocol', None)
kwargs.pop('proxy_host', None)
kwargs.pop('proxy_port', None)

return f(ctx, *args, **kwargs)

return inner


@cli.command(cls=SafetyCLILegacyCommand, utility_command=True, help=CLI_LICENSES_COMMAND_HELP)
@proxy_options
@auth_options(stage=False)
@click.option("--db", default="",
help="Path to a local license database. Default: empty")
@click.option('--output', "-o", type=click.Choice(['screen', 'text', 'json', 'bare'], case_sensitive=False),
default='screen')
@click.option("--cache", default=0,
help='Whether license database file should be cached.'
'Default: 0 seconds')
@click.option("files", "--file", "-r", multiple=True, type=click.File(),
help="Read input from one (or multiple) requirement files. Default: empty")
@click.pass_context
@clean_license_command
def license(ctx, db, output, cache, files):
"""
Find the open source licenses used by your Python dependencies.
"""
LOG.info('Running license command')
packages = get_packages(files, False)
licenses_db = {}

SafetyContext().params = ctx.params

try:
licenses_db = safety.get_licenses(session=ctx.obj.auth.client, db_mirror=db, cached=cache,
telemetry=ctx.obj.config.telemetry_enabled)
except SafetyError as e:
LOG.exception('Expected SafetyError happened: %s', e)
output_exception(e, exit_code_output=False)
except Exception as e:
LOG.exception('Unexpected Exception happened: %s', e)
exception = e if isinstance(e, SafetyException) else SafetyException(info=e)
output_exception(exception, exit_code_output=False)

filtered_packages_licenses = get_packages_licenses(packages=packages, licenses_db=licenses_db)

announcements = []
if not db:
announcements = safety.get_announcements(session=ctx.obj.auth.client, telemetry=ctx.obj.config.telemetry_enabled)

output_report = SafetyFormatter(output=output).render_licenses(announcements, filtered_packages_licenses)

click.secho(output_report, nl=True)


@cli.command(cls=SafetyCLILegacyCommand, utility_command=True, help=CLI_GENERATE_HELP)
@click.option("--path", default=".", help=CLI_GENERATE_PATH)
@click.argument('name', required=True)
Expand Down
2 changes: 1 addition & 1 deletion safety/cli_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ def invoke(self, ctx):
# Workaround for legacy check options, that now are global options
subcommand_args = set(args)
PROXY_HOST_OPTIONS = set(["--proxy-host", "-ph"])
if "check" in ctx.protected_args and (bool(PROXY_HOST_OPTIONS.intersection(subcommand_args) or "--key" in subcommand_args)) :
if "check" in ctx.protected_args or "license" in ctx.protected_args and (bool(PROXY_HOST_OPTIONS.intersection(subcommand_args) or "--key" in subcommand_args)) :
proxy_options, key = self.parse_legacy_args(args)
if proxy_options:
ctx.params.update(proxy_options)
Expand Down
4 changes: 2 additions & 2 deletions safety/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def get_exit_code(self):

class InvalidCredentialError(DatabaseFetchError):

def __init__(self, credential: Optional[str] = None, message="Your authentication credential '{credential}' is invalid. See {link}.", reason=None):
def __init__(self, credential: Optional[str] = None, message="Your authentication credential{credential}is invalid. See {link}.", reason=None):
self.credential = credential
self.link = 'https://bit.ly/3OY2wEI'
self.message = message.format(credential=self.credential, link=self.link) if self.credential else message.format(link=self.link)
self.message = message.format(credential=f" '{self.credential}' ", link=self.link) if self.credential else message.format(credential=' ', link=self.link)
info = f" Reason: {reason}"
self.message = self.message + (info if reason else "")
super().__init__(self.message)
Expand Down
69 changes: 0 additions & 69 deletions safety/reqs_scan.py

This file was deleted.

175 changes: 1 addition & 174 deletions safety/safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,11 +986,7 @@ def review(*, report=None, params=None):

@sync_safety_context
def get_licenses(*, session=None, db_mirror=False, cached=0, telemetry=True):
# key = key if key else os.environ.get("SAFETY_API_KEY", False)
key = session.api_key

if not key and not db_mirror:
raise InvalidCredentialError(message="The API-KEY was not provided.")

if db_mirror:
mirrors = [db_mirror]
else:
Expand Down Expand Up @@ -1175,172 +1171,3 @@ def save_report(path: str, default_name: str, report: str):

with open(save_at, 'w+') as report_file:
report_file.write(report)


import os
import subprocess

def walklevel(path, depth = 1, deny_list = []):
"""It works just like os.walk, but you can pass it a level parameter
that indicates how deep the recursion will go.
If depth is 1, the current directory is listed.
If depth is 0, nothing is returned.
If depth is -1 (or less than 0), the full depth is walked.
"""

# If depth is negative, just walk
# Not using yield from for python2 compat
# and copy dirs to keep consistant behavior for depth = -1 and depth = inf
if depth < 0:
for root, dirs, files in os.walk(path):
yield root, dirs[:], files
return
elif depth == 0:
return

# path.count(os.path.sep) is safe because
# - On Windows "\\" is never allowed in the name of a file or directory
# - On UNIX "/" is never allowed in the name of a file or directory
# - On MacOS a literal "/" is quitely translated to a ":" so it is still
# safe to count "/".
base_depth = path.rstrip(os.path.sep).count(os.path.sep)
for root, dirs, files in os.walk(path):
for idx, directory in enumerate(dirs):
if f"{root}{directory}" in deny_list:
# print(f"Not scanning {root}{directory}")
del dirs[idx]
yield root, dirs[:], files
cur_depth = root.count(os.path.sep)
if base_depth + depth <= cur_depth:
del dirs[:]

def scan_directory(directory, timeout=None, max_depth=0, current_path=None):
virtual_envs = []
python_interpreters = []
requirements_files = []

deny_list = {
# Windows directories
"C:\\Windows",
"C:\\Program Files",
"C:\\Program Files (x86)",
"C:\\ProgramData",

# Linux and macOS directories
"/usr",
"/usr/local",
"/opt",
"/var",
"/etc",
"/Library",
"/System",
"/Applications",
"~/Library",
"/proc",
"/dev"
}

go_up = ''

for root, dirs, files in walklevel(directory, max_depth, deny_list):
# Skip symbolic links, /proc, and /dev directories
# if os.path.islink(root) or root.startswith('/proc') or root.startswith('/dev'):
# continue

status = f'Scanning: {root.strip()}...'
found = f'Python items found: {len(python_interpreters) + len(requirements_files)}'
status_pad = ' ' * (get_terminal_size().columns - len(status))
found_pad = ' ' * (get_terminal_size().columns - len(found))

click.echo('{}{}\r{}'.format(go_up, f"{status}{status_pad}\n", f"{found}{found_pad}"), nl=False, err=True)

if not go_up:
go_up = "\033[F"

# Look for Python interpreters and requirements
for file_name in files:
file_path = os.path.join(root, file_name)

if file_name.endswith('requirements.txt'):
file_name_randomizer = round(random.random()*100000000)
filepath = f"{root}/{str(file_name)}"
requirements_files.append(file_path)
os.system(f"safety check -r {filepath} --cache 100 --output json >> {current_path}/requirements/{file_name_randomizer}-scan.json")

if file_name.startswith('python'):
try:

p = file_path

# Check if the path is a symbolic link
# if os.path.islink(file_path):
# p = os.path.realpath(file_path)
# if os.path.islink(p):
# p = os.path.realpath(p)
# if os.path.islink(p):
# p = os.path.realpath(p)

output = subprocess.check_output(
[file_path, '--version', '--version'],
stderr=subprocess.STDOUT, timeout=timeout)
result = output.decode('utf-8')

if result.startswith('Python'):
output = subprocess.check_output([p, '-m', 'pip', 'freeze'], stderr=subprocess.STDOUT, timeout=timeout)
requirements += output.decode('utf-8') + '\n'

# Command to run
cmd = ['safety', 'check', '--cache', '100', '--output', 'json' '--stdin']

if not requirements:
continue

# click.secho("Safety check is running...")
# Run the command and pass the string as stdin
result = subprocess.run(cmd, input=requirements, text=True, capture_output=True)
file_name_randomizer = round(random.random()*100000000)
with open(f"{current_path}/environments/{file_name_randomizer}-scan.json", "w") as outfile:
outfile.write(result.stdout)


# output = subprocess.check_output(
# [file_path, '--version', '--version'],
# stderr=subprocess.STDOUT, timeout=timeout)
# result = output.decode('utf-8')

# if result.startswith('Python'):
# parts = result.split()
# # Extract the version number
# version = parts[1]

# # Extract the date and time
# date = re.findall(r'\(.*?\)', result)[0].strip('()')

# # Extract the compiler information
# compiler = re.findall(r'\[.*?\]', result)[0].strip('[]')

python_interpreters.append(file_path)
except Exception:
pass

requirements = ""

click.secho(f"Results are save in {current_path}/...")

# for interp in python_interpreters:
# output = subprocess.check_output(
# [interp, '-m', 'pip', 'freeze'], stderr=subprocess.STDOUT, timeout=timeout)
# requirements += output.decode('utf-8') + '\n'

# # Command to run
# cmd = ['safety', 'check', '--stdin']

# if not requirements:
# click.secho("No packages found.")

# click.secho("Safety check is running...")
# # Run the command and pass the string as stdin
# result = subprocess.run(cmd, input=requirements, text=True, capture_output=True)

# click.secho(result.stdout)
# click.secho(result.stderr)
2 changes: 2 additions & 0 deletions safety/scan/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

CLI_CHECK_COMMAND_HELP = "\\[deprecated] Find vulnerabilities at target files or enviroments. Now replaced by [bold]safety scan[/bold], and will be unsupported beyond 1 May 2024." \
"\n[bold]Example: safety check -r requirements.txt[/bold]"
CLI_LICENSES_COMMAND_HELP = "\\[deprecated] Find licenses at target files or enviroments. This command will be replaced by [bold]safety scan[/bold], and will be unsupported beyond 1 May 2024." \
"\n[bold]Example: safety license -r requirements.txt[/bold]"


CLI_ALERT_COMMAND_HELP = "\\[deprecated] Create GitHub pull requests or GitHub issues using a `safety check` json report file. Being replaced by newer features." \
Expand Down
8 changes: 8 additions & 0 deletions safety/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,14 @@ def get_packages_licenses(*, packages=None, licenses_db=None):
pkg_name = canonicalize_name(pkg.name)
# packages may have different licenses depending their version.
pkg_licenses = packages_licenses_db.get(pkg_name, [])
if not pkg.version:
for req in pkg.requirements:
if is_pinned_requirement(req.specifier):
pkg.version = next(iter(req.specifier)).version
break

if not pkg.version:
continue
version_requested = parse_version(pkg.version)
license_id = None
license_name = None
Expand Down
Loading

0 comments on commit 69dab0b

Please sign in to comment.