<a href="https://colab.research.google.com/github/wongpinter/google-client/blob/master/Transfer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ============================= FORM ============================= #
#@markdown ← Install rclone

build_version = 'stable'  # @param ["stable", "beta"]

# ================================================================ #

from IPython.display import clear_output, display, HTML
import subprocess

def install_rclone(build_version: str):
    """
    Install rclone based on the specified build version.
    """
    clear_output()
    install_script_url = "https://rclone.org/install.sh"

    try:
        if build_version == 'stable':
            subprocess.run(['curl', install_script_url, '|', 'sudo', 'bash'], check=True)
        else:
            subprocess.run(['curl', install_script_url, '|', 'sudo', 'bash', '-s', 'beta'], check=True)

        subprocess.run(['sudo', 'apt-get', '-y', 'install', 'fuse3'], check=True)
    except subprocess.CalledProcessError as error:
        display(HTML(f'[ERROR]: ❌ {error}'))
        return

    configure_rclone()

def configure_rclone():
    """
    Configure rclone by copying the configuration file.
    """
    try:
        subprocess.run(['mkdir', '-p', '/root/.config/rclone'], check=True)
        subprocess.run(['cp', '/content/drive/MyDrive/rclone.conf', '/root/.config/rclone'], check=True)
    except subprocess.CalledProcessError as error:
        display(HTML(f'[ERROR]: ❌ {error}'))
    else:
        clear_output()
        display(HTML(f'[NOTICE]: ✅ Successfully installed and configured rclone'))

if __name__ == '__main__':
    install_rclone(build_version)


In [None]:
# ============================= FORM ============================= #
# @markdown ← [Start] rclone
Mode = "Copy" # @param ["Copy", "Move", "Sync", "Checker", "Deduplicate", "Remove Empty Directories", "Empty Trash"]
Source = "/content/drive/MyDrive/GDFlix" # @param {type:"string"}
Destination = "mr_drive:" # @param {type:"string"}

#@markdown ---
#@markdown ⚙️ Global Configuration ⚙️
Extra_Arguments = "" # @param {type:"string"}
Compare = "Size & Mod-Time" # @param ["Size & Mod-Time", "Size & Checksum", "Only Mod-Time", "Only Size", "Only Checksum"]
Checkers = 5 # @param {type:"slider", min:1, max:40, step:1}
Transfers = 5 # @param {type:"slider", min:1, max:20, step:1}
Dry_Run = False # @ param {type:"boolean"}
Do_not_cross_filesystem_boundaries = False
Do_not_update_modtime_if_files_are_identical = False # @ param {type:"boolean"}
Google_Drive_optimization = False # @ param {type:"boolean"}
Large_amount_of_files_optimization = False # @ param {type:"boolean"}
Simple_Ouput = True # @ param {type:"boolean"}
Skip_all_files_that_exist = False # @ param {type:"boolean"}
Skip_files_that_are_newer_on_the_destination = False # @ param {type:"boolean"}
Output_Log_File = "OFF" # @ param ["OFF", "NOTICE", "INFO", "ERROR", "DEBUG"]

#@markdown ↪️ Sync Configuration ↩️
Sync_Mode = "Delete during transfer" # @param ["Delete during transfer", "Delete before transfering", "Delete after transfering"]
Track_Renames = False # @ param {type:"boolean"}

#@markdown 💞 Deduplicate Configuration 💞
Deduplicate_Mode = "Interactive" # @ param ["Interactive", "Skip", "First", "Newest", "Oldest", "Largest", "Rename"]
Deduplicate_Use_Trash = True # @ param {type:"boolean"}

#@markdown ---
automatically_clear_cell_output = False  # @ param {type: "boolean"}
# ================================================================ #

import os
import subprocess
from IPython.display import HTML, clear_output
import zipfile
from datetime import datetime

