<hr>

***Version: 1001.1492024.qut.cs.tnl***

***Sk Tanzir Mehedi, PhD Student, QUT***

***Supervisory Team: Prof. Raja Jurdak & Dr Chadni Islam***
<hr>

**----Start of Step 2----**

## Malicious Packages Summary From Files

In [85]:
import zipfile
import tarfile
import io

def count_setup_py_in_nested_zip(zip_file_path):
    setup_py_count = 0  # Counter for setup.py files
    outer_folder_count = 0
    package_folders_count = 0
    version_folders_count = 0
    tar_files_count = 0
    no_tar_files_count = 0
    inner_tar_read_count = 0
    no_setup_py_count = 0
    
    with zipfile.ZipFile(zip_file_path, 'r') as outer_zip:
        # List the contents of the outer zip to find the outermost folder (FolderName)
        folder_name = [name for name in outer_zip.namelist() if name.endswith('/')][0]
        outer_folder_count += 1
        print(f"Found outer folder: {folder_name}")
        
        # List the folders directly under the outer folder (PackageNameFolders)
        package_folders = [name for name in outer_zip.namelist() 
                           if name.startswith(folder_name) 
                           and name.endswith('/') 
                           and name.count('/') == folder_name.count('/') + 1]
        package_folders_count += len(package_folders)
        print(f"Package folders found: {package_folders}")
        
        # Iterate through PackageNameFolders to find VersionFolders and tar.gz or zip files
        for package_folder in package_folders:
            version_folders = [name for name in outer_zip.namelist() 
                               if name.startswith(package_folder) 
                               and name.endswith('/') 
                               and name != package_folder]
            version_folders_count += len(version_folders)
            print(f"Version folders found in {package_folder}: {version_folders}")
            
            for version_folder in version_folders:
                # Check for a .tar.gz or .zip file within the VersionFolder
                tar_files = [name for name in outer_zip.namelist() if name.startswith(version_folder) and (name.endswith('.tar.gz') or name.endswith('.zip'))]
                
                if tar_files:
                    tar_files_count += len(tar_files)
                    print(f"Compressed files found in {version_folder}: {tar_files}")
                else:
                    no_tar_files_count += 1
                    print(f"No compressed files found in {version_folder}. Skipping.")
                    continue
                
                tar_package_name = tar_files[0]
                inner_tar_read_count += 1
                print(f"Reading compressed file: {tar_package_name}")
                
                # Now, check if it's a .tar.gz or a .zip file and handle accordingly
                try:
                    with outer_zip.open(tar_package_name) as inner_file:
                        file_signature = inner_file.read(2)
                        inner_file.seek(0)  # Reset file pointer after reading signature
                        
                        if file_signature == b'\x1f\x8b':  # GZIP signature
                            try:
                                with tarfile.open(fileobj=io.BytesIO(inner_file.read()), mode='r:gz') as tar:
                                    # Find the 'setup.py' file in the tar.gz
                                    setup_py_path = [name for name in tar.getnames() if name.endswith('setup.py')]
                                    if setup_py_path:
                                        setup_py_count += 1  # Increment count when setup.py is found
                                        print(f"Found setup.py at: {setup_py_path[0]}")
                                    else:
                                        no_setup_py_count += 1
                                        print(f"No setup.py found in {tar_package_name}")
                            except tarfile.ReadError:
                                print(f"Error reading tar.gz file: {tar_package_name}. Skipping.")
                        elif file_signature == b'PK':  # ZIP signature
                            try:
                                with zipfile.ZipFile(io.BytesIO(inner_file.read()), 'r') as inner_zip:
                                    # Find the 'setup.py' file in the zip
                                    setup_py_path = [name for name in inner_zip.namelist() if name.endswith('setup.py')]
                                    if setup_py_path:
                                        setup_py_count += 1  # Increment count when setup.py is found
                                        print(f"Found setup.py at: {setup_py_path[0]}")
                                    else:
                                        no_setup_py_count += 1
                                        print(f"No setup.py found in {tar_package_name}")
                            except zipfile.BadZipFile:
                                print(f"Error reading zip file: {tar_package_name}. Skipping.")
                        else:
                            print(f"Unknown file type for {tar_package_name}. Skipping.")
                except Exception as e:
                    print(f"Error processing file {tar_package_name}: {e}")
    
    # Print the final counts for each event
    print("\nSummary:")
    print(f"Total outer folders found: {outer_folder_count}")
    print(f"Total package folders found: {package_folders_count}")
    print(f"Total version folders found: {version_folders_count}")
    print(f"Total compressed files found: {tar_files_count}")
    print(f"Total no compressed files found: {no_tar_files_count}")
    print(f"Total inner compressed files read: {inner_tar_read_count}")
    print(f"Total setup.py files found: {setup_py_count}")
    print(f"Total no setup.py files found: {no_setup_py_count}")
    
    return setup_py_count

