In [61]:
import sys, os, time, subprocess
from memory_profiler import memory_usage

# Get duration
def format_duration(duration):
    """
    Format the elapsed time since `t0` in hours, minutes, and seconds.
    
    Adapted from @john-fouhy:
    https://stackoverflow.com/questions/538666/python-format-timedelta-to-string
    """
    hours, remainder = divmod(int(duration), 3600)
    minutes, seconds = divmod(remainder, 60)
    return f"{hours:02}:{minutes:02}:{seconds:02}"

# Format header for printing
def format_header(text, line_character="=", n=None):
    if n is None:
        n = len(text)
    line = n*line_character
    return "{}\n{}\n{}".format(line, text, line)

def format_memory(B, unit="infer", return_units=True):
    """
    Return the given bytes as a human-readable string in KB, MB, GB, or TB.
    1 KB = 1024 Bytes

    Adapted from the following source (@whereisalext):
    https://stackoverflow.com/questions/12523586/python-format-size-application-converting-b-to-kb-mb-gb-tb/52379087
    """
    KB = 1024
    MB = KB ** 2  # 1,048,576
    GB = KB ** 3  # 1,073,741,824
    TB = KB ** 4  # 1,099,511,627,776

    def format_with_unit(size, unit_name):
        return f"{size:.2f} {unit_name}" if return_units else size

    unit = unit.lower()
    if unit != "infer":
        unit = unit.lower()
        if unit == "b":
            return format_with_unit(B, "B")
        elif unit == "kb":
            return format_with_unit(B / KB, "KB")
        elif unit == "mb":
            return format_with_unit(B / MB, "MB")
        elif unit == "gb":
            return format_with_unit(B / GB, "GB")
        elif unit == "tb":
            return format_with_unit(B / TB, "TB")
        else:
            raise ValueError(f"Unknown unit: {unit}")
    else:
        if B < KB:
            return format_with_unit(B, "B")
        elif KB <= B < MB:
            return format_with_unit(B / KB, "KB")
        elif MB <= B < GB:
            return format_with_unit(B / MB, "MB")
        elif GB <= B < TB:
            return format_with_unit(B / GB, "GB")
        else:
            return format_with_unit(B / TB, "TB")
    
class RunShellCommand(object):
    """
    Parameters: 
        cmd:str command to be executed
        name:str name associated with command [Default: None]
        shell_executable:str path to executable [Default: /bin/bash]
        
    Usage: 
        cmd = RunShellCommand("time (sleep 5 & echo 'Hello World')", name="Demo")
        cmd.run()
        cmd
        # ================================================
        # RunShellCommand(name:Demo)
        # ================================================
        # (/bin/bash)$ time (sleep 5 & echo 'Hello World')
        # ________________________________________________
        # Properties:
        #     - stdout: 61.00 B
        #     - stderr: 91.00 B
        #     - returncode: 0
        #     - peak memory: 37.22 B
        #     - duration: 00:00:05

    """

    def __init__(
        self, 
        command:str, 
        name:str=None, 
        shell_executable:str="/bin/bash",
        ):

        if isinstance(command, str):
            command = [command]
        command = " ".join(list(filter(bool, map(str, command))))
        self.command = command
        self.name = name
        self.shell_executable = shell_executable
        self.executed = False
        
    def run(self, encoding="utf-8", **popen_kws):
        def execute_command(encoding):
            # Execute the process
            self.process_ = subprocess.Popen(
                self.command,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                executable=self.shell_executable,
                **popen_kws,
            )
            # Wait until process is complete and return stdout/stderr
            self.stdout_, self.stderr_ = self.process_.communicate()
            self.returncode_ = self.process_.returncode
            
            if encoding:
                self.stdout_ = self.stdout_.decode(encoding)
                self.stderr_ = self.stderr_.decode(encoding)

        # Measure memory usage
        t0 = time.time()
        self.memory_usage_ = memory_usage((execute_command, (encoding,)))
        self.duration_ = time.time() - t0

        self.peak_memory_ = max(self.memory_usage_)
        self.executed = True

        return self.stdout_, self.stderr_

    def __repr__(self):
        name_text = "{}(name:{})".format(self.__class__.__name__, self.name)
        command_text = "({})$ {}".format(self.shell_executable, self.command)
        n = max(len(name_text), len(command_text))
        pad = 4
        fields = [
            format_header(name_text,line_character="=", n=n),
            *format_header(command_text, line_character="_", n=n).split("\n")[1:],
            ]
        if self.executed:
            fields += [
            "Properties:",
            pad*" " + "- stdout: {}".format(format_memory(sys.getsizeof(self.stdout_))),
            pad*" " + "- stderr: {}".format(format_memory(sys.getsizeof(self.stderr_))),
            pad*" " + "- returncode: {}".format(self.returncode_),
            pad*" " + "- peak memory: {}".format(format_memory(self.peak_memory_)),
            pad*" " + "- duration: {}".format(format_duration(self.duration_)),
            ]
        return "\n".join(fields)
    