def set_environment_variables():
    os.environ["bufferC"] = "--buffer-size 96M"
    os.environ["sourceC"] = Source
    os.environ["destinationC"] = Destination
    os.environ["transfersC"] = f"--transfers {Transfers}"
    os.environ["checkersC"] = f"--checkers {Checkers}"
    os.environ["compareC"] = get_compare_argument(Compare)
    os.environ["skipnewC"] = "-u" if Skip_files_that_are_newer_on_the_destination else ""
    os.environ["skipexistC"] = "--ignore-existing" if Skip_all_files_that_exist else ""
    os.environ["nocrossfilesystemC"] = "--one-file-system" if Do_not_cross_filesystem_boundaries else ""
    os.environ["noupdatemodtimeC"] = "--no-update-modtime" if Do_not_update_modtime_if_files_are_identical else ""
    os.environ["filesoptimizeC"] = "--fast-list" if Large_amount_of_files_optimization else ""
    os.environ["driveoptimizeC"] = "--drive-chunk-size 32M --drive-acknowledge-abuse --drive-keep-revision-forever" if Google_Drive_optimization else ""
    os.environ["dryrunC"] = "-n" if Dry_Run else ""
    os.environ["statsC"] = get_stats_argument(Output_Log_File, Simple_Ouput)
    os.environ["loglevelC"] = get_log_level_argument(Output_Log_File)
    os.environ["extraC"] = Extra_Arguments
    os.environ["syncmodeC"] = get_sync_mode_argument(Sync_Mode)
    os.environ["trackrenamesC"] = "--track-renames" if Track_Renames else ""
    os.environ["deduplicateC"] = get_deduplicate_mode_argument(Deduplicate_Mode)
    os.environ["deduplicatetrashC"] = "" if Deduplicate_Use_Trash else "--drive-use-trash=false"

def get_compare_argument(compare):
    arguments = {
        "Size & Checksum": "-c",
        "Only Mod-Time": "--ignore-size",
        "Only Size": "--size-only",
        "Only Checksum": "-c --ignore-size"
    }
    return arguments.get(compare, "")

def get_stats_argument(log_file, simple_output):
    if log_file != "OFF":
        return "--log-file=/root/.rclone_log/rclone_log.txt"
    return "-v --stats-one-line --stats=5s" if simple_output else "-v --stats=5s"

def get_log_level_argument(log_file):
    log_levels = {
        "INFO": "--log-level INFO",
        "ERROR": "--log-level ERROR",
        "DEBUG": "--log-level DEBUG"
    }
    return log_levels.get(log_file, "")

def get_sync_mode_argument(sync_mode):
    sync_modes = {
        "Delete during transfer": "--delete-during",
        "Delete before transfering": "--delete-before",
        "Delete after transfering": "--delete-after"
    }
    return sync_modes.get(sync_mode, "")

def get_deduplicate_mode_argument(deduplicate_mode):
    deduplicate_modes = {
        "Interactive": "interactive",
        "Skip": "skip",
        "First": "first",
        "Newest": "newest",
        "Oldest": "oldest",
        "Largest": "largest",
        "Rename": "rename"
    }
    return deduplicate_modes.get(deduplicate_mode, "")

def execute_rclone(mode):
    command = {
        "Copy": f"rclone --config=/root/.config/rclone/rclone.conf copy $sourceC $destinationC $checkersC $loglevelC $skipnewC $nocrossfilesystemC $bufferC $driveoptimizeC $extraC",
        "Move": f"rclone --config=/root/.config/rclone/rclone.conf move $sourceC $destinationC $checkersC $loglevelC --delete-empty-src-dirs $skipnewC $nocrossfilesystemC $bufferC $driveoptimizeC $extraC",
        "Sync": f"rclone --config=/root/.config/rclone/rclone.conf sync $sourceC $destinationC $checkersC $loglevelC $trackrenamesC $skipnewC $nocrossfilesystemC $bufferC $driveoptimizeC $extraC",
        "Checker": f"rclone --config=/root/.config/rclone/rclone.conf check $sourceC $destinationC $statsC $compareC $skipexistC $noupdatemodtimeC $filesoptimizeC $dryrunC $extraC",
        "Deduplicate": f"rclone --config=/root/.config/rclone/rclone.conf dedupe $sourceC $statsC $deduplicateC $compareC $skipexistC $noupdatemodtimeC $filesoptimizeC $dryrunC $extraC",
        "Remove Empty Directories": f"rclone --config=/root/.config/rclone/rclone.conf rmdirs $sourceC $loglevelC $extraC",
        "Empty Trash": f"rclone --config=/root/.config/rclone/rclone.conf cleanup $sourceC $loglevelC $extraC"
    }
    subprocess.run(command.get(mode, "").split(), check=True)

