In [170]:
#|default_exp haircut

# Load Execution Trace File
Load the execution.txt file that contains the execution trace.

In [183]:
#|export

import pandas as pd
import numpy as np
from pathlib import Path

In [1]:
# Define the file path
file_path = 'execution.txt'

# Load the execution trace file
try:
    with open(file_path, 'r') as file:
        execution_trace = file.read()
except FileNotFoundError:
    print(repr(file_path) + " not found. Please check the file path and try again.")


In [7]:
execution_trace.split('\n')

['[...]/django/django/db/models/manager.py:184   call         => __get__(self=<django.db.models.manager.ManagerDescriptor object at 0x107354690>, instance=None, cls=<django.db.models.base.ModelBase object at 0x1070929d0>)',
 '[...]/django/django/db/models/manager.py:185   line            if instance is not None:',
 '[...]/django/django/db/models/manager.py:190   line            if cls._meta.abstract:',
 '[...]/django/django/db/models/manager.py:195   line            if cls._meta.swapped:',
 '[...]/django/django/db/models/options.py:404   call            => swapped(self=<django.db.models.options.Options object at 0x107275390>)',
 '[...]/django/django/db/models/options.py:413   line               if self.swappable:',
 '[...]/django/django/db/models/options.py:430   line               return None',
 '[...]/django/django/db/models/options.py:430   return          <= swapped: None',
 '[...]/django/django/db/models/manager.py:204   line            return cls._meta.managers_map[self.manager.n

# Parse Execution Trace
Parse the execution trace to identify the directories and files involved in the execution.

In [181]:
#|export

# Importing necessary library
import re
import os
from functools import wraps

def ftransform(func):
    @wraps(func)
    def wrapper(args):
        return FTransform(func(args))
    return wrapper

class FTransform:
    def __init__(self, file) -> None:
        self.file = file

    def apply(self, f):
        return f(self.file)

def split_meta_data_and_code_for_line(line):
    meta_data = line[:60]
    code = line[60:]
    return meta_data, code

@ftransform
def split_meta_data_and_code(file):
    return [split_meta_data_and_code_for_line(line) for line in file]

In [407]:
df = pd.DataFrame(FTransform(execution_trace.split('\n')).apply(split_meta_data_and_code).file, columns=['meta_data', 'code'])

In [408]:
df

Unnamed: 0,meta_data,code
0,[...]/django/django/db/models/manager.py:184 ...,=> __get__(self=<django.db.models.manager.Mana...
1,[...]/django/django/db/models/manager.py:185 ...,if instance is not None:
2,[...]/django/django/db/models/manager.py:190 ...,if cls._meta.abstract:
3,[...]/django/django/db/models/manager.py:195 ...,if cls._meta.swapped:
4,[...]/django/django/db/models/options.py:404 ...,=> swapped(self=<django.db.models.options.O...
...,...,...
1355,[...]go/django/db/models/sql/compiler.py:1496 ...,"for row in map(list, rows):"
1356,[...]go/django/db/models/sql/compiler.py:1496 ...,<= apply_converters: None
1357,[...]rn/django/django/db/models/query.py:121 ...,<= __iter__: None
1358,[...]rn/django/django/db/models/query.py:1927 ...,if self._prefetch_related_lookups and no...


In [391]:
df['indent'] = df['code'].apply(lambda x: len(x) - len(x.lstrip()))

In [392]:
df['diff'] = df.indent.diff() 

In [393]:
df['line_no'] = df['meta_data'].apply(lambda x: x.split(':')[-1].split(' ')[0])

In [394]:
df['file_path'] = df['meta_data'].apply(lambda x: x.split(':')[0])

In [410]:
df = df[:-1]

In [411]:
df['line_type'] = df['meta_data'].apply(lambda meta:  list(filter(lambda x: x, meta.split(" ")))[-1]) 

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['line_type'] = df['meta_data'].apply(lambda meta:  list(filter(lambda x: x, meta.split(" ")))[-1])


In [412]:
df

Unnamed: 0,meta_data,code,line_type
0,[...]/django/django/db/models/manager.py:184 ...,=> __get__(self=<django.db.models.manager.Mana...,call
1,[...]/django/django/db/models/manager.py:185 ...,if instance is not None:,line
2,[...]/django/django/db/models/manager.py:190 ...,if cls._meta.abstract:,line
3,[...]/django/django/db/models/manager.py:195 ...,if cls._meta.swapped:,line
4,[...]/django/django/db/models/options.py:404 ...,=> swapped(self=<django.db.models.options.O...,call
...,...,...,...
1354,[...]go/django/db/models/sql/compiler.py:1502 ...,=> apply_converters(self=<django.db.m...,call
1355,[...]go/django/db/models/sql/compiler.py:1496 ...,"for row in map(list, rows):",line
1356,[...]go/django/db/models/sql/compiler.py:1496 ...,<= apply_converters: None,return
1357,[...]rn/django/django/db/models/query.py:121 ...,<= __iter__: None,return


In [397]:
df['line_no'] = df['line_no'].astype(int)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['line_no'] = df['line_no'].astype(int)


In [10]:
df['execution_order'] = df.index

In [406]:
df

Unnamed: 0,meta_data,code,indent,diff,line_no,file_path,line_type
0,[...]/django/django/db/models/manager.py:184 ...,=> __get__(self=<django.db.models.manager.Mana...,0,,184,[...]/django/django/db/models/manager.py,call
1,[...]/django/django/db/models/manager.py:185 ...,if instance is not None:,3,3.0,185,[...]/django/django/db/models/manager.py,call
2,[...]/django/django/db/models/manager.py:190 ...,if cls._meta.abstract:,3,0.0,190,[...]/django/django/db/models/manager.py,call
3,[...]/django/django/db/models/manager.py:195 ...,if cls._meta.swapped:,3,0.0,195,[...]/django/django/db/models/manager.py,call
4,[...]/django/django/db/models/options.py:404 ...,=> swapped(self=<django.db.models.options.O...,3,0.0,404,[...]/django/django/db/models/options.py,call
...,...,...,...,...,...,...,...
1354,[...]go/django/db/models/sql/compiler.py:1502 ...,=> apply_converters(self=<django.db.m...,9,0.0,1502,[...]go/django/db/models/sql/compiler.py,call
1355,[...]go/django/db/models/sql/compiler.py:1496 ...,"for row in map(list, rows):",12,3.0,1496,[...]go/django/db/models/sql/compiler.py,call
1356,[...]go/django/db/models/sql/compiler.py:1496 ...,<= apply_converters: None,9,-3.0,1496,[...]go/django/db/models/sql/compiler.py,call
1357,[...]rn/django/django/db/models/query.py:121 ...,<= __iter__: None,6,-3.0,121,[...]rn/django/django/db/models/query.py,call


In [398]:
df = df[['execution_order', 'file_path', 'line_no', 'indent', 'diff', 'code']]

KeyError: "['execution_order'] not in index"

In [12]:
df_sorted = df.sort_values(by=['file_path', 'execution_order'])

In [13]:
df['has_complete_if'] = df['code'].apply(lambda x: x.strip().endswith(':') and x.strip().startswith('if'))

In [14]:
next_line_will_jump = df['line_no'].diff().shift(-1) > 1

In [15]:
next_line_new_file = df['file_path'].shift(-1) != df['file_path']

In [16]:
next_line_not_has_call = ~df['code'].shift(-1).str.contains('=>', na=False)

In [17]:
df['prune if'] = df['has_complete_if'] & (next_line_new_file | next_line_will_jump) & next_line_not_has_call

In [18]:
df = df[~df['prune if']]

In [19]:
df.drop(columns=['has_complete_if', 'prune if'], inplace=True)

In [20]:
df.head(20)

Unnamed: 0,execution_order,file_path,line_no,indent,diff,code
0,0,[...]/django/django/db/models/manager.py,184,0,,=> __get__(self=<django.db.models.manager.Mana...
3,3,[...]/django/django/db/models/manager.py,195,3,0.0,if cls._meta.swapped:
4,4,[...]/django/django/db/models/options.py,404,3,0.0,=> swapped(self=<django.db.models.options.O...
6,6,[...]/django/django/db/models/options.py,430,6,0.0,return None
7,7,[...]/django/django/db/models/options.py,430,3,-3.0,<= swapped: None
8,8,[...]/django/django/db/models/manager.py,204,3,0.0,return cls._meta.managers_map[self.manager....
9,9,[...]/django/django/db/models/manager.py,204,0,-3.0,<= __get__: <django.db.models.manager.Manager ...
10,10,[...]/django/django/db/models/manager.py,157,0,0.0,=> all(self=<django.db.models.manager.Manager ...
11,11,[...]/django/django/db/models/manager.py,164,3,3.0,return self.get_queryset()
12,12,[...]/django/django/db/models/manager.py,150,3,0.0,=> get_queryset(self=<django.db.models.mana...


In [146]:
df.to_csv('execution_trace.csv', index=False)

In [67]:
df_sorted.to_csv('execution_trace.csv', index=False)

# To code

In [231]:
test = {'class1': {}}

test.setdefault('a', [])

{'class1': {}, 'a': []}

In [456]:
#|export

class RegexNoMatchError(Exception):
    pass

class Line:
    def __init__(self, line_no, line_code, line_type):
        self.line_no = line_no
        self.line_code = line_code
        self.line_type = line_type

    def tab(self):
        self.line_code = '\t' + self.line_code

class StackItem:
    def __init__(self, class_name, func_name, line, skip=False) -> None:
        self.class_name = class_name
        self.func_name = func_name
        self.skip = skip
        self.visited_lines = [line]
    
    def add_visit(self, line):
        self.visited_lines.append(line)

    @property
    def visited_line_nos(self):
        return [line.line_no for line in self.visited_lines]

class BlockStackItem(StackItem):
    '''This is a stateful stack item'''
    def __init__(self, parent_class_name, parent_func_name, block_type, definition_line):
        super().__init__(parent_class_name, parent_func_name, definition_line)
        self.block_type = block_type
        self._lines = [definition_line]

    def add_line(self, line):
        self._lines.append(line)

    def add_line_many(self, lines):
        self._lines.extend(lines)

    def tab_lines(self):
        for line in self._lines:
            line.tab()

    @property
    def lines(self):
        return self._lines

    @property
    def line_nos(self):
        return [line.line_no for line in self._lines]

    @property
    def line_codes(self):
        return [line.line_code for line in self._lines]

class LineDictOrdered:
    def __init__(self) -> None:
        self._lines = {}

    def sort(self):
        self._lines = dict(sorted(self.lines.items()))

    def add_line(self, line: Line):
        line_no = line.line_no
        line_code = line.line_code

        self._lines[line_no] = line_code

    def add_line_many(self, lines: list[Line]):
        for line in lines:
            self.add_line(line)

    def to_list(self):
        self.sort()
        return list(self._lines.values())

    @property
    def lines(self):
        return self._lines
    

class TraceCompiler:
    classes= {}
    _stack = []
    _line_log = []

    def __init__(self, trace_path, output_path = "haircutted"):
        self._df = None
        self._trace_path = trace_path
        self._output_path = output_path

    def pre_process(self):
        try:
            with open(self._trace_path, 'r') as file:
                execution_trace = file.read()
        except FileNotFoundError:
            print(repr(file_path) + " not found. Please check the file path and try again.")


        df = pd.DataFrame(FTransform(execution_trace.split('\n')).apply(split_meta_data_and_code).file, columns=['meta_data', 'code'])
        # Remove empty row
        df = df[:-1]

        df['indent'] = df['code'].apply(lambda x: len(x) - len(x.lstrip()))
        df['diff'] = df.indent.diff()
        df['line_no'] = df['meta_data'].apply(lambda x: x.split(':')[-1].split(' ')[0])
        df['file_path'] = df['meta_data'].apply(lambda x: x.split(':')[0])
        df['line_type'] = df['meta_data'].apply(lambda meta:  list(filter(lambda x: x, meta.split(" ")))[-1]) 


        df['line_no'] = df['line_no'].astype(int)
        df['execution_order'] = df.index

        # Rearrange columns
        df = df[['execution_order', 'file_path', 'line_no', 'indent', 'diff', 'line_type' ,'code']]

        # #######################################
        # # Removing ifs that evaluate to false #
        # #######################################

        # df['has_complete_if'] = df['code'].apply(lambda x: x.strip().endswith(':') and x.strip().startswith('if'))
        # next_line_will_jump = df['line_no'].diff().shift(-1) > 1
        # next_line_new_file = df['file_path'].shift(-1) != df['file_path']
        # next_line_not_has_call = ~df['code'].shift(-1).str.contains('=>', na=False)

        # df['prune if'] = df['has_complete_if'] & (next_line_new_file | next_line_will_jump) & next_line_not_has_call
        # df = df[~df['prune if']]
        # df.drop(columns=['has_complete_if', 'prune if'], inplace=True)
        self._df = df

    def _get_func_name(self, code_snippet):
        match = re.match(r"=>\s+(\w+)", code_snippet)

        if match is None:
            raise RegexNoMatchError(f"Could not find function name in {code_snippet}")

        return match.group(1)


    def _get_self_class_name(self, code_snippet):
        match = re.match(r"=>(.*)\((self|cls)=<(\S*).*\)", code_snippet)
        
        if match is None:
            raise RegexNoMatchError(f"Could not find class name in {code_snippet}")

        return match.group(3)

    def _get_inline_class_name_from_func_name(self, func_name, code_snippet):
        match =  re.match(r".* (\S*)\." + func_name + r"\(\S*\)", code_snippet)
        
        # if match is None:
        #     match = 
        
        if match is None:
            raise RegexNoMatchError(f"Could not find class name in {code_snippet}")

        return match.group(1)

    def _get_class_name(self, code_snippet):
        try:
            return self._get_self_class_name(code_snippet)
        except RegexNoMatchError:
            pass

        try:
            previous_line = self._line_log[-2].line_code
            return self._get_inline_class_name_from_func_name(self._get_func_name(code_snippet), previous_line)
        except RegexNoMatchError:
            pass

        try:
            # Just use func name as class anme
            return self._get_func_name(code_snippet)
        except RegexNoMatchError:
            raise

    def _add_line(self, class_name, func_name, line):
        self.classes[class_name].setdefault(func_name, LineDictOrdered()).add_line(line)

    def _add_line_many(self, class_name, func_name, lines):
        self.classes[class_name].setdefault(func_name, LineDictOrdered()).add_line_many(lines)

    def _compile_func_calls(self, line):

        # Skip lambda functions TODO: might want to handle this better in the future
        if "genexpr" in line.line_code or "listcomp" in line.line_code or "lambda" in line.line_code:
            self._stack.append(StackItem(None, 'lambda', line, skip=True))
            return

        # Skip decorators TODO:might want to handle this better in the future
        if "=> inner(" in line.line_code:
            self._stack.append(StackItem(None, 'lambda', line, skip=True))
            return

        func_name = self._get_func_name(line.line_code)
        class_name = self._get_class_name(line.line_code)
        line = Line(line.line_no, f"def {line.line_code[3:]}:", line.line_type)

        self._stack.append(StackItem(class_name, func_name, line))

        if class_name not in self.classes:
            self.classes[class_name] = {}
        
        self._add_line(class_name, func_name, line)

    def _compile_line_in_block(self, line: Line):
        # For lines that are in a block within the function OR in a nested block
        if self._check_is_in_block():
            line.tab()
            self._stack[-1].add_line(line)
            return
        
        class_name = self._stack[-1].class_name
        func_name = self._stack[-1].func_name

        self._add_line(class_name, func_name, line)

    def _compile_func_return(self):
        self._stack.pop()

    def _compile_block_return(self):
        block = self._stack.pop()

        if len(block.lines) <= 1:
            if self._check_is_in_block():
                self._compile_block_return()
            return

        if self._check_is_in_block():
            block.tab_lines()
            self._stack[-1].add_line_many(block.lines)
        
            # Recurse until all existing blocks are closed
            # TODO: This assumes that if there is a line jump, then all previous nested blocks are escaped
            self._compile_block_return()
        
    
        self._add_line_many(block.class_name, block.func_name, block.lines)

    def _compile_block_entry(self, line: Line):
        parent_block = self._stack[-1]

        parent_class_name, parent_func_name = parent_block.class_name, parent_block.func_name
        block_type = line.line_code.split(" ")[0]

        self._stack.append(BlockStackItem(parent_class_name, parent_func_name, block_type, line))

    def _compile_line(self, line: Line):

        if line.line_type == "return":
            # Before closing the function, make sure all unclosed blocks are closed
            if self._check_is_in_block():
                self._compile_block_return()

            self._compile_func_return()
            return

        if line.line_type == "call":
            self._compile_func_calls(line)
            return

        if line.line_type == "line":
            if self._check_line_visited(line):
                # this is to deal with return statements that are in a block that span multiple lines
                # the multi-line return statement will be read back on forth, ignore when it reads back
                return

            self._stack[-1].add_visit(line)
            
            if self._check_is_in_block() and self._check_block_line_jump(line):
                # TODO: if -> nested if -> nested else - the code in nested else will not be added to top level if (as it is considered a line jump)
                self._compile_block_return()
                # Do not return has we have not yet processed the current line
                # Just only know that current line is no longer in block


            # Skip functions marked to skip, eg. lambda functions
            if self._stack[-1].skip:
                return

            if line.line_code.startswith(("if", "elif", "else", "for", "with", 'while', 'try', 'except', 'finally')) and line.line_code.endswith(":"):
                self._compile_block_entry(line)
                return
            
            self._compile_line_in_block(line)

    def _check_block_line_jump(self, line):
        if len(self._stack[-1].visited_line_nos) < 2:
            return False
        
        return self._stack[-1].visited_line_nos[-1] - self._stack[-1].visited_line_nos[-2] > 1

    def _check_global_line_jump(self):
        return self._line_log[-1].line_no - self._line_log[-2].line_no > 1

    def _check_line_visited(self, line):
        return line.line_no in self._stack[-1].visited_line_nos

    def _check_is_in_block(self):
        if len(self._stack) == 0:
            return False

        return isinstance(self._stack[-1], BlockStackItem)

    def _check_line_in_block(self, line_no):
        return line_no in self._stack[-1].line_nos

    def _log_line(self, line):
        "History of last 5 lines processed. Useful to track back to find more info abt called function"
        if len(self._line_log) > 10:
                self._line_log = self._line_log[-10:]
        self._line_log.append(line)

    def compile(self):
        for index, row in self._df.iterrows():
            line_no = row["line_no"]
            line_type = row["line_type"]
            line_code = row["code"].strip()

            line = Line(line_no, line_code, line_type)

            self._log_line(line)
            self._compile_line(line)

    def _build_class(self, class_name):
        return f"class {class_name}:"

    def _build_func(self, func_code):
        func_def = func_code[0]
        func_body = func_code[1:]

        return "\n".join(['\t' + func_def] + ['\t\t' + line for line in func_body])

    def _build_file(self, class_name, func_dict):
        code = []

        code.append(self._build_class(class_name))
        for func_code in func_dict.values():
            func_code = func_code.to_list()
            code.append(self._build_func(func_code))
            code.append('\n')

        return "\n".join(code)

    def build(self):
        for class_info, func_dict in self.classes.items():
            class_info = class_info.split(".")

            if len(class_info) < 3:
                class_path = Path("external")

            if len(class_info) < 2:
                class_file = Path(class_info[-1] + ".py")
            
            if len(class_info) >= 3:   
                class_name = class_info[-1]
                class_file = Path(class_info[-2] + ".py")
                class_path = Path("/".join(class_info[:-2]))

            os.makedirs(self._output_path / class_path, exist_ok=True)

            p = self._output_path / class_path / class_file
            
            if p.exists():
                with p.open(mode="a") as file:
                    file.write(self._build_file(class_name, func_dict))
            else:
                with p.open(mode="w") as file:
                    file.write(self._build_file(class_name, func_dict))

In [457]:
tc = TraceCompiler("execution.txt")

In [458]:
tc.pre_process()

In [459]:
tc.compile()


In [462]:
tc.build()

In [461]:
tc.classes['django.db.models.sql.query.Query']['get_initial_alias'].lines

{1066: 'def get_initial_alias(self=<django.db.models.sql.query.Query object at 0x107704590>):',
 1074: 'elif self.model:',
 1075: '\talias = self.join(self.base_table_class(self.get_meta().db_table, None))',
 1078: 'return alias',
 1071: 'if self.alias_map:',
 1072: '\talias = self.base_table',
 1073: '\tself.ref_alias(alias)'}

In [129]:
tc.classes

{'django.db.models.manager.ManagerDescriptor': {'__get__': ['def __get__(self=<django.db.models.manager.ManagerDescriptor object at 0x107354690>, instance=None, cls=<django.db.models.base.ModelBase object at 0x1070929d0>):',
   'if cls._meta.swapped:',
   'return cls._meta.managers_map[self.manager.name]']},
 'django.db.models.options.Options': {'swapped': ['def swapped(self=<django.db.models.options.Options object at 0x107275390>):',
   'return None']},
 'django.db.models.manager.Manager': {'all': ['def all(self=<django.db.models.manager.Manager object at 0x1073fb650>):',
   'return self.get_queryset()'],
  'get_queryset': ['def get_queryset(self=<django.db.models.manager.Manager object at 0x1073fb650>):',
   'return self._queryset_class(model=self.model, using=self._db, hints=self._hints)']},
 'django.db.models.query.QuerySet': {'__init__': ['def __init__(self=<django.db.models.query.QuerySet object at 0x107d1edd0>, model=<django.db.models.base.ModelBase object at 0x1070929d0>, query

In [122]:
os.makedirs("django/db/models", exist_ok=True)

In [156]:
# Define a function that takes the functions dictionary and the file name as arguments
def write_functions(functions, file_name):
  # Open the file in write mode
  with open(file_name, "w") as f:
    # Loop through the functions dictionary
    for function_name, function_code in functions.items():
      # Write the function code to the file with a newline
      f.write(function_code + "\n")

In [160]:
with open('source.py', 'w') as file:
    file.write(trace_to_code(df))

In [295]:
#|export

if __name__ == "__main__":
    tc = TraceCompiler("test.txt")
    tc.pre_process()
    tc.compile()

# Create Directory Structure
Based on the parsed execution trace, create the directory structure.

In [3]:
# Define a function to create the directory structure
def create_directory_structure(directories):
    # Iterate over all directories
    for directory in directories:
        # Try to create the directory
        try:
            os.makedirs(directory)
        # If the directory already exists, pass
        except FileExistsError:
            pass

# Call the function with the directories list
create_directory_structure(directories)

# Generate Code Files
Generate the respective code files in the appropriate directories as per the execution trace.

In [4]:
# Define a function to create the code files
def create_code_files(files):
    # Iterate over all files
    for file in files:
        # Try to create the file
        try:
            with open(file, 'w') as f:
                pass
        # If the file already exists, pass
        except FileExistsError:
            pass

# Call the function with the files list
create_code_files(files)

# Export

In [463]:
import nbdev
nbdev.export.nb_export('haircut.ipynb', '.')
print('Export successful')

Export successful
