In [1]:
import ast
import astunparse
import csv
import numpy as np
from pybase64 import b64decode
from typing import List
from collections import defaultdict
from six.moves import zip_longest

In [2]:
class RedundantIfElseDetector(ast.NodeVisitor):
    '''
    Detects: 

    if cond:
        return True
    else:
        return False

    Advises:

    return cond
    '''
    def __init__(self, print_hint=False) -> None:
        super().__init__()
        self.print_hint = print_hint

    def _compose_hint(self, node: ast.If) -> None:
        correct_code_tree = ast.Return(value=node.test, kind=None)
        correct_code_str = astunparse.unparse(correct_code_tree)
        user_code_str = astunparse.unparse(node)
        self.hint = (
            "\n\nUse \n"
            f"{correct_code_str}"
            "\n\nInstead of \n"
            f"{user_code_str}"
        )
        if self.print_hint:
            print(self.hint)

    def visit_If(self, node: ast.If) -> None:
        body_if: List = node.body
        orelse: List = node.orelse

        if (
            len(body_if) == 1
            and isinstance(body_if[0], ast.Return)
            and isinstance(body_if[0].value, ast.Constant)
            and body_if[0].value.value is True

            and len(orelse) == 1
            and isinstance(orelse[0], ast.Return)
            and isinstance(orelse[0].value, ast.Constant)
            and orelse[0].value.value is False
        ):
            self.detected = True
            self._compose_hint(node)

    def detect_and_show_hint(self, code: str) -> None:
        self.user_code = None
        self.hint = None
        self.detected = False
        
        try:
            parsed = ast.parse(code)
        except Exception:
            # ignore solutions where the code can not be executed (due to the user indentation errors - tabs, incorrect spacing) 
            return

        self.visit(parsed)

        if self.detected:
            self.user_code = code

In [3]:
class Runner:
    def __init__(self, detector) -> None:
        self.detector = detector
        self.task_ids = []
        self.user_codes = []
        self.hints = []
        self.counter_task_solutions = defaultdict(lambda: 0)
        self.counter_task_errors = defaultdict(lambda: 0)

    def run(self):
        task_order = 0
        task_ids = []
        with open('./solutions-ipython.csv') as csv_file:
            csv_reader = csv.reader(csv_file, delimiter=';')
            next(csv_reader)
    
            for idx, row in enumerate(csv_reader):
                task_id = row[1]
                solution_encoded = row[2]
                solution_decoded = b64decode(solution_encoded).decode("utf-8")
                
                # self.counter_task_solutions[task_id] += 1
                self.counter_task_solutions[task_order] += 1
                self.detector.detect_and_show_hint(code=solution_decoded)

                if self.detector.detected:
                    # self.counter_task_errors[task_id] += 1
                    self.counter_task_errors[task_order] += 1
                    self.task_ids.append(task_id)
                    self.user_codes.append(self.detector.user_code)
                    self.hints.append(self.detector.hint)

                if task_id not in task_ids:
                    task_ids.append(task_id)
                    task_order += 1

        print(f"Total number of detected errors: {len(self.hints)}")

    def increment_counter(self, counter, task_id):
        if task_id not in counter.keys():
            counter[task_id] = 1
        else:
            counter[task_id] += 1
    
    def display_detected_error(self, error_idx):
        print(
            "User code: \n",
            self.user_codes[error_idx],
        )
        print("\n------------------\n")
        print(
            "Hint: \n",
            self.hints[error_idx],
        )


In [4]:
runner_redundant_if_else_detector = Runner(detector=RedundantIfElseDetector())
runner_redundant_if_else_detector.run()

Total number of detected errors: 3168


In [5]:
runner_redundant_if_else_detector.display_detected_error(error_idx=3000)

User code: 
 def is_triangle(a, b, c):
    if (a+b>c)and(a+c>b)and(c+b>a):
        return True
    else: return False


------------------

Hint: 
 

Use 

return (((a + b) > c) and ((a + c) > b) and ((c + b) > a))


Instead of 

if (((a + b) > c) and ((a + c) > b) and ((c + b) > a)):
    return True
else:
    return False