cmd = RunShellCommand("echo $PATH", name="Demo")
cmd.run()
# ================================================
# RunShellCommand(name:Demo)
# ================================================
# (/bin/bash)$ time (sleep 5 & echo 'Hello World')
# ________________________________________________
# Properties:
#     - stdout: 61.00 B
#     - stderr: 91.00 B
#     - returncode: 0
#     - peak memory: 37.22 B
#     - duration: 00:00:05


'/Users/jolespin/miniconda3/envs/soothsayer_env/bin:/Users/jolespin/miniconda3/condabin:/usr/local/bin:/System/Cryptexes/App/usr/bin:/usr/bin:/bin:/usr/sbin:/sbin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/local/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/appleinternal/bin:/Applications/iTerm.app/Contents/Resources/utilities\n'

In [78]:
with open("/Users/jolespin/Test/test.tsv", "r") as f:
    # first_line = f.readline().strip()
    # print(first_line)
    for i, line in enumerate(f):
        print(line)

        if i == 2:
            break


id_node	id_cluster	intra-cluster_connectivity	representative

OceanDNA-a1339	PSLC-4293	99.75	True

OceanDNA-a1340	PSLC-4293	99.75	False



In [154]:
#!/usr/bin/env python
import sys, os, time, gzip, bz2, subprocess, pickle
from memory_profiler import memory_usage

# Get file object
def open_file_reader(filepath:str, compression="auto", binary=False):
    """

    Args:
        filepath (str): path/to/file
        compression (str, optional): {None, gzip, bz2}. Defaults to "auto".

    Returns:
        file object
    """
    if compression == "auto":
        compression = None
        ext = filepath.split(".")[-1].lower()
        if ext == "gz":
            compression = "gzip"
        elif ext == "bz2":
            compression = "bz2"

    if binary:
        if not compression:
            return open(filepath, "rb")
        else:
            # GZIP compression
            if compression == "gzip":
                return gzip.open(filepath, "rb")
            # BZ2 compression
            elif compression == "bz2":
                return bz2.open(filepath, "rb")
    else:
        if not compression:
            return open(filepath, "r")
        else:
            # GZIP compression
            if compression == "gzip":
                return gzip.open(filepath, "rt")
                # f.read1 = f.read # Hack from https://github.com/kislyuk/eight/issues/1
            # BZ2 compression
            elif compression == "bz2":
                return bz2.open(filepath, "rt")
            
# Get file object
def open_file_writer(filepath:str, compression="auto", binary=False):
    """

    Args:
        filepath (str): path/to/file
        compression (str, optional): {None, gzip, bz2}. Defaults to "auto".

    Returns:
        file object
    """
    if compression == "auto":
        compression = None
        ext = filepath.split(".")[-1].lower()
        if ext == "gz":
            compression = "gzip"
        elif ext == "bz2":
            compression = "bz2"

    if binary:
        if not compression:
            return open(filepath, "wb")
        else:
            # GZIP compression
            if compression == "gzip":
                return gzip.open(filepath, "wb")
            # BZ2 compression
            elif compression == "bz2":
                return bz2.open(filepath, "wb")
    else:
        if not compression:
            return open(filepath, "w")
        else:
            # GZIP compression
            if compression == "gzip":
                return gzip.open(filepath, "wt")
                # f.read1 = f.read # Hack from https://github.com/kislyuk/eight/issues/1
            # BZ2 compression
            elif compression == "bz2":
                return bz2.open(filepath, "wt")

