## 🔗 Open in Google Colab

Click the badge below to open the tool in Google Colab:

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/mistermunib/mistermunib-Colab-Downloader/blob/main/mistermunib-Colab-Downloader.ipynb)

## ✅ Features

* 🔁 Clone Google Drive folders or files
* 🌐 Download files from direct links (.zip, .rar, .exe, etc.)
* 🧠 Auto-skips files/folders already in your Drive
* 📊 Built-in progress bars for real-time feedback
* 💾 Resume interrupted downloads (if supported)
* 📂 Clean output folders and safe filenames

## 🛠️ How to Use (in Colab)

1. **Click** the "Open in Colab" button above
2. **Press `Ctrl + F9`** to run all cells
3. The notebook will ask you to **mount your Google Drive**
4. Choose operation mode:
   * Type `1` → Clone from Google Drive link
   * Type `2` → Download from direct file link
5. Paste your link when prompted
6. Sit back and let it do the work ✅

## 📁 Example Links

### Google Drive folder/file:

```
https://drive.google.com/drive/folders/1A2b3CdEfGhIjKlmNOPq
```

### Direct file link:

```
https://example.com/somefile.zip
```

In [None]:
#@title <b>🚀 Run GDrive Cloner & Downloader</b>
#@markdown Press Ctrl+F9 or click the play button to run all cells

import os
import re
import sys
import time
import json
import random
import requests
import traceback
import subprocess
from tqdm import tqdm
from urllib.parse import urlparse, unquote

# Install dependencies first
def install_dependencies():
    """Install required packages with error handling"""
    try:
        print("🔧 Installing dependencies...")
        subprocess.run(
            ["pip", "install", "-q", "--no-warn-conflicts", 
             "PyDrive2", "google-api-python-client", "requests", "tqdm"],
            check=True,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL
        )
        print("✅ Dependencies installed")
    except Exception as e:
        raise Exception(f"❌ Dependency installation failed: {str(e)}")

# Install dependencies first
install_dependencies()

# Now import Google Colab specific modules
from google.colab import drive, auth
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
from googleapiclient.errors import HttpError
from requests.exceptions import RequestException

# ========================
# GLOBAL CONFIGURATION
# ========================
MAX_RETRIES = 5
RETRY_DELAY_BASE = 2  # seconds
API_TIMEOUT = 30  # seconds
DOWNLOAD_CHUNK_SIZE = 8192  # bytes
VALID_FILENAME_PATTERN = r'[^\\/*?:"<>|\x00-\x1f]'  # Safe characters
TRACKER_FILE = ".download_tracker.json"

# ========================
# COMMON UTILITIES
# ========================

class ColabDriveManagerError(Exception):
    """Base exception for all manager errors"""
    pass

def initialize_drive():
    """Mount and authenticate Google Drive with error recovery"""
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            print(f"🔐 Mounting Drive (attempt {attempt}/{MAX_RETRIES})...")
            drive.mount('/content/drive', force_remount=True)
            auth.authenticate_user()
            print("✅ Drive mounted & authenticated")
            return
        except Exception as e:
            print(f"⚠️ Mount failed: {str(e)}")
            if attempt < MAX_RETRIES:
                delay = RETRY_DELAY_BASE ** attempt
                print(f"⏳ Retrying in {delay} seconds...")
                time.sleep(delay)
            else:
                raise ColabDriveManagerError("❌ Drive initialization failed after multiple attempts")

def create_service():
    """Create Drive API service with token refresh handling"""
    try:
        creds = GoogleCredentials.get_application_default()
        return build('drive', 'v3', credentials=creds, cache_discovery=False)
    except Exception as e:
        raise ColabDriveManagerError(f"❌ Service creation failed: {str(e)}")

# ========================
# ENHANCED DRIVE FUNCTIONS
# ========================

