In [12]:
folder_path = '/Users/siyuanzhou/Nutstore Files/developer/2017intercultural/sub-project/14.0NLP/code/'
save_dir = '/Users/siyuanzhou/Nutstore Files/developer/2017intercultural/sub-project/14.0NLP/output/resultsflow6'

In [13]:
import os
import re
import nbformat
import graphviz
import ast

In [14]:
def parse_variable_assignments(code):
    """ Parse Python code to find all variable assignments. """
    tree = ast.parse(code)
    assignments = {}
    for node in ast.walk(tree):
        if isinstance(node, ast.Assign) and len(node.targets) == 1 and isinstance(node.targets[0], ast.Name):
            var_name = node.targets[0].id
            try:
                if isinstance(node.value, ast.Str):
                    assignments[var_name] = node.value.s
                elif isinstance(node.value, ast.BinOp) and isinstance(node.value.op, (ast.Add, ast.Mod)):
                    values = [node.value.left, node.value.right]
                    resolved_values = []
                    for v in values:
                        if isinstance(v, ast.Str):
                            resolved_values.append(v.s)
                        elif isinstance(v, ast.Name) and v.id in assignments:
                            resolved_values.append(assignments[v.id])
                    if len(resolved_values) == len(values):
                        assignments[var_name] = ''.join(resolved_values)
            except Exception:
                continue
    return assignments

def clean_code(code):
    """ Remove lines that start with '%' or '!', common in Jupyter notebooks as magic commands and shell commands. """
    cleaned_lines = []
    for line in code.splitlines():
        if not line.strip().startswith(('%', '!')):
            cleaned_lines.append(line)
    return '\n'.join(cleaned_lines)

def replace_variables_in_path_and_trim(path, variables):
    """ Replace variables in the file path and only return the base filename. """
    original_path = path
    for var, value in variables.items():
        path = path.replace(var, value)
    if path == original_path and '+' in path:
        path = re.sub(r"[\w\s]*\+\s*['\"]", "", path)
    path = re.sub(r"[\'+\s]+", "", path)
    return os.path.basename(path)

def extract_operations_from_code(code, file_operations):
    """ Extract file operations from code and replace variables in paths. """
    variables = parse_variable_assignments(code)
    patterns = {
        "read_csv": r"read_csv\((.*?)\)",
        "read_json": r"read_json\((.*?)\)",
        "to_csv": r"to_csv\((.*?)\)",
        "to_json": r"to_json\((.*?)\)"
    }
    for operation, pattern in patterns.items():
        matches = re.findall(pattern, code, re.DOTALL)
        for match in matches:
            file_name = match.split(',')[0].strip().strip('"').strip("'")
            file_name = replace_variables_in_path_and_trim(file_name, variables)
            if 'read_' in operation:
                file_operations['inputs'].add(file_name)
            elif 'to_' in operation:
                file_operations['outputs'].add(file_name)
    return file_operations

def extract_file_operations_from_ipynb(nb_path):
    """ Extract file operations from a Jupyter notebook after cleaning the code. """
    with open(nb_path, 'r', encoding='utf-8') as nb_file:
        nb = nbformat.read(nb_file, as_version=4)
    file_operations = {'inputs': set(), 'outputs': set()}
    for cell in nb['cells']:
        if cell['cell_type'] == 'code':
            cleaned_code = clean_code(cell['source'])
            try:
                file_operations = extract_operations_from_code(cleaned_code, file_operations)
            except SyntaxError:
                # Handle potential syntax errors from remaining non-standard code
                continue
    return file_operations

def extract_file_operations_from_py(py_path):
    """ Extract file operations from a Python script. """
    with open(py_path, 'r', encoding='utf-8') as py_file:
        code = py_file.read()
    file_operations = {'inputs': set(), 'outputs': set()}
    file_operations = extract_operations_from_code(code, file_operations)
    return file_operations

def create_flowchart(file_ops):
    """ Create a flowchart of file operations using Graphviz. """
    dot = graphviz.Digraph(comment='Data Analysis Workflow', graph_attr={'rankdir': 'LR'})
    for nb_name, ops in file_ops.items():
        dot.node(nb_name, nb_name.split('.')[0], shape='box')
        for input_file in ops['inputs']:
            dot.node(input_file, input_file, shape='cylinder')
            dot.edge(input_file, nb_name, label='input')
        for output_file in ops['outputs']:
            dot.node(output_file, output_file, shape='cylinder')
            dot.edge(nb_name, output_file, label='output')
    return dot

def process_files_recursively(folder_path):
    """ Recursively process .ipynb and .py files in directories and subdirectories. """
    file_ops_dict = {}
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            file_path = os.path.join(root, file)
            if file.endswith('.ipynb'):
                try:
                    file_ops_dict[file] = extract_file_operations_from_ipynb(file_path)
                except json.JSONDecodeError:
                    print(f"Error decoding JSON for file: {file_path}")
            elif file.endswith('.py'):
                file_ops_dict[file] = extract_file_operations_from_py(file_path)
    return file_ops_dict

# Main execution logic: apply the functions to files in a specified directory
file_ops_dict = process_files_recursively(folder_path)

flowchart = create_flowchart(file_ops_dict)
flowchart.render(save_dir, format='pdf', cleanup=True)

'/Users/siyuanzhou/Nutstore Files/developer/2017intercultural/sub-project/14.0NLP/output/resultsflow6.pdf'