<a href="https://colab.research.google.com/github/jleonoras/TorrentToGDrive/blob/main/TorrentToGDrive.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Download Torrent to Google Drive

### Install Dependency

In [None]:
#@markdown <h3>Step #1. Install libtorrent and Initialize Session

try:
    # Upgrade essential packaging tools
    print("🔄 Upgrading packaging tools...")
    !python -m pip install --upgrade pip setuptools wheel
except Exception as e:
    print(f"⚠️ Error upgrading packaging tools: {e}")

try:
    # Install the official and more current 'libtorrent' package
    print("\n🔄 Installing official libtorrent...")
    !python -m pip install libtorrent
except Exception as e:
    print(f"\n⚠️ Error installing libtorrent: {e}")

try:
    # Import the library and initialize the session
    import libtorrent as lt

    # Session settings can be combined into a dictionary
    settings = {
        'listen_interfaces': '0.0.0.0:6881', # Listen on all network interfaces on port 6881
        'user_agent': 'my_awesome_client/1.0',
        'alert_mask': lt.alert.category_t.all_categories
    }

    # Initialize the session with the specified settings
    ses = lt.session(settings)

    print("\n🚀 Libtorrent session started successfully!")
    print(f"Listening on: {ses.listen_port()}")

except Exception as e:
    print(f"\n⚠️ Error initializing libtorrent session: {e}")

### Mount Google Drive

In [None]:
#@markdown <h3>Step #2. Mount Google Drive

from google.colab import drive
import os

# Define mount point
mount_point = "/content/drive"

# Mount Google Drive
try:
    print("🔄 Mounting Google Drive...")
    drive.mount(mount_point)

    # Verify if the mount point exists
    if os.path.ismount(mount_point):
        print("\n✅ Google Drive successfully mounted!")
    else:
        raise Exception("\n❌ Google Drive mount failed. The mount point does not exist.")

except Exception as e:
    print(f"\n⚠️ Error mounting Google Drive: {e}")

## Add Torrent Source



In [None]:
#@markdown <h3>Step #3. Add From Magnet Link or Torrent File

import os
import libtorrent as lt
from google.colab import files
from urllib.parse import urlparse, parse_qs

# Initialize list of downloads
downloads = []

# Function to determine file type from torrent
def get_file_icon_from_torrent(download):
    try:
        if not download.has_metadata():
            return "⏳"  # Metadata not yet available
        ti = download.get_torrent_info()
        if ti.num_files() > 1:
            return "📂"  # Folder (multi-file torrent)
        filename = ti.files()[0].path.lower()
        if filename.endswith(('.mp4', '.mkv', '.avi', '.mov')):
            return "🎬"
        if filename.endswith(('.mp3', '.flac', '.wav')):
            return "🎵"
        if filename.endswith(('.pdf', '.docx', '.epub')):
            return "📚"
        if filename.endswith(('.txt', '.csv')):
            return "📄"
        return "📦"
    except Exception as e:
        print(f"⚠️ Error retrieving torrent info: {e}")
        return "📦"

# Function to select download save path
def select_save_path():
    print("\n📂 Select a folder for saving torrents:")
    print("1. Movies\n2. TV Shows\n3. Music\n4. Books\n5. Documents\n6. Others (custom)")

    choice = input("🔢 Enter the number of your choice (1-6): ")
    save_paths = {
        '1': 'Movies',
        '2': 'TV Shows',
        '3': 'Music',
        '4': 'Books',
        '5': 'Documents'
    }

    if choice == '6':
        custom_folder = input("📁 Enter a custom folder name: ")
        final_save_path = f'/content/drive/MyDrive/Torrents/{custom_folder}'
    elif choice in save_paths:
        selected_path = save_paths[choice]
        final_save_path = f'/content/drive/MyDrive/Torrents/{selected_path}'
    else:
        print("❌ Invalid selection. Please try again.")
        return select_save_path()

    if not os.path.exists(final_save_path):
        print(f"📁 Creating folder: {final_save_path}")
        os.makedirs(final_save_path, exist_ok=True)

    print(f"✅ Save path selected: {final_save_path}")
    return final_save_path

