In [None]:
import ipywidgets as widgets
from IPython.display import display, Javascript
import base64
import os
import mimetypes
import json
import shutil
import subprocess
import zipfile
from getpass import getpass

# Authentication

In [None]:
kaggle_username = input("Kaggle Username: ")
kaggle_username = kaggle_username.strip().replace('"', "").replace("'", "")

In [None]:
raw_token = getpass("Token: ")
raw_token: str = raw_token.strip().replace('"', "").replace("'", "")

In [None]:
kaggle_path = "/root/.config/kaggle/kaggle.json"

# Make sure the directory exists
os.makedirs(os.path.dirname(kaggle_path), exist_ok=True)

# Write or overwrite kaggle.json
with open(kaggle_path, "w") as f:
    json.dump({"username": kaggle_username, "key": raw_token}, f)

# Secure permissions
os.chmod(kaggle_path, 0o600)

os.environ["KAGGLE_USERNAME"] = kaggle_username
os.environ["KAGGLE_KEY"] = raw_token


In [None]:
def find_kaggle_json():
    # Common paths
    paths = [
        os.path.expanduser("~/.kaggle/kaggle.json"),
        "/root/.config/kaggle/kaggle.json",
        os.path.join(os.getcwd(), "kaggle.json"),  # maybe in current dir
    ]

    for path in paths:
        if os.path.exists(path):
            return path
    return None


kaggle_json_path = find_kaggle_json()

if kaggle_json_path:
    print("Found kaggle.json at:", kaggle_json_path)
else:
    print("kaggle.json not found in common locations.")


In [None]:
print("Kaggle Username:", os.environ["KAGGLE_USERNAME"])
print("Kaggle Key:", os.environ["KAGGLE_KEY"])

# Functions

In [None]:
import ipywidgets as widgets
from IPython.display import display, Javascript
import base64
import os
import mimetypes

In [None]:
class FileDownloader(widgets.VBox):
    """
    Download files from the kernel to the local machine via an ipywidgets button.

    **Parameters**

    file_path : str
        Path to the file to download from the kernel.

    **Usage**

    FileDownloader("data/my_plot.png")

    **Notes**

    - Intended for small files only.
    - Large files are not recommended due to browser and kernel limitations.
    """

    def __init__(self, file_path, **kwargs):
        self.file_path = file_path

        # Validate file exists
        if not self.file_path or not os.path.exists(self.file_path):
            # File not found - disable button
            button_desc = "File Not Found"
            self.button = widgets.Button(description=button_desc, disabled=True)
            self.progress = widgets.IntProgress(
                value=0, min=0, max=100, description="Progress:", bar_style="info"
            )
            self.progress.layout.visibility = "hidden"
            super().__init__([self.button, self.progress])
            print(f"File not found: {self.file_path}")
            return

        # File exists - show info and enable button
        file_size = os.path.getsize(self.file_path)
        file_name = os.path.basename(self.file_path)
        print(f"File: {file_name} (Size: {file_size / (1024 * 1024):,.2f} Mb)")

        # Create button
        button_desc = kwargs.get("description", "Download File")
        self.button = widgets.Button(description=button_desc)
        self.button.on_click(self._handle_download)

        # Create progress bar (hidden initially)
        self.progress = widgets.IntProgress(
            value=0, min=0, max=100, description="Progress:", bar_style="info"
        )
        self.progress.layout.visibility = "hidden"

        # Create VBox with button and progress
        super().__init__([self.button, self.progress])

    def _handle_download(self, b):
        """
        Reads the file, encodes it, and sends it to the browser.
        """
        try:
            # 2. Update button text to show activity
            original_desc = self.button.description
            self.button.description = "Downloading..."
            self.button.disabled = True
            self.progress.layout.visibility = "visible"
            self.progress.value = 0

            # 3. Guess MIME type
            mime_type, _ = mimetypes.guess_type(self.file_path)
            if mime_type is None:
                mime_type = "application/octet-stream"
            self.progress.value = 10

            # 4. Read file and encode to Base64 with progress
            file_size = os.path.getsize(self.file_path)
            chunk_size = 1024 * 1024  # 1MB chunks
            file_data = bytearray()

            with open(self.file_path, "rb") as f:
                bytes_read = 0
                while True:
                    chunk = f.read(chunk_size)
                    if not chunk:
                        break
                    file_data.extend(chunk)
                    bytes_read += len(chunk)
                    # Update progress 10-60%
                    self.progress.value = 10 + int((bytes_read / file_size) * 50)

            # 5. Encode to base64
            self.progress.value = 70
            b64_data = base64.b64encode(bytes(file_data)).decode()
            self.progress.value = 90

            filename = os.path.basename(self.file_path)

            # 6. Trigger Download using JS
            js_payload = f"""
            var a = document.createElement('a');
            a.href = 'data:{mime_type};base64,{b64_data}';
            a.download = '{filename}';
            document.body.appendChild(a);
            a.click();
            document.body.removeChild(a);
            """

            display(Javascript(js_payload))

            # 7. Restore button state
            self.progress.value = 100
            self.progress.bar_style = "success"
            self.button.description = original_desc
            self.button.disabled = False

        except Exception as e:
            self.button.description = "Failed"
            self.progress.bar_style = "danger"
            print(f"Download failed: {e}")
            self.button.disabled = False