In [6]:
class RedundantElifDetector(ast.NodeVisitor):
    '''
    Detects: 

    if cond_1:
        body_1
    ...
    elif cond_n:
        body_n


    Advises:

    if cond_1:
        body_1
    ...
    else:
        body_n


    Bonus:
    
    Removes unreacheable code

    '''
    def __init__(self, print_hint=False) -> None:
        super().__init__()
        self.print_hint = print_hint

    def _check_ops_2_branches(self, ops):
        self.error_detected_2_branches = False
        if len(ops) == 2:
            op_sets = [
                set([ast.Eq, ast.NotEq]),
                set([ast.GtE, ast.Lt]),
                set([ast.LtE, ast.Gt]),
            ]    
            for op_set in op_sets:
                if (set([type(op) for op in ops]) == op_set):
                    self.error_detected_2_branches = True
                    break
        
    def _check_ops_3_branches(self, ops):
        self.error_detected_3_branches = False
        if len(ops) == 3:
            op_set = set([ast.Gt, ast.Lt, ast.Eq])
            self.error_detected_3_branches = (set([type(op) for op in ops]) == op_set)

    def _compose_hint(self, correct_code_tree, user_code_tree):
        correct_code_str = astunparse.unparse(correct_code_tree)
        user_code_str = astunparse.unparse(user_code_tree)
        self.hint = (
            "\n\nUse \n"
            f"{correct_code_str}"
            "\n\nInstead of \n"
            f"{user_code_str}"
        )
        if self.print_hint:
            print(self.hint)

    def _compose_hint_ast_2_branches(self, if_node: ast.If):
        correct_code_tree = ast.If(
            test=if_node.test,
            body=if_node.body,
            orelse=if_node.orelse[0].body, 
        )
        self._compose_hint(correct_code_tree, if_node)

    def _compose_hint_ast_3_branches(self, if_node: ast.If):
        correct_code_tree = ast.If(
            test=if_node.test,
            body=if_node.body,
            orelse=[ast.If(
                test=if_node.orelse[0].test,
                body=if_node.orelse[0].body,
                orelse=if_node.orelse[0].orelse[0].body,
            )],
        )
        self._compose_hint(correct_code_tree, if_node)

    def _parse_if_node_level(self, if_node: ast.If):
        parsed_success = False
        left, comparators, ops, orelse = None, None, None, None
        
        if isinstance(if_node.test, ast.Compare):
            compare: ast.Compare = if_node.test
            left: str = astunparse.unparse(compare.left)
            comparators: str = astunparse.unparse(compare.comparators)
            ops: List = compare.ops
            orelse: List = if_node.orelse
            parsed_success = True

        return parsed_success, left, comparators, ops, orelse

    def _detect_error_2_branches(self, if_node: ast.If):
        check_next_branch = False
        left_2, comparators_2, ops, orelse_2 = None, None, None, None

        parsed_success, left_1, comparators_1, ops_1, orelse_1 = self._parse_if_node_level(if_node)

        if parsed_success and len(orelse_1) == 1 and isinstance(orelse_1[0], ast.If):
            elif_node: ast.If = orelse_1[0]
            parsed_success, left_2, comparators_2, ops_2, orelse_2 = self._parse_if_node_level(elif_node)

            if parsed_success and left_1 == left_2 and comparators_1 == comparators_2:
                ops = ops_1 + ops_2
                self._check_ops_2_branches(ops)
                check_next_branch = True
                
                if self.error_detected_2_branches:
                    self._compose_hint_ast_2_branches(if_node)
        
        return check_next_branch, left_2, comparators_2, ops, orelse_2
    
    def _detect_error_3_branches(self, if_node: ast.If, left_2, comparators_2, ops, orelse_2):
        if len(orelse_2) == 1 and isinstance(orelse_2[0], ast.If):
            elif_node: ast.If = orelse_2[0]
            parsed_success, left_3, comparators_3, ops_3, _ = self._parse_if_node_level(elif_node)

            if parsed_success and left_3 == left_2 and comparators_2 == comparators_3:
                ops = ops + ops_3
                self._check_ops_3_branches(ops)

                if self.error_detected_3_branches:
                    self._compose_hint_ast_3_branches(if_node)
    
    def visit_If(self, if_node: ast.If) -> None:
        check_next_branch, left_2, comparators_2, ops, orelse_2 = self._detect_error_2_branches(if_node)

        if self.error_detected_2_branches:
            return

        if check_next_branch:
            self._detect_error_3_branches(if_node, left_2, comparators_2, ops, orelse_2)    

    def detect_and_show_hint(self, code: str) -> None:
        self.user_code = None
        self.hint = None
        self.error_detected_2_branches = False
        self.error_detected_3_branches = False

        try:
            parsed = ast.parse(code)
        except Exception:
            # ignore solutions where the code can not be executed (due to the user indentation errors - tabs, incorrect spacing) 
            return

        self.visit(parsed)

        self.detected = self.error_detected_2_branches or self.error_detected_3_branches

        if self.detected:
            self.user_code = code