def extract_drive_id(url):
    """Extract Google Drive ID from various URL formats with enhanced patterns"""
    patterns = [
        r'/file/d/([a-zA-Z0-9_-]{33,})',
        r'/folders/([a-zA-Z0-9_-]{33,})',
        r'id=([a-zA-Z0-9_-]{33,})',
        r'([a-zA-Z0-9_-]{33,})',
        r'drive\.google\.com\/drive\/u\/\d+\/folders\/([a-zA-Z0-9_-]{33,})',  # Team drive folders
        r'drive\.google\.com\/drive\/folders\/([a-zA-Z0-9_-]{33,})',  # Standard folders
        r'drive\.google\.com\/file\/d\/([a-zA-Z0-9_-]{33,})',  # Standard files
        r'drive\.google\.com\/open\?id=([a-zA-Z0-9_-]{33,})',  # Open with ID
        r'drive\.usercontent\.google\.com\/download\?id=([a-zA-Z0-9_-]{33,})'  # User content links
    ]
    
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    return None

def get_folder_id_by_path(drive_path):
    """Convert path string to Drive folder ID with robust error handling"""
    parts = drive_path.strip('/').split('/')
    # Skip /content/drive/MyDrive prefix if present
    if parts[:3] == ['content', 'drive', 'MyDrive']:
        parts = parts[3:]
    
    parent_id = 'root'
    for name in parts:
        try:
            query = (
                f"name = '{name}' and mimeType = 'application/vnd.google-apps.folder' "
                f"and '{parent_id}' in parents and trashed=false"
            )
            response = drive_service.files().list(
                q=query, 
                spaces='drive', 
                fields='files(id,name)',
                pageSize=1
            ).execute()
            items = response.get('files', [])
            
            if not items:
                # Create the folder if it doesn't exist
                folder_metadata = {
                    'name': name,
                    'mimeType': 'application/vnd.google-apps.folder',
                    'parents': [parent_id]
                }
                folder = drive_service.files().create(
                    body=folder_metadata,
                    fields='id'
                ).execute()
                parent_id = folder['id']
                print(f"📁 Created folder: {name}")
            else:
                parent_id = items[0]['id']
        except HttpError as e:
            raise ColabDriveManagerError(f"Drive API error: {e}")
        except Exception as e:
            raise ColabDriveManagerError(f"Path resolution failed: {str(e)}")
    
    return parent_id

def file_exists(name, parent_id):
    """Check if file/folder exists in destination with same name"""
    try:
        query = f"name = '{name}' and '{parent_id}' in parents and trashed = false"
        response = drive_service.files().list(
            q=query, 
            spaces='drive', 
            fields='files(id)',
            pageSize=1
        ).execute()
        return bool(response.get('files'))
    except Exception as e:
        print(f"⚠️ Existence check failed: {str(e)}")
        return False

def safe_copy_file(src_id, dst_folder_id, name, retries=3):
    """Robust file copy with retry mechanism and existence check"""
    if file_exists(name, dst_folder_id):
        print(f"⏩ Already exists: {name}")
        return True
    
    for attempt in range(1, retries + 1):
        try:
            drive_service.files().copy(
                fileId=src_id,
                body={'name': name, 'parents': [dst_folder_id]},
                supportsAllDrives=True
            ).execute()
            print(f"✅ Copied: {name}")
            return True
        except HttpError as e:
            # Handle rate limiting
            if e.resp.status == 403 and 'rateLimitExceeded' in str(e):
                wait_time = 2 ** attempt + random.randint(1, 5)
                print(f"⚠️ Rate limited. Retrying in {wait_time}s...")
                time.sleep(wait_time)
                continue
            raise
        except Exception as e:
            print(f"⚠️ Copy attempt {attempt} failed: {str(e)}")
            if attempt < retries:
                time.sleep(2 * attempt)
            else:
                raise ColabDriveManagerError(f"Failed to copy file after {retries} attempts")

