From 0a561dc11a142cb17f80f81c3480cb99b0164dda Mon Sep 17 00:00:00 2001 From: Agustin Isasmendi Date: Fri, 14 Nov 2025 17:42:00 +0100 Subject: [PATCH 1/5] fix(cli): terminal cursor disappears after aborting scan with ctrl+c --- CHANGELOG.md | 4 +- src/scanoss/filecount.py | 75 +++--- src/scanoss/scanner.py | 371 +++++++++++++------------- src/scanoss/scanners/folder_hasher.py | 48 ++-- src/scanoss/scanners/scanner_hfh.py | 35 +-- src/scanoss/threadedscanning.py | 10 + 6 files changed, 275 insertions(+), 268 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a19331d0..5cc984f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,8 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] -### Added -- Upcoming changes... +### Fixed +- Fixed terminal cursor disappearing after aborting scan with Ctrl+C ## [1.40.1] - 2025-10-29 ### Changed diff --git a/src/scanoss/filecount.py b/src/scanoss/filecount.py index a2f43b1b..87b8df75 100644 --- a/src/scanoss/filecount.py +++ b/src/scanoss/filecount.py @@ -26,6 +26,7 @@ import os import pathlib import sys +from contextlib import nullcontext from progress.spinner import Spinner @@ -105,48 +106,46 @@ def count_files(self, scan_dir: str) -> bool: """ success = True if not scan_dir: - raise Exception(f'ERROR: Please specify a folder to scan') + raise Exception('ERROR: Please specify a folder to scan') if not os.path.exists(scan_dir) or not os.path.isdir(scan_dir): raise Exception(f'ERROR: Specified folder does not exist or is not a folder: {scan_dir}') self.print_msg(f'Searching {scan_dir} for files to count...') - spinner = None - if not self.quiet and self.isatty: - spinner = Spinner('Searching ') - file_types = {} - file_count = 0 - file_size = 0 - for root, dirs, files in os.walk(scan_dir): - self.print_trace(f'U Root: {root}, Dirs: {dirs}, Files {files}') - dirs[:] = self.__filter_dirs(dirs) # Strip out unwanted directories - filtered_files = self.__filter_files(files) # Strip out unwanted files - self.print_trace(f'F Root: {root}, Dirs: {dirs}, Files {filtered_files}') - for file in filtered_files: # Cycle through each filtered file - path = os.path.join(root, file) - f_size = 0 - try: - f_size = os.stat(path).st_size - except Exception as e: - self.print_trace(f'Ignoring missing symlink file: {file} ({e})') # broken symlink - if f_size > 0: # Ignore broken links and empty files - file_count = file_count + 1 - file_size = file_size + f_size - f_suffix = pathlib.Path(file).suffix - if not f_suffix or f_suffix == '': - f_suffix = 'no_suffix' - self.print_trace(f'Counting {path} ({f_suffix} - {f_size})..') - fc = file_types.get(f_suffix) - if not fc: - fc = [1, f_size] - else: - fc[0] = fc[0] + 1 - fc[1] = fc[1] + f_size - file_types[f_suffix] = fc - if spinner: - spinner.next() - # End for loop - if spinner: - spinner.finish() + spinner_ctx = Spinner('Searching ') if (not self.quiet and self.isatty) else nullcontext() + + with spinner_ctx as spinner: + file_types = {} + file_count = 0 + file_size = 0 + for root, dirs, files in os.walk(scan_dir): + self.print_trace(f'U Root: {root}, Dirs: {dirs}, Files {files}') + dirs[:] = self.__filter_dirs(dirs) # Strip out unwanted directories + filtered_files = self.__filter_files(files) # Strip out unwanted files + self.print_trace(f'F Root: {root}, Dirs: {dirs}, Files {filtered_files}') + for file in filtered_files: # Cycle through each filtered file + path = os.path.join(root, file) + f_size = 0 + try: + f_size = os.stat(path).st_size + except Exception as e: + self.print_trace(f'Ignoring missing symlink file: {file} ({e})') # broken symlink + if f_size > 0: # Ignore broken links and empty files + file_count = file_count + 1 + file_size = file_size + f_size + f_suffix = pathlib.Path(file).suffix + if not f_suffix or f_suffix == '': + f_suffix = 'no_suffix' + self.print_trace(f'Counting {path} ({f_suffix} - {f_size})..') + fc = file_types.get(f_suffix) + if not fc: + fc = [1, f_size] + else: + fc[0] = fc[0] + 1 + fc[1] = fc[1] + f_size + file_types[f_suffix] = fc + if spinner: + spinner.next() + # End for loop self.print_stderr(f'Found {file_count:,.0f} files with a total size of {file_size / (1 << 20):,.2f} MB.') if file_types: csv_dict = [] diff --git a/src/scanoss/scanner.py b/src/scanoss/scanner.py index 63803e54..6e5d147b 100644 --- a/src/scanoss/scanner.py +++ b/src/scanoss/scanner.py @@ -26,6 +26,7 @@ import json import os import sys +from contextlib import nullcontext from pathlib import Path from typing import Any, Dict, List, Optional @@ -363,62 +364,60 @@ def scan_folder(self, scan_dir: str) -> bool: # noqa: PLR0912, PLR0915 operation_type='scanning', ) self.print_msg(f'Searching {scan_dir} for files to fingerprint...') - spinner = None - if not self.quiet and self.isatty: - spinner = Spinner('Fingerprinting ') - save_wfps_for_print = not self.no_wfp_file or not self.threaded_scan - wfp_list = [] - scan_block = '' - scan_size = 0 - queue_size = 0 - file_count = 0 # count all files fingerprinted - wfp_file_count = 0 # count number of files in each queue post - scan_started = False - - to_scan_files = file_filters.get_filtered_files_from_folder(scan_dir) - for to_scan_file in to_scan_files: - if self.threaded_scan and self.threaded_scan.stop_scanning(): - self.print_stderr('Warning: Aborting fingerprinting as the scanning service is not available.') - break - self.print_debug(f'Fingerprinting {to_scan_file}...') - if spinner: - spinner.next() - abs_path = Path(scan_dir, to_scan_file).resolve() - wfp = self.winnowing.wfp_for_file(str(abs_path), to_scan_file) - if wfp is None or wfp == '': - self.print_debug(f'No WFP returned for {to_scan_file}. Skipping.') - continue - if save_wfps_for_print: - wfp_list.append(wfp) - file_count += 1 - if self.threaded_scan: - wfp_size = len(wfp.encode('utf-8')) - # If the WFP is bigger than the max post size and we already have something stored in the scan block, - # add it to the queue - if scan_block != '' and (wfp_size + scan_size) >= self.max_post_size: - self.threaded_scan.queue_add(scan_block) - queue_size += 1 - scan_block = '' - wfp_file_count = 0 - scan_block += wfp - scan_size = len(scan_block.encode('utf-8')) - wfp_file_count += 1 - # If the scan request block (group of WFPs) or larger than the POST size or we have reached the file limit, add it to the queue # noqa: E501 - if wfp_file_count > self.post_file_count or scan_size >= self.max_post_size: - self.threaded_scan.queue_add(scan_block) - queue_size += 1 - scan_block = '' - wfp_file_count = 0 - if not scan_started and queue_size > self.nb_threads: # Start scanning if we have something to do - scan_started = True - if not self.threaded_scan.run(wait=False): - self.print_stderr('Warning: Some errors encounted while scanning. Results might be incomplete.') - success = False - # End for loop - if self.threaded_scan and scan_block != '': - self.threaded_scan.queue_add(scan_block) # Make sure all files have been submitted - if spinner: - spinner.finish() + spinner_ctx = Spinner('Fingerprinting ') if (not self.quiet and self.isatty) else nullcontext() + + with spinner_ctx as spinner: + save_wfps_for_print = not self.no_wfp_file or not self.threaded_scan + wfp_list = [] + scan_block = '' + scan_size = 0 + queue_size = 0 + file_count = 0 # count all files fingerprinted + wfp_file_count = 0 # count number of files in each queue post + scan_started = False + + to_scan_files = file_filters.get_filtered_files_from_folder(scan_dir) + for to_scan_file in to_scan_files: + if self.threaded_scan and self.threaded_scan.stop_scanning(): + self.print_stderr('Warning: Aborting fingerprinting as the scanning service is not available.') + break + self.print_debug(f'Fingerprinting {to_scan_file}...') + if spinner: + spinner.next() + abs_path = Path(scan_dir, to_scan_file).resolve() + wfp = self.winnowing.wfp_for_file(str(abs_path), to_scan_file) + if wfp is None or wfp == '': + self.print_debug(f'No WFP returned for {to_scan_file}. Skipping.') + continue + if save_wfps_for_print: + wfp_list.append(wfp) + file_count += 1 + if self.threaded_scan: + wfp_size = len(wfp.encode('utf-8')) + # If the WFP is bigger than the max post size and we already have something stored in the scan block, + # add it to the queue + if scan_block != '' and (wfp_size + scan_size) >= self.max_post_size: + self.threaded_scan.queue_add(scan_block) + queue_size += 1 + scan_block = '' + wfp_file_count = 0 + scan_block += wfp + scan_size = len(scan_block.encode('utf-8')) + wfp_file_count += 1 + # If the scan request block (group of WFPs) or larger than the POST size or we have reached the file limit, add it to the queue # noqa: E501 + if wfp_file_count > self.post_file_count or scan_size >= self.max_post_size: + self.threaded_scan.queue_add(scan_block) + queue_size += 1 + scan_block = '' + wfp_file_count = 0 + if not scan_started and queue_size > self.nb_threads: # Start scanning if we have something to do + scan_started = True + if not self.threaded_scan.run(wait=False): + self.print_stderr('Warning: Some errors encounted while scanning. Results might be incomplete.') + success = False + # End for loop + if self.threaded_scan and scan_block != '': + self.threaded_scan.queue_add(scan_block) # Make sure all files have been submitted if file_count > 0: if save_wfps_for_print: # Write a WFP file if no threading is requested @@ -631,63 +630,61 @@ def scan_files(self, files: []) -> bool: # noqa: PLR0912, PLR0915 skip_extensions=self.skip_extensions, operation_type='scanning', ) - spinner = None - if not self.quiet and self.isatty: - spinner = Spinner('Fingerprinting ') - save_wfps_for_print = not self.no_wfp_file or not self.threaded_scan - wfp_list = [] - scan_block = '' - scan_size = 0 - queue_size = 0 - file_count = 0 # count all files fingerprinted - wfp_file_count = 0 # count number of files in each queue post - scan_started = False - - to_scan_files = file_filters.get_filtered_files_from_files(files) - for file in to_scan_files: - if self.threaded_scan and self.threaded_scan.stop_scanning(): - self.print_stderr('Warning: Aborting fingerprinting as the scanning service is not available.') - break - self.print_debug(f'Fingerprinting {file}...') - if spinner: - spinner.next() - wfp = self.winnowing.wfp_for_file(file, file) - if wfp is None or wfp == '': - self.print_debug(f'No WFP returned for {file}. Skipping.') - continue - if save_wfps_for_print: - wfp_list.append(wfp) - file_count += 1 - if self.threaded_scan: - wfp_size = len(wfp.encode('utf-8')) - # If the WFP is bigger than the max post size and we already have something stored in the scan block, add it to the queue # noqa: E501 - if scan_block != '' and (wfp_size + scan_size) >= self.max_post_size: - self.threaded_scan.queue_add(scan_block) - queue_size += 1 - scan_block = '' - wfp_file_count = 0 - scan_block += wfp - scan_size = len(scan_block.encode('utf-8')) - wfp_file_count += 1 - # If the scan request block (group of WFPs) or larger than the POST size or we have reached the file limit, add it to the queue # noqa: E501 - if wfp_file_count > self.post_file_count or scan_size >= self.max_post_size: - self.threaded_scan.queue_add(scan_block) - queue_size += 1 - scan_block = '' - wfp_file_count = 0 - if not scan_started and queue_size > self.nb_threads: # Start scanning if we have something to do - scan_started = True - if not self.threaded_scan.run(wait=False): - self.print_stderr( - 'Warning: Some errors encounted while scanning. Results might be incomplete.' - ) - success = False + spinner_ctx = Spinner('Fingerprinting ') if (not self.quiet and self.isatty) else nullcontext() + + with spinner_ctx as spinner: + save_wfps_for_print = not self.no_wfp_file or not self.threaded_scan + wfp_list = [] + scan_block = '' + scan_size = 0 + queue_size = 0 + file_count = 0 # count all files fingerprinted + wfp_file_count = 0 # count number of files in each queue post + scan_started = False + + to_scan_files = file_filters.get_filtered_files_from_files(files) + for file in to_scan_files: + if self.threaded_scan and self.threaded_scan.stop_scanning(): + self.print_stderr('Warning: Aborting fingerprinting as the scanning service is not available.') + break + self.print_debug(f'Fingerprinting {file}...') + if spinner: + spinner.next() + wfp = self.winnowing.wfp_for_file(file, file) + if wfp is None or wfp == '': + self.print_debug(f'No WFP returned for {file}. Skipping.') + continue + if save_wfps_for_print: + wfp_list.append(wfp) + file_count += 1 + if self.threaded_scan: + wfp_size = len(wfp.encode('utf-8')) + # If the WFP is bigger than the max post size and we already have something stored in the scan block, add it to the queue # noqa: E501 + if scan_block != '' and (wfp_size + scan_size) >= self.max_post_size: + self.threaded_scan.queue_add(scan_block) + queue_size += 1 + scan_block = '' + wfp_file_count = 0 + scan_block += wfp + scan_size = len(scan_block.encode('utf-8')) + wfp_file_count += 1 + # If the scan request block (group of WFPs) or larger than the POST size or we have reached the file limit, add it to the queue # noqa: E501 + if wfp_file_count > self.post_file_count or scan_size >= self.max_post_size: + self.threaded_scan.queue_add(scan_block) + queue_size += 1 + scan_block = '' + wfp_file_count = 0 + if not scan_started and queue_size > self.nb_threads: # Start scanning if we have something to do + scan_started = True + if not self.threaded_scan.run(wait=False): + self.print_stderr( + 'Warning: Some errors encounted while scanning. Results might be incomplete.' + ) + success = False - # End for loop - if self.threaded_scan and scan_block != '': - self.threaded_scan.queue_add(scan_block) # Make sure all files have been submitted - if spinner: - spinner.finish() + # End for loop + if self.threaded_scan and scan_block != '': + self.threaded_scan.queue_add(scan_block) # Make sure all files have been submitted if file_count > 0: if save_wfps_for_print: # Write a WFP file if no threading is requested @@ -778,73 +775,72 @@ def scan_wfp_file(self, file: str = None) -> bool: # noqa: PLR0912, PLR0915 self.print_debug(f'Found {file_count} files to process.') raw_output = '{\n' file_print = '' - bar = None - if not self.quiet and self.isatty: - bar = Bar('Scanning', max=file_count) - bar.next(0) - with open(wfp_file) as f: - for line in f: - if line.startswith(WFP_FILE_START): - if file_print: - wfp += file_print # Store the WFP for the current file - cur_size = len(wfp.encode('utf-8')) - file_print = line # Start storing the next file - cur_files += 1 - batch_files += 1 - else: - file_print += line # Store the rest of the WFP for this file - l_size = cur_size + len(file_print.encode('utf-8')) - # Hit the max post size, so sending the current batch and continue processing - if l_size >= self.max_post_size and wfp: - self.print_debug( - f'Sending {batch_files} ({cur_files}) of' - f' {file_count} ({len(wfp.encode("utf-8"))} bytes) files to the ScanOSS API.' - ) - if self.debug and cur_size > self.max_post_size: - Scanner.print_stderr(f'Warning: Post size {cur_size} greater than limit {self.max_post_size}') - scan_resp = self.scanoss_api.scan(wfp, max_component['name']) # Scan current WFP and store - if bar: - bar.next(batch_files) - if scan_resp is not None: - for key, value in scan_resp.items(): - raw_output += ' "%s":%s,' % (key, json.dumps(value, indent=2)) - for v in value: - if hasattr(v, 'get'): - if v.get('id') != 'none': - vcv = '%s:%s:%s' % (v.get('vendor'), v.get('component'), v.get('version')) - components[vcv] = components[vcv] + 1 if vcv in components else 1 - if max_component['hits'] < components[vcv]: - max_component['name'] = v.get('component') - max_component['hits'] = components[vcv] - else: - Scanner.print_stderr(f'Warning: Unknown value: {v}') - else: - success = False - batch_files = 0 - wfp = '' - if file_print: - wfp += file_print # Store the WFP for the current file - if wfp: - self.print_debug( - f'Sending {batch_files} ({cur_files}) of' - f' {file_count} ({len(wfp.encode("utf-8"))} bytes) files to the ScanOSS API.' - ) - scan_resp = self.scanoss_api.scan(wfp, max_component['name']) # Scan current WFP and store + bar_ctx = Bar('Scanning', max=file_count) if (not self.quiet and self.isatty) else nullcontext() + + with bar_ctx as bar: if bar: - bar.next(batch_files) - first = True - if scan_resp is not None: - for key, value in scan_resp.items(): - if first: - raw_output += ' "%s":%s' % (key, json.dumps(value, indent=2)) - first = False + bar.next(0) + with open(wfp_file) as f: + for line in f: + if line.startswith(WFP_FILE_START): + if file_print: + wfp += file_print # Store the WFP for the current file + cur_size = len(wfp.encode('utf-8')) + file_print = line # Start storing the next file + cur_files += 1 + batch_files += 1 else: - raw_output += ',\n "%s":%s' % (key, json.dumps(value, indent=2)) - else: - success = False + file_print += line # Store the rest of the WFP for this file + l_size = cur_size + len(file_print.encode('utf-8')) + # Hit the max post size, so sending the current batch and continue processing + if l_size >= self.max_post_size and wfp: + self.print_debug( + f'Sending {batch_files} ({cur_files}) of' + f' {file_count} ({len(wfp.encode("utf-8"))} bytes) files to the ScanOSS API.' + ) + if self.debug and cur_size > self.max_post_size: + Scanner.print_stderr(f'Warning: Post size {cur_size} greater than limit {self.max_post_size}') + scan_resp = self.scanoss_api.scan(wfp, max_component['name']) # Scan current WFP and store + if bar: + bar.next(batch_files) + if scan_resp is not None: + for key, value in scan_resp.items(): + raw_output += ' "%s":%s,' % (key, json.dumps(value, indent=2)) + for v in value: + if hasattr(v, 'get'): + if v.get('id') != 'none': + vcv = '%s:%s:%s' % (v.get('vendor'), v.get('component'), v.get('version')) + components[vcv] = components[vcv] + 1 if vcv in components else 1 + if max_component['hits'] < components[vcv]: + max_component['name'] = v.get('component') + max_component['hits'] = components[vcv] + else: + Scanner.print_stderr(f'Warning: Unknown value: {v}') + else: + success = False + batch_files = 0 + wfp = '' + if file_print: + wfp += file_print # Store the WFP for the current file + if wfp: + self.print_debug( + f'Sending {batch_files} ({cur_files}) of' + f' {file_count} ({len(wfp.encode("utf-8"))} bytes) files to the ScanOSS API.' + ) + scan_resp = self.scanoss_api.scan(wfp, max_component['name']) # Scan current WFP and store + if bar: + bar.next(batch_files) + first = True + if scan_resp is not None: + for key, value in scan_resp.items(): + if first: + raw_output += ' "%s":%s' % (key, json.dumps(value, indent=2)) + first = False + else: + raw_output += ',\n "%s":%s' % (key, json.dumps(value, indent=2)) + else: + success = False raw_output += '\n}' - if bar: - bar.finish() if self.output_format == 'plain': self.__log_result(raw_output) elif self.output_format == 'cyclonedx': @@ -1052,19 +1048,16 @@ def wfp_folder(self, scan_dir: str, wfp_file: str = None): ) wfps = '' self.print_msg(f'Searching {scan_dir} for files to fingerprint...') - spinner = None - if not self.quiet and self.isatty: - spinner = Spinner('Fingerprinting ') - - to_fingerprint_files = file_filters.get_filtered_files_from_folder(scan_dir) - for file in to_fingerprint_files: - if spinner: - spinner.next() - abs_path = Path(scan_dir, file).resolve() - self.print_debug(f'Fingerprinting {file}...') - wfps += self.winnowing.wfp_for_file(str(abs_path), file) - if spinner: - spinner.finish() + spinner_ctx = Spinner('Fingerprinting ') if (not self.quiet and self.isatty) else nullcontext() + + with spinner_ctx as spinner: + to_fingerprint_files = file_filters.get_filtered_files_from_folder(scan_dir) + for file in to_fingerprint_files: + if spinner: + spinner.next() + abs_path = Path(scan_dir, file).resolve() + self.print_debug(f'Fingerprinting {file}...') + wfps += self.winnowing.wfp_for_file(str(abs_path), file) if wfps: if wfp_file: self.print_stderr(f'Writing fingerprints to {wfp_file}') diff --git a/src/scanoss/scanners/folder_hasher.py b/src/scanoss/scanners/folder_hasher.py index eb4bd726..549a7c18 100644 --- a/src/scanoss/scanners/folder_hasher.py +++ b/src/scanoss/scanners/folder_hasher.py @@ -157,38 +157,38 @@ def _build_root_node( # Sort the files by name to ensure the hash is the same for the same folder filtered_files.sort() - bar = Bar('Hashing files...', max=len(filtered_files)) - full_file_path = '' - for file_path in filtered_files: - try: - file_path_obj = Path(file_path) if isinstance(file_path, str) else file_path - full_file_path = file_path_obj if file_path_obj.is_absolute() else root / file_path_obj + bar_ctx = Bar('Hashing files...', max=len(filtered_files)) - self.base.print_debug(f'\nHashing file {str(full_file_path)}') + with bar_ctx as bar: + full_file_path = '' + for file_path in filtered_files: + try: + file_path_obj = Path(file_path) if isinstance(file_path, str) else file_path + full_file_path = file_path_obj if file_path_obj.is_absolute() else root / file_path_obj - file_bytes = full_file_path.read_bytes() - key = CRC64.get_hash_buff(file_bytes) - key_str = ''.join(f'{b:02x}' for b in key) - rel_path = str(full_file_path.relative_to(root)) + self.base.print_debug(f'\nHashing file {str(full_file_path)}') - file_item = DirectoryFile(rel_path, key, key_str) + file_bytes = full_file_path.read_bytes() + key = CRC64.get_hash_buff(file_bytes) + key_str = ''.join(f'{b:02x}' for b in key) + rel_path = str(full_file_path.relative_to(root)) - current_node = root_node - for part in Path(rel_path).parent.parts: - child_path = str(Path(current_node.path) / part) - if child_path not in current_node.children: - current_node.children[child_path] = DirectoryNode(child_path) - current_node = current_node.children[child_path] - current_node.files.append(file_item) + file_item = DirectoryFile(rel_path, key, key_str) - root_node.files.append(file_item) + current_node = root_node + for part in Path(rel_path).parent.parts: + child_path = str(Path(current_node.path) / part) + if child_path not in current_node.children: + current_node.children[child_path] = DirectoryNode(child_path) + current_node = current_node.children[child_path] + current_node.files.append(file_item) - except Exception as e: - self.base.print_debug(f'Skipping file {full_file_path}: {str(e)}') + root_node.files.append(file_item) - bar.next() + except Exception as e: + self.base.print_debug(f'Skipping file {full_file_path}: {str(e)}') - bar.finish() + bar.next() return root_node def _hash_calc_from_node(self, node: DirectoryNode, current_depth: int = 1) -> dict: diff --git a/src/scanoss/scanners/scanner_hfh.py b/src/scanoss/scanners/scanner_hfh.py index 7ac64630..8d4a2849 100644 --- a/src/scanoss/scanners/scanner_hfh.py +++ b/src/scanoss/scanners/scanner_hfh.py @@ -110,6 +110,19 @@ def __init__( # noqa: PLR0913 self.min_accepted_score = min_accepted_score self.use_grpc = use_grpc + def _execute_grpc_scan(self, hfh_request: Dict) -> None: + """ + Execute folder hash scan. + + Args: + hfh_request: Request dictionary for the gRPC call + """ + try: + self.scan_results = self.client.folder_hash_scan(hfh_request, self.use_grpc) + except Exception as e: + self.base.print_stderr(f'Error during folder hash scan: {e}') + self.scan_results = None + def scan(self) -> Optional[Dict]: """ Scan the provided directory using the folder hashing algorithm. @@ -124,25 +137,17 @@ def scan(self) -> Optional[Dict]: 'min_accepted_score': self.min_accepted_score, } - spinner = Spinner('Scanning folder...') - stop_spinner = False + spinner_ctx = Spinner('Scanning folder...') + + with spinner_ctx as spinner: + grpc_thread = threading.Thread(target=self._execute_grpc_scan, args=(hfh_request,)) + grpc_thread.start() - def spin(): - while not stop_spinner: + while grpc_thread.is_alive(): spinner.next() time.sleep(0.1) - spinner_thread = threading.Thread(target=spin) - spinner_thread.start() - - try: - response = self.client.folder_hash_scan(hfh_request, self.use_grpc) - if response: - self.scan_results = response - finally: - stop_spinner = True - spinner_thread.join() - spinner.finish() + grpc_thread.join() return self.scan_results diff --git a/src/scanoss/threadedscanning.py b/src/scanoss/threadedscanning.py index e9784e3a..d0a5cad7 100644 --- a/src/scanoss/threadedscanning.py +++ b/src/scanoss/threadedscanning.py @@ -22,6 +22,7 @@ THE SOFTWARE. """ +import atexit import os import queue import sys @@ -77,6 +78,8 @@ def __init__( if nb_threads > MAX_ALLOWED_THREADS: self.print_msg(f'Warning: Requested threads too large: {nb_threads}. Reducing to {MAX_ALLOWED_THREADS}') self.nb_threads = MAX_ALLOWED_THREADS + # Register cleanup to ensure progress bar is finished on exit + atexit.register(self.complete_bar) @staticmethod def __count_files_in_wfp(wfp: str): @@ -101,6 +104,13 @@ def complete_bar(self): if self.bar: self.bar.finish() + def __del__(self): + """Ensure progress bar is cleaned up when object is destroyed""" + try: + self.complete_bar() + except Exception: + pass # Ignore errors during cleanup + def set_bar(self, bar: Bar) -> None: """ Set the Progress Bar to display progress while scanning From 6242fdb6d12920aeb02f2e62b143768ebadbd3a6 Mon Sep 17 00:00:00 2001 From: Agustin Isasmendi Date: Thu, 13 Nov 2025 16:31:05 +0100 Subject: [PATCH 2/5] feat(policies): add support for license source filtering in copyleft inspection --- CHANGELOG.md | 11 + src/scanoss/cli.py | 14 + src/scanoss/constants.py | 3 + .../policy_check/scanoss/copyleft.py | 8 +- .../inspection/utils/scan_result_processor.py | 33 ++- tests/test_policy_inspect.py | 254 ++++++++++++++++++ 6 files changed, 311 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5cc984f0..de72021d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + +### Added +- Added `--license-sources` (`-ls`) option to copyleft inspection + - Filter which license sources to check (component_declared, license_file, file_header, file_spdx_tag, scancode) + - Supports both `-ls source1 source2` and `-ls source1 -ls source2` syntax + +### Changed +- Copyleft inspection now defaults to component-level licenses only (component_declared, license_file) + - Reduces noise from file-level license detections (file_header, scancode) + - Use `-ls` to override and check specific sources + ### Fixed - Fixed terminal cursor disappearing after aborting scan with Ctrl+C diff --git a/src/scanoss/cli.py b/src/scanoss/cli.py index 824c5133..0a3c1ae0 100644 --- a/src/scanoss/cli.py +++ b/src/scanoss/cli.py @@ -55,6 +55,7 @@ from .components import Components from .constants import ( DEFAULT_API_TIMEOUT, + DEFAULT_COPYLEFT_LICENSE_SOURCES, DEFAULT_HFH_DEPTH, DEFAULT_HFH_MIN_ACCEPTED_SCORE, DEFAULT_HFH_RANK_THRESHOLD, @@ -64,6 +65,7 @@ DEFAULT_TIMEOUT, MIN_TIMEOUT, PYTHON_MAJOR_VERSION, + VALID_LICENSE_SOURCES, ) from .csvoutput import CsvOutput from .cyclonedx import CycloneDx @@ -699,6 +701,17 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915 p.add_argument('--exclude', help='Licenses to exclude from analysis (comma-separated list)') p.add_argument('--explicit', help='Use only these specific licenses for analysis (comma-separated list)') + # License source filtering + for p in [p_inspect_raw_copyleft, p_inspect_legacy_copyleft]: + p.add_argument( + '-ls', '--license-sources', + action='extend', + nargs='+', + choices=VALID_LICENSE_SOURCES, + help=f'Specify which license sources to check for copyleft violations. Each license object in scan results ' + f'has a source field indicating its origin. Default: {", ".join(DEFAULT_COPYLEFT_LICENSE_SOURCES)}', + ) + # Common options for (legacy) copyleft and undeclared component inspection for p in [p_inspect_raw_copyleft, p_inspect_raw_undeclared, p_inspect_legacy_copyleft, p_inspect_legacy_undeclared]: p.add_argument('-i', '--input', nargs='?', help='Path to scan results file to analyse') @@ -1752,6 +1765,7 @@ def inspect_copyleft(parser, args): include=args.include, # Additional licenses to check exclude=args.exclude, # Licenses to ignore explicit=args.explicit, # Explicit license list + license_sources=args.license_sources, # License sources to check (list) ) # Execute inspection and exit with appropriate status code status, _ = i_copyleft.run() diff --git a/src/scanoss/constants.py b/src/scanoss/constants.py index 989f2008..68216595 100644 --- a/src/scanoss/constants.py +++ b/src/scanoss/constants.py @@ -17,3 +17,6 @@ DEFAULT_HFH_DEPTH = 1 DEFAULT_HFH_RECURSIVE_THRESHOLD = 0.8 DEFAULT_HFH_MIN_ACCEPTED_SCORE = 0.15 + +VALID_LICENSE_SOURCES = ['component_declared', 'license_file', 'file_header', 'file_spdx_tag', 'scancode'] +DEFAULT_COPYLEFT_LICENSE_SOURCES = ['component_declared', 'license_file'] diff --git a/src/scanoss/inspection/policy_check/scanoss/copyleft.py b/src/scanoss/inspection/policy_check/scanoss/copyleft.py index 08694854..a56c39b9 100644 --- a/src/scanoss/inspection/policy_check/scanoss/copyleft.py +++ b/src/scanoss/inspection/policy_check/scanoss/copyleft.py @@ -26,6 +26,8 @@ from dataclasses import dataclass from typing import Dict, List +from scanoss.constants import DEFAULT_COPYLEFT_LICENSE_SOURCES + from ...policy_check.policy_check import PolicyCheck, PolicyOutput, PolicyStatus from ...utils.markdown_utils import generate_jira_table, generate_table from ...utils.scan_result_processor import ScanResultProcessor @@ -63,6 +65,7 @@ def __init__( # noqa: PLR0913 include: str = None, exclude: str = None, explicit: str = None, + license_sources: list = None, ): """ Initialise the Copyleft class. @@ -77,6 +80,7 @@ def __init__( # noqa: PLR0913 :param include: Licenses to include in the analysis :param exclude: Licenses to exclude from the analysis :param explicit: Explicitly defined licenses + :param license_sources: List of license sources to check """ super().__init__( debug, trace, quiet, format_type, status, name='Copyleft Policy', output=output @@ -85,6 +89,7 @@ def __init__( # noqa: PLR0913 self.filepath = filepath self.output = output self.status = status + self.license_sources = license_sources or DEFAULT_COPYLEFT_LICENSE_SOURCES self.results_processor = ScanResultProcessor( self.debug, self.trace, @@ -92,7 +97,8 @@ def __init__( # noqa: PLR0913 self.filepath, include, exclude, - explicit) + explicit, + self.license_sources) def _json(self, components: list[Component]) -> PolicyOutput: """ diff --git a/src/scanoss/inspection/utils/scan_result_processor.py b/src/scanoss/inspection/utils/scan_result_processor.py index 22333b5d..75960eab 100644 --- a/src/scanoss/inspection/utils/scan_result_processor.py +++ b/src/scanoss/inspection/utils/scan_result_processor.py @@ -71,11 +71,13 @@ def __init__( # noqa: PLR0913 include: str = None, exclude: str = None, explicit: str = None, + license_sources: list = None, ): super().__init__(debug, trace, quiet) self.result_file_path = result_file_path self.license_util = LicenseUtil() self.license_util.init(include, exclude, explicit) + self.license_sources = license_sources self.results = self._load_input_file() def get_results(self) -> Dict[str, Any]: @@ -162,9 +164,11 @@ def _append_license_to_component(self, self.print_debug(f'WARNING: Results missing licenses. Skipping: {new_component}') return - licenses_order_by_source_priority = self._get_licenses_order_by_source_priority(new_component['licenses']) + # Select licenses based on configuration (filtering or priority mode) + selected_licenses = self._select_licenses(new_component['licenses']) + # Process licenses for this component - for license_item in licenses_order_by_source_priority: + for license_item in selected_licenses: if license_item.get('name'): spdxid = license_item['name'] source = license_item.get('source') @@ -309,19 +313,26 @@ def convert_components_to_list(self, components: dict): component['licenses'] = [] return results_list - def _get_licenses_order_by_source_priority(self,licenses_data): + def _select_licenses(self, licenses_data): """ - Select licenses based on source priority: - 1. component_declared (highest priority) - 2. license_file - 3. file_header - 4. scancode (lowest priority) + Select licenses based on configuration. + + Two modes: + - Filtering mode: If license_sources specified, filter to those sources + - Priority mode: Otherwise, use original priority-based selection - If any high-priority source is found, return only licenses from that source. - If none found, return all licenses. + Args: + licenses_data: List of license dictionaries - Returns: list with ordered licenses by source. + Returns: + Filtered list of licenses based on configuration """ + # Filtering mode, when license_sources is explicitly provided + if self.license_sources: + sources_to_include = set(self.license_sources) | {'unknown'} + return [lic for lic in licenses_data + if lic.get('source') in sources_to_include or lic.get('source') is None] + # Define priority order (highest to lowest) priority_sources = ['component_declared', 'license_file', 'file_header', 'scancode'] diff --git a/tests/test_policy_inspect.py b/tests/test_policy_inspect.py index 0ccf7886..a3161a45 100644 --- a/tests/test_policy_inspect.py +++ b/tests/test_policy_inspect.py @@ -28,6 +28,7 @@ import unittest from unittest.mock import Mock, patch +from scanoss.constants import DEFAULT_COPYLEFT_LICENSE_SOURCES, VALID_LICENSE_SOURCES from src.scanoss.inspection.policy_check.dependency_track.project_violation import ( DependencyTrackProjectViolationPolicyCheck, ) @@ -389,6 +390,259 @@ def test_copyleft_policy_jira_markdown_output(self): self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) self.assertEqual(expected_details_output, results) + ## Copyleft License Source Filtering Tests ## + + def test_copyleft_policy_default_license_sources(self): + """ + Test default behavior: should use DEFAULT_COPYLEFT_LICENSE_SOURCES + (component_declared and license_file) + """ + script_dir = os.path.dirname(os.path.abspath(__file__)) + file_name = 'result.json' + input_file_name = os.path.join(script_dir, 'data', file_name) + copyleft = Copyleft(filepath=input_file_name, format_type='json') + status, policy_output = copyleft.run() + details = json.loads(policy_output.details) + + # Should find components with copyleft from component_declared or license_file + # Expected: 5 PURL@version entries (scanner.c x2, engine x2, wfp x1) + self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) + self.assertEqual(len(details['components']), 5) + + # Verify all components have licenses from default sources + for component in details['components']: + for license in component['licenses']: + self.assertIn(license['source'], DEFAULT_COPYLEFT_LICENSE_SOURCES) + + def test_copyleft_policy_license_sources_none(self): + """ + Test explicit None: should use DEFAULT_COPYLEFT_LICENSE_SOURCES + """ + script_dir = os.path.dirname(os.path.abspath(__file__)) + file_name = 'result.json' + input_file_name = os.path.join(script_dir, 'data', file_name) + copyleft = Copyleft(filepath=input_file_name, format_type='json', license_sources=None) + status, policy_output = copyleft.run() + details = json.loads(policy_output.details) + + # Should behave same as default + self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) + self.assertEqual(len(details['components']), 5) + + # Verify all components have licenses from default sources + for component in details['components']: + for license in component['licenses']: + self.assertIn(license['source'], DEFAULT_COPYLEFT_LICENSE_SOURCES) + + + def test_copyleft_policy_license_sources_component_declared_only(self): + """ + Test filtering to component_declared source only + Should find GPL-2.0-only from component_declared + """ + script_dir = os.path.dirname(os.path.abspath(__file__)) + file_name = 'result.json' + input_file_name = os.path.join(script_dir, 'data', file_name) + copyleft = Copyleft( + filepath=input_file_name, + format_type='json', + license_sources=['component_declared'] + ) + status, policy_output = copyleft.run() + details = json.loads(policy_output.details) + + # Should find 5 PURL@version entries from component_declared + self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) + self.assertEqual(len(details['components']), 5) + + # All licenses should be from component_declared + for component in details['components']: + for license in component['licenses']: + self.assertEqual(license['source'], 'component_declared') + + def test_copyleft_policy_license_sources_license_file_only(self): + """ + Test filtering to license_file source only + Should find GPL-2.0-only from license_file (engine and wfp) + """ + script_dir = os.path.dirname(os.path.abspath(__file__)) + file_name = 'result.json' + input_file_name = os.path.join(script_dir, 'data', file_name) + copyleft = Copyleft( + filepath=input_file_name, + format_type='json', + license_sources=['license_file'] + ) + status, policy_output = copyleft.run() + details = json.loads(policy_output.details) + + # Should find engine and wfp (2 components with license_file) + self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) + self.assertEqual(len(details['components']), 2) + + # Verify components are engine and wfp + purls = [comp['purl'] for comp in details['components']] + self.assertIn('pkg:github/scanoss/engine', purls) + self.assertIn('pkg:github/scanoss/wfp', purls) + + # All licenses should be from license_file + for component in details['components']: + for license in component['licenses']: + self.assertEqual(license['source'], 'license_file') + + def test_copyleft_policy_license_sources_file_header_only(self): + """ + Test filtering to file_header source only + file_header only has BSD-2-Clause and Zlib (not copyleft) + Should find no copyleft licenses + """ + script_dir = os.path.dirname(os.path.abspath(__file__)) + file_name = 'result.json' + input_file_name = os.path.join(script_dir, 'data', file_name) + copyleft = Copyleft( + filepath=input_file_name, + format_type='json', + license_sources=['file_header'] + ) + status, policy_output = copyleft.run() + details = json.loads(policy_output.details) + + # Should find no copyleft (file_header only has BSD and Zlib) + self.assertEqual(status, PolicyStatus.POLICY_SUCCESS.value) + self.assertEqual(details, {}) + + def test_copyleft_policy_license_sources_multiple_sources(self): + """ + Test using multiple license sources + Should find copyleft from component_declared and scancode + """ + script_dir = os.path.dirname(os.path.abspath(__file__)) + file_name = 'result.json' + input_file_name = os.path.join(script_dir, 'data', file_name) + copyleft = Copyleft( + filepath=input_file_name, + format_type='json', + license_sources=['component_declared', 'scancode'] + ) + status, policy_output = copyleft.run() + details = json.loads(policy_output.details) + + # Should find components from both sources + self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) + self.assertGreaterEqual(len(details['components']), 3) + + # Verify licenses are from specified sources + for component in details['components']: + for license in component['licenses']: + self.assertIn(license['source'], ['component_declared', 'scancode']) + + def test_copyleft_policy_license_sources_all_valid_sources(self): + """ + Test using all valid license sources + """ + script_dir = os.path.dirname(os.path.abspath(__file__)) + file_name = 'result.json' + input_file_name = os.path.join(script_dir, 'data', file_name) + copyleft = Copyleft( + filepath=input_file_name, + format_type='json', + license_sources=VALID_LICENSE_SOURCES + ) + status, policy_output = copyleft.run() + details = json.loads(policy_output.details) + + # Should find all copyleft licenses from any source + self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) + self.assertGreaterEqual(len(details['components']), 3) + + def test_copyleft_policy_license_sources_with_markdown_output(self): + """ + Test license source filtering works with markdown output + """ + script_dir = os.path.dirname(os.path.abspath(__file__)) + file_name = 'result.json' + input_file_name = os.path.join(script_dir, 'data', file_name) + copyleft = Copyleft( + filepath=input_file_name, + format_type='md', + license_sources=['license_file'] + ) + status, policy_output = copyleft.run() + + # Should generate markdown table + self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) + self.assertIn('### Copyleft Licenses', policy_output.details) + self.assertIn('Component', policy_output.details) + self.assertIn('License', policy_output.details) + self.assertIn('2 component(s) with copyleft licenses were found', policy_output.summary) + + def test_copyleft_policy_license_sources_with_include_filter(self): + """ + Test license_sources works with include filter + Filter to scancode source and include only GPL-2.0-or-later + """ + script_dir = os.path.dirname(os.path.abspath(__file__)) + file_name = 'result.json' + input_file_name = os.path.join(script_dir, 'data', file_name) + copyleft = Copyleft( + filepath=input_file_name, + format_type='json', + license_sources=['scancode'], + include='GPL-2.0-or-later' + ) + status, policy_output = copyleft.run() + details = json.loads(policy_output.details) + + # Should find only GPL-2.0-or-later from scancode + self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) + if details: # May be empty if no matches + for component in details.get('components', []): + for license in component['licenses']: + self.assertEqual(license['spdxid'], 'GPL-2.0-or-later') + self.assertEqual(license['source'], 'scancode') + + def test_copyleft_policy_license_sources_with_exclude_filter(self): + """ + Test license_sources works with exclude filter + Use component_declared but exclude GPL-2.0-only + """ + script_dir = os.path.dirname(os.path.abspath(__file__)) + file_name = 'result.json' + input_file_name = os.path.join(script_dir, 'data', file_name) + copyleft = Copyleft( + filepath=input_file_name, + format_type='json', + license_sources=['component_declared'], + exclude='GPL-2.0-only' + ) + status, policy_output = copyleft.run() + details = json.loads(policy_output.details) + + # Should exclude GPL-2.0-only, leaving nothing (all component_declared are GPL-2.0-only) + self.assertEqual(status, PolicyStatus.POLICY_SUCCESS.value) + self.assertEqual(details, {}) + + def test_copyleft_policy_license_sources_no_copyleft_file(self): + """ + Test license_sources with result-no-copyleft.json + Should return success even with license_sources specified + """ + script_dir = os.path.dirname(os.path.abspath(__file__)) + file_name = 'result-no-copyleft.json' + input_file_name = os.path.join(script_dir, 'data', file_name) + copyleft = Copyleft( + filepath=input_file_name, + format_type='json', + license_sources=['component_declared'] + ) + status, policy_output = copyleft.run() + details = json.loads(policy_output.details) + + # Should find no copyleft + self.assertEqual(status, PolicyStatus.POLICY_SUCCESS.value) + self.assertEqual(details, {}) + self.assertIn('0 component(s) with copyleft licenses were found', policy_output.summary) + def test_inspect_license_summary(self): script_dir = os.path.dirname(os.path.abspath(__file__)) file_name = 'result.json' From 63ae916c34df758c967950dbf65183c5d0777e9f Mon Sep 17 00:00:00 2001 From: Agustin Isasmendi Date: Mon, 17 Nov 2025 11:17:42 +0100 Subject: [PATCH 3/5] feat(licenses): integrate OSADL copyleft data --- CHANGELOG.md | 9 +- LICENSE | 12 +- README.md | 5 + src/scanoss/data/osadl-copyleft.json | 133 +++++++++++++++++ src/scanoss/inspection/utils/license_utils.py | 127 ++++++++-------- src/scanoss/osadl.py | 135 ++++++++++++++++++ tests/test_osadl.py | 102 +++++++++++++ tests/test_policy_inspect.py | 17 +-- 8 files changed, 459 insertions(+), 81 deletions(-) create mode 100644 src/scanoss/data/osadl-copyleft.json create mode 100644 src/scanoss/osadl.py create mode 100644 tests/test_osadl.py diff --git a/CHANGELOG.md b/CHANGELOG.md index de72021d..532ba071 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,12 +13,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Supports both `-ls source1 source2` and `-ls source1 -ls source2` syntax ### Changed +- **Switched to OSADL authoritative copyleft license data** + - Copyleft detection now uses [OSADL (Open Source Automation Development Lab)](https://www.osadl.org/) checklist data + - Adds missing `-or-later` license variants (GPL-2.0-or-later, GPL-3.0-or-later, LGPL-2.1-or-later, etc.) + - Expands copyleft coverage from 21 to 32 licenses + - Custom include/exclude/explicit filters still use legacy behavior for backward compatibility + - Dataset attribution added to README (CC-BY-4.0 license) + - Copyleft inspection now defaults to component-level licenses only (component_declared, license_file) - Reduces noise from file-level license detections (file_header, scancode) - Use `-ls` to override and check specific sources ### Fixed -- Fixed terminal cursor disappearing after aborting scan with Ctrl+C +- Fixed the terminal cursor disappearing after aborting scan with Ctrl+C ## [1.40.1] - 2025-10-29 ### Changed diff --git a/LICENSE b/LICENSE index 23bdec93..47ae1a0f 100644 --- a/LICENSE +++ b/LICENSE @@ -18,4 +18,14 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. \ No newline at end of file +THE SOFTWARE. + +=============================================================================== + +In addition, this repository includes an unmodified copy of the OSADL copyleft +license checklist data (src/scanoss/data/osadl-copyleft.json), which is +licensed under the Creative Commons Attribution 4.0 International license +(CC-BY-4.0) by the Open Source Automation Development Lab (OSADL) eG. + +The OSADL data file contains its own license header with full attribution +information. diff --git a/README.md b/README.md index f5d1bfb2..ececd218 100644 --- a/README.md +++ b/README.md @@ -135,3 +135,8 @@ Details of major changes to the library can be found in [CHANGELOG.md](CHANGELOG ## Background Details about the Winnowing algorithm used for scanning can be found [here](WINNOWING.md). + +## Dataset License Notice +This application is licensed under the MIT License. In addition, it includes an unmodified copy of the OSADL copyleft license dataset ([osadl-copyleft.json](src/scanoss/data/osadl-copyleft.json)) which is licensed under the [Creative Commons Attribution 4.0 International license (CC-BY-4.0)](https://creativecommons.org/licenses/by/4.0/) by the [Open Source Automation Development Lab (OSADL) eG](https://www.osadl.org/). + +**Attribution:** A project by the Open Source Automation Development Lab (OSADL) eG. Original source: [https://www.osadl.org/fileadmin/checklists/copyleft.json](https://www.osadl.org/fileadmin/checklists/copyleft.json) diff --git a/src/scanoss/data/osadl-copyleft.json b/src/scanoss/data/osadl-copyleft.json new file mode 100644 index 00000000..6cc2e1a7 --- /dev/null +++ b/src/scanoss/data/osadl-copyleft.json @@ -0,0 +1,133 @@ +{ + "title": "OSADL Open Source License Obligations Checklist (https:\/\/www.osadl.org\/Checklists)", + "license": "Creative Commons Attribution 4.0 International license (CC-BY-4.0)", + "attribution": "A project by the Open Source Automation Development Lab (OSADL) eG. For further information about the project see the description at www.osadl.org\/checklists.", + "copyright": "(C) 2017 - 2024 Open Source Automation Development Lab (OSADL) eG and contributors, info@osadl.org", + "disclaimer": "The checklists and particularly the copyleft data have been assembled with maximum diligence and care; however, the authors do not warrant nor can be held liable in any way for its correctness, usefulness, merchantibility or fitness for a particular purpose as far as permissible by applicable law. Anyone who uses the information does this on his or her sole responsibility. For any individual legal advice, it is recommended to contact a lawyer.", + "timeformat": "%Y-%m-%dT%H:%M:%S%z", + "timestamp": "2025-10-30T11:23:00+0000", + "copyleft": + { + "0BSD": "No", + "AFL-2.0": "No", + "AFL-2.1": "No", + "AFL-3.0": "No", + "AGPL-3.0-only": "Yes", + "AGPL-3.0-or-later": "Yes", + "Apache-1.0": "No", + "Apache-1.1": "No", + "Apache-2.0": "No", + "APSL-2.0": "Yes (restricted)", + "Artistic-1.0": "No", + "Artistic-1.0-Perl": "No", + "Artistic-2.0": "No", + "Bitstream-Vera": "No", + "blessing": "No", + "BlueOak-1.0.0": "No", + "BSD-1-Clause": "No", + "BSD-2-Clause": "No", + "BSD-2-Clause-Patent": "No", + "BSD-3-Clause": "No", + "BSD-3-Clause-Open-MPI": "No", + "BSD-4-Clause": "No", + "BSD-4-Clause-UC": "No", + "BSD-4.3TAHOE": "No", + "BSD-Source-Code": "No", + "BSL-1.0": "No", + "bzip2-1.0.5": "No", + "bzip2-1.0.6": "No", + "CC-BY-2.5": "No", + "CC-BY-3.0": "No", + "CDDL-1.0": "Yes (restricted)", + "CDDL-1.1": "Yes (restricted)", + "CPL-1.0": "Yes", + "curl": "No", + "ECL-1.0": "No", + "ECL-2.0": "No", + "EFL-2.0": "No", + "EPL-1.0": "Yes", + "EPL-2.0": "Yes (restricted)", + "EUPL-1.1": "Yes", + "EUPL-1.2": "Yes", + "FSFAP": "No", + "FSFUL": "No", + "FSFULLR": "No", + "FSFULLRWD": "No", + "FTL": "No", + "GPL-1.0-only": "Yes", + "GPL-1.0-or-later": "Yes", + "GPL-2.0-only": "Yes", + "GPL-2.0-only WITH Classpath-exception-2.0": "Yes (restricted)", + "GPL-2.0-or-later": "Yes", + "GPL-3.0-only": "Yes", + "GPL-3.0-or-later": "Yes", + "HPND": "No", + "IBM-pibs": "No", + "ICU": "No", + "IJG": "No", + "ImageMagick": "No", + "Info-ZIP": "No", + "IPL-1.0": "Yes", + "ISC": "No", + "JasPer-2.0": "No", + "LGPL-2.0-only": "Yes (restricted)", + "LGPL-2.0-or-later": "Yes (restricted)", + "LGPL-2.1-only": "Yes (restricted)", + "LGPL-2.1-or-later": "Yes (restricted)", + "LGPL-3.0-only": "Yes (restricted)", + "LGPL-3.0-or-later": "Yes (restricted)", + "Libpng": "No", + "libpng-2.0": "No", + "libtiff": "No", + "LicenseRef-scancode-bsla-no-advert": "No", + "LicenseRef-scancode-info-zip-2003-05": "No", + "LicenseRef-scancode-ppp": "No", + "Minpack": "No", + "MirOS": "No", + "MIT": "No", + "MIT-0": "No", + "MIT-CMU": "No", + "MPL-1.1": "Yes (restricted)", + "MPL-2.0": "Yes (restricted)", + "MPL-2.0-no-copyleft-exception": "Yes (restricted)", + "MS-PL": "Questionable", + "MS-RL": "Yes (restricted)", + "NBPL-1.0": "No", + "NCSA": "No", + "NTP": "No", + "OFL-1.1": "Yes (restricted)", + "OGC-1.0": "No", + "OLDAP-2.8": "No", + "OpenSSL": "Questionable", + "OSL-3.0": "Yes", + "PHP-3.01": "No", + "PostgreSQL": "No", + "PSF-2.0": "No", + "Python-2.0": "No", + "Qhull": "No", + "RSA-MD": "No", + "Saxpath": "No", + "SGI-B-2.0": "No", + "Sleepycat": "Yes", + "SMLNJ": "No", + "Spencer-86": "No", + "SSH-OpenSSH": "No", + "SSH-short": "No", + "SunPro": "No", + "Ubuntu-font-1.0": "Yes (restricted)", + "Unicode-3.0": "No", + "Unicode-DFS-2015": "No", + "Unicode-DFS-2016": "No", + "Unlicense": "No", + "UPL-1.0": "No", + "W3C": "No", + "W3C-19980720": "No", + "W3C-20150513": "No", + "WTFPL": "No", + "X11": "No", + "XFree86-1.1": "No", + "Zlib": "No", + "zlib-acknowledgement": "No", + "ZPL-2.0": "No" + } +} \ No newline at end of file diff --git a/src/scanoss/inspection/utils/license_utils.py b/src/scanoss/inspection/utils/license_utils.py index beb7dd09..247cb06f 100644 --- a/src/scanoss/inspection/utils/license_utils.py +++ b/src/scanoss/inspection/utils/license_utils.py @@ -22,96 +22,89 @@ THE SOFTWARE. """ -from ...scanossbase import ScanossBase +from scanoss.osadl import Osadl -DEFAULT_COPYLEFT_LICENSES = { - 'agpl-3.0-only', - 'artistic-1.0', - 'artistic-2.0', - 'cc-by-sa-4.0', - 'cddl-1.0', - 'cddl-1.1', - 'cecill-2.1', - 'epl-1.0', - 'epl-2.0', - 'gfdl-1.1-only', - 'gfdl-1.2-only', - 'gfdl-1.3-only', - 'gpl-1.0-only', - 'gpl-2.0-only', - 'gpl-3.0-only', - 'lgpl-2.1-only', - 'lgpl-3.0-only', - 'mpl-1.1', - 'mpl-2.0', - 'sleepycat', - 'watcom-1.0', -} +from ...scanossbase import ScanossBase class LicenseUtil(ScanossBase): """ A utility class for handling software licenses, particularly copyleft licenses. - This class provides functionality to initialize, manage, and query a set of - copyleft licenses. It also offers a method to generate URLs for license information. + Uses OSADL (Open Source Automation Development Lab) authoritative copyleft data + with optional include/exclude/explicit filters. """ BASE_SPDX_ORG_URL = 'https://spdx.org/licenses' - BASE_OSADL_URL = 'https://www.osadl.org/fileadmin/checklists/unreflicenses' def __init__(self, debug: bool = False, trace: bool = True, quiet: bool = False): super().__init__(debug, trace, quiet) - self.default_copyleft_licenses = set(DEFAULT_COPYLEFT_LICENSES) - self.copyleft_licenses = set() + self.osadl = Osadl(debug=debug) + self.include_licenses = set() + self.exclude_licenses = set() + self.explicit_licenses = set() def init(self, include: str = None, exclude: str = None, explicit: str = None): """ - Initialize the set of copyleft licenses based on user input. - - This method allows for customization of the copyleft license set by: - - Setting an explicit list of licenses - - Including additional licenses to the default set - - Excluding specific licenses from the default set + Initialize copyleft license filters. - :param include: Comma-separated string of licenses to include - :param exclude: Comma-separated string of licenses to exclude - :param explicit: Comma-separated string of licenses to use exclusively + :param include: Comma-separated licenses to mark as copyleft (in addition to OSADL) + :param exclude: Comma-separated licenses to mark as NOT copyleft (override OSADL) + :param explicit: Comma-separated licenses to use exclusively (ignore OSADL) """ - if self.debug: - self.print_stderr(f'Include Copyleft licenses: ${include}') - self.print_stderr(f'Exclude Copyleft licenses: ${exclude}') - self.print_stderr(f'Explicit Copyleft licenses: ${explicit}') - if explicit: - explicit = explicit.strip() + # Reset previous filters so init() can be safely called multiple times + self.include_licenses.clear() + self.exclude_licenses.clear() + self.explicit_licenses.clear() + + # Parse explicit list (if provided, ignore OSADL completely) if explicit: - exp = [item.strip().lower() for item in explicit.split(',')] - self.copyleft_licenses = set(exp) - self.print_debug(f'Copyleft licenses: ${self.copyleft_licenses}') + self.explicit_licenses = {lic.strip().lower() for lic in explicit.split(',') if lic.strip()} + self.print_debug(f'Explicit copyleft licenses: {self.explicit_licenses}') return - # If no explicit licenses were set, set default ones - self.copyleft_licenses = self.default_copyleft_licenses.copy() - if include: - include = include.strip() + + # Parse include list (mark these as copyleft in addition to OSADL) if include: - inc = [item.strip().lower() for item in include.split(',')] - self.copyleft_licenses.update(inc) - if exclude: - exclude = exclude.strip() + self.include_licenses = {lic.strip().lower() for lic in include.split(',') if lic.strip()} + self.print_debug(f'Include licenses: {self.include_licenses}') + + # Parse exclude list (mark these as NOT copyleft, overriding OSADL) if exclude: - inc = [item.strip().lower() for item in exclude.split(',')] - for lic in inc: - self.copyleft_licenses.discard(lic) - self.print_debug(f'Copyleft licenses: ${self.copyleft_licenses}') + self.exclude_licenses = {lic.strip().lower() for lic in exclude.split(',') if lic.strip()} + self.print_debug(f'Exclude licenses: {self.exclude_licenses}') def is_copyleft(self, spdxid: str) -> bool: """ - Check if a given license is considered copyleft. + Check if a license is copyleft. + + Logic: + 1. If explicit list provided → check if license in explicit list + 2. If license in include list → return True + 3. If license in exclude list → return False + 4. Otherwise → use OSADL authoritative data - :param spdxid: The SPDX identifier of the license to check - :return: True if the license is copyleft, False otherwise + :param spdxid: SPDX license identifier + :return: True if copyleft, False otherwise """ - return spdxid.lower() in self.copyleft_licenses + if not spdxid: + return False + + spdxid_lc = spdxid.lower() + + # Explicit mode: use only the explicit list + if self.explicit_licenses: + return spdxid_lc in self.explicit_licenses + + # Include filter: if license in include list, force copyleft=True + if spdxid_lc in self.include_licenses: + return True + + # Exclude filter: if license in exclude list, force copyleft=False + if spdxid_lc in self.exclude_licenses: + return False + + # No filters matched, use OSADL authoritative data + return self.osadl.is_copyleft(spdxid) def get_spdx_url(self, spdxid: str) -> str: """ @@ -122,14 +115,6 @@ def get_spdx_url(self, spdxid: str) -> str: """ return f'{self.BASE_SPDX_ORG_URL}/{spdxid}.html' - def get_osadl_url(self, spdxid: str) -> str: - """ - Generate the URL for the OSADL (Open Source Automation Development Lab) page of a license. - - :param spdxid: The SPDX identifier of the license - :return: The URL of the OSADL page for the given license - """ - return f'{self.BASE_OSADL_URL}/{spdxid}.txt' # diff --git a/src/scanoss/osadl.py b/src/scanoss/osadl.py new file mode 100644 index 00000000..165f625e --- /dev/null +++ b/src/scanoss/osadl.py @@ -0,0 +1,135 @@ +""" +SPDX-License-Identifier: MIT + + Copyright (c) 2025, SCANOSS + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +""" + +import json +import sys + +import importlib_resources + + +class Osadl: + """ + OSADL data accessor class. + + Provides access to OSADL (Open Source Automation Development Lab) authoritative + checklist data for license analysis. + + Data is loaded once at class level and shared across all instances for efficiency. + + Data source: https://www.osadl.org/fileadmin/checklists/copyleft.json + License: CC-BY-4.0 + """ + + _shared_copyleft_data = {} + _data_loaded = False + + def __init__(self, debug: bool = False): + """ + Initialize the Osadl class. + Data is loaded once at class level and shared across all instances. + """ + self.debug = debug + self._load_copyleft_data() + + @staticmethod + def print_stderr(*args, **kwargs): + """ + Print the given message to STDERR + """ + print(*args, file=sys.stderr, **kwargs) + + def print_debug(self, *args, **kwargs): + """ + Print debug message if enabled + """ + if self.debug: + self.print_stderr(*args, **kwargs) + + def _load_copyleft_data(self) -> bool: + """ + Load the embedded OSADL copyleft JSON file into class-level shared data. + Data is loaded only once and shared across all instances. + + :return: True if successful, False otherwise + """ + if Osadl._data_loaded: + return True + + # OSADL copyleft license checklist from: https://www.osadl.org/Checklists + # Data source: https://www.osadl.org/fileadmin/checklists/copyleft.json + # License: CC-BY-4.0 (Creative Commons Attribution 4.0 International) + # Copyright: (C) 2017 - 2024 Open Source Automation Development Lab (OSADL) eG + try: + f_name = importlib_resources.files(__name__) / 'data/osadl-copyleft.json' + with importlib_resources.as_file(f_name) as f: + with open(f, 'r', encoding='utf-8') as file: + data = json.load(file) + except Exception as e: + self.print_stderr(f'ERROR: Problem loading OSADL copyleft data: {e}') + return False + + # Process copyleft data + copyleft = data.get('copyleft', {}) + if not copyleft: + self.print_stderr('ERROR: No copyleft data found in OSADL JSON') + return False + + # Store in class-level shared dictionary + for lic_id, status in copyleft.items(): + # Normalize license ID (lowercase) for consistent lookup + lic_id_lc = lic_id.lower() + Osadl._shared_copyleft_data[lic_id_lc] = status + + Osadl._data_loaded = True + self.print_debug(f'Loaded {len(Osadl._shared_copyleft_data)} OSADL copyleft entries') + return True + + def is_copyleft(self, spdx_id: str) -> bool: + """ + Check if a license is copyleft according to OSADL data. + + Returns True for both strong copyleft ("Yes") and weak/restricted copyleft ("Yes (restricted)"). + + :param spdx_id: SPDX license identifier + :return: True if copyleft, False otherwise + """ + if not spdx_id: + return False + + # Normalize lookup + spdx_id_lc = spdx_id.lower() + # Use class-level shared data + status = Osadl._shared_copyleft_data.get(spdx_id_lc) + + if not status: + self.print_debug(f'No OSADL copyleft data for license: {spdx_id}') + return False + + # Consider both "Yes" and "Yes (restricted)" as copyleft (case-insensitive) + return status.lower().startswith('yes') + + +# +# End of Osadl Class +# diff --git a/tests/test_osadl.py b/tests/test_osadl.py new file mode 100644 index 00000000..6e0b929f --- /dev/null +++ b/tests/test_osadl.py @@ -0,0 +1,102 @@ +""" +SPDX-License-Identifier: MIT + + Copyright (c) 2025, SCANOSS + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +""" +import unittest + +from scanoss.osadl import Osadl + + +class TestOsadl(unittest.TestCase): + """ + Test the Osadl class + """ + + def test_initialization(self): + """Test basic initialization - data is loaded at class level""" + osadl = Osadl() + self.assertIsNotNone(osadl) + self.assertTrue(Osadl._data_loaded) + self.assertGreater(len(Osadl._shared_copyleft_data), 0) + + def test_initialization_with_debug(self): + """Test initialization with debug enabled""" + osadl = Osadl(debug=True) + self.assertTrue(osadl.debug) + + def test_is_copyleft_gpl_2_0_only(self): + """Test GPL-2.0-only is copyleft""" + osadl = Osadl() + self.assertTrue(osadl.is_copyleft('GPL-2.0-only')) + + def test_is_copyleft_gpl_2_0_or_later(self): + """Test GPL-2.0-or-later is copyleft""" + osadl = Osadl() + self.assertTrue(osadl.is_copyleft('GPL-2.0-or-later')) + + def test_is_not_copyleft_mit(self): + """Test MIT is not copyleft""" + osadl = Osadl() + self.assertFalse(osadl.is_copyleft('MIT')) + + def test_is_copyleft_case_insensitive_license_id(self): + """Test license ID lookup is case-insensitive""" + osadl = Osadl() + self.assertTrue(osadl.is_copyleft('gpl-2.0-only')) + self.assertTrue(osadl.is_copyleft('GPL-2.0-ONLY')) + self.assertTrue(osadl.is_copyleft('Gpl-2.0-Only')) + + def test_is_copyleft_unknown_license(self): + """Test unknown license returns False""" + osadl = Osadl() + self.assertFalse(osadl.is_copyleft('Unknown-License')) + + def test_is_copyleft_empty_string(self): + """Test empty string returns False""" + osadl = Osadl() + self.assertFalse(osadl.is_copyleft('')) + + def test_is_copyleft_none(self): + """Test None returns False""" + osadl = Osadl() + self.assertFalse(osadl.is_copyleft(None)) + + def test_multiple_instances_share_data(self): + """Test that multiple instances share the same class-level data""" + osadl1 = Osadl() + osadl2 = Osadl() + + # Both instances should see data loaded by first instance + result1 = osadl1.is_copyleft('GPL-2.0-only') + self.assertTrue(result1) + self.assertTrue(Osadl._data_loaded) + + # Second instance uses the same class-level shared data + result2 = osadl2.is_copyleft('MIT') + self.assertFalse(result2) + + # Verify both instances reference the same class-level data + self.assertIs(Osadl._shared_copyleft_data, Osadl._shared_copyleft_data) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_policy_inspect.py b/tests/test_policy_inspect.py index a3161a45..27c6ef41 100644 --- a/tests/test_policy_inspect.py +++ b/tests/test_policy_inspect.py @@ -579,7 +579,7 @@ def test_copyleft_policy_license_sources_with_markdown_output(self): def test_copyleft_policy_license_sources_with_include_filter(self): """ Test license_sources works with include filter - Filter to scancode source and include only GPL-2.0-or-later + Filter to scancode source and include MIT (normally not copyleft) """ script_dir = os.path.dirname(os.path.abspath(__file__)) file_name = 'result.json' @@ -588,18 +588,19 @@ def test_copyleft_policy_license_sources_with_include_filter(self): filepath=input_file_name, format_type='json', license_sources=['scancode'], - include='GPL-2.0-or-later' + include='MIT' ) status, policy_output = copyleft.run() details = json.loads(policy_output.details) - # Should find only GPL-2.0-or-later from scancode + # Should find MIT (added via include) and any OSADL copyleft licenses self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) - if details: # May be empty if no matches - for component in details.get('components', []): - for license in component['licenses']: - self.assertEqual(license['spdxid'], 'GPL-2.0-or-later') - self.assertEqual(license['source'], 'scancode') + self.assertGreater(len(details.get('components', [])), 0) + + # Verify all licenses are from scancode or unknown (always included) + for component in details.get('components', []): + for license in component['licenses']: + self.assertIn(license['source'], ['scancode', 'unknown']) def test_copyleft_policy_license_sources_with_exclude_filter(self): """ From ba806b637d0c089e2cee80eba28586637df57d69 Mon Sep 17 00:00:00 2001 From: Agustin Isasmendi Date: Mon, 17 Nov 2025 13:21:27 +0100 Subject: [PATCH 4/5] chore(linter): improve comment formatting --- src/scanoss/scanner.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/scanoss/scanner.py b/src/scanoss/scanner.py index 6e5d147b..cde0ad88 100644 --- a/src/scanoss/scanner.py +++ b/src/scanoss/scanner.py @@ -394,8 +394,8 @@ def scan_folder(self, scan_dir: str) -> bool: # noqa: PLR0912, PLR0915 file_count += 1 if self.threaded_scan: wfp_size = len(wfp.encode('utf-8')) - # If the WFP is bigger than the max post size and we already have something stored in the scan block, - # add it to the queue + # If the WFP is bigger than the max post size and we already have something + # stored in the scan block, add it to the queue if scan_block != '' and (wfp_size + scan_size) >= self.max_post_size: self.threaded_scan.queue_add(scan_block) queue_size += 1 @@ -404,7 +404,8 @@ def scan_folder(self, scan_dir: str) -> bool: # noqa: PLR0912, PLR0915 scan_block += wfp scan_size = len(scan_block.encode('utf-8')) wfp_file_count += 1 - # If the scan request block (group of WFPs) or larger than the POST size or we have reached the file limit, add it to the queue # noqa: E501 + # If the scan request block (group of WFPs) is larger than the POST size + # or we have reached the file limit, add it to the queue if wfp_file_count > self.post_file_count or scan_size >= self.max_post_size: self.threaded_scan.queue_add(scan_block) queue_size += 1 @@ -413,7 +414,10 @@ def scan_folder(self, scan_dir: str) -> bool: # noqa: PLR0912, PLR0915 if not scan_started and queue_size > self.nb_threads: # Start scanning if we have something to do scan_started = True if not self.threaded_scan.run(wait=False): - self.print_stderr('Warning: Some errors encounted while scanning. Results might be incomplete.') + self.print_stderr( + 'Warning: Some errors encountered while scanning. ' + 'Results might be incomplete.' + ) success = False # End for loop if self.threaded_scan and scan_block != '': @@ -659,7 +663,8 @@ def scan_files(self, files: []) -> bool: # noqa: PLR0912, PLR0915 file_count += 1 if self.threaded_scan: wfp_size = len(wfp.encode('utf-8')) - # If the WFP is bigger than the max post size and we already have something stored in the scan block, add it to the queue # noqa: E501 + # If the WFP is bigger than the max post size and we already have something + # stored in the scan block, add it to the queue if scan_block != '' and (wfp_size + scan_size) >= self.max_post_size: self.threaded_scan.queue_add(scan_block) queue_size += 1 @@ -668,7 +673,8 @@ def scan_files(self, files: []) -> bool: # noqa: PLR0912, PLR0915 scan_block += wfp scan_size = len(scan_block.encode('utf-8')) wfp_file_count += 1 - # If the scan request block (group of WFPs) or larger than the POST size or we have reached the file limit, add it to the queue # noqa: E501 + # If the scan request block (group of WFPs) is larger than the POST size + # or we have reached the file limit, add it to the queue if wfp_file_count > self.post_file_count or scan_size >= self.max_post_size: self.threaded_scan.queue_add(scan_block) queue_size += 1 @@ -678,7 +684,8 @@ def scan_files(self, files: []) -> bool: # noqa: PLR0912, PLR0915 scan_started = True if not self.threaded_scan.run(wait=False): self.print_stderr( - 'Warning: Some errors encounted while scanning. Results might be incomplete.' + 'Warning: Some errors encountered while scanning. ' + 'Results might be incomplete.' ) success = False @@ -799,7 +806,9 @@ def scan_wfp_file(self, file: str = None) -> bool: # noqa: PLR0912, PLR0915 f' {file_count} ({len(wfp.encode("utf-8"))} bytes) files to the ScanOSS API.' ) if self.debug and cur_size > self.max_post_size: - Scanner.print_stderr(f'Warning: Post size {cur_size} greater than limit {self.max_post_size}') + Scanner.print_stderr( + f'Warning: Post size {cur_size} greater than limit {self.max_post_size}' + ) scan_resp = self.scanoss_api.scan(wfp, max_component['name']) # Scan current WFP and store if bar: bar.next(batch_files) From 6db2eac5b8a4c09a9d2213425ca6839dab01993e Mon Sep 17 00:00:00 2001 From: Agustin Isasmendi Date: Mon, 17 Nov 2025 13:21:55 +0100 Subject: [PATCH 5/5] chore: bump version to 1.41.0 --- src/scanoss/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scanoss/__init__.py b/src/scanoss/__init__.py index 694457df..46c39ba1 100644 --- a/src/scanoss/__init__.py +++ b/src/scanoss/__init__.py @@ -22,4 +22,4 @@ THE SOFTWARE. """ -__version__ = '1.40.1' +__version__ = '1.41.0'