In [7]:
runner_redundant_elif_detector = Runner(detector=RedundantElifDetector())
runner_redundant_elif_detector.run()

Total number of detected errors: 309


In [8]:
runner_redundant_elif_detector.display_detected_error(error_idx=230)

User code: 
 def check(p, k):
    if p==4*k:
        print("OK")
    elif p > 4*k:
        print("Prebyva:", p-4*k)
    elif p < 4*k:
        print("Chybi:", abs(p-4*k))
    else:
        print("Incorrect Ex")

------------------

Hint: 
 

Use 

if (p == (4 * k)):
    print('OK')
elif (p > (4 * k)):
    print('Prebyva:', (p - (4 * k)))
else:
    print('Chybi:', abs((p - (4 * k))))


Instead of 

if (p == (4 * k)):
    print('OK')
elif (p > (4 * k)):
    print('Prebyva:', (p - (4 * k)))
elif (p < (4 * k)):
    print('Chybi:', abs((p - (4 * k))))
else:
    print('Incorrect Ex')



In [9]:
class DuplicatedSegment:
    def __init__(self, segment_idx: int, line_idxs: List[int]):
        self.segment_idx = segment_idx
        self.line_idxs = line_idxs
        self.extended = False

class DuplicatedCodeDetector:
    def _compare_ast(self, left, right):
        """Compare two abstract syntax trees.
        Return `None` if they are equal, and raise an exception otherwise.
        """
        assert type(left) == type(right)
    
        if isinstance(left, ast.AST):
            left_fields = ast.iter_fields(left)
            right_fields = ast.iter_fields(right)
            for left_field, right_field in zip_longest(
                left_fields, right_fields, fillvalue=""
            ):
                left_name, left_values = left_field
                right_name, right_values = right_field
                assert left_name == right_name
                self._compare_ast(left_values, right_values)

        elif isinstance(left, list):
            for left_child, right_child in zip_longest(left, right, fillvalue=""):
                self._compare_ast(left_child, right_child)

        else:
            assert left == right
    
    def _get_child_nodes(self, node: ast.AST) -> None:
        self._nodes.extend(node.body)
        for n in ast.iter_child_nodes(node):
            if hasattr(n, "body"):
                self._get_child_nodes(n)

    def _get_duplicity_matrix_base(self, root_node: ast.AST) -> None:
        self._get_child_nodes(root_node)
        self._nodes.sort(key=lambda n: n.lineno)

    def _compute_duplicity_matrix_entry(self, node_1, node_2):
        identical_asts = True
        try:
            self._compare_ast(left=node_1, right=node_2)
        except Exception:
            identical_asts = False
        
        return float(identical_asts)
    
    def _compute_duplicity_matrix(self):
        n_nodes = len(self._nodes)
        self._duplicity_matrix = np.zeros((n_nodes, n_nodes), dtype=np.float32)
        for i in range(n_nodes):
            for j in range(n_nodes):
                self._duplicity_matrix[i, j] = self._compute_duplicity_matrix_entry(
                    self._nodes[i],
                    self._nodes[j]
                ) if i != j else 1.0

    def _analyze_duplicity_matrix(self):
        n_nodes = len(self._nodes)

        for i in range(n_nodes):
            for j in range(i+1, n_nodes):
                if self._duplicity_matrix[i][j] == 1.0:
                    appended = self._append_line_idxs_to_segment(i, j)
                    if not appended:
                        self._create_new_segment(i, j)

    def _append_line_idxs_to_segment(self, i, j):
        appended = False      
        for segment in self._duplicite_code_segments:
            if self._nodes[i].lineno in segment.line_idxs:
                segment.line_idxs.append(self._nodes[j].lineno)
                appended = True        
            elif self._nodes[i].lineno - 1 in segment.line_idxs and self._nodes[j].lineno - 1 in segment.line_idxs:
                segment.line_idxs.append(self._nodes[i].lineno)
                segment.line_idxs.append(self._nodes[j].lineno)
                segment.extended = True
                appended = True
        
        return appended

    def _remove_short_segments(self):
        duplicite_code_segments = []
        for segment in self._duplicite_code_segments:
            if segment.extended:
                duplicite_code_segments.append(segment)

        self._duplicite_code_segments = duplicite_code_segments

    def _create_new_segment(self, i, j):
        segment = DuplicatedSegment(
            segment_idx=len(self._duplicite_code_segments),
            line_idxs=[self._nodes[i].lineno, self._nodes[j].lineno],
        )
        self._duplicite_code_segments.append(segment)

    def _compose_hint(self, code):
        code_strings = code.split('\n')
        self.hint: List[str] = []
        for lineno, code_string in enumerate(code_strings):
            string_is_part_of_duplicated_segment = False
            segment_idx = None
            for segment in self._duplicite_code_segments:
                if lineno + 1 in segment.line_idxs:
                    string_is_part_of_duplicated_segment = True
                    segment_idx = segment.segment_idx
    
            if string_is_part_of_duplicated_segment:
                code_string += f"               : {segment_idx}"
    
            self.hint.append(code_string)
    
    def _check_detected(self):
        self.detected = len(self._duplicite_code_segments) > 0
    
    def detect_and_show_hint(self, code: str) -> None:
        self.user_code = None
        self.hint = None
        self.detected = False

        self._nodes: List[ast.AST] = []
        self._duplicity_matrix: np.ndarray = None
        self._duplicite_code_segments: List[DuplicatedSegment] = []

        try:
            parsed = ast.parse(code)
        except Exception:
            # ignore solutions where the code can not be executed (due to the user indentation errors - tabs, incorrect spacing) 
            return

        self._get_duplicity_matrix_base(parsed)
        self._compute_duplicity_matrix()
        self._analyze_duplicity_matrix()
        self._remove_short_segments()
        self._check_detected()
            
        if self.detected:
            self.user_code = code
            self._compose_hint(code)

