In [1]:
from google.colab import drive
import os
import pandas as pd

# Mount Google Drive
drive.mount('/content/drive')
# Load DataFrame from the CSV file
csv_file_path = '/content/drive/My Drive/Projet 2CS/DATASET/dataset.csv'
df = pd.read_csv(csv_file_path)

# Now df contains the DataFrame with the data from the CSV file
print("DataFrame loaded successfully from:", csv_file_path)


Mounted at /content/drive
DataFrame loaded successfully from: /content/drive/My Drive/Projet 2CS/DATASET/dataset.csv


In [5]:
!pip install astor
!pip install deap

Collecting astor
  Downloading astor-0.8.1-py2.py3-none-any.whl (27 kB)
Installing collected packages: astor
Successfully installed astor-0.8.1
Collecting deap
  Downloading deap-1.4.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.4/135.4 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: deap
Successfully installed deap-1.4.1


In [45]:
import ast
import astor

def replace_expression_condition(code, line_number, old_expression, new_expression):
    # Parse the code into an abstract syntax tree (AST)
    tree = ast.parse(code)
    print('old_expression : ', old_expression)
    print('new_expression : ', new_expression)
    # Define a visitor to traverse the AST and perform replacements
    class ReplaceExpression(ast.NodeTransformer):
        def visit_If(self, node):
            self.generic_visit(node)
            # Check if the node corresponds to the specified line number
            if getattr(node, 'lineno', None) == line_number:
                try:
                    old_expr_ast = ast.parse(old_expression, mode='eval').body
                    new_expr_ast = ast.parse(new_expression, mode='eval').body
                except SyntaxError as e:
                    print("SyntaxError:", e)
                    return node

                if self.compare_expr(node.test, old_expr_ast):
                    node.test = new_expr_ast

            return node

        def visit_While(self, node):
            self.generic_visit(node)
            # Check if the node corresponds to the specified line number
            if getattr(node, 'lineno', None) == line_number:
                try:
                    old_expr_ast = ast.parse(old_expression, mode='eval').body
                    new_expr_ast = ast.parse(new_expression, mode='eval').body
                except SyntaxError as e:
                    print("SyntaxError:", e)
                    return node

                if self.compare_expr(node.test, old_expr_ast):
                    node.test = new_expr_ast

            return node

        def compare_expr(self, expr1, expr2):
            # Compare the string representations of the AST nodes
            return ast.dump(expr1) == ast.dump(expr2)

    # Instantiate the visitor and traverse the AST
    transformer = ReplaceExpression()
    transformed_tree = transformer.visit(tree)

    # Generate Python code from the modified AST
    modified_code = astor.to_source(transformed_tree)
    print(modified_code)
    return modified_code


In [10]:
import subprocess
import re

def evaluate_program(erroneous_program):
    # Execute the erroneous program as a subprocess
    result = subprocess.run(['python', '-c', erroneous_program], capture_output=True, text=True)

    # Initialize counters
    total_tests = 0
    total_failed_tests = 0

    # Extract information from the stderr
    stderr_output = result.stderr

    # Use regex to find the number of tests ran
    match = re.search(r'Ran (\d+) tests', stderr_output)
    if match:
        total_tests = int(match.group(1))

    # Use regex to find the number of failures and errors
    match_failures = re.search(r'FAILED \((failures=(\d+))?(, )?(errors=(\d+))?\)', stderr_output)
    if match_failures:
        failures = match_failures.group(2)
        errors = match_failures.group(5)
        if failures:
            total_failed_tests += int(failures)
        if errors:
            total_failed_tests += int(errors)

    # Calculate the number of successful tests
    successful_tests = total_tests - total_failed_tests
    failed_tests = total_failed_tests

    print('total_tests : ', total_tests)
    print('failed_tests (including errors) : ', total_failed_tests)
    print('successful_tests : ', successful_tests)

    return total_tests, failed_tests, successful_tests

In [7]:
from pyparsing import Forward, Literal, Word, alphas, nums, alphanums

# Define the grammar for the prefix expression
expr = Forward()
identifier = Word(alphas + '_', alphanums + '_')
operand = Word(nums) | (Literal('-').suppress() + Word(nums))
op = Literal('+') | Literal('-') | Literal('*') | Literal('/')| Literal('<=')| Literal('>=')| Literal('<') | Literal('==') | Literal('!=') | Literal('>')
open_paren = Literal("(").suppress()
close_paren = Literal(")").suppress()
comma = Literal(",").suppress()
expr <<= op + open_paren + expr + comma + expr + close_paren | identifier | operand

# Define the infix notation with the correct operator precedence
def infix_action(tokens):
    if len(tokens) == 1:
        return tokens[0]
    else:
        return f"({tokens[1]} {tokens[0]} {tokens[2]})"

expr.setParseAction(infix_action)