# Provide the path to your outer zip file
zip_file_path = 'D:/Final Version/Dataset/pypi_malregistry.zip'
setup_py_count = count_setup_py_in_nested_zip(zip_file_path)

Found outer folder: pypi_malregistry-main/
Package folders found: ['pypi_malregistry-main/10Cent10/', 'pypi_malregistry-main/10Cent11/', 'pypi_malregistry-main/11cent/', 'pypi_malregistry-main/12cent/', 'pypi_malregistry-main/1337c/', 'pypi_malregistry-main/1337test/', 'pypi_malregistry-main/1337z/', 'pypi_malregistry-main/13cent/', 'pypi_malregistry-main/14cent/', 'pypi_malregistry-main/15cent/', 'pypi_malregistry-main/16cent/', 'pypi_malregistry-main/1inch/', 'pypi_malregistry-main/2022-requests/', 'pypi_malregistry-main/282828282828282828/', 'pypi_malregistry-main/3m-promo-link-gen/', 'pypi_malregistry-main/3web-py/', 'pypi_malregistry-main/3web/', 'pypi_malregistry-main/4123/', 'pypi_malregistry-main/90456984689490856/', 'pypi_malregistry-main/AadhaarCrypt/', 'pypi_malregistry-main/Ailyboostbot/', 'pypi_malregistry-main/Ailynitro/', 'pypi_malregistry-main/BeaitifulSoop/', 'pypi_malregistry-main/BeaotifulSoup/', 'pypi_malregistry-main/BeaufifulSoup/', 'pypi_malregistry-main/Beautifi

## Collect Malicious Packages Details From Files

In [7]:
import zipfile
import tarfile
import io
import pandas as pd
import configparser
import re

# Define illegal characters for Excel and a function to clean them
ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]')

def remove_illegal_characters(value):
    """Removes illegal characters and converts non-string values to string."""
    if isinstance(value, str):
        return ILLEGAL_CHARACTERS_RE.sub('', value)
    return str(value)  # Ensure all values are strings for Excel compatibility

# Function to safely decode file contents with fallback encoding
def safe_decode(byte_data):
    try:
        return byte_data.decode('utf-8')
    except UnicodeDecodeError:
        # Fallback to latin-1 or another encoding if utf-8 fails
        return byte_data.decode('latin-1')

# Function to check if file content is valid for configparser
def is_valid_config(content):
    try:
        config = configparser.ConfigParser()
        config.read_string(content)
        return True
    except configparser.MissingSectionHeaderError:
        return False

# Extract package name and version from filename if PKG-INFO is missing
def extract_name_version_from_filename(tar_package_name):
    try:
        filename = tar_package_name.split('/')[-1].replace('.tar.gz', '')
        name, version = filename.split('-')[0], filename.split('-')[1]
        return name, version
    except IndexError:
        return '', ''  # Default to empty if the filename structure is not as expected