# Read pickle
def read_pickle(filepath, compression="auto"):
    with open_file_reader(filepath, compression=compression, binary=True) as f:
        return pickle.load(f)
    
def write_pickle(obj, filepath, compression="auto"):
    with open_file_writer(filepath, compression=compression, binary=True) as f:
        return pickle.dump(obj, f)

# Get duration
def format_duration(duration):
    """
    Format the elapsed time since `t0` in hours, minutes, and seconds.
    
    Adapted from @john-fouhy:
    https://stackoverflow.com/questions/538666/python-format-timedelta-to-string
    """
    hours, remainder = divmod(int(duration), 3600)
    minutes, seconds = divmod(remainder, 60)
    return f"{hours:02}:{minutes:02}:{seconds:02}"

# Format header for printing
def format_header(text, line_character="=", n=None):
    if n is None:
        n = len(text)
    line = n*line_character
    return "{}\n{}\n{}".format(line, text, line)

# Format memory
def format_memory(B, unit="auto", return_units=True):
    """
    Return the given bytes as a human-readable string in KB, MB, GB, or TB.
    1 KB = 1024 Bytes

    Adapted from the following source (@whereisalext):
    https://stackoverflow.com/questions/12523586/python-format-size-application-converting-b-to-kb-mb-gb-tb/52379087
    """
    KB = 1024
    MB = KB ** 2  # 1,048,576
    GB = KB ** 3  # 1,073,741,824
    TB = KB ** 4  # 1,099,511,627,776

    def format_with_unit(size, unit_name):
        return f"{size:.2f} {unit_name}" if return_units else size

    unit = unit.lower()
    if unit != "auto":
        unit = unit.lower()
        if unit == "b":
            return format_with_unit(B, "B")
        elif unit == "kb":
            return format_with_unit(B / KB, "KB")
        elif unit == "mb":
            return format_with_unit(B / MB, "MB")
        elif unit == "gb":
            return format_with_unit(B / GB, "GB")
        elif unit == "tb":
            return format_with_unit(B / TB, "TB")
        else:
            raise ValueError(f"Unknown unit: {unit}")
    else:
        if B < KB:
            return format_with_unit(B, "B")
        elif KB <= B < MB:
            return format_with_unit(B / KB, "KB")
        elif MB <= B < GB:
            return format_with_unit(B / MB, "MB")
        elif GB <= B < TB:
            return format_with_unit(B / GB, "GB")
        else:
            return format_with_unit(B / TB, "TB")
    