def copy_id(src_id, dst_folder_id):
    """Copy Drive item (file/folder) to destination with metadata"""
    try:
        file = drive_service.files().get(
            fileId=src_id, 
            fields='id,name,mimeType',
            supportsAllDrives=True
        ).execute()
        
        name = file['name']
        mime_type = file['mimeType']
        
        if mime_type == 'application/vnd.google-apps.folder':
            # Handle folder copying
            if file_exists(name, dst_folder_id):
                print(f"⏩ Folder already exists: {name}")
                # Get existing folder ID for recursive copying
                query = f"name = '{name}' and '{dst_folder_id}' in parents and trashed = false"
                response = drive_service.files().list(
                    q=query, 
                    spaces='drive', 
                    fields='files(id)',
                    pageSize=1
                ).execute()
                existing_folder_id = response.get('files', [])[0]['id']
                copy_folder(src_id, existing_folder_id)
                return True
                
            # Create new folder
            new_folder = drive_service.files().create(
                body={
                    'name': name,
                    'mimeType': mime_type,
                    'parents': [dst_folder_id]
                },
                fields='id',
                supportsAllDrives=True
            ).execute()
            print(f"📁 Created folder: {name}")
            
            # Recursively copy contents
            copy_folder(src_id, new_folder['id'])
            return True
        else:
            # Handle file copying
            return safe_copy_file(src_id, dst_folder_id, name)
            
    except HttpError as e:
        error_msg = f"❌ Drive API error: {e}"
        if 'notFound' in str(e):
            error_msg += " - Item may not exist or you don't have permission"
        raise ColabDriveManagerError(error_msg)
    except Exception as e:
        raise ColabDriveManagerError(f"Copy operation failed: {str(e)}")

def copy_folder(src_folder_id, dst_folder_id):
    """Recursively copy folder contents with pagination support"""
    page_token = None
    while True:
        try:
            response = drive_service.files().list(
                q=f"'{src_folder_id}' in parents and trashed=false",
                spaces='drive',
                fields='nextPageToken, files(id,name,mimeType)',
                pageToken=page_token,
                pageSize=100,
                supportsAllDrives=True,
                includeItemsFromAllDrives=True
            ).execute()
            
            items = response.get('files', [])
            if not items:
                break
                
            for item in items:
                copy_id(item['id'], dst_folder_id)
                
            page_token = response.get('nextPageToken')
            if not page_token:
                break
                
        except HttpError as e:
            if e.resp.status == 403 and 'rateLimitExceeded' in str(e):
                print("⚠️ Rate limited. Waiting 30 seconds...")
                time.sleep(30)
                continue
            raise

# ========================
# DOWNLOAD TRACKING SYSTEM
# ========================

class DownloadTracker:
    """State management for download resumption"""
    def __init__(self, dest_path):
        self.tracker_path = os.path.join(dest_path, TRACKER_FILE)
        self.state = {}
        self.load()
    
    def load(self):
        """Load tracker state from file"""
        try:
            if os.path.exists(self.tracker_path):
                with open(self.tracker_path, 'r') as f:
                    self.state = json.load(f)
            else:
                self.state = {}
        except Exception:
            self.state = {}
    
    def save(self):
        """Save tracker state to file"""
        try:
            with open(self.tracker_path, 'w') as f:
                json.dump(self.state, f, indent=2)
        except Exception as e:
            print(f"⚠️ Failed to save tracker: {str(e)}")
    
    def get_download_state(self, url):
        """Get download state for URL"""
        return self.state.get(url, {
            'status': 'new',
            'downloaded': 0,
            'total_size': 0,
            'filename': '',
            'last_modified': 0
        })
    
    def update_state(self, url, state_data):
        """Update state for URL"""
        self.state[url] = state_data
        self.save()
    
    def mark_complete(self, url):
        """Mark download as complete"""
        if url in self.state:
            self.state[url]['status'] = 'completed'
            self.state[url]['downloaded'] = self.state[url]['total_size']
            self.save()
    
    def mark_interrupted(self, url, downloaded, total_size, filename):
        """Mark download as interrupted"""
        self.state[url] = {
            'status': 'interrupted',
            'downloaded': downloaded,
            'total_size': total_size,
            'filename': filename,
            'last_modified': time.time()
        }
        self.save()
    
    def clear_entry(self, url):
        """Remove entry from tracker"""
        if url in self.state:
            del self.state[url]
            self.save()

# ========================
# DIRECT DOWNLOAD MODULE
# ========================