# Function to process and extract metadata from various files, including the new structure
def extract_metadata(tar, tar_package_name):
    # Try to get package name and version from file name first
    default_name, default_version = extract_name_version_from_filename(tar_package_name)

    metadata = {
        "Path": tar_package_name,  # Include the path of the package as a column
        "Malicious Package Name": default_name,  # Default from filename
        "Malicious Package Version": default_version,  # Default from filename
        "PKG-INFO": 'N',  # Default to not found
        "Metadata-Version": '',
        "Summary": '',
        "Author": '',
        "Author-email": '',
        "License": '',
        "Home-page": '',  # Add Home-page to the metadata
        "Keywords": '',
        "Description": '',
        "Platform": '',
        "Classifier": '',
        "Documentation": '',
        "Description-Content-Type": '',
        "Setup.py Exists": 'N',
        "setup.py": '',
        "README.md Exists": 'N',
        "README Content": '',
        "LICENSE Exists": 'N',
        "MANIFEST.in Exists": 'N',
        "requirements.txt Exists": 'N',
        "setup.cfg Exists": 'N',
        "tag_build": '',
        "tag_date": '',
        "pyproject.toml Exists": 'N',
        "pyproject.toml Content": '',
        "dependency links": '',
        "requires": '',
        "SOURCES": '',
        "top_level": '',
        "my_package Exists": 'N',
        "subpackage Exists": 'N',
        "modules": [],
        "tests Exists": 'N',
        "test_modules": [],
        "Reason for Skipping": ''  # Column to hold skip reason, initially empty
    }

    # Process PKG-INFO file
    pkg_info_path = [name for name in tar.getnames() if 'PKG-INFO' in name]
    if pkg_info_path:
        metadata["PKG-INFO"] = 'Y'
        pkg_info_file = tar.extractfile(pkg_info_path[0])
        if pkg_info_file:  # Ensure the file is not None
            try:
                pkg_info = safe_decode(pkg_info_file.read())  # Use safe_decode
                for line in pkg_info.splitlines():
                    if line.startswith("Name:"):
                        metadata["Malicious Package Name"] = line.split(":")[1].strip()
                    elif line.startswith("Version:"):
                        metadata["Malicious Package Version"] = line.split(":")[1].strip()
                    elif line.startswith("Metadata-Version:"):
                        metadata["Metadata-Version"] = line.split(":")[1].strip()
                    elif line.startswith("Summary:"):
                        metadata["Summary"] = line.split(":")[1].strip()
                    elif line.startswith("Author:"):
                        metadata["Author"] = line.split(":")[1].strip()
                    elif line.startswith("Author-email:"):
                        metadata["Author-email"] = line.split(":")[1].strip()
                    elif line.startswith("License:"):
                        metadata["License"] = line.split(":")[1].strip()
                    elif line.startswith("Home-page:"):
                        metadata["Home-page"] = line.split(":")[1].strip()  # Extract Home-page
                    elif line.startswith("Keywords:"):
                        metadata["Keywords"] = line.split(":")[1].strip()
                    elif line.startswith("Description:"):
                        metadata["Description"] = line.split(":")[1].strip()
                    elif line.startswith("Platform:"):
                        metadata["Platform"] = line.split(":")[1].strip()
                    elif line.startswith("Classifier:"):
                        metadata["Classifier"] = line.split(":")[1].strip()
                    elif line.startswith("Documentation:"):
                        metadata["Documentation"] = line.split(":")[1].strip()
                    elif line.startswith("Description-Content-Type:"):
                        metadata["Description-Content-Type"] = line.split(":")[1].strip()
            except Exception as e:
                print(f"Error reading PKG-INFO: {e}")
    
    # Process setup.py
    setup_py_path = [name for name in tar.getnames() if 'setup.py' in name]
    if setup_py_path:
        setup_py_file = tar.extractfile(setup_py_path[0])
        if setup_py_file:
            metadata["Setup.py Exists"] = 'Y'
            setup_py_content = safe_decode(setup_py_file.read())  # Use safe_decode
            metadata["setup.py"] = setup_py_content  # Store full content if needed
            if not metadata["Malicious Package Name"]:  # If Name wasn't found in PKG-INFO, try extracting from setup.py
                for line in setup_py_content.splitlines():
                    if "name=" in line:
                        metadata["Malicious Package Name"] = line.split('=')[1].strip().replace('"', '').replace("'", "")
                    if "version=" in line:
                        metadata["Version"] = line.split('=')[1].strip().replace('"', '').replace("'", "")

    # Process README.md
    readme_path = [name for name in tar.getnames() if 'README.md' in name]
    if readme_path:
        readme_file = tar.extractfile(readme_path[0])
        if readme_file:
            metadata["README.md Exists"] = 'Y'
            metadata["README Content"] = safe_decode(readme_file.read())  # Use safe_decode

    # Process LICENSE
    license_path = [name for name in tar.getnames() if 'LICENSE' in name]
    if license_path:
        license_file = tar.extractfile(license_path[0])
        if license_file:
            metadata["LICENSE Exists"] = 'Y'

    # Process MANIFEST.in
    manifest_path = [name for name in tar.getnames() if 'MANIFEST.in' in name]
    if manifest_path:
        manifest_file = tar.extractfile(manifest_path[0])
        if manifest_file:
            metadata["MANIFEST.in Exists"] = 'Y'

    # Process requirements.txt
    requirements_path = [name for name in tar.getnames() if 'requirements.txt' in name]
    if requirements_path:
        requirements_file = tar.extractfile(requirements_path[0])
        if requirements_file:
            metadata["requirements.txt Exists"] = 'Y'

    # Process setup.cfg and extract tag_build and tag_date using configparser
    setup_cfg_path = [name for name in tar.getnames() if 'setup.cfg' in name]
    if setup_cfg_path:
        setup_cfg_file = tar.extractfile(setup_cfg_path[0])
        if setup_cfg_file:
            metadata["setup.cfg Exists"] = 'Y'
            setup_cfg_content = safe_decode(setup_cfg_file.read())  # Use safe_decode

            # Ensure setup.cfg contains valid sections before parsing
            if is_valid_config(setup_cfg_content):
                config = configparser.ConfigParser()
                config.read_string(setup_cfg_content)

                # Attempt to extract tag_build and tag_date from the setup.cfg
                if 'egg_info' in config:
                    metadata["tag_build"] = config.get('egg_info', 'tag_build', fallback='')
                    metadata["tag_date"] = config.get('egg_info', 'tag_date', fallback='')

    # Process pyproject.toml
    pyproject_toml_path = [name for name in tar.getnames() if 'pyproject.toml' in name]
    if pyproject_toml_path:
        pyproject_toml_file = tar.extractfile(pyproject_toml_path[0])
        if pyproject_toml_file:
            metadata["pyproject.toml Exists"] = 'Y'
            metadata["pyproject.toml Content"] = safe_decode(pyproject_toml_file.read())  # Use safe_decode

    # Process egg-info files (dependency_links, requires, SOURCES, etc.)
    egg_info_files = [name for name in tar.getnames() if 'egg-info/' in name]
    for egg_file in egg_info_files:
        egg_info_file = tar.extractfile(egg_file)
        if egg_info_file:
            if 'dependency_links.txt' in egg_file:
                metadata["dependency links"] = safe_decode(egg_info_file.read()).strip()  # Use safe_decode
            elif 'requires.txt' in egg_file:
                metadata["requires"] = safe_decode(egg_info_file.read()).strip()  # Use safe_decode
            elif 'SOURCES.txt' in egg_file:
                metadata["SOURCES"] = safe_decode(egg_info_file.read()).strip()  # Use safe_decode
            elif 'top_level.txt' in egg_file:
                metadata["top_level"] = safe_decode(egg_info_file.read()).strip()  # Use safe_decode

    # Check for the existence of my_package/
    package_dir = [name for name in tar.getnames() if 'my_package/' in name]
    if package_dir:
        metadata["my_package Exists"] = 'Y'
        # Check for subpackage
        subpackage_dir = [name for name in tar.getnames() if 'my_package/subpackage/' in name]
        if subpackage_dir:
            metadata["subpackage Exists"] = 'Y'

        # Get modules inside my_package
        module_files = [name for name in tar.getnames() if 'my_package/' in name and name.endswith('.py')]
        metadata["modules"] = module_files

    # Check for the existence of tests/
    test_dir = [name for name in tar.getnames() if 'tests/' in name]
    if test_dir:
        metadata["tests Exists"] = 'Y'
        # Get test modules inside tests/
        test_files = [name for name in tar.getnames() if 'tests/' in name and name.endswith('.py')]
        metadata["test_modules"] = test_files

    return metadata

