Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve download_folder functionality #6

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repos:
args: ["--fix=lf"]

- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 5.12.0
hooks:
- id: isort
args:
Expand Down
84 changes: 65 additions & 19 deletions gh_folder_download/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from os import makedirs
from os.path import exists, join
from pathlib import Path
Expand All @@ -8,7 +9,7 @@
from github.ContentFile import ContentFile
from github.Repository import Repository
from rich import print
from typer import Option, Typer
from typer import Option, Typer, confirm
from wget import download as wget_download

app = Typer()
Expand All @@ -26,6 +27,18 @@ def download_command(
token: Optional[str] = Option(None, help="GitHub token"),
force: bool = Option(False, help="Remove existing output folder if it exists"),
) -> None:
"""
Downloads a repository from a given URL and saves it to the specified output folder.

Args:
url (str): The URL of the repository to download.
output (Path): The folder where the repository will be saved.
token (Optional[str]): The GitHub token to access private repositories.
force (bool): Whether to remove the existing output folder if it already exists.

Returns:
None
"""
org, repo, branch, path = parse_github_url(url)
github = Github(token)
repository = github.get_repo(f"{org}/{repo}")
Expand Down Expand Up @@ -82,30 +95,63 @@ def download_folder(
force: bool,
) -> None:
"""
Download all contents at server_path with commit tag sha in
the repository.
Downloads a folder from a repository.

Args:
repository (Repository): The repository to download from.
sha (str): The SHA of the commit or tree to download.
path (str): The path of the folder to download.
output (Path): The output path where the folder will be saved.
force (bool): If True, existing folder will be removed before downloading.

Returns:
None: This function does not return anything.
"""
fullpath = join(output, path)
existing_items = set(os.listdir(fullpath)) if os.path.exists(fullpath) else set()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from os import listdir, makedirs

...

existing_items = set(listdir(fullpath)) if exists(fullpath) else set()


# Get contents of the folder and assign to extra_items
contents = repository.get_dir_contents(path, ref=sha)
github_items = {content.name for content in contents}
extra_items = existing_items - github_items

if exists(fullpath):
if force:
if extra_items:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if the force is true the removal of the folder should be automatic, if it's false then ask to user if he/she wants to delete the folder and continue or cancel the operation. So:

if force:
    ...
elif extra_items:
    ...
else:
    ...

# Display confirmation prompt
if confirm(
f"Folder contains extra items: {extra_items}. Do you want to continue?"
):
rmtree(fullpath)
else:
print("Operation cancelled.")
return
elif force:
rmtree(fullpath)
else:
print("Output folder already exists")
return

makedirs(fullpath)
contents = repository.get_dir_contents(path, ref=sha)
for content in contents:
print(f"Downloading {content.path}")
fullpath = join(output, content.path)
if content.type == "dir":
makedirs(fullpath)
download_folder(repository, sha, content.path, output, force)
else:
try:
file_content = repository.get_contents(content.path, ref=sha)
if not isinstance(file_content, ContentFile):
raise ValueError("Expected ContentFile")
wget_download(file_content.download_url, fullpath)
print("")
except (GithubException, OSError, ValueError) as exc:
print("Error processing %s: %s", content.path, exc)

# Separate files and directories
files = [content for content in contents if content.type == "file"]
directories = [content for content in contents if content.type == "dir"]

# First, download all files in the current folder
for file_content in files:
print(f"Downloading {file_content.path}")
fullpath = join(output, file_content.path)
try:
file_data = repository.get_contents(file_content.path, ref=sha)
if not isinstance(file_data, ContentFile):
raise ValueError("Expected ContentFile")
wget_download(file_data.download_url, fullpath)
print("")
except (GithubException, OSError, ValueError) as exc:
print(f"Error processing {file_content.path}: {exc}")

# Then, download sub-directories
for directory in directories:
print(f"Downloading directory {directory.path}")
download_folder(repository, sha, directory.path, output, force)
Loading
Loading