In [7]:
import os
import pandas as pd
# mapping of CWE ID to vulnerability categories

# Define the mapping of CWE to vulnerability categories
cwe_category_mapping = {
    'CWE-23': 'LOWV',
    'CWE-134': 'LOWV',
    'CWE-470': 'LOWV',
    'CWE-502': 'LOIS',
    'CWE-113': 'LOIS',
    'CWE-601': 'LOIS',
    'CWE-78': 'LOIS',
    'CWE-79': 'LOIS',
    'CWE-918': 'ITV',
    'CWE-319': 'ITV'
}

# Initialize dictionaries to store dataframes by category
category_dfs = {
    'LOWV': [],
    'LOIS': [],
    'ITV': []
}
# Path to the final dataset
file_path = '/Users/obiedaananbeh/Desktop/Repo/VulDediction/DataSet/final_dataset.csv'

try:
    # Read the final dataset
    final_df = pd.read_csv(file_path)
    
    # Map rows to categories based on the "CWE ID" column
    for cwe_id, category in cwe_category_mapping.items():
        # Filter rows matching the current CWE ID
        filtered_df = final_df[final_df['CWE ID'] == cwe_id]
        
        if not filtered_df.empty:
            # Add the filtered rows to the corresponding category
            category_dfs[category].append(filtered_df)
    
    # Combine dataframes for each category
    for category in category_dfs:
        if category_dfs[category]:
            combined_df = pd.concat(category_dfs[category], ignore_index=True)
            # Save to CSV
            output_path = f'/Users/obiedaananbeh/Desktop/Repo/VulDediction/Apply refactoring technique/DataSet/{category}.csv'
            combined_df.to_csv(output_path, index=False)
            print(f"{category}: {len(combined_df)} rows saved to {output_path}")
except Exception as e:
    print(f"Error processing the final dataset: {str(e)}")


LOWV: 3000 rows saved to /Users/obiedaananbeh/Desktop/Repo/VulDediction/Apply refactoring technique/DataSet/LOWV.csv
LOIS: 5000 rows saved to /Users/obiedaananbeh/Desktop/Repo/VulDediction/Apply refactoring technique/DataSet/LOIS.csv
ITV: 2000 rows saved to /Users/obiedaananbeh/Desktop/Repo/VulDediction/Apply refactoring technique/DataSet/ITV.csv


In [12]:
# apply the TrustChain Verification Refactoring (TCVR) algorithm 
import pandas as pd
import re

def extract_interaction_details(code_snippet):
    """
    Extract relevant details about external interactions from the code snippet.
    """
    # Extract socket-related operations
    socket_ops = {
        'read': bool(re.search(r'getInputStream()', code_snippet)),
        'write': bool(re.search(r'getOutputStream()', code_snippet)),
        'connect': bool(re.search(r'socket\.connect|new Socket', code_snippet))
    }
    return socket_ops

def construct_verification(interaction_details):
    """
    Construct appropriate verification mechanisms based on interaction details.
    """
    verifications = []
    
    if interaction_details['connect']:
        verifications.append("""
        // Verify server certificate and establish SSL connection
        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(null, trustStore.getCustomTrustManagers(), new SecureRandom());
        SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
        SSLSocket sslSocket = (SSLSocket) sslSocketFactory.createSocket(host, port);
        """)
    
    if interaction_details['read']:
        verifications.append("""
        // Verify data integrity before reading
        if (!verifyMessageIntegrity(inputStream)) {
            throw new SecurityException("Message integrity verification failed");
        }
        """)
    
    if interaction_details['write']:
        verifications.append("""
        // Sign outgoing data
        signAndWriteData(outputStream, data);
        """)
    
    return "\n".join(verifications)

def build_verified_execution(code_snippet):
    """
    Build a secure version of the code with verification mechanisms.
    """
    interaction_details = extract_interaction_details(code_snippet)
    verifications = construct_verification(interaction_details)
    
    # Base security imports and configurations
    secure_code = """
    import javax.net.ssl.*;
    import java.security.*;
    
    // Initialize security components
    private static final TrustManager[] trustStore = createTrustStore();
    private static final KeyStore keyStore = loadKeyStore();
    """
    
    # Replace plain socket operations with secure versions
    secure_code = secure_code.strip()
    
    if interaction_details['connect']:
        code_snippet = re.sub(
            r'new Socket\((.*?)\)',
            r'sslSocketFactory.createSocket(\1)',
            code_snippet
        )
    
    if interaction_details['read'] or interaction_details['write']:
        code_snippet = re.sub(
            r'socket\.(getInputStream|getOutputStream)\(\)',
            r'sslSocket.\1()',
            code_snippet
        )
    
    # Add verification mechanisms
    if verifications:
        # Insert verifications before the socket operations
        lines = code_snippet.split('\n')
        for i, line in enumerate(lines):
            if 'socket.' in line or 'new Socket' in line:
                lines.insert(i, verifications)
                break
        code_snippet = '\n'.join(lines)
    
    return secure_code + "\n" + code_snippet