def count_setup_py_in_nested_zip(zip_file_path):
    data = []  # Store data for each package
    skip_count = 0  # Counter for skipped packages
    with zipfile.ZipFile(zip_file_path, 'r') as outer_zip:
        # List the contents of the outer zip to find the only folder (FolderName)
        folder_name = [name for name in outer_zip.namelist() if name.endswith('/')][0]
        print(f"Found outer folder: {folder_name}")
        
        # List the folders directly under the outer folder (PackageNameFolders)
        package_folders = [name for name in outer_zip.namelist() 
                           if name.startswith(folder_name) 
                           and name.endswith('/') 
                           and name.count('/') == folder_name.count('/') + 1]
        
        # Iterate through PackageNameFolders to find VersionFolders and tar.gz files
        for package_folder in package_folders:
            version_folders = [name for name in outer_zip.namelist() 
                               if name.startswith(package_folder) 
                               and name.endswith('/') 
                               and name != package_folder]
            
            for version_folder in version_folders:
                # Check for a .tar.gz file within the VersionFolder
                tar_files = [name for name in outer_zip.namelist() if name.startswith(version_folder) and name.endswith('.tar.gz')]
                
                if tar_files:
                    tar_package_name = tar_files[0]
                    print(f"Reading inner tar file: {tar_package_name}")
                    
                    try:
                        # Now, read the inner tar.gz file
                        with outer_zip.open(tar_package_name) as inner_tar_file:
                            # Extract and read the tar.gz file in memory
                            with tarfile.open(fileobj=io.BytesIO(inner_tar_file.read()), mode='r') as tar:
                                # Extract metadata and store it
                                metadata = extract_metadata(tar, tar_package_name)
                                data.append(metadata)
                    except (tarfile.ReadError, EOFError, UnicodeDecodeError) as e:
                        print(f"Skipping file {tar_package_name}, encountered an error during processing.")
                        # Add entry to data with the skip reason
                        name, version = extract_name_version_from_filename(tar_package_name)
                        data.append({
                            "Malicious Package Name": name,
                            "Malicious Package Version": version,
                            "Path": tar_package_name,
                            "Reason for Skipping": str(e)
                        })
                        skip_count += 1  # Increment the skip counter

    # Create DataFrame, remove illegal characters, and save to Excel
    df = pd.DataFrame(data)
    print(f"DataFrame shape before export: {df.shape}")
    
    # Check for missing or empty columns
    missing_columns = df.columns[df.isna().all()].tolist()
    print(f"Empty columns: {missing_columns}")

    # Ensure all columns are included
    df = df.apply(lambda x: x.map(remove_illegal_characters))  # Clean illegal characters
    
    # Convert the DataFrame to a list of tuples
    df_list = df.values.tolist()

    # Sort the list using sorted(), case-insensitive for the first column (Package Name)
    df_list_sorted = sorted(df_list, key=lambda x: x[0].lower())

    # Convert the sorted list back to a DataFrame
    df_sorted = pd.DataFrame(df_list_sorted, columns=df.columns)

    # Write the sorted DataFrame to an Excel file
    df_sorted.to_excel('MaliciousPackagesDetailsFromFiles.xlsx', index=False)

    print(f"Metadata extracted and saved to MaliciousPackageMetadata.xlsx with {df_sorted.shape[1]} columns")
    print(f"Total packages: {df_sorted.shape[0]}")  # Print total package count
    print(f"Total corrupted packages: {skip_count}")  # Print skipped package count
    print(f"Metadata found packages: {df_sorted.shape[0]-skip_count}")  # Print metadata found package count