# Example usage:
prefix_expr = "+(mm1234, +(+(var123   ,    *(var,   xyz)),   fgdh))"
prefix_expr = "<=(mm1234,fgdh)"
infix_expr = expr.parseString(prefix_expr)[0]
print("Infix expression:", infix_expr)

Infix expression: (mm1234 <= fgdh)


In [122]:
# Assuming df is your DataFrame containing the dataset
erroneous_program = df.iloc[18]['Content']
print(erroneous_program)

lines = erroneous_program.split('\n')
for i, line in enumerate(lines, start=1):
    print(f"Line {i}: {line}")


import unittest

class BubbleSortWrongWhile2:
    @staticmethod
    def bubble_sort(tab):
        tabb = tab.copy()
        i = 0
        j = len(tabb) - 1
        aux = 0
        fini = 0
        while fini == 1:  # while (fini == 0)
            fini = 1
            i = 0
            while i < j:
                if tabb[i] > tabb[i + 1]:
                    aux = tabb[i]
                    tabb[i] = tabb[i + 1]
                    tabb[i + 1] = aux
                    fini = 0
                i = i + 1
            j = j - 1
        cpt = 0
        for k in range(len(tab) - 1):
            if tabb[k] > tabb[k + 1]:
                cpt = cpt + 1
        return cpt

class BubbleSortTest(unittest.TestCase):
    def test_0(self):
        result = BubbleSortWrongWhile2.bubble_sort([1, 2, 3, 4, 5])
        self.assertEqual(result, 0)

    def test_1(self):
        result = BubbleSortWrongWhile2.bubble_sort([5, 4, 3, 2, 1])
        self.assertEqual(result, 0)

    def test_2(self):
        r

In [123]:
import ast
import keyword
import builtins

def extract_variables_constants(erroneous_program):
    tree = ast.parse(erroneous_program)
    variables = set()
    constants = set()

    # Collect function names to exclude them from variables
    function_names = {node.name for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)}

    # Collect names defined at the module level (e.g., imported modules, classes)
    module_level_names = set()
    for node in ast.walk(tree):
        if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
            module_level_names.add(node.name)
        elif isinstance(node, (ast.Import, ast.ImportFrom)):
            for alias in node.names:
                module_level_names.add(alias.name)

    # Add built-in function names to exclude them from variables
    builtins_names = set(dir(builtins))

    # Collect names defined within test methods
    test_method_names = set()
    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef) and node.name.startswith("test_"):
            # Exclude variables defined within test methods
            for child_node in ast.walk(node):
                if isinstance(child_node, ast.Name) and not isinstance(child_node.ctx, ast.Store):
                    test_method_names.add(child_node.id)


    for node in ast.walk(tree):
        if isinstance(node, ast.Name) and not isinstance(node.ctx, ast.Store):
            # Check if the name is not a keyword, not a function name, not a built-in,
            # not a module-level name, and not a name used in test cases
            if node.id not in keyword.kwlist and node.id not in function_names \
               and node.id not in module_level_names and node.id not in builtins_names \
               and node.id not in test_method_names:
                variables.add(node.id)
        elif isinstance(node, ast.Constant) and node.value != '__main__' :  # For Python 3.8+
            if isinstance(node.value, (int, float, str)):
                constants.add(node.value)
        elif isinstance(node, ast.Num):  # For compatibility with Python 3.7 and below
            constants.add(node.n)

    return list(variables), list(constants)


variables,_ = extract_variables_constants(erroneous_program)
print(variables)
print(len(variables))


['cpt', 'k', 'j', 'tabb', 'fini', 'aux', 'i', 'tab']
8


In [125]:
import operator
import random
import numpy as np
import math
import csv  # Import the CSV module
from deap import algorithms, base, creator, tools, gp
import operator
import random
import numpy as np
import math
import csv
import time
import deap
from deap import algorithms, base, creator, tools, gp

# Define global variables
max_generations = 1
population_size = 100
output_directory = "/content/drive/My Drive/Projet 2CS/Statistics/BubbleSort/"
logbook =''


# Define the primitive set for the symbolic regression problem
pset = gp.PrimitiveSet("MAIN", arity= len(variables))  # Set arity equal to the number of variables  # Increase arity to 2 for x and y
pset.addPrimitive(operator.lt, arity=2)  # Less than operator (<)
pset.addPrimitive(operator.le, arity=2)  # Less than or equal to operator (<=)
pset.addPrimitive(operator.eq, arity=2)  # Equal to operator (==)
pset.addPrimitive(operator.ne, arity=2)  # Not equal to operator (!=)
pset.addPrimitive(operator.gt, arity=2)  # Greater than operator (>)
pset.addPrimitive(operator.ge, arity=2)  # Greater than or equal to operator (>=)
pset.addEphemeralConstant("rand101", lambda: random.randint(-10,10))

creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)

toolbox = base.Toolbox()
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=2)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)

