In [1]:
import errno
import logging
import os
import shutil
import stat
import uuid
from typing import IO, Any, Callable, Iterator, Optional, Union

In [2]:
# Source: toil/jobStores/abstractJobStore.py
def readFile(self, jobStoreFileID: str, localFilePath: str, symlink: bool = False) -> None:
    return self.read_file(jobStoreFileID, localFilePath, symlink)

In [3]:
# Source: cactus/shared/common.py
def readGlobalFileWithoutCache(fileStore, jobStoreID):
    """Reads a jobStoreID into a file and returns it, without touching
    the cache.
    Works around toil issue #1532.
    """
    f = fileStore.getLocalTempFile()
    fileStore.jobStore.readFile(jobStoreID, f)
    return f

In [4]:
# Source: cactus/preprocessor/cactus_preprocessor.py
def run(self, fileStore):
    chunkList = [readGlobalFileWithoutCache(fileStore, fileID) for fileID in self.chunkIDList]

    #Docker expects paths relative to the work dir
    chunkList = [os.path.basename(chunk) for chunk in chunkList]
    outSequencePath = fileStore.getLocalTempFile()
    cactus_call(outfile=outSequencePath, stdin_string=" ".join(chunkList),
                parameters=["fasta_merge"])
    return fileStore.writeGlobalFile(outSequencePath)

In [5]:
# Source: toil/job.py
# Source: toil/worker.py

In [6]:
# Source: toil/lib/io.py            
def atomic_tmp_file(final_path: str) -> str:
    """Return a tmp file name to use with atomic_install.  This will be in the
    same directory as final_path. The temporary file will have the same extension
    as finalPath.  It the final path is in /dev (/dev/null, /dev/stdout), it is
    returned unchanged and atomic_tmp_install will do nothing."""
    final_dir = os.path.dirname(os.path.normpath(final_path))  # can be empty
    if final_dir == '/dev':
        return final_path
    final_basename = os.path.basename(final_path)
    final_ext = os.path.splitext(final_path)[1]
    base_name = f"{final_basename}.{uuid.uuid4()}.tmp{final_ext}"
    return os.path.join(final_dir, base_name)


def atomic_install(tmp_path, final_path) -> None:
    """atomic install of tmp_path as final_path"""
    if os.path.dirname(os.path.normpath(final_path)) != '/dev':
        os.rename(tmp_path, final_path)


def AtomicFileCreate(final_path: str, keep: bool = False) -> Iterator[str]:
    """Context manager to create a temporary file.  Entering returns path to
    the temporary file in the same directory as finalPath.  If the code in
    context succeeds, the file renamed to its actually name.  If an error
    occurs, the file is not installed and is removed unless keep is specified.
    """
    tmp_path = atomic_tmp_file(final_path)
    try:
        yield tmp_path
        atomic_install(tmp_path, final_path)
    except Exception:
        if not keep:
            try:
                os.unlink(tmp_path)
            except Exception:
                pass
        raise

            
def atomic_copy(src_path: str, dest_path: str, executable: Optional[bool] = None) -> None:
    """Copy a file using posix atomic creations semantics."""
    if executable is None:
        executable = os.stat(src_path).st_mode & stat.S_IXUSR != 0
    with AtomicFileCreate(dest_path) as dest_path_tmp:
        shutil.copyfile(src_path, dest_path_tmp)
        if executable:
            os.chmod(dest_path_tmp, os.stat(dest_path_tmp).st_mode | stat.S_IXUSR)

            
            
# Source: toil/jobStores/fileJobStore.py
logger = logging.getLogger(__name__)
def _get_file_path_from_id(jobStoreDir, jobStoreFileID):
    """
    :param str jobStoreFileID: The ID of a file

    :rtype : string, string is the absolute path that that file should
             appear at on disk, under either self.jobsDir if it is to be
             cleaned up with a job, or self.filesDir otherwise.
    """

    # We just make the file IDs paths under the job store overall.
    absPath = os.path.join(jobStoreDir, jobStoreFileID)

    # Don't validate here, we are called by the validation logic

    return absPath


def _check_job_store_file_id(jobStoreDir, jobStoreFileID):
    """
    :raise NoSuchFileException: if the file with ID jobStoreFileID does
                                not exist or is not a file
    """
    if not file_exists(jobStoreDir, jobStoreFileID):
        raise NoSuchFileException(jobStoreFileID)

            