class RunShellCommand(object):
    """
    Args: 
        command:str command to be executed
        name:str name associated with command [Default: None]
        shell_executable:str path to executable [Default: /bin/bash]
        
    Usage: 
        cmd = RunShellCommand("time (sleep 5 & echo 'Hello World')", name="Demo")
        cmd.run()
        cmd
        # ================================================
        # RunShellCommand(name:Demo)
        # ================================================
        # (/bin/bash)$ time (sleep 5 & echo 'Hello World')
        # ________________________________________________
        # Properties:
        #     - stdout: 61.00 B
        #     - stderr: 91.00 B
        #     - returncode: 0
        #     - peak memory: 37.22 B
        #     - duration: 00:00:05

    """

    def __init__(
        self, 
        command:str, 
        name:str=None, 
        shell_executable:str="/bin/bash",
        ):

        if isinstance(command, str):
            command = [command]
        command = " ".join(list(filter(bool, map(str, command))))
        self.command = command
        self.name = name
        self.shell_executable = shell_executable
        self.executed = False
        
    def run(self, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", **popen_kws):
        def execute_command(encoding, stdout, stderr):
            # Execute the process
            self.process_ = subprocess.Popen(
                self.command,
                shell=True,
                stdout=stdout,
                stderr=stderr,
                executable=self.shell_executable,
                **popen_kws,
            )
            # Wait until process is complete and return stdout/stderr
            self.stdout_, self.stderr_ = self.process_.communicate()
            self.returncode_ = self.process_.returncode
            
            # Encode
            if encoding:
                if self.stdout_:
                    self.stdout_ = self.stdout_.decode(encoding)
                if self.stderr_:
                    self.stderr_ = self.stderr_.decode(encoding)

        # I/O
        self.redirect_stdout = None
        if isinstance(stdout, str):
            self.redirect_stdout = stdout
            stdout = open(stdout, "wb")

        self.redirect_stderr = None
        if isinstance(stderr, str):
            self.redirect_stderr = stderr
            stderr = open(stderr, "wb")

        # Measure memory usage
        t0 = time.time()
        self.memory_usage_ = memory_usage((execute_command, (encoding, stdout, stderr,)), max_iterations=1)
        self.duration_ = time.time() - t0

        # # Flush
        # if hasattr(stdout, "flush"):
        #     stdout.flush()
        # if hasattr(stderr, "flush"):
        #     stderr.flush()
            
        # Close
        if hasattr(stdout, "close"):
            stdout.close()
        if hasattr(stderr, "close"):
            stderr.close()

        self.peak_memory_ = max(self.memory_usage_)
        self.executed = True

        return self

    def __repr__(self):
        name_text = "{}(name:{})".format(self.__class__.__name__, self.name)
        command_text = "({})$ {}".format(self.shell_executable, self.command)
        n = max(len(name_text), len(command_text))
        pad = 4
        fields = [
            format_header(name_text,line_character="=", n=n),
            *format_header(command_text, line_character="_", n=n).split("\n")[1:],
            ]
        if self.executed:
            fields += [
            "Properties:",
            ]
            # stdout
            if self.redirect_stdout:
                fields += [
                pad*" " + "- stdout({}): {}".format(
                    self.redirect_stdout,
                    format_memory(os.stat(self.redirect_stdout).st_size),
                )
                ]
            else:
                fields += [
                pad*" " + "- stdout: {}".format(format_memory(sys.getsizeof(self.stdout_))),
                ]
            # stderr
            if self.redirect_stderr:
                fields += [
                pad*" " + "- stderr({}): {}".format(
                    self.redirect_stderr,
                    format_memory(os.stat(self.redirect_stderr).st_size),
                )
                ]
            else:
                fields += [
                pad*" " + "- stderr: {}".format(format_memory(sys.getsizeof(self.stderr_))),
                ]

            fields += [
            pad*" " + "- returncode: {}".format(self.returncode_),
            pad*" " + "- peak memory: {}".format(format_memory(self.peak_memory_)),
            pad*" " + "- duration: {}".format(format_duration(self.duration_)),
            ]
        return "\n".join(fields)


In [155]:
cmd = RunShellCommand("time (sleep 1 & echo 'Hello World')", name="Demo")
cmd.run(stdout="test_out.txt", stderr="test_err.txt")
# ================================================
# RunShellCommand(name:Demo)
# ================================================
# (/bin/bash)$ time (sleep 1 & echo 'Hello World')
# ________________________________________________
# Properties:
#     - stdout(test_out.txt): 36.00 B
#     - stderr(test_err.txt): 126.00 B
#     - returncode: 0
#     - peak memory: 27.65 B
#     - duration: 00:00:01

RunShellCommand(name:Demo)
(/bin/bash)$ time (sleep 1 & echo 'Hello World')
________________________________________________
Properties:
    - stdout(test_out.txt): 12.00 B
    - stderr(test_err.txt): 42.00 B
    - returncode: 0
    - peak memory: 36.15 B
    - duration: 00:00:00

In [150]:
%%bash
cat test_out.txt
# Hello World
# Hello World
# Hello World

Hello World
Hello World
Hello World


In [156]:
with open("test_out.txt", "rt") as f:
    print(f.read())

Hello World



In [158]:
import json
d = {"a":0}
with open("test.json", "wt") as f:
    json.dump(d, f)
with open("test.json", "rt") as f:
    d2 = json.load(f)
d2

{'a': 0}

In [174]:
import logging
import sys

def build_logger(logger_name=__name__, stream=sys.stdout):
    # Create a logger object
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)  # Set the logging level
    
    # Create a stream handler to output logs to stdout
    stream_handler = logging.StreamHandler(stream)
    stream_handler.setLevel(logging.DEBUG)  # Set the level for the handler
    
    # Create a formatter and set it to the handler
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    stream_handler.setFormatter(formatter)
    
    # Add the handler to the logger
    logger.addHandler(stream_handler)

    return logger
    