In [10]:
runner_duplicated_code_detector = Runner(detector=DuplicatedCodeDetector())
runner_duplicated_code_detector.run()

Total number of detected errors: 369


In [11]:
runner_duplicated_code_detector.hints[2]

['def animals(n):',
 '\tfor i in range(n):',
 '   \t \tprint("sob")',
 '\tfor i in range(n):               : 0',
 '   \t \tprint("los")               : 0',
 '\tfor i in range(n):               : 0',
 '   \t \tprint("los")               : 0']

In [12]:
import plotly.graph_objects as go
import math

In [13]:
class VisualizerBarChart:
    def __init__(self, detector_name, counter_task_solutions, counter_task_errors):
        self.detector_name = detector_name
        self.counter_task_solutions = counter_task_solutions
        self.counter_task_errors = counter_task_errors

    def _process_data(self):
        self.task_order = []
        self.num_errors = []
        self.error_rates = []
        self.hover_texts = []
        for task_order_idx in sorted(self.counter_task_solutions.keys()):
            n_errors = self.counter_task_errors[task_order_idx]
            n_solutions = self.counter_task_solutions[task_order_idx]            
            error_rate = round(n_errors / n_solutions, 4)
            self.task_order.append(task_order_idx)
            self.num_errors.append(n_errors)
            self.error_rates.append(error_rate)
            self.hover_texts.append(
                f"Task_order: {task_order_idx} <br>"
                f"Number of detected errors: {n_errors} <br>"
                f"Number of solutions: {n_solutions} <br>"
                f"Error rate: {error_rate} <br>"
            )

    def display_visualization(self):
        self._process_data()
        fig = go.Figure(
            layout=go.Layout(
                    title=dict(
                        text=f"<b> {self.detector_name}: Number of errors v. Task order </b>",
                        x=0.5,
                    ),
                    xaxis=dict(
                        title="Task order",
                        type="linear",
                        dtick=3,
                    ),
                    yaxis=dict(
                        title="Number of errors",
                    ),
                ),
            data=[
                go.Bar(
                    x=self.task_order,
                    y=self.error_rates,
                    text=self.hover_texts,
                )
            ]
        )
        fig.show()
        return fig