# Path to your zip file
zip_file_path = 'D:/Final Version/Dataset/pypi_malregistry.zip'
count_setup_py_in_nested_zip(zip_file_path)

Found outer folder: pypi_malregistry-main/
Reading inner tar file: pypi_malregistry-main/10Cent10/999.0.4/10Cent10-999.0.4.tar.gz
Reading inner tar file: pypi_malregistry-main/10Cent11/999.0.4/10Cent11-999.0.4.tar.gz
Reading inner tar file: pypi_malregistry-main/11cent/999.0.0/11Cent-999.0.0.tar.gz
Reading inner tar file: pypi_malregistry-main/11cent/999.0.1/11Cent-999.0.1.tar.gz
Reading inner tar file: pypi_malregistry-main/11cent/999.0.2/11Cent-999.0.2.tar.gz
Reading inner tar file: pypi_malregistry-main/11cent/999.0.3/11Cent-999.0.3.tar.gz
Reading inner tar file: pypi_malregistry-main/11cent/999.0.4/11Cent-999.0.4.tar.gz
Reading inner tar file: pypi_malregistry-main/12cent/999.0.0/12Cent-999.0.0.tar.gz
Reading inner tar file: pypi_malregistry-main/12cent/999.0.1/12Cent-999.0.1.tar.gz
Reading inner tar file: pypi_malregistry-main/1337c/4.4.7/1337c-4.4.7.tar.gz
Reading inner tar file: pypi_malregistry-main/1337test/1/1337test-1.tar.gz
Reading inner tar file: pypi_malregistry-main/1337

**----End of Step 2----**