def reset_logger(logger):
    # Remove all existing handlers
    for handler in logger.handlers[:]:
        logger.removeHandler(handler)
        handler.close()
    
    # Set a new handler (for example, to output to stdout)
    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    stream_handler.setFormatter(formatter)
    logger.addHandler(stream_handler)
    
    # Optionally set a new level
    logger.setLevel(logging.DEBUG)
    
logger = build_logger("FUCK")


In [178]:
# reset_logger(logger)
logger.info("suh dude")
logger.error("fuh dude")

2024-07-26 13:53:28,222 - FUCK - INFO - suh dude
2024-07-26 13:53:28,225 - FUCK - ERROR - fuh dude


In [179]:
logger.info("FUCK")

2024-07-26 13:53:37,689 - FUCK - INFO - FUCK


In [181]:
any(["a"])

True

In [196]:
from memory_profiler import memory_usage
import functools

def profile_peak_memory(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # Measure memory usage
        mem_usage = memory_usage((func, args, kwargs), max_usage=True, retval=True, max_iterations=1)
        peak_memory, result = mem_usage[0], mem_usage[1]
        print(f"Peak memory usage for {func.__name__}: {peak_memory:.2f} MiB")
        return result
    return wrapper
    
@profile_peak_memory
def func():
    with open("test.txt", "w") as f:
        print("FUCK", file=f)
    return "yea boi"

result = func()
print(result)

Peak memory usage for func: 19.54 MiB
yea boi


In [189]:
%%bash
cat test.txt

FUCK


In [221]:
import functools
from memory_profiler import memory_usage

def profile_peak_memory(func, stream=sys.stdout):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # Measure memory usage
        mem_usage = memory_usage((func, args, kwargs), max_usage=True, retval=True, max_iterations=1)
        peak_memory, result = mem_usage[0], mem_usage[1]
        print(f"Peak memory usage for {func.__name__}: {format_memory(peak_memory)}", file=stream)
        return result
    return wrapper

# Example usage
@profile_peak_memory(stream=sys.stderr)
def example_function(n):
    
    return np.power(np.random.normal(size=(n,n)), 5)

result = example_function(10)


TypeError: profile_peak_memory() missing 1 required positional argument: 'func'

In [228]:
import hashlib
import os



get_md5hash_from_directory(".")

{'./utility_sandbox.ipynb': 'b6767ad254d8b1edd0967fc287f1e7db',
 './test.txt': '8921c550080d6f4341d101acfe6ee205',
 './.ipynb_checkpoints/test_err-checkpoint.txt': '70e50c46e00f00541417d1f5d82d4eee',
 './.ipynb_checkpoints/test_out-checkpoint.txt': 'e59ff97941044f85df5297e1c302d260',
 './.ipynb_checkpoints/utility_sandbox-checkpoint.ipynb': 'b6767ad254d8b1edd0967fc287f1e7db',
 './.ipynb_checkpoints/prototype-checkpoint.ipynb': 'b6fca670b679b6c769aeeea0e78f8122'}

In [209]:
example_function.peak_memory

832.9765625

In [218]:
import genopype as gp

In [229]:
gp.get_directory_tree(".", ascii=True)

'/\n|__ .ipynb_checkpoints/\n|   |__ prototype-checkpoint.ipynb\n|   |__ test_err-checkpoint.txt\n|   |__ test_out-checkpoint.txt\n|   |__ utility_sandbox-checkpoint.ipynb\n|__ test.txt\n|__ utility_sandbox.ipynb'

In [230]:
IsADirectoryError("FUCK")

IsADirectoryError('FUCK')

In [232]:
subprocess.CalledProcessError("FUCK")

TypeError: __init__() missing 1 required positional argument: 'cmd'

In [234]:
pd.MultiIndex.from_tuples()

TypeError: from_tuples() missing 1 required positional argument: 'tuples'