def apply_tcvr(df):
    """
    Apply TCVR algorithm to the entire dataset.
    """
    # Create a new column for the secure code
    df['code_fix'] = df['Code Snippet'].apply(build_verified_execution)
    return df

# Read the input CSV file
df = pd.read_csv('/Users/obiedaananbeh/Desktop/Repo/VulDediction/Apply refactoring technique/DataSet/ITV.csv')

# Apply TCVR
df_secured = apply_tcvr(df)

# Save the results to a new CSV file
df_secured.to_csv('ITV_with_fixes.csv', index=False)

print("TCVR transformation completed. Results saved to 'secured_code.csv'")

TCVR transformation completed. Results saved to 'secured_code.csv'


In [13]:
# apply Output Safety Refactoring (OSR)
import pandas as pd
import re
from typing import Dict, List, Optional

class OSRRefactorer:
    def __init__(self):
        self.sanitization_patterns = {
            'command_injection': {
                'pattern': r'ProcessExecutor\.getInstance\((.*?)\)',
                'fixes': {
                    'LIBRE_OFFICE': self._fix_libre_office_command,
                    'OCR_MY_PDF': self._fix_ocr_command,
                    'PYTHON_OPENCV': self._fix_opencv_command,
                    'CALIBRE': self._fix_calibre_command,
                    'GHOSTSCRIPT': self._fix_ghostscript_command
                }
            }
        }

    def _fix_libre_office_command(self, code: str) -> str:
        """
        Apply security fixes for LibreOffice command execution
        """
        security_wrapper = """
        // Create a secure command executor
        class SecureCommandExecutor {
            private final ProcessExecutor executor;
            private final CommandValidator validator;
            
            public SecureCommandExecutor() {
                this.executor = ProcessExecutor.getInstance(ProcessExecutor.Processes.LIBRE_OFFICE);
                this.validator = new CommandValidator();
            }
            
            public ProcessExecutorResult execute(List<String> command) throws SecurityException {
                // Validate and sanitize each command component
                List<String> sanitizedCommand = new ArrayList<>();
                for (String component : command) {
                    String sanitized = validator.sanitizeInput(component);
                    validator.validateComponent(sanitized);
                    sanitizedCommand.add(sanitized);
                }
                
                // Execute in restricted environment
                return executor.runCommandWithRestrictions(sanitizedCommand);
            }
        }
        
        // Use the secure executor
        SecureCommandExecutor executor = new SecureCommandExecutor();
        return executor.execute(command);
        """
        
        return re.sub(
            r'ProcessExecutor\.getInstance\(ProcessExecutor\.Processes\.LIBRE_OFFICE\).*?command\)',
            security_wrapper,
            code,
            flags=re.DOTALL
        )

    def _fix_ocr_command(self, code: str) -> str:
        """
        Apply security fixes for OCR command execution
        """
        security_wrapper = """
        // Create secure OCR command builder
        class SecureOCRCommandBuilder {
            private final List<String> command = new ArrayList<>();
            private final PathValidator pathValidator = new PathValidator();
            
            public List<String> buildCommand(Path input, Path output, OCROptions options) {
                // Validate paths
                pathValidator.validatePath(input);
                pathValidator.validatePath(output);
                
                // Build command with sanitized inputs
                command.add("ocrmypdf");
                command.add("--verbose");
                command.add(sanitizeArg(options.getVerbosity()));
                
                // Add sanitized options
                if (options.hasSidecar()) {
                    command.add("--sidecar");
                    command.add(pathValidator.sanitizePath(options.getSidecarPath()));
                }
                
                return Collections.unmodifiableList(command);
            }
        }
        
        // Use secure builder
        SecureOCRCommandBuilder builder = new SecureOCRCommandBuilder();
        List<String> sanitizedCommand = builder.buildCommand(inputPath, outputPath, options);
        return ProcessExecutor.getInstance(ProcessExecutor.Processes.OCR_MY_PDF)
                            .runCommandWithSecurityContext(sanitizedCommand);
        """
        
        return re.sub(
            r'ProcessExecutor\.getInstance\(ProcessExecutor\.Processes\.OCR_MY_PDF\).*?command\)',
            security_wrapper,
            code,
            flags=re.DOTALL
        )

    def _fix_opencv_command(self, code: str) -> str:
        """Similar pattern for OpenCV commands"""
        pass

    def _fix_calibre_command(self, code: str) -> str:
        """Similar pattern for Calibre commands"""
        pass

    def _fix_ghostscript_command(self, code: str) -> str:
        """Similar pattern for Ghostscript commands"""
        pass

    def apply_osr(self, code: str) -> str:
        """
        Apply Output Safety Refactoring to the given code
        """
        # Create base security context
        security_context = """
        // Add security context
        class SecurityContext {
            private final Map<String, String> restrictedEnv;
            private final Set<String> allowedCommands;
            
            public SecurityContext() {
                this.restrictedEnv = new HashMap<>();
                this.allowedCommands = new HashSet<>();
                initializeSecurityContext();
            }
            
            private void initializeSecurityContext() {
                // Set minimal environment
                restrictedEnv.put("PATH", System.getenv("PATH"));
                // Add allowed commands
                allowedCommands.addAll(Arrays.asList("unoconv", "ocrmypdf", "python"));
            }
            
            public boolean validateCommand(List<String> command) {
                return command.stream()
                    .allMatch(this::isAllowedCommand);
            }
            
            private boolean isAllowedCommand(String cmd) {
                return allowedCommands.contains(cmd) ||
                       cmd.startsWith("--") || // Allow flags
                       cmd.matches("^[a-zA-Z0-9/_.-]+$"); // Allow safe paths
            }
        }
        """
        
        # Add input validation
        input_validation = """
        class InputValidator {
            public static void validateInput(String input) {
                if (input == null || input.isEmpty()) {
                    throw new IllegalArgumentException("Input cannot be null or empty");
                }
                if (input.contains("..")) {
                    throw new SecurityException("Path traversal attempt detected");
                }
                // Add more validation as needed
            }
        }
        """
        
        # Apply security fixes based on patterns
        modified_code = code
        for pattern_type, pattern_info in self.sanitization_patterns.items():
            matches = re.finditer(pattern_info['pattern'], code)
            for match in matches:
                process_type = match.group(1).strip()
                if process_type in pattern_info['fixes']:
                    modified_code = pattern_info['fixes'][process_type](modified_code)
        
        # Add security context and validation
        modified_code = security_context + input_validation + modified_code
        
        return modified_code