def save_log_settings():
    timestamp = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
    log_dir = "/root/.rclone_log/"
    log_file_path = f"{log_dir}rclone_log_{timestamp}.txt"
    settings_file_path = f"{log_dir}{Mode}_settings.txt"

    if not os.path.exists(log_dir):
        os.makedirs(log_dir, mode=0o666, exist_ok=True)

    with open(settings_file_path, "w") as f:
        f.write(f"""Mode: {Mode}
Compare: {Compare}
Source: "{Source}"
Destination: "{Destination}"
Transfers: {Transfers}
Checkers: {Checkers}
Skip files that are newer on the destination: {Skip_files_that_are_newer_on_the_destination}
Skip all files that exist: {Skip_all_files_that_exist}
Do not cross filesystem boundaries: {Do_not_cross_filesystem_boundaries}
Do not update modtime if files are identical: {Do_not_update_modtime_if_files_are_identical}
Dry-Run: {Dry_Run}
Output Log Level: {Output_Log_File}
Extra Arguments: "{Extra_Arguments}"
Sync Mode: {Sync_Mode}
Track Renames: {Track_Renames}
Deduplicate Mode: {Deduplicate_Mode}
Deduplicate Use Trash: {Deduplicate_Use_Trash}""")

    compress_log_files(log_dir, log_file_path)

def compress_log_files(log_dir, log_file_path):
    zip_path = "/root/rclone_log.zip"
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as log_zip:
        for root, _, files in os.walk(log_dir):
            for file in files:
                file_path = os.path.join(root, file)
                log_zip.write(file_path, os.path.relpath(file_path, log_dir))
    if os.path.exists(log_dir):
        subprocess.run(['rm', '-rf', log_dir], check=True)
        os.makedirs(log_dir, mode=0o666, exist_ok=True)

    try:
        files.download(zip_path)
        subprocess.run(['rm', '-f', zip_path], check=True)
        display(HTML("Sending log to your browser..."))
    except:
        display(HTML("You can use file explorer to download the log file."))

def main():
    clear_output()
    set_environment_variables()

    if Output_Log_File != "OFF" and Mode != "Config":
        display(HTML("Logging enabled, rclone will no longer display any output on the terminal. Please wait until the cell stops by itself."))

    try:
        execute_rclone(Mode)
    except subprocess.CalledProcessError as e:
        display(HTML(f"[ERROR]: ❌ {e}"))
        return

    if Output_Log_File != "OFF" and Mode != "Config":
        save_log_settings()

    if Mode != "Config":
        display(HTML("✅ Operation has been successfully completed."))

    if automatically_clear_cell_output:
        clear_output()

if __name__ == '__main__':
    main()


In [None]:
# ============================= FORM ============================= #
# @markdown #### ⬅️ Execute rClone

Mode = "Sync"  # @param ["Move", "Copy", "Sync", "Verify", "Dedupe", "Clean Empty Dirs", "Empty Trash"]
Source = "mr_drive:sbToDrive"  # @param {type:"string"}
Destination = "tutor_drive:sbackup"  # @param {type:"string"}
Extra_Arguments = ""  # @param {type:"string"}
COPY_SHARED_FILES = False  # @param {type:"boolean"}
Compare = "Size & Checksum"
TRANSFERS, CHECKERS = 20, 20
THROTTLE_TPS = True
BRIDGE_TRANSFER = True  # @param {type:"boolean"}
FAST_LIST = False  # @param {type:"boolean"}
OPTIMIZE_GDRIVE = True
SIMPLE_LOG = True
RECORD_LOGFILE = False  # @param {type:"boolean"}
SKIP_NEWER_FILE = False
SKIP_EXISTED = False
SKIP_UPDATE_MODTIME = False
ONE_FILE_SYSTEM = False
LOG_LEVEL = "DEBUG"
SYNC_MODE = "Delete after transfering"
SYNC_TRACK_RENAME = True
DEDUPE_MODE = "Largest"
USE_TRASH = True
DRY_RUN = False  # @param {type:"boolean"}
# ================================================================ #

from os import path as _p

if not _p.exists("/root/.ipython/rlab_utils.py"):
    from shlex import split as _spl
    from subprocess import run

    shellCmd = "wget -qq https://biplobsd.github.io/RLabClone/res/rlab_utils.py -O /root/.ipython/rlab_utils.py"
    run(_spl(shellCmd))

from datetime import datetime as _dt
from rlab_utils import displayOutput, checkAvailable, runSh, prepareSession, PATH_RClone_Config, accessSettingFile, memGiB

def populate_action_arg():
    action_args = {
        "Copy": "copy",
        "Sync": "sync",
        "Verify": "check",
        "Dedupe": "dedupe largest",
        "Clean Empty Dirs": "rmdirs",
        "Empty Trash": "delete"
    }
    return action_args.get(Mode, "move")

def populate_compare_arg():
    compare_args = {
        "Mod-Time": "--ignore-size",
        "Size": "--size-only",
        "Checksum": "-c --ignore-size",
        "Size & Checksum": "-c"
    }
    return compare_args.get(Compare, "-c")