def translate_expr(individual, variables):
    expr = str(individual)
    # Replace conditional operators
    expr = expr.replace("lt", "<")  # Less than
    expr = expr.replace("le", "<=") # Less than or equal to
    expr = expr.replace("eq", "==") # Equal to
    expr = expr.replace("ne", "!=") # Not equal to
    expr = expr.replace("gt", ">")  # Greater than
    expr = expr.replace("ge", ">=") # Greater than or equal to
    # Replace ephemeral constants with their values
    expr = expr.replace("rand101", str(random.randint(-10, 10)))
    # Replace ARG0, ARG1, ..., ARGn with the variable names
    for i, var in enumerate(variables):
        arg_name = "ARG{}".format(i)
        # Replace the argument name with the variable name
        expr = expr.replace(arg_name, var)
    return expr

def evalSymbReg(individual, variables):
    new_expression = translate_expr(individual, variables)
    new_expression = expr.parseString(str(new_expression))
    # Replace the erroneous expression in the erroneous code with the new expression
    erroneous_code = replace_expression_condition(erroneous_program, 11 , "fini == 1", str(new_expression)[2:-2].strip())
    signal.signal(signal.SIGALRM, timeout_handler)
    timeout_seconds = 1  # Adjust the timeout as needed

    try:
        # Start the timer. This line raises an alarm signal after the specified number of seconds
        signal.alarm(timeout_seconds)
        # Your evaluation logic
        total_tests, failed_tests, successful_tests = evaluate_program(erroneous_code)
        # Cancel the alarm if the function returns before the timeout
        signal.alarm(0)

        num_failed_tests = failed_tests
        return num_failed_tests,

    except TimeoutException:
        print("Evaluation timed out!")
        total_tests = 25
        return total_tests,

    finally:
        # Make sure to cancel the alarm in case of unexpected errors
        signal.alarm(0)


import signal
class TimeoutException(Exception):
    pass

def timeout_handler(signum, frame):
    raise TimeoutException

def main():
    start_time = time.time()
    random.seed(42)
    variables, _ = extract_variables_constants(erroneous_program)

    if len(variables) < 1:
        print("Erreur: Nombre incorrect de variables extraites.")
        return

    variables_list = list(variables)  # Convert the set to a list
    pop = toolbox.population(n=population_size)
    hof = tools.HallOfFame(1)
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", np.mean)
    stats.register("std", np.std)
    stats.register("min", np.min)
    stats.register("max", np.max)

    toolbox.register("evaluate", evalSymbReg, variables=variables_list)  # Pass the list of variables
    toolbox.register("select", tools.selTournament, tournsize=3)  # Register tournament selection operator
    toolbox.register("mate", gp.cxOnePoint)  # Register crossover operator
    toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)  # Register mutation operator
    toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)  # Register mutation operator

    pop = toolbox.population(n=population_size)
    logbook = algorithms.eaSimple(pop, toolbox, 0.5, 0.1, max_generations, stats=stats, halloffame=hof, verbose=True)
    execution_time = time.time() - start_time


    # Print the best individual

    metrics_file = output_directory + "BubbleSortWrongWhile2_evolution_metrics.csv"
    os.makedirs(os.path.dirname(metrics_file), exist_ok=True)
    with open(metrics_file, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["Generation", "Individus" , "Average Fitness", "Std Fitness", "Min Fitness", "Max Fitness", "Execution Time", "Best Individual"])
        best_individual = hof[0]
        best_expr_str = translate_expr(best_individual, variables_list)
        best_expr_str2 = expr.parseString(str(best_expr_str))
        best_expr_str2 = str(best_expr_str2)[2:-2].strip()
        for gen, record in enumerate(logbook[1:]):
          for i in range(len(record)):
              writer.writerow([record[i]['gen'], record[i]['nevals'], record[i]['avg'], record[i]['std'], record[i]['min'], record[i]['max'], execution_time, best_expr_str2])

    print("Metrics and best individuals saved successfully.")



if __name__ == "__main__":
    main()



[1;30;43mLe flux de sortie a été tronqué et ne contient que les 5000 dernières lignes.[0m
    unittest.main()

total_tests :  9
failed_tests (including errors) :  9
successful_tests :  0
old_expression :  fini == 1
new_expression :  (i == fini)
import unittest


class BubbleSortWrongWhile2:

    @staticmethod
    def bubble_sort(tab):
        tabb = tab.copy()
        i = 0
        j = len(tabb) - 1
        aux = 0
        fini = 0
        while i == fini:
            fini = 1
            i = 0
            while i < j:
                if tabb[i] > tabb[i + 1]:
                    aux = tabb[i]
                    tabb[i] = tabb[i + 1]
                    tabb[i + 1] = aux
                    fini = 0
                i = i + 1
            j = j - 1
        cpt = 0
        for k in range(len(tab) - 1):
            if tabb[k] > tabb[k + 1]:
                cpt = cpt + 1
        return cpt


class BubbleSortTest(unittest.TestCase):

    def test_0(self):
        result = BubbleSortWrongW