In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from pathlib import Path
from hashlib import sha256

def hash_file(filepath: Path) -> str:
    """Hash a file's contents using sha256 and return the hash hex digest."""
    hash = sha256()
    with open(filepath, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash.update(chunk)
    return hash.hexdigest()


In [None]:
# Initialize the structure of the file tree
# User provides the root directory (or an s3-compatible bucket)
# Under that root directory, we create:
# - a data/ directory to store files, using the pattern: data/<hash>/<filename>
# - a datasets/ directory to store datasets, which are versioned collections of files, using the pattern: datasets/<dataset_name>/<version_hash>/files.txt, where `files.txt` is a text file containing a newline-separated list of hashes of the files in the dataset.


In [None]:
from pyprojroot import here
def initialize_file_tree(root_dir: Path):
    root_dir.mkdir(parents=True, exist_ok=True)
    data_dir = root_dir / "data"
    data_dir.mkdir(parents=True, exist_ok=True)
    datasets_dir = root_dir / "datasets"
    datasets_dir.mkdir(parents=True, exist_ok=True)


initialize_file_tree(here() / "data" / "dummy")

In [None]:
from dataclasses import dataclass, field
from typing import List
import json5 as json
import shutil


@dataclass
class GitData:
    """A class for storing data in a git-like structure."""

    root_dir: Path

    def get_dataset(self, dataset_name: str) -> "Dataset":
        """Return a Dataset"""
        dataset_dir = self.root_dir / "datasets" / dataset_name
        if not dataset_dir.exists():
            raise ValueError(f"Dataset {dataset_name} does not exist.")
        return Dataset(self.root_dir, dataset_name)

    def create_dataset(self, dataset_name: str) -> "Dataset":
        """Create a dataset by name."""
        dataset_dir = self.root_dir / "datasets" / dataset_name
        if not dataset_dir.exists():
            dataset_dir.mkdir(parents=True, exist_ok=True)
        return Dataset(root_dir=self.root_dir, dataset_name=dataset_name)


@dataclass
class DatasetCommit:
    """The definition of a commit is the state of the dataset as checked in by a user.

    The state of the dataset is defined by the files that are present in the dataset.
    This is represented by hashing all of the files in the dataset.
    The source of truth for the DatasetCommit
    is stored in the commit.json file of each commit hash.

    A DatasetCommit is always associated with a Dataset.
    """

    root_dir: Path
    dataset_name: str
    version_hash: str = field(default="")
    commit_message: str = field(default="")
    def __post_init__(self):
        self.data_dir = self.root_dir / "data"
        self.dataset_dir = self.root_dir / "datasets" / self.dataset_name

        # Special case behavior is needed if this is the first commit,
        # in which case


    def _read_json(self) -> dict:
        with open(self.dataset_dir / self.version_hash / "commit.json", "r+") as f:
            return json.loads(f.read())

    def _file_hashes(self) -> List[str]:
        try:
            return self._read_json()["file_hashes"]
        except FileNotFoundError:
            return []


    def _file_dict(self) -> dict:
        # Return a dictionary of the form {filename: file_hash_dir}
        file_dict = {}
        for file_hash in self._file_hashes():
            filepath = sorted((self.data_dir / file_hash).glob("*"))[0]
            file_dict[filepath.name] = filepath
        return file_dict

    def _commit_message(self) -> str:
        try:
            return self._read_json()["commit_message"]
        except FileNotFoundError:
            return ""

    def parent_hash(self) -> str:
        try:
            return self._read_json()["parent_hash"]
        except FileNotFoundError:
            return ""

    def create(
        self,
        commit_message: str,
        add_files: str | Path | List[str | Path] = None,
        remove_files: str | Path | List[str | Path] = None,
    ) -> str:
        # First off, calculate the new state of the dataset,
        # i.e. which files are going to be present.

        hash_hexes = []
        # For each file, create the hash directory and copy the file to it
        if add_files is not None:
            if isinstance(add_files, Path):
                add_files = [add_files]
            for filepath in add_files:
                hash_hex = hash_file(Path(filepath))
                hash_hexes.append(hash_hex)
                file_hash_dir = self.data_dir / hash_hex
                file_hash_dir.mkdir(parents=True, exist_ok=True)
                # Copy the file, not move, to the hash directory.
                shutil.copy(filepath, file_hash_dir / filepath.name)

        hash_hexes.extend(self._file_hashes())

        # Finally, compute the hash of the removed files
        # and remove them from the hash_hexes
        if remove_files is not None:
            if isinstance(remove_files, (str, Path)):
                remove_files = [remove_files]

            for filename in remove_files:
                hash_hex = self._file_dict()[str(filename)].parent.name
                hash_hexes.remove(hash_hex)

        # Sort the hash_hexes so that the order of the files in the dataset
        # is deterministic.
        hash_hexes = sorted(hash_hexes)

        # Create a new version of the dataset by hashing
        # the concatenation of the hash_hexes and the commit message.
        hash_concat = "\n".join(hash_hexes) + "\n" + commit_message
        hasher = sha256()
        hasher.update(hash_concat.encode("utf-8"))
        version_hash = hasher.hexdigest()

        # Write the new version of the dataset to the dataset directory.
        dataset_version_dir = self.dataset_dir / version_hash
        dataset_version_dir.mkdir(parents=True, exist_ok=True)
        with open(dataset_version_dir / "commit.json", "w+") as f:
            info = {
                "file_hashes": hash_hexes,
                "parent_hash": self.parent_hash(),
                "commit_message": commit_message,
            }
            f.write(json.dumps(info))
        return DatasetCommit(
            root_dir=self.root_dir,
            dataset_name=self.dataset_name,
            version_hash=version_hash,
            commit_message=commit_message,
        )


# A commit with an empty commit hash is an empty commit hash. This is a special case.
commit1 = DatasetCommit(root_dir=here() / "data" / "dummy", dataset_name="test_dataset")
print(commit1._file_dict())
print(commit1.version_hash)
commit2 = commit1.create(commit_message="test from empty commit", add_files=[here() / "MANIFEST.in"])
print(commit2._file_dict())
print(commit2.version_hash)
commit3 = commit2.create(commit_message="test adding from existing commit", remove_files=["MANIFEST.in"])
print(commit3._file_dict())
print(commit3.version_hash)

In [None]:
def test_commit_creation():
    # This test allows me to read an existing commit and create descendent commits.
    commit1 = DatasetCommit(
        root_dir=here() / "data" / "dummy",
        dataset_name="test_dataset",
        version_hash="3273234a22aca83f0846473a3b6e5fe5d5ceef1d8733f23947da44c43f4d49d3",
    )
    print(commit1.version_hash)
    print(commit1._file_hashes())
    print(commit1._commit_message())
    print(commit1._file_dict())

    commit2 = commit1.create(
        commit_message="test commit", add_files=[here() / "MANIFEST.in"]
    )
    print(commit2._file_hashes())
    print(commit2._commit_message())
    print(commit2._file_dict())

    commit3 = commit2.create(commit_message="test_remove", remove_files="MANIFEST.in")
    print(commit3._file_hashes())
    print(commit3._commit_message())
    print(commit3._file_dict())

    commit4 = commit3.create(
        commit_message="test add pre-commit config",
        add_files=[here() / ".pre-commit-config.yaml"],
    )
    print(commit4._file_hashes())
    print(commit4._commit_message())
    print(commit4._file_dict())


test_commit_creation()

In [None]:
@dataclass
class Dataset:
    """A class for storing data in a git-like structure."""
    root_dir: Path
    dataset_name: str

    def __post_init__(self) -> None:
        self.data_dir = self.root_dir / "data"
        self.dataset_dir = self.root_dir / "datasets" / self.dataset_name

        self.commit = DatasetCommit(self.root_dir, self.dataset_name, self.current_version_hash())

        metadata_path = self.dataset_dir / "metadata.json"
        if not metadata_path.exists():
            # Create an empty metadata file.
            metadata = {"current_version_hash": None}
            with open(metadata_path, "w+") as f:
                f.write(json.dumps(metadata))

    def current_version_hash(self):
        """Return the hash of the current version of the dataset."""
        with open(self.dataset_dir / "metadata.json", "r") as f:
            metadata = json.loads(f.read())
        return metadata["current_version_hash"]

    def commit(self, commit_message: str, files: Path | List[Path]) -> str:
        """Commit data files to the dataset."""

        # Update metadata.json with current hash.
        with open(self.dataset_dir / "metadata.json", "r") as f:
            metadata = json.loads(f.read())
        metadata["current_version_hash"] = version_hash
        with open(self.dataset_dir / "metadata.json", "w+") as f:
            f.write(json.dumps(metadata))

    def metadata(self):
        """Return the metadata for the dataset."""
        with open(self.dataset_dir / "metadata.json", "r") as f:
            return json.loads(f.read())

    def checkout(self, version_hash: str) -> None:
        """Checkout a version of the dataset."""


In [None]:
datasets = GitData(here() / "data" / "dummy")

dataset = datasets.create_dataset("test_dataset")
dataset.commit("First commit", here() / "README.md")

In [None]:
dataset.commit("Committing more files", [here() / "README.md", here() / "mkdocs.yaml"])


In [None]:
dataset.checkout()

In [None]:
dataset.files, dataset.current_version_hash