def populate_optimize_gdrive_arg():
    return (
        "--buffer-size 256M --drive-chunk-size 256M --drive-upload-cutoff 256M --drive-acknowledge-abuse --drive-keep-revision-forever"
        if OPTIMIZE_GDRIVE
        else "--buffer-size 128M"
    )

def populate_gdrive_copy_arg():
    global TRANSFERS, CHECKERS
    if BRIDGE_TRANSFER and memGiB() < 13:
        TRANSFERS, CHECKERS = 10, 80
    return "--disable copy" if BRIDGE_TRANSFER else "--drive-server-side-across-configs"

def populate_stats_arg():
    stats_args = "--stats-one-line --stats=5s" if SIMPLE_LOG else "--stats=5s -P"
    log_level_args = {
        "INFO": "--log-level INFO",
        "ERROR": "--log-level ERROR",
        "DEBUG": "--log-level DEBUG"
    }
    stats_args += f" -v{'' if SIMPLE_LOG else 'v'}" if LOG_LEVEL != "OFF" else log_level_args.get(LOG_LEVEL, "")
    return stats_args

def populate_sync_mode_arg():
    sync_mode_args = {
        "Delete before transfering": "--delete-before",
        "Delete after transfering": "--delete-after",
        "Delete during transfer": "--delete-during"
    }
    sync_mode_arg = sync_mode_args.get(SYNC_MODE, "")
    if SYNC_TRACK_RENAME:
        sync_mode_arg += " --track-renames"
    return sync_mode_arg

def populate_dedupe_mode_arg():
    dedupe_mode_args = {
        "Interactive": "--dedupe-mode interactive",
        "Skip": "--dedupe-mode skip",
        "First": "--dedupe-mode first",
        "Newest": "--dedupe-mode newest",
        "Oldest": "--dedupe-mode oldest",
        "Rename": "--dedupe-mode rename",
        "Largest": "--dedupe-mode largest"
    }
    return dedupe_mode_args.get(DEDUPE_MODE, "--dedupe-mode largest")

def generate_cmd():
    shared_files_args = "--drive-shared-with-me --files-from /content/upload.txt --no-traverse" if COPY_SHARED_FILES else ""
    log_file_arg = "--log-file /content/rclone_log.txt -vv -P" if RECORD_LOGFILE else ""

    args = [
        "rclone",
        f"--config {PATH_RClone_Config}",
        '--user-agent "Mozilla"',
        populate_action_arg(),
        f'"{Source}"',
        f'"{Destination}"' if Mode in ("Move", "Copy", "Sync") else "",
        f"--transfers {TRANSFERS}",
        f"--checkers {CHECKERS}",
    ]

    if Mode == "Verify":
        args.append("--one-way")
    elif Mode == "Empty Trash":
        args.append("--drive-trashed-only --drive-use-trash=false")
    else:
        args.extend([
            populate_gdrive_copy_arg(),
            populate_sync_mode_arg(),
            populate_compare_arg(),
            populate_optimize_gdrive_arg(),
            "-u" if SKIP_NEWER_FILE else "",
            "--ignore-existing" if SKIP_EXISTED else "",
            "--no-update-modtime" if SKIP_UPDATE_MODTIME else "",
            "--one-file-system" if ONE_FILE_SYSTEM else "",
            "--tpslimit 95 --tpslimit-burst 40" if THROTTLE_TPS else "",
            "--fast-list" if FAST_LIST else "",
            "--delete-empty-src-dirs" if Mode == "Move" else "",
        ])

    args.extend([
        "-n" if DRY_RUN else "",
        populate_stats_arg(),
        shared_files_args,
        Extra_Arguments,
        log_file_arg
    ])

    return args

def execute_rclone():
    prepareSession()
    if not Source.strip():
        displayOutput("❌ The Source field is empty.")
        return

    if checkAvailable("/content/rclone_log.txt"):
        if not checkAvailable("/content/logfiles"):
            runSh("mkdir -p -m 666 /content/logfiles")
        job = accessSettingFile("job.txt")
        runSh(f'mv /content/rclone_log.txt /content/logfiles/{job["title"]}_{job["status"]}_logfile.txt')

    onGoingJob = {
        "title": f'{Mode}_{Source}_{Destination}_{_dt.now().strftime("%a-%H-%M-%S")}',
        "status": "ongoing",
    }
    accessSettingFile("job.txt", onGoingJob)

    cmd = " ".join(generate_cmd())
    runSh(cmd, output=True)
    displayOutput(Mode, "success")

    onGoingJob["status"] = "finished"
    accessSettingFile("job.txt", onGoingJob)

execute_rclone()