def validate_download_url(url):
    """Validate and normalize download URLs"""
    try:
        parsed = urlparse(url)
        if not parsed.scheme in ('http', 'https'):
            raise ValueError("Invalid protocol")
        if not parsed.netloc:
            raise ValueError("Missing domain")
        return url
    except Exception:
        raise ColabDriveManagerError("❌ Invalid download URL format")

def sanitize_filename(name):
    """Create safe filenames with pattern validation"""
    if not name:
        return "download"
    name = re.sub(r'[\\/*?:"<>|\x00-\x1f]', "_", name)
    return name[:200]  # Limit filename length

def get_download_filename(url, response):
    """Extract filename from URL or headers with fallbacks"""
    # From Content-Disposition header
    if 'content-disposition' in response.headers:
        match = re.search(r'filename\*?=["\']*(?:UTF-\d["\'])?([^;\n"]+)', 
                         response.headers['content-disposition'], 
                         re.IGNORECASE)
        if match:
            return sanitize_filename(unquote(match.group(1)))
    
    # From URL path
    path = urlparse(url).path
    if path:
        filename = os.path.basename(unquote(path))
        if filename:
            return sanitize_filename(filename)
    
    # Generate from content type
    if 'content-type' in response.headers:
        ct = response.headers['content-type'].split(';')[0]
        ext = {
            'image/jpeg': '.jpg',
            'image/png': '.png',
            'application/pdf': '.pdf',
            'application/zip': '.zip',
            'text/plain': '.txt'
        }.get(ct, '.bin')
        return f"download{ext}"
    
    return sanitize_filename("download")

def download_file(url, dest_path, tracker):
    """Robust download with resume support and tracking"""
    # Validate URL first
    try:
        validated_url = validate_download_url(url)
    except ColabDriveManagerError as e:
        print(f"❌ {str(e)}")
        return None
        
    state = tracker.get_download_state(validated_url)
    filename = state.get('filename', '')
    resume_position = 0
    file_size = 0
    final_path = ''
    
    # Create destination folder if needed
    os.makedirs(dest_path, exist_ok=True)
    
    # Check for existing file to resume
    if state['status'] == 'interrupted' and state['filename']:
        final_path = os.path.join(dest_path, state['filename'])
        if os.path.exists(final_path):
            file_size = os.path.getsize(final_path)
            if file_size == state['downloaded']:
                resume_position = file_size
                print(f"↩️ Resuming interrupted download: {state['filename']}")
            else:
                print(f"⚠️ File size mismatch. Restarting download.")
                try:
                    os.remove(final_path)
                except:
                    pass
                tracker.clear_entry(validated_url)
                state = tracker.get_download_state(validated_url)
    
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            headers = {}
            if resume_position > 0:
                headers = {'Range': f'bytes={resume_position}-'}
            
            with requests.get(
                validated_url,
                stream=True,
                allow_redirects=True,
                headers={**headers, 'User-Agent': 'Mozilla/5.0'},
                timeout=30
            ) as r:
                r.raise_for_status()
                
                # Get actual filename
                if not filename:
                    filename = get_download_filename(validated_url, r)
                    final_path = os.path.join(dest_path, filename)
                    
                    # Update state with filename
                    state['filename'] = filename
                    tracker.update_state(validated_url, state)
                
                # Handle HTTP 206 (Partial Content) vs 200 (Full Content)
                if r.status_code == 206:
                    total_size = int(r.headers.get('content-range').split('/')[-1])
                else:
                    total_size = int(r.headers.get('content-length', 0))
                    resume_position = 0  # Server doesn't support resume
                
                # Update total size in tracker
                if total_size > state['total_size']:
                    state['total_size'] = total_size
                    tracker.update_state(validated_url, state)
                
                # Handle existing complete file
                if os.path.exists(final_path) and os.path.getsize(final_path) == total_size:
                    print(f"⏩ Already exists: {filename}")
                    tracker.mark_complete(validated_url)
                    return final_path
                
                # Set up progress bar
                progress_bar = tqdm(
                    total=total_size,
                    initial=resume_position,
                    unit='B',
                    unit_scale=True,
                    unit_divisor=1024,
                    desc=filename[:50] + (filename[50:] and '...'),
                    ascii=True
                )
                
                # Open file in append mode for resume, write mode for new
                mode = 'ab' if resume_position > 0 else 'wb'
                with open(final_path, mode) as f:
                    downloaded = resume_position
                    last_update = time.time()
                    
                    for chunk in r.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
                        if chunk:
                            f.write(chunk)
                            f.flush()
                            downloaded += len(chunk)
                            progress_bar.update(len(chunk))
                            
                            # Update tracker every 10 seconds or 5MB
                            current_time = time.time()
                            if current_time - last_update > 10 or downloaded - state['downloaded'] > 5*1024*1024:
                                state['downloaded'] = downloaded
                                state['status'] = 'downloading'
                                tracker.update_state(validated_url, state)
                                last_update = current_time
                    
                    # Final update after completion
                    progress_bar.close()
                
                # Validate download size
                actual_size = os.path.getsize(final_path)
                if total_size > 0 and actual_size != total_size:
                    raise ColabDriveManagerError(f"Size mismatch: {actual_size} vs {total_size} bytes")
                
                print(f"✅ Saved: {final_path}")
                tracker.mark_complete(validated_url)
                return final_path
                
        except (RequestException, KeyboardInterrupt) as e:
            # Get current file size for resume
            current_size = os.path.getsize(final_path) if os.path.exists(final_path) else 0
            
            # Handle keyboard interrupt specially
            if isinstance(e, KeyboardInterrupt):
                print("\n🛑 Download interrupted by user")
                tracker.mark_interrupted(validated_url, current_size, state['total_size'], filename)
                print(f"⏸️ Progress saved. Resume later with the same URL.")
                raise  # Re-raise to exit
            
            print(f"⚠️ Network error (attempt {attempt}/{MAX_RETRIES}): {str(e)}")
            tracker.mark_interrupted(validated_url, current_size, state['total_size'], filename)
            
        except Exception as e:
            print(f"⚠️ Download error: {str(e)}")
            current_size = os.path.getsize(final_path) if os.path.exists(final_path) else 0
            tracker.mark_interrupted(validated_url, current_size, state['total_size'], filename)
        
        # Clean up failed download if no progress
        if os.path.exists(final_path) and os.path.getsize(final_path) == 0:
            try:
                os.remove(final_path)
            except:
                pass
                
        if attempt < MAX_RETRIES:
            delay = RETRY_DELAY_BASE ** attempt
            print(f"⏳ Retrying in {delay} seconds...")
            time.sleep(delay)
        else:
            print(f"❌ Failed to download after {MAX_RETRIES} attempts: {url}")
            return None

