In [None]:
# %%writefile ./clean_ipynb.py
from json import dump, load
from multiprocessing import cpu_count
from multiprocessing.dummy import Pool
from pathlib import Path
from subprocess import run

from autoflake import fix_code
from black import (
    DEFAULT_LINE_LENGTH,
    PY36_VERSIONS,
    FileMode,
    NothingChanged,
    format_file_contents,
)
from isort import SortImports

pool = Pool(cpu_count())

def remove_duplicate_cells(cells):
    cell_set_strings = []
    cell_set = []
    for e in cells:
        if e["source"] in cell_set_strings:
            continue
        else:
            cell_set_strings.append(e["source"])
            cell_set.append(e)
    return cell_set


def remove_empty_cells(cells):
    cell_set = []
    for e in cells:
        if len(e["source"]) <= 2:
            continue
        else:
            cell_set.append(e)
    return cell_set


def remove_magics(source):
    # check for '%' in first token of each line
    non_magic_source = []
    # magics, as well as source queries
    invalid_source = ["%", "?"]
    for e in source.split("\n"):
        if len(e) == 0:
            continue
        if e[0] in invalid_source:
            continue
        else:
            non_magic_source.append(e)
    return "\n".join(non_magic_source)

def clean_python_code(python_source, isort=True, black=True, autoflake=True):
    # run source code string through autoflake, isort, and black
    formatted_source = python_source

    if autoflake:
        formatted_source = fix_code(
            formatted_source,
            expand_star_imports=True,
            remove_all_unused_imports=True,
            remove_duplicate_keys=True,
            remove_unused_variables=True,
        )

    if isort:
        formatted_source = SortImports(file_contents=formatted_source).output

    if black:
        mode = FileMode(
            target_versions=PY36_VERSIONS,
            line_length=DEFAULT_LINE_LENGTH,
            is_pyi=False,
            string_normalization=True,
        )
        try:
            formatted_source = format_file_contents(
                formatted_source, fast=True, mode=mode
            )
        except NothingChanged:
            pass
    return formatted_source


def clear_ipynb_output(ipynb_file_path):
    # clear cell outputs, reset cell execution count of each cell in a jupyer notebook
    run(
        (
            "jupyter",
            "nbconvert",
            "--ClearOutputPreprocessor.enabled=True",
            "--inplace",
            ipynb_file_path,
        ),
        check=True,
    )


def clean_ipynb_cell(cell_dict):
    # clean a single cell within a jupyter notebook
    if cell_dict["cell_type"] == "code":
        try:
            clean_lines = clean_python_code("".join(cell_dict["source"])).split("\n")

            if len(clean_lines) == 1 and clean_lines[0] == "":
                clean_lines = []
            else:
                clean_lines[:-1] = [
                    clean_line + "\n" for clean_line in clean_lines[:-1]
                ]
            cell_dict["source"] = clean_lines
            return cell_dict

        except:
            # return original cell dict otherwise
            return cell_dict
    else:
        return cell_dict


def clean_ipynb(
    ipynb_file_path, clear_output=True, autoflake=True, isort=True, black=True
):
    # load, clean and write .ipynb source in-place, back to original file
    if clear_output:
        clear_ipynb_output(ipynb_file_path)

    with open(ipynb_file_path) as ipynb_file:
        ipynb_dict = load(ipynb_file)

    # mulithread the map operation
    processed_cells = pool.map(clean_ipynb_cell, ipynb_dict["cells"])
    ipynb_dict["cells"] = processed_cells

    with open(ipynb_file_path, "w") as ipynb_file:
        dump(ipynb_dict, ipynb_file, indent=1)
        ipynb_file.write("\n")


def create_file(file_path, contents):
    file_path.touch()
    file_path.open("w", encoding="utf-8").write(contents)


def clean_py(py_file_path, autoflake=True, isort=True, black=True):
    # load, clean and write .py source, write cleaned file back to disk
    with open(py_file_path, "r") as file:
        source = file.read()

    clean_lines = clean_python_code("".join(source))
    create_file(Path(py_file_path), clean_lines)


In [None]:
# %%writefile ../cli.py
import glob
from pathlib import Path

import plac
from wasabi import Printer

from .clean_ipynb import clean_ipynb, clean_py

msg = Printer()


@plac.annotations(
    path=("File or dir to clean", "positional", None, str),
    py=("Apply to .py source", "option", None, bool),
    ipynb=("Apply to .ipynb source", "option", None, bool),
    autoflake=("Apply autoflake to source", "option", None, bool),
    isort=("Apply isort to source", "option", None, bool),
    black=("Apply black to source", "option", None, bool),
)
def main(path, py=True, ipynb=True, autoflake=True, isort=True, black=True):
    path = Path(path)
    if not path.exists():
        raise ValueError("Provide a valid path to a file or directory")

    if path.is_dir():
        # recursively apply to all .py source within dir
        msg.info(f"Recursively cleaning directory: {path}")
        if py:
            for e in glob.iglob(path.as_posix() + "/**/*.py", recursive=True):
                try:
                    msg.info(f"Cleaning file: {e}")
                    clean_py(e, autoflake, isort, black)
                except:
                    msg.fail(f"Unable to clean file: {e}")
        if ipynb:
            # recursively apply to all .ipynb source within dir
            for e in glob.iglob(path.as_posix() + "/**/*.ipynb", recursive=True):
                try:
                    msg.info(f"Cleaning file: {e}")
                    clean_ipynb(e, autoflake, isort, black)
                except:
                    msg.fail(f"Unable to clean file: {e}")

    if path.is_file():
        msg.info(f"Cleaning file: {path}")

        if path.suffix not in [".py", ".ipynb"]:
            # valid extensions
            raise ValueError("Ensure valid .py or .ipynb path is provided")

            if py and path.suffix == ".py":
                clean_py(path, autoflake, isort, black)

            if ipynb and path.suffix == ".ipynb":
                clean_ipynb(path, autoflake, isort, black)


def main_wrapper():
    plac.call(main)


In [None]:
import shutil


def delete_copy_dir(source, destination):
    # copy dir, delete if destination already exists
    if Path(destination).exists():
        shutil.rmtree(destination)
    shutil.copytree(source, destination)


delete_copy_dir("./dev_source/", "./dev_source_test")