**File exist**

In [None]:
def create_dummy_text_file(path="dummy.txt", content="This is a dummy text file."):
    with open(path, "w", encoding="utf-8") as f:
        f.write(content)
    return path


In [None]:
create_dummy_text_file()

In [None]:
FileDownloader("dummy.txt")

**File does no exist**

In [None]:
FileDownloader("no.txt")

In [None]:
def upload_to_kaggle(
    file_path: str,
    dataset_slug: str,
    dataset_title: str,
    upload_folder: str = "/content/data/upload_folder",
    subtitle: str = None,
    license_name: str = "CC0-1.0",
    public: bool = True,
    quiet: bool = False,
):
    """
    Upload a file to Kaggle as a dataset.

    Args:
        file_path: Path to the file to upload
        dataset_slug: Kaggle dataset slug (no spaces or underscores)
        dataset_title: Display title for the dataset
        upload_folder: Temporary folder for upload preparation
        subtitle: Optional subtitle for the dataset
        license_name: License type (default: CC0-1.0)
        public: Make dataset public (default: True)
        quiet: Suppress output (default: False)

    Returns:
        bool: True if upload successful, False otherwise
    """

    # Check for Kaggle authentication
    kaggle_json_path = "/root/.kaggle/kaggle.json"
    has_kaggle_json = os.path.exists(kaggle_json_path)
    has_env_vars = "KAGGLE_USERNAME" in os.environ and "KAGGLE_KEY" in os.environ

    if not has_kaggle_json and not has_env_vars:
        print("ERROR: Kaggle authentication not found.")
        print("Please set up authentication using one of these methods:")
        print("1. Create kaggle.json at /root/.kaggle/kaggle.json")
        print("2. Set KAGGLE_USERNAME and KAGGLE_KEY environment variables")
        return False

    if not quiet:
        if has_kaggle_json:
            print("Using kaggle.json for authentication")
        else:
            print("Using environment variables for authentication")

    # Validate dataset slug
    if " " in dataset_slug or "_" in dataset_slug:
        raise ValueError("Dataset slug cannot contain spaces or underscores")

    # Check if file exists
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")

    # Display file info
    file_size = os.path.getsize(file_path) / (1024 * 1024)
    file_name = os.path.basename(file_path)
    if not quiet:
        print(f"Found file: {file_path}")
        print(f"Size: {file_size:.2f} MB")

    # Create upload folder
    os.makedirs(upload_folder, exist_ok=True)
    if not quiet:
        print(f"Created upload folder: {upload_folder}")

    # Copy file to upload folder
    dest_path = os.path.join(upload_folder, file_name)
    shutil.copy(file_path, dest_path)
    if not quiet:
        print("File copied to upload folder")

    # Read Kaggle username from kaggle.json or environment variables
    if has_kaggle_json:
        with open(kaggle_json_path, "r") as f:
            kaggle_data = json.load(f)
            kaggle_username = kaggle_data.get("username")
            if not kaggle_username:
                print("ERROR: kaggle.json must contain 'username' field")
                return False
    else:
        kaggle_username = os.environ.get("KAGGLE_USERNAME")
        if not kaggle_username:
            print("ERROR: KAGGLE_USERNAME environment variable not set")
            return False

    # Create metadata
    metadata = {
        "title": dataset_title,
        "id": f"{kaggle_username}/{dataset_slug}",
        "licenses": [{"name": license_name}],
    }

    if subtitle:
        metadata["subtitle"] = subtitle

    metadata_path = os.path.join(upload_folder, "dataset-metadata.json")
    with open(metadata_path, "w") as f:
        json.dump(metadata, f, indent=2)

    if not quiet:
        print("Metadata created")
        print(f"Target ID: {metadata['id']}")

    # Build command
    cmd = ["kaggle", "datasets", "create", "-p", upload_folder]
    if public:
        cmd.append("-u")
    if quiet:
        cmd.append("-q")

    # Upload to Kaggle
    if not quiet:
        print("\nUploading to Kaggle...")

    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            env=os.environ.copy(),
        )

        if result.returncode == 0:
            if not quiet:
                print("Upload successful!")
            print(result.stdout)
            return True
        else:
            print("Upload failed!")
            print(result.stderr)
            return False

    except Exception as e:
        print(f"Error during upload: {e}")
        return False