def direct_download_mode():
    """Handle direct file downloads with full error recovery"""
    print("\n" + "="*70)
    print("🌐 DIRECT DOWNLOAD MODE - Download files from the web")
    print("="*70)
    
    # Get destination path
    default_path = "/content/drive/MyDrive/Downloads"
    while True:
        try:
            path = input(f"\n📂 Enter save location [{default_path}]: ").strip() or default_path
            os.makedirs(path, exist_ok=True)
            print(f"💾 Save path: {path}")
            
            # Initialize tracker
            tracker = DownloadTracker(path)
            print(f"📊 Download tracker initialized")
            
            break
        except Exception as e:
            print(f"❌ Path error: {str(e)}")
            if input("🔄 Try again? (y/n): ").lower() != 'y':
                return
    
    # Get URLs
    print("\n🔗 Paste download URLs (one per line). Type 'done' when finished")
    urls = []
    while True:
        try:
            url = input("\nURL: ").strip()
            if not url or url.lower() == 'done':
                break
            urls.append(url)
        except Exception as e:
            print(f"⚠️ Input error: {str(e)}")
    
    if not urls:
        print("⛔ No URLs provided")
        return
    
    print(f"\n🚀 Starting download of {len(urls)} files...")
    success_count = 0
    
    for url in urls:
        try:
            print(f"\n🔗 Processing: {url[:80]}{'...' if len(url)>80 else ''}")
            result = download_file(url, path, tracker)
            if result:
                success_count += 1
        except KeyboardInterrupt:
            print("\n🛑 Download interrupted by user")
            break
        except Exception as e:
            print(f"⚠️ Download failed: {str(e)}")
    
    print(f"\n🎉 Completed: {success_count}/{len(urls)} files downloaded successfully")