In [14]:
visualizer_duplicated_code_detector = VisualizerBarChart(
    detector_name="Duplicated code detector",
    counter_task_solutions=runner_duplicated_code_detector.counter_task_solutions,
    counter_task_errors=runner_duplicated_code_detector.counter_task_errors,
    )

fig_duplicated_code_detector = visualizer_duplicated_code_detector.display_visualization()

In [15]:
class VisualizerScatterChart:
    def __init__(self, detector_name, counter_task_solutions, counter_task_errors):
        self.detector_name = detector_name
        self.counter_task_solutions = counter_task_solutions
        self.counter_task_errors = counter_task_errors
    
    def _process_data(self):
        self.task_ids = []
        self.num_errors = []
        self.error_rates = []
        self.hover_texts = []
        for (task_id, n_errors), n_solutions in zip(self.counter_task_errors.items(), self.counter_task_solutions.values()):
            error_rate = round(n_errors / n_solutions, 4)
            self.task_ids.append(task_id)
            self.num_errors.append(n_errors)
            self.error_rates.append(error_rate)
            self.hover_texts.append(
                f"Task_id: {task_id} <br>"
                f"Number of detected errors: {n_errors} <br>"
                f"Number of solutions: {n_solutions} <br>"
                f"Error rate: {error_rate} <br>"
            )
        self.min_task_id = min(self.task_ids)
        self.max_task_id = max(self.task_ids)
        max_num_errors = max(self.num_errors)
        self.y_axis_dtick = max_num_errors // 5
        max_error_rate = max(self.error_rates)
        self.colorbar_dtick = round(max_error_rate / 5, 2)
        self.sizes = [100 * round(math.sqrt(e_r / max_error_rate), 4)for e_r in self.error_rates]


    def display_visualization(self):
        self._process_data()
        fig = go.Figure(
            layout=go.Layout(
                    title=dict(
                        text=f"<b> {self.detector_name}: Number of errors v. Task ID </b>",
                        x=0.5,
                    ),
                    xaxis=dict(
                        title="Task ID",
                        type="linear",
                        dtick=5,
                    ),
                    yaxis=dict(
                        title="Number of errors",
                        dtick=self.y_axis_dtick,
                    ),
                ),
            data=[
                go.Scatter(
                    x=self.task_ids,
                    y=self.num_errors,
                    mode='markers',
                    marker=dict(
                        color=self.error_rates,
                        size=self.sizes,
                        colorbar=dict(
                            title="Error rate",
                            dtick=self.colorbar_dtick,
                        ),
                    ),
                    text=self.hover_texts,
                )
            ]
        )
        fig.show()
        return fig

In [16]:
visualizer_redundant_if_else_detector = VisualizerScatterChart(
    detector_name="Redundant if else detector",
    counter_task_solutions=runner_redundant_if_else_detector.counter_task_solutions,
    counter_task_errors=runner_redundant_if_else_detector.counter_task_errors,
    )

fig_redundant_if_else_detector = visualizer_redundant_if_else_detector.display_visualization()

In [17]:
visualizer_redundant_elif_detector = VisualizerScatterChart(
    detector_name="Redundant elif detector",
    counter_task_solutions=runner_redundant_elif_detector.counter_task_solutions,
    counter_task_errors=runner_redundant_elif_detector.counter_task_errors,
    )

fig_redundant_elif_detector = visualizer_redundant_elif_detector.display_visualization()

In [20]:
import plotly.io as pio

pio.write_html(fig_redundant_if_else_detector, file='redundant_if_else_detector.html', auto_open=True)
pio.write_html(fig_redundant_elif_detector, file='redundant_elif_detector.html', auto_open=True)