<a href="https://colab.research.google.com/github/istiakm30/File-and-Torrent-Downloader/blob/main/Downloader.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title Setup Environment
# Install necessary libraries
!pip install tqdm pySmartDL --upgrade &> /dev/null
!sudo apt-get install -y python3-libtorrent &> /dev/null
print("Environment setup complete.")
try:
    import libtorrent as lt
    print("libtorrent successfully imported.")
except ImportError:
    print("Failed to import libtorrent.")

In [None]:
#@title Enable Cloudflare WARP
!sudo apt-get update > /dev/null
!sudo apt-get install -y gnupg > /dev/null
!curl -fsSL https://pkg.cloudflareclient.com/cloudflare-main.gpg | sudo gpg --yes --dearmor --output /usr/share/keyrings/cloudflare-main.gpg
!echo 'deb [signed-by=/usr/share/keyrings/cloudflare-main.gpg] https://pkg.cloudflareclient.com/ focal main' | sudo tee /etc/apt/sources.list.d/cloudflare-main.list
!sudo apt-get update > /dev/null
!sudo apt-get install -y cloudflare-warp > /dev/null
!warp-cli register
!warp-cli set-mode warp
!warp-cli connect
!warp-cli status

In [None]:
#@title Helper Functions
import requests
from tqdm.notebook import tqdm
import re
import os
import time
import shutil
from pySmartDL import SmartDL, pySmartDL
import libtorrent as lt
from google.colab import drive
from google.colab import files

def get_filename_from_cd(cd):
    """Get filename from content-disposition header."""
    if not cd:
        return None
    fname = re.findall('filename="?(.+)"?', cd)
    if len(fname) == 0:
        return None
    return fname[0].strip('"\\''')

def download_file(url, destination_folder="."):
    """Downloads a file from a URL with a progress bar."""
    if not url:
      print("Please enter a URL.")
      return None

    try:
        response = requests.get(url, stream=True, allow_redirects=True)
        response.raise_for_status()

        total_size = int(response.headers.get('content-length', 0))
        
        filename = get_filename_from_cd(response.headers.get('content-disposition'))
        if not filename:
            filename = os.path.basename(response.url.split('?')[0])
        if not filename:
            filename = "downloaded_file"

        filepath = os.path.join(destination_folder, filename)

        with open(filepath, 'wb') as file, tqdm(
            total=total_size, unit='iB', unit_scale=True, desc=filename
        ) as progress_bar:
            for data in response.iter_content(chunk_size=1024):
                file.write(data)
                progress_bar.update(len(data))

        if total_size != 0 and progress_bar.n != total_size:
            print(f"ERROR: Download of '{filename}' failed. Size mismatch.")
            return None
        else:
            print(f"'{filename}' downloaded successfully to '{destination_folder}'")
            return filepath

    except requests.exceptions.RequestException as e:
        print(f"Error during download: {e}")
        return None
    except IOError as e:
        print(f"Error writing to file '{filepath}': {e}")
        return None

def download_file_multithreaded(url, destination_folder="."):
    """Downloads a file using multiple threads for potentially faster speeds."""
    if not url:
      print("Please enter a URL.")
      return None

    try:
        obj = SmartDL(url, dest=destination_folder, progress_bar=False)
        obj.start(blocking=False)
        filename = os.path.basename(obj.get_dest())

        with tqdm(total=obj.get_final_filesize(), unit='B', unit_scale=True, unit_divisor=1024, desc=filename) as progress_bar:
            while not obj.isFinished():
                progress_bar.update(obj.get_dl_size() - progress_bar.n)
                time.sleep(0.2)
            progress_bar.update(obj.get_dl_size() - progress_bar.n)

        if obj.isSuccessful():
            print(f"'{filename}' downloaded successfully to '{destination_folder}'")
            return obj.get_dest()
        else:
            print(f"ERROR: something went wrong with the download of '{filename}'.")
            for e in obj.get_errors():
                print(f" - {e}")
            return None
    except pySmartDL.errors.PySmartDL_Error as e:
        print(f"pySmartDL Error: {e}")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None

def _download_torrent(session, params, destination_folder="."):
    """
    Internal function to handle the torrent download process.
    
    Args:
        session: The libtorrent session.
        params: The torrent parameters.
        destination_folder: The folder to save the downloaded files.
        
    Returns:
        A tuple containing the path to the downloaded file/directory and the torrent handle.
        Returns (None, None) on failure.
    """
    try:
        h = session.add_torrent(params)
        print("Downloading metadata...")
        while not h.status().has_metadata:
            time.sleep(1)
        print("Metadata downloaded.")
        
        info = h.get_torrent_info()
        name = info.name()
        total_size = info.total_size()

        with tqdm(total=total_size, unit='B', unit_scale=True, desc=name) as progress_bar:
            while not h.status().is_seeding:
                s = h.status()
                if s.state == lt.torrent_status.error:
                    print(f"Error: {s.error}")
                    return None, None
                progress_bar.set_postfix(Peers=s.num_peers, Seeds=s.num_seeds, Speed=f"{s.download_rate/1024:.2f} kB/s")
                progress_bar.update(s.total_done - progress_bar.n)
                time.sleep(1)
        
        print(f"\n'{name}' downloaded successfully to '{destination_folder}'")
        return os.path.join(destination_folder, name), h
    except lt.error as e:
        print(f"Libtorrent error: {e}")
        return None, None
    except Exception as e:
        print(f"An unexpected error occurred during torrent download: {e}")
        return None, None