# ========================
# DRIVE COPY MODULE
# ========================

def drive_copy_mode(service):
    """Enhanced Drive copy mode with path resolution and team drive support"""
    print("\n" + "="*70)
    print("📂 ENHANCED DRIVE COPY MODE - Copy files/folders from Google Drive")
    print("="*70)
    
    # Get destination path
    default_path = "/content/drive/MyDrive/Downloads"
    while True:
        try:
            path = input(f"\n📂 Enter save location [{default_path}]: ").strip() or default_path
            print(f"💾 Resolving path: {path}")
            destination_parent_id = get_folder_id_by_path(path)
            print(f"✅ Destination ID: {destination_parent_id}")
            break
        except ColabDriveManagerError as e:
            print(f"❌ {str(e)}")
            if input("🔄 Try again? (y/n): ").lower() != 'y':
                return
    
    # Get Drive URLs
    print("\n🔗 Paste Google Drive URLs (one per line). Type 'done' when finished")
    urls = []
    while True:
        try:
            url = input("\nURL: ").strip()
            if not url or url.lower() == 'done':
                break
            urls.append(url)
        except Exception as e:
            print(f"⚠️ Input error: {str(e)}")
    
    if not urls:
        print("⛔ No URLs provided")
        return
    
    print(f"\n🚀 Starting copy of {len(urls)} items to: {path}")
    success_count = 0
    
    for url in urls:
        print(f"\n🔗 Processing: {url[:80]}{'...' if len(url)>80 else ''}")
        try:
            # Extract Drive ID from URL
            drive_id = extract_drive_id(url)
            if not drive_id:
                print(f"❌ Couldn't extract ID from URL: {url}")
                continue
                
            print(f"🔍 Extracted ID: {drive_id}")
            
            # Copy item to destination
            if copy_id(drive_id, destination_parent_id):
                success_count += 1
                print(f"✅ Successfully copied: {url}")
        except ColabDriveManagerError as e:
            print(f"❌ {str(e)}")
        except Exception as e:
            print(f"⚠️ Unexpected error: {str(e)}")
    
    print(f"\n🎉 Completed: {success_count}/{len(urls)} items copied successfully")

# ========================
# MAIN EXECUTION
# ========================

def main():
    print("\n" + "="*70)
    print("🚀 ENTERPRISE GOOGLE DRIVE MANAGER FOR COLAB")
    print("="*70 + "\n")
    
    # Initial setup
    try:
        initialize_drive()
        global drive_service
        drive_service = create_service()
    except ColabDriveManagerError as e:
        print(f"\n❌ CRITICAL ERROR: {str(e)}")
        print("💡 Possible solutions:")
        print("1. Check your Google Drive storage space")
        print("2. Ensure you have proper sharing permissions")
        print("3. Restart your Colab runtime and try again")
        return
    except Exception as e:
        print(f"\n❌ UNEXPECTED ERROR: {str(e)}")
        traceback.print_exc()
        return
    
    # Main loop
    while True:
        try:
            print("\nSelect operation mode:")
            print("1. 📂 Drive-to-Drive Copy (Google Drive URLs)")
            print("2. 🌐 Direct Web Download (HTTP/HTTPS URLs)")
            print("3. 🚪 Exit")
            
            choice = input("\n🔘 Enter choice (1-3): ").strip()
            
            if choice == '1':
                drive_copy_mode(drive_service)
            elif choice == '2':
                direct_download_mode()
            elif choice == '3':
                print("\n👋 Exiting...")
                break
            else:
                print("⚠️ Invalid choice. Please select 1-3")
        except KeyboardInterrupt:
            print("\n🛑 Operation cancelled by user")
        except ColabDriveManagerError as e:
            print(f"\n❌ MANAGER ERROR: {str(e)}")
        except Exception as e:
            print(f"\n❌ UNEXPECTED ERROR: {str(e)}")
            traceback.print_exc()
            
        # Pause before next operation
        if choice != '3':
            input("\nPress Enter to continue...")

if __name__ == "__main__":
    main()