In [None]:
upload_to_kaggle(
    file_path="dummy.txt",
    dataset_slug="my-dummy-dataset",
    dataset_title="My Dummy Dataset",
)

In [11]:
import os
import ipywidgets as widgets
from IPython.display import display, clear_output


def upload_to_colab(destination_folder: str = "/content/uploads"):
    """
    Upload files from local machine to Google Colab using an interactive widget.

    Args:
        destination_folder: Directory path in Colab where files will be saved (default: '/content/uploads')

    Returns:
        FileUploader: Widget instance that handles the upload

    Usage:
        upload_to_colab()
        upload_to_colab("/content/data")
    """

    # Create destination folder if it doesn't exist
    os.makedirs(destination_folder, exist_ok=True)

    # Create file uploader widget
    file_uploader = widgets.FileUpload(accept="*", multiple=True)

    upload_progress = widgets.IntProgress(
        value=0,
        min=0,
        max=100,
        description="Uploading:",
        bar_style="info",
        orientation="horizontal",
    )
    upload_progress.layout.visibility = "hidden"

    output_area = widgets.Output()

    # Container for all widgets
    container = widgets.VBox([file_uploader, upload_progress, output_area])

    def on_upload_change(change):
        """Handle file upload"""
        if not change["new"]:
            return

        try:
            uploaded_files = change["new"]
            file_count = len(uploaded_files)

            if file_count == 0:
                return

            # Show progress bar
            upload_progress.layout.visibility = "visible"
            upload_progress.value = 0
            upload_progress.bar_style = "info"

            with output_area:
                clear_output(wait=True)

                saved_paths = []

                for idx, (filename, file_info) in enumerate(uploaded_files.items()):
                    content = file_info["content"]

                    # Update progress description
                    upload_progress.description = f"File {idx + 1}/{file_count}:"

                    # Save file to destination folder
                    file_path = os.path.join(destination_folder, filename)

                    with open(file_path, "wb") as f:
                        f.write(content)

                    saved_paths.append(file_path)

                    # Update progress
                    upload_progress.value = int(((idx + 1) / file_count) * 100)

                # Complete
                upload_progress.bar_style = "success"
                upload_progress.description = "Complete:"

                # Display results
                print(
                    f"Successfully uploaded {file_count} file(s) to: {destination_folder}\n"
                )
                print("Uploaded files:")
                for file_path in saved_paths:
                    file_size = os.path.getsize(file_path)
                    print(f"  - {file_path} ({file_size / 1024:.2f} KB)")

            # Clear the file uploader to reset it
            file_uploader.value.clear()
            file_uploader._counter = 0

        except Exception as e:
            upload_progress.bar_style = "danger"
            upload_progress.description = "Failed:"
            with output_area:
                clear_output(wait=True)
                print(f"Error during upload: {e}")

    # Attach event handler
    file_uploader.observe(on_upload_change, names="value")

    return container


In [12]:
upload_to_colab()

VBox(children=(FileUpload(value={}, accept='*', description='Upload', multiple=True), IntProgress(value=0, barâ€¦