# Add torrents from magnet links or .torrent files
def add_torrent_to_download():
    while True:
        source = input("\n🔗 Enter a magnet link, type 'file' to upload a .torrent file, or 'exit' to quit: ")

        if source.lower() == 'exit':
            print("\n👋 Exiting input mode.")
            break

        # Handling magnet links
        if source.startswith("magnet:?xt="):
            try:
                parsed_url = urlparse(source)
                query_params = parse_qs(parsed_url.query)
                magnet_name = query_params.get("dn", ["Unknown"])[0]
                print(f"\n🔗 Adding magnet link: {magnet_name}")

                save_path = select_save_path()

                params = lt.parse_magnet_uri(source)
                params.save_path = save_path
                h = ses.add_torrent(params)
                downloads.append(h)

                icon = get_file_icon_from_torrent(h)
                print(f"\n🆕 Queuing download: {icon} {magnet_name}")

            except Exception as e:
                print(f"⚠️ Invalid magnet link: {e}")

        # Handling .torrent files
        elif source.lower() == 'file':
            print("\n📁 Please upload a .torrent file...")
            uploaded = files.upload()

            if uploaded:
                filename = next(iter(uploaded))
                file_content = uploaded[filename]
                try:
                    info = lt.torrent_info(file_content)
                    save_path = select_save_path()
                    h = ses.add_torrent({'ti': info, 'save_path': save_path})
                    downloads.append(h)

                    icon = get_file_icon_from_torrent(h)
                    print(f"\n🆕 Queuing download: {icon} {h.status().name}")

                except Exception as e:
                    print(f"⚠️ Error loading torrent file: {e}")
            else:
                print("⚠️ No file uploaded. Please try again.")

        # Invalid input
        else:
            print("❌ Invalid input. Please enter a magnet link, type 'file', or 'exit'.")

# Run the function
add_torrent_to_download()

## Download File

In [None]:
#@markdown <h3>Step #4. Download File

import time
from IPython.display import clear_output

# Lists to track ongoing and completed downloads
ongoing_downloads = downloads
completed_downloads = set()

# Torrent state strings
state_str = [
    'queued',
    'checking',
    'downloading metadata',
    'downloading',
    'finished',
    'seeding',
    'allocating',
    'checking fastresume'
]

# Monitor downloads
while True:
    clear_output(wait=True)
    next_iteration_downloads = []
    completed_display_list = []
    ongoing_display_list = []

    for download in ongoing_downloads:
        s = download.status()

        state = state_str[s.state] if s.state < len(state_str) else "unknown"
        icon = get_file_icon_from_torrent(download)

        # ETA calculation
        eta_str = "ETA: ∞"
        if s.download_rate > 0 and s.total_wanted > 0:
            eta_seconds = (s.total_wanted - s.total_done) / s.download_rate
            hours, remainder = divmod(int(eta_seconds), 3600)
            minutes, seconds = divmod(remainder, 60)

            if hours > 0:
                eta_str = f"ETA: {hours}h {minutes}m {seconds}s"
            elif minutes > 0:
                eta_str = f"ETA: {minutes}m {seconds}s"
            else:
                eta_str = f"ETA: {seconds}s"

        if s.is_seeding:
            completed_downloads.add(download)
        else:
            next_iteration_downloads.append(download)

    # Now build display lines separately
    for download in completed_downloads:
        s = download.status()
        icon = get_file_icon_from_torrent(download)
        completed_display_list.append(
            f'{icon} {s.name} ✅ {s.progress * 100:.2f}% complete! '
            f'🌀 seeding ⬇️ 0.0 kB/s ⬆️ {s.upload_rate / 1000:.1f} kB/s 👥 peers: {s.num_peers}'
        )

    for download in next_iteration_downloads:
        s = download.status()
        icon = get_file_icon_from_torrent(download)
        state = state_str[s.state] if s.state < len(state_str) else "unknown"
        eta_str = "ETA: ∞"
        if s.download_rate > 0 and s.total_wanted > 0:
            eta_seconds = (s.total_wanted - s.total_done) / s.download_rate
            hours, remainder = divmod(int(eta_seconds), 3600)
            minutes, seconds = divmod(remainder, 60)
            if hours > 0:
                eta_str = f"ETA: {hours}h {minutes}m {seconds}s"
            elif minutes > 0:
                eta_str = f"ETA: {minutes}m {seconds}s"
            else:
                eta_str = f"ETA: {seconds}s"

        ongoing_display_list.append(
            f'{icon} {s.name} ⬇️ {s.progress * 100:.2f}% 🌀 {state} ⏳ {eta_str} '
            f'⬇️ {s.download_rate / 1000:.1f} kB/s ⬆️ {s.upload_rate / 1000:.1f} kB/s 👥 peers: {s.num_peers}'
        )

        # Sort next ongoing downloads by progress
        next_iteration_downloads.sort(key=lambda x: x.status().progress, reverse=True)

    # Display completed first, then ongoing
    for line in completed_display_list + ongoing_display_list:
        print(line)

    ongoing_downloads = next_iteration_downloads

    if not ongoing_downloads and completed_downloads:
        print("\n🎉 All downloads complete!")
        break

    time.sleep(1)