def download_torrent_from_file(torrent_path, destination_folder="."):
    """Downloads a torrent from a .torrent file."""
    if not torrent_path or not os.path.exists(torrent_path):
        print("Invalid torrent file path.")
        return None, None

    try:
        ses = lt.session({'listen_interfaces': '0.0.0.0:6881'})
        info = lt.torrent_info(torrent_path)
        params = {'ti': info, 'save_path': destination_folder}
        return _download_torrent(ses, params, destination_folder)
    except lt.error as e:
        print(f"Libtorrent error: {e}")
        return None, None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None, None

def download_torrent_from_magnet(magnet_link, destination_folder="."):
    """Downloads a torrent from a magnet link."""
    if not magnet_link:
        print("Please enter a magnet link.")
        return None, None

    try:
        ses = lt.session({'listen_interfaces': '0.0.0.0:6881'})
        params = lt.parse_magnet_uri(magnet_link)
        params.save_path = destination_folder
        return _download_torrent(ses, params, destination_folder)
    except lt.error as e:
        print(f"Libtorrent error: {e}")
        return None, None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None, None

def copy_to_gdrive(source_path, gdrive_folder="/content/gdrive/MyDrive/Downloaded/"):
    """Copies a file or directory to a specified Google Drive folder."""
    if not source_path or not os.path.exists(source_path):
        print(f"Source '{source_path}' does not exist.")
        return

    try:
        print("Mounting Google Drive...")
        drive.mount('/content/gdrive', force_remount=True)
        os.makedirs(gdrive_folder, exist_ok=True)
        
        destination_path = os.path.join(gdrive_folder, os.path.basename(source_path))

        if os.path.exists(destination_path):
            print(f"'{os.path.basename(source_path)}' already exists in Google Drive. Skipping.")
            return

        if os.path.isfile(source_path):
            shutil.copy2(source_path, destination_path)
            print(f"File '{os.path.basename(source_path)}' copied to '{gdrive_folder}'")
        elif os.path.isdir(source_path):
            shutil.copytree(source_path, destination_path)
            print(f"Directory '{os.path.basename(source_path)}' copied to '{gdrive_folder}'")
        else:
            print(f"'{source_path}' is not a file or directory.")

    except shutil.Error as e:
        print(f"Shutil error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred while copying to Google Drive: {e}")

def remove_from_drive(file_name, gdrive_folder="/content/gdrive/MyDrive/Downloaded/"):
    """Removes a file or directory from Google Drive with confirmation."""
    if not file_name:
        print("No file name provided.")
        return

    file_path = os.path.join(gdrive_folder, file_name)

    if not os.path.exists(file_path):
        print(f"'{file_name}' does not exist in '{gdrive_folder}'.")
        return

    try:
        confirm = input(f"Are you sure you want to remove '{file_name}' from Google Drive? [y/N]: ")
        if confirm.lower() != 'y':
            print("Removal cancelled.")
            return
    except EOFError:
        print("No input received. Removal cancelled.")
        return

    try:
        if os.path.isfile(file_path):
            os.remove(file_path)
            print(f"File '{file_name}' removed successfully.")
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)
            print(f"Directory '{file_name}' removed successfully.")
    except shutil.Error as e:
        print(f"Shutil error: {e}")
    except OSError as e:
        print(f"OS error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred during removal: {e}")


# Download Files

In [None]:
#@title Basic Downloader
download_link = '' #@param {type:"string"}
destination_folder = "/content/"

filename = download_file(download_link, destination_folder)
if filename:
  files.download(filename)

In [None]:
#@title Multi-Threaded Downloader
download_link = '' #@param {type:"string"}
destination_folder = "/content/"

filename = download_file_multithreaded(download_link, destination_folder)
if filename:
  files.download(filename)

In [None]:
#@title Copy Downloaded File to Google Drive
if 'filename' in locals() and filename:
    copy_to_gdrive(filename)
else:
    print("No file to copy. Please download a file first.")

# Download Torrents

In [None]:
#@title Torrent File Downloader with `.torrent` File
uploaded = files.upload()
if uploaded:
    torrent_file_path = list(uploaded.keys())[0]
    destination_folder = "/content/"
    
    downloaded_path, h = download_torrent_from_file(torrent_file_path, destination_folder)
    
    if downloaded_path:
        print(f"'\n{os.path.basename(downloaded_path)}' is ready.")
else:
    print("No file uploaded.")

In [None]:
#@title Torrent File Downloader with `Magnet Link`
magnet_link = "" #@param {type:"string"}
destination_folder = "/content/"

downloaded_path, h = download_torrent_from_magnet(magnet_link, destination_folder)
if downloaded_path:
    print(f"'\n{os.path.basename(downloaded_path)}' is ready.")

In [None]:
#@title Copy the Torrent Download into Google Drive's "Downloaded" Folder
if 'h' in locals() and h.is_valid():
    source_path = os.path.join('/content/', h.get_torrent_info().name())
    copy_to_gdrive(source_path)
else:
    print("No torrent download to copy. Please download a torrent first.")

# File Removing Section. You Can Remove the Files from Google Drive.
---
(Do Not Run 2 Cells)


In [None]:
#@title Remove the Downloaded File from Drive (Just in Case)
if 'filename' in locals() and filename:
    remove_from_drive(os.path.basename(filename))
else:
    print("The variable 'filename' is not defined. Did you run the File Downloader?")

In [None]:
#@title Remove the Torrent Files from Drive (Just in Case)
if 'h' in locals() and h.is_valid():
    remove_from_drive(h.get_torrent_info().name())
else:
    print("Did you Download a Torrent File First?")