def process_csv(file_path: str) -> pd.DataFrame:
    """
    Process the CSV file and apply OSR to the Code Snippet column
    """
    # Read the CSV file
    df = pd.read_csv(file_path)
    
    # Initialize the OSR refactorer
    refactorer = OSRRefactorer()
    
    # Create a new column for fixed code
    df['code_fix'] = df['Code Snippet'].apply(lambda x: refactorer.apply_osr(x))
    
    return df

# Example usage
if __name__ == "__main__":
    # File path from the provided dataset
    file_path = "/Users/obiedaananbeh/Desktop/Repo/VulDediction/Apply refactoring technique/DataSet/LOIS.csv"
    
    # Process the CSV and apply OSR
    try:
        result_df = process_csv(file_path)
        print("Successfully processed the code snippets.")
        print(f"Total rows processed: {len(result_df)}")
        
        # Optionally save the results
        result_df.to_csv("LOIS_fixes.csv", index=False)
        print("Results saved to 'processed_code_with_fixes.csv'")
        
    except Exception as e:
        print(f"Error processing the file: {str(e)}")

Successfully processed the code snippets.
Total rows processed: 5000
Results saved to 'processed_code_with_fixes.csv'


In [15]:
import os
import pandas as pd

# Directory containing the CSV files
directory = '/Users/obiedaananbeh/Desktop/Repo/VulDediction/Apply refactoring technique/fix_Code'

# List to hold dataframes
dfs = []

# Iterate over all files in the directory
for filename in os.listdir(directory):
    if filename.endswith('.csv'):
        file_path = os.path.join(directory, filename)
        df = pd.read_csv(file_path)
        dfs.append(df)

# Concatenate all dataframes
combined_df = pd.concat(dfs, ignore_index=True)

# Save the combined dataframe to a new CSV file
combined_df.to_csv('/Users/obiedaananbeh/Desktop/Repo/VulDediction/DataSet/dataSet_withFixes.csv', index=False)

print("All CSV files have been combined into 'dataSet_withFixes.csv'")

# Print the total number of rows in the combined dataframe
total_rows = len(combined_df)
print(f"Total number of rows in the combined dataframe: {total_rows}")

All CSV files have been combined into 'dataSet_withFixes.csv'
Total number of rows in the combined dataframe: 10000
