<a href="https://colab.research.google.com/github/rajdeep-3305/gdrivemirrorbot/blob/main/bot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**A mirror bot that runs on Google Colab that transfer the files to Google Drive**

Features:
- Linux User-Agent emulation to avoid 403 errors
- SourceForge URL parsing and direct download
- Progress tracking with speed and ETA
- Automatic Google Drive upload
- Filename preservation

How to use?

0. Scroll till you found the # Template
1. Paste your download link file
2. Go to "Runtime" then select Run All (In mobile find it on 3 stripe on left top)
3. It will prompt "Run Anyway" and permit access to Google Drive (Make sure checks all the permission so you won't encounter "Google Drive mount error". Then Wait until finish
4. Check your GDrive (Normally take some minutes for the file to appear)
5. Download your file from GDrive

Tested on:
- SourceForge
- Github
- Drive 🗿
- MediaFire (Need the direct file link)

Note: The other one is not tested, please paste the **Direct File Link** (If you don't know then download normally then copy the downloaded link from the download page the link and paste it here)


In [None]:
# Install required packages
!pip install requests beautifulsoup4 pyrogram tgcrypto

In [None]:
import os
import requests
import asyncio
import time
from urllib.parse import urlparse, unquote
from pathlib import Path
import zipfile
import tarfile
from google.colab import drive, files
from IPython.display import display, HTML
import shutil
import re
import tempfile
import six
import tqdm

In [None]:
class DownloaderBot:
    def __init__(self):
        self.session = requests.Session()
        # Emulate Linux enviroment (Avoid 403)
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Sec-Fetch-Dest': 'document',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-Site': 'none'
        })

        self.download_path = "/content/downloads/"
        self.setup_environment()

    def setup_environment(self):
        """Setup download directory and mount Google Drive"""
        os.makedirs(self.download_path, exist_ok=True)

        # Mount User Google Drive
        try:
            drive.mount('/content/drive')
            print("✅ Google Drive mounted successfully")

            # Create Drive Folder if Doesn't exist
            self.drive_path = "/content/drive/MyDrive/KanagawaMirrorBot/"
            os.makedirs(self.drive_path, exist_ok=True)
        except Exception as e:
            print(f"⚠️ Failed to mount Google Drive: {e}")
            self.drive_path = None

    # Handle sourceforge anomaly dawg (Use modified SF Downloader)
    def parse_sourceforge_url(self, url):
        """Parse SourceForge URL to get direct download link"""
        if 'sourceforge.net' not in url:
            return url

        try:
            # Extract project and filepath from SourceForge URL
            project_match = re.search(r'projects/([^/]*)/files', url)
            filepath_match = re.search(r'files/(.*?)(?:/download|$)', url)

            if project_match and filepath_match:
                project = project_match.group(1)
                filepath = filepath_match.group(1)

                # Construct direct download URL (Master Mirror)
                direct_url = f"https://master.dl.sourceforge.net/project/{project}/{filepath}?viasf=1"
                print(f"📡 Converted SourceForge URL: {direct_url}")
                return direct_url
            else:
                print("⚠️ Could not parse SourceForge URL, using original")
                return url
        except Exception as e:
            print(f"⚠️ Error parsing SourceForge URL: {e}")
            return url

    def get_filename_from_url(self, url, response=None):
        """Extract filename from URL or response headers"""
        filename = None

        # Try to get filename from Content-Disposition header
        if response and 'content-disposition' in response.headers:
            cd = response.headers['content-disposition']
            filename_match = re.search(r'filename[*]?=([^;]+)', cd)
            if filename_match:
                filename = filename_match.group(1).strip('"\'')
                filename = unquote(filename)  # URL decode

        # Fallback to URL parsing
        if not filename:
            parsed_url = urlparse(url)
            filename = os.path.basename(parsed_url.path)

            # Remove query parameters like ?viasf=1
            if '?' in filename:
                filename = filename.split('?')[0]

        # Ensure we have a filename
        if not filename or filename == '/':
            filename = 'downloaded_file'

        return filename

    def format_size(self, bytes_size):
        """Convert bytes to human readable format"""
        for unit in ['B', 'KB', 'MB', 'GB']:
            if bytes_size < 1024.0:
                return f"{bytes_size:.1f} {unit}"
            bytes_size /= 1024.0
        return f"{bytes_size:.1f} TB"

    def format_time(self, seconds):
        """Convert seconds to human readable format"""
        if seconds < 60:
            return f"{int(seconds)}s"
        elif seconds < 3600:
            return f"{int(seconds//60)}m {int(seconds%60)}s"
        else:
            hours = int(seconds // 3600)
            minutes = int((seconds % 3600) // 60)
            return f"{hours}h {minutes}m"

    def download_with_progress(self, url, filename):
        """Download file with progress tracking"""
        print(f"🚀 Starting download: {filename}")

        try:
            # Get file info
            response = self.session.head(url, allow_redirects=True)
            total_size = int(response.headers.get('content-length', 0))

            if total_size == 0:
                print("⚠️ Could not determine file size")
            else:
                print(f"📦 File size: {self.format_size(total_size)}")

            # Start download
            response = self.session.get(url, stream=True, allow_redirects=True)
            response.raise_for_status()

            filepath = os.path.join(self.download_path, filename)

            downloaded = 0
            start_time = time.time()
            last_update = start_time

            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
                        downloaded += len(chunk)

                        current_time = time.time()
                        if current_time - last_update >= 1.0:  # Update every second
                            elapsed = current_time - start_time
                            speed = downloaded / elapsed if elapsed > 0 else 0

                            if total_size > 0:
                                progress = (downloaded / total_size) * 100
                                eta = (total_size - downloaded) / speed if speed > 0 else 0

                                print(f"\r📊 Progress: {progress:.1f}% | "
                                      f"Speed: {self.format_size(speed)}/s | "
                                      f"ETA: {self.format_time(eta)} | "
                                      f"Downloaded: {self.format_size(downloaded)}", end='')
                            else:
                                print(f"\r📊 Downloaded: {self.format_size(downloaded)} | "
                                      f"Speed: {self.format_size(speed)}/s", end='')

                            last_update = current_time

            print(f"\n✅ Download completed: {filename}")
            return filepath

        except requests.exceptions.RequestException as e:
            print(f"❌ Download failed: {e}")
            return None
        except Exception as e:
            print(f"❌ Unexpected error: {e}")
            return None

    def upload_to_drive(self, filepath):
        """Upload file to Google Drive"""
        if not self.drive_path:
            print("⚠️ Google Drive not available, skipping upload")
            return False

        try:
            filename = os.path.basename(filepath)
            drive_filepath = os.path.join(self.drive_path, filename)

            print(f"☁️ Uploading to Google Drive: {filename}")

            # Copy file to Google Drive
            with open(filepath, 'rb') as src, open(drive_filepath, 'wb') as dst:
                total_size = os.path.getsize(filepath)
                uploaded = 0
                start_time = time.time()

                while True:
                    chunk = src.read(8192)
                    if not chunk:
                        break
                    dst.write(chunk)
                    uploaded += len(chunk)

                    # Show upload progress
                    if total_size > 0:
                        progress = (uploaded / total_size) * 100
                        elapsed = time.time() - start_time
                        speed = uploaded / elapsed if elapsed > 0 else 0

                        print(f"\r☁️ Upload: {progress:.1f}% | "
                              f"Speed: {self.format_size(speed)}/s", end='')

            print(f"\n✅ Upload completed: {filename}")
            return True

        except Exception as e:
            print(f"❌ Upload failed: {e}")
            return False

    def cleanup_local_file(self, filepath):
        """Remove local file after upload"""
        try:
            os.remove(filepath)
            print(f"🗑️ Local file cleaned up: {os.path.basename(filepath)}")
        except Exception as e:
            print(f"⚠️ Could not clean up local file: {e}")

    def download_and_upload(self, url, keep_local=False):
        """Main function to download and upload to Google Drive"""
        print(f"🎯 Processing URL: {url}")

        # Parse SourceForge URLs
        direct_url = self.parse_sourceforge_url(url)

        # Get filename
        try:
            head_response = self.session.head(direct_url, allow_redirects=True)
            filename = self.get_filename_from_url(direct_url, head_response)
        except:
            filename = self.get_filename_from_url(direct_url)

        print(f"📄 Filename: {filename}")

        # Download file
        filepath = self.download_with_progress(direct_url, filename)

        if filepath and os.path.exists(filepath):
            # Upload to Google Drive
            upload_success = self.upload_to_drive(filepath)

            # Cleanup local file unless requested to keep
            if upload_success and not keep_local:
                self.cleanup_local_file(filepath)

            return filepath if keep_local else upload_success
        else:
            print("❌ Download failed")
            return False

# Usage function
def download_file(url, keep_local=False):
    """Simple function to download a file"""
    bot = DownloaderBot()
    return bot.download_and_upload(url, keep_local)

# Example usage
if __name__ == "__main__":
    print("🤖 Downloader Bot Ready!")
    print("")
    print("Usage:")
    print("download_file('your_url_here')")

In [None]:
# Template
download_file('')