def file_exists(jobStoreDir, file_id):
    absPath = _get_file_path_from_id(jobStoreDir, file_id)

    if (not absPath.startswith(jobStoreDir)):
        # Don't even look for it, it is out of bounds.
        raise NoSuchFileException(file_id)

    try:
        st = os.stat(absPath)
    except os.error:
        return False
    if not stat.S_ISREG(st.st_mode):
        raise NoSuchFileException(file_id)
    return True


def read_file(jobStoreDir, file_id, local_path, symlink=False):
    _check_job_store_file_id(jobStoreDir, file_id)
    jobStoreFilePath = _get_file_path_from_id(jobStoreDir, file_id)
    localDirPath = os.path.dirname(local_path)
    executable = getattr(file_id, 'executable', False)

    if not symlink and os.path.islink(local_path):
        # We had a symlink and want to clobber it with a hardlink or copy.
        os.unlink(local_path)

    if os.path.exists(local_path) and os.path.samefile(jobStoreFilePath, local_path):
        # The files are already the same: same name, hardlinked, or
        # symlinked. There is nothing to do, and trying to shutil.copyfile
        # one over the other will fail.
        return

    if symlink:
        # If the reader will accept a symlink, so always give them one.
        # There's less that can go wrong.
        try:
            os.symlink(jobStoreFilePath, local_path)
            # It worked!
            return
        except OSError as e:
            # For the list of the possible errno codes, see: https://linux.die.net/man/2/symlink
            if e.errno == errno.EEXIST:
                # Overwrite existing file, emulating shutil.copyfile().
                os.unlink(local_path)
                # It would be very unlikely to fail again for same reason but possible
                # nonetheless in which case we should just give up.
                os.symlink(jobStoreFilePath, local_path)
                # Now we succeeded and don't need to copy
                return
            elif e.errno == errno.EPERM:
                # On some filesystems, the creation of symbolic links is not possible.
                # In this case, we try to make a hard link.
                pass
            else:
                logger.error(f"Unexpected OSError when reading file '{jobStoreFilePath}' from job store")
                raise

    # If we get here, symlinking isn't an option.
    # Make sure we are working with the real source path, in case it is a
    # symlinked import.
    jobStoreFilePath = os.path.realpath(jobStoreFilePath)

    if os.stat(jobStoreFilePath).st_dev == os.stat(localDirPath).st_dev:
        # It is possible that we can hard link the file.
        # Note that even if the device numbers match, we can end up trying
        # to create a "cross-device" link.
        try:
            os.link(jobStoreFilePath, local_path)
            # It worked!
            return
        except OSError as e:
            # For the list of the possible errno codes, see: https://linux.die.net/man/2/link
            if e.errno == errno.EEXIST:
                # Overwrite existing file, emulating shutil.copyfile().
                os.unlink(local_path)
                # It would be very unlikely to fail again for same reason but possible
                # nonetheless in which case we should just give up.
                try:
                    os.link(jobStoreFilePath, local_path)
                    # Now we succeeded and don't need to copy
                    return
                except:
                    pass
            elif e.errno == errno.EXDEV:
                # It's a cross-device link even though it didn't appear to be.
                # Just keep going and hit the file copy case.
                pass
            elif e.errno == errno.EPERM:
                # On some filesystems, hardlinking could be disallowed by permissions.
                # In this case, we also fall back to making a complete copy.
                pass
            elif e.errno == errno.ELOOP:
                # Too many symbolic links were encountered. Just keep going and hit the
                # file copy case.
                pass
            elif e.errno == errno.EMLINK:
                # The maximum number of links to file is reached. Just keep going and
                # hit the file copy case.
                pass
            else:
                logger.error(f"Unexpected OSError when reading file '{jobStoreFilePath}' from job store")
                raise

    print("Gunnaaaa make a copayyyyy")
    # atomic_copy(jobStoreFilePath, local_path, executable=executable)

In [7]:
js = "/scratch1/kdeweese/latissima/genome_stats/test_jobstore"
# tmp = "test_tmp"
file_id = "test1.out"
local_path = "test_tmp/test2.out"
read_file(js, file_id, local_path)

Gunnaaaa make a copayyyyy
