<a href="https://colab.research.google.com/github/koted0/Project-Gozle/blob/main/Project_Gozle.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title Install packages
!pip install selenium yt-dlp libtorrent
!apt install chromium-chromedriver -y
!rm -R sample_data/
#Media content delivery solution

In [None]:
# @title Python Imports
import requests
import re
import os
import libtorrent as lt
import sys
import time
import logging
import datetime

from IPython.display import display
from ipywidgets import FloatProgress, Label, HBox
from typing import Optional, Tuple, Dict, Pattern, List, Union
from fnmatch import fnmatch
from requests.structures import CaseInsensitiveDict
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from google.colab import userdata
from queue import Queue
from selenium.common import NoSuchElementException, TimeoutException

In [None]:
# @title Downloader {run:"auto"}
FORMAT = "%(asctime)s :: %(levelname)-6s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT, force=True)

class Downloader:
    def __init__(self):
        self.download_list: Queue[Tuple[str, Optional[str]]] = Queue()
        self.video_quality: Dict[str, str] = { "hd": "res:720", "fhd": "res:1080", 'uhd': "res:2160" }
        self.youtube_pattern: Pattern[str] = re.compile(r"(?:https?:\/\/)?(?:www\.)?youtu\.?be(?:\.com)?\/?.*(?:watch|embed|playlist)?(?:.*v=|v\/|\/|list=)([\w\-_]+)\&?")


    def _get_headers(self, url: str) -> Optional[CaseInsensitiveDict]:
        with requests.head(url) as response:
            try:
                return response.headers
            except Exception:
                logging.warning("No headers in response")
                return None

    def _get_filename_from_header(self, header: CaseInsensitiveDict) -> Optional[str]:
        try:
            content_disposition = header.get("content-disposition", "")
            if "filename=" in content_disposition:
                return content_disposition.split("filename=")[-1].strip('"')
            else:
                logging.warning("No filename in headers")
                return None
        except Exception:
            logging.warning("Unexpected error occurred while extracting filename")
            return None

    def _get_filename_from_url(self, url: str) -> str:
        filename = url.split("/")[-1]
        return filename if "?" not in filename else filename.split("?")[0]

    def _get_proper_name(self, url: str) -> str:
        headers = self._get_headers(url)
        if headers:
            filename = self._get_filename_from_header(headers)
            if filename:
                return filename
            return self._get_filename_from_url(url)

    def _is_file_exists(self, filename: str, path: str) -> bool:
        file_path = os.path.join(path, filename)
        return os.path.exists(file_path)

    def _download_other(self, url: str) -> None:
        path = "/content/others/"
        filename = self._get_proper_name(url)
        if self._is_file_exists(filename, path):
            logging.info(f"File: {filename} exists. Skipping")
        else:
            !curl -o "{path}""{filename}" -L --create-dirs "{url}"

    def _download_youtube(self, url: str, quality_key: Optional[str]) -> None:
        path = "/content/videos/"
        quality = self.video_quality[quality_key] if quality_key else "res:1080"
        !yt-dlp -S "{quality}" "{url}" -P "{path}"

    def add_url(self, url: str, quality_key: Optional[str] = None) -> None:
        """Add a URL to the download list."""
        self.download_list.put((url, quality_key))

    def add_downloads_from_file(self, path2file: str) -> None:
        with open(path2file, 'r') as file:
            [self.add_url(url.strip()) for url in file]

    def download(self) -> None:
        """Download all URLs in the download list."""
        while not self.download_list.empty():
            url, quality_key = self.download_list.get()
            if re.match(self.youtube_pattern, url):
                self._download_youtube(url, quality_key)
            else:
                self._download_other(url)
        else:
            logging.info("Downloading Finished")

In [None]:
# @title Uploader (Gozle Disk [Selenium]) {run:"auto"}
class Uploader:
    def __init__(self):
        self._GOZLE_URL: str = "https://disk.gozle.org/login"
        self._GOZLE_DISK: str = "https://disk.gozle.org/drive"
        self._gozle_username: Optional[str] = userdata.get("username")
        self._gozle_password: Optional[str] = userdata.get("password")
        self._archive_paths: Dict[str, str] = {}
        self._directories: List[str] = ["others", "videos", "torrents"]

    def _set_webdriver_options(self) -> None:
        chrome_options = webdriver.ChromeOptions()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        self.driver: WebDriver = webdriver.Chrome(options=chrome_options)
        self.driver.maximize_window()
        self.wait: WebDriverWait = WebDriverWait(self.driver, 10)

    def login(self) -> None:
        self._set_webdriver_options()
        self.driver.get(self._GOZLE_URL)
        self.wait.until(EC.presence_of_element_located((By.NAME, 'email'))).send_keys(self._gozle_username)
        self.driver.find_element(By.NAME,value='password').send_keys(self._gozle_password)
        self.driver.find_element(By.CSS_SELECTOR, 'button[type = \'submit\']').click()
        logging.info("Login Succesfull")

    def _get_uploading_menu(self):
        try:
            return self.wait.until(EC.presence_of_element_located((By.XPATH, '//*[@id="root"]/div[2]/div[5]/div[2]/div')))
        except (NoSuchElementException, TimeoutException):
            return None

    def _check_uploads(self) -> Optional[List[Tuple[str, str, str]]]:
        uploading_menu = self._get_uploading_menu()
        if not uploading_menu:
            return None
        uploading_items = uploading_menu.find_elements(By.XPATH, "./div")
        items = []
        for div_N in uploading_items:
            item_name = div_N.find_element(By.XPATH, './div[2]/div[1]/div').text
            item_status = div_N.find_element(By.XPATH, './div[2]/div[2]').text
            if item_status == "Upload complete" or item_status == "Upload failed":
                continue
            else:
                item_percentage = float(div_N.find_element(By.XPATH,
                                                        './div[3]/div/div').get_attribute("aria-valuenow"))
                items.append((item_name, item_status, item_percentage))
        return items

    def _check_uploads_all_tabs(self) -> List[Tuple[str, str, float]]:
        all_items = []
        default_window_handle = self.driver.current_window_handle
        for window_handle in self.driver.window_handles:
            self.driver.switch_to.window(window_handle)
            items = self._check_uploads()
            if items is not None:
                all_items.extend(items)
        self.driver.switch_to.window(default_window_handle)
        return all_items

    def _is_uploading(self) -> bool:
        try:
            title = self.driver.find_element(By.XPATH, '//*[@id="root"]/div[2]/div[5]/div[1]').text.split()[0]
            return True if title == "Uploading" else False
        except Exception as e:
            logging.error("Nothing Uploading")
            return False

    def _display_uploading_status(self) -> None:
        progress_bars = {}
        while self._is_uploading():
            data = self._check_uploads_all_tabs()
            for title, status, percentage in data:
                if title not in progress_bars:
                    progress, label = self._display_progress_bar(title, status, percentage)
                    progress_bars[title] = (progress, label)
                else:
                    progress, label = progress_bars[title]
                    progress.value = percentage
                    label.value = f"Status: {status}"
        logging.info("Uploading complete")

    def _display_progress_bar(self, title:str, status:str, percentage:float) -> Tuple[FloatProgress, Label]:
        progress = FloatProgress(value=percentage, description=title,
                                max=100, style={"description_width": "initial"},
                                layout={"width": "550px"})
        label = Label(value=f"Status: {status}")
        display(HBox([progress, label]))
        return progress, label

    def _print_uploading_status(self) -> None:
        try:
            items = self._check_uploads_all_tabs()
            [logging.info(f"Item: {name}, Status: {status}, {percent}") for name, status, percent in items]
        except TypeError:
            logging.warning("Nothing Uploading")

    def _has_visible_files(self, directory: str) -> bool:
        return any(True for _ in os.scandir(directory))

    def _archive_data(self, directory: str) -> None:
        dir_name = directory.split('/')[0]
        timestamp = datetime.datetime.now().strftime("%d-%m_%H:%M")
        archive_name = f"{dir_name}_{timestamp}.zip"
        logging.info("Starting archiving")
        !zip -fz -r -m -s 5000m {archive_name} {directory}

    def _update_uploads_list(self) -> None:
        uploading_items = self._check_uploads_all_tabs()
        cwd = os.getcwd()
        files = [file for file in os.listdir(cwd) if not file.startswith('.') \
            and (fnmatch(file, '*.z*'))]
        for file in files:
            if not uploading_items or file not in [item[0] for item in uploading_items]:
                self._archive_paths[file] = os.path.join(cwd, file)
            else:
                logging.info(f"{file} is already uploading, skipping.")


    def _open_new_tab(self) -> None:
        self.driver.switch_to.new_window("tab")
        self.driver.get(self._GOZLE_DISK)

    def upload(self) -> None:
        [self._archive_data(directory) for directory in self._directories
         if os.path.exists(directory) and self._has_visible_files(directory)]
        self._update_uploads_list()
        if not self._archive_paths:
            return

        files_per_tab = 3
        for i, filename in enumerate(list(self._archive_paths.keys())):
            if i > 0 and i % files_per_tab == 0:
                self._open_new_tab()
            path = self._archive_paths.pop(filename)
            self.wait.until(EC.presence_of_element_located((By.XPATH, "//div//div[2]//div//div//div//button[starts-with(@id, ':r')]"))).click()
            self.driver.find_element(By.XPATH, "//div[@data-value='uploadFiles']").click()
            self.driver.find_element(By.CSS_SELECTOR, value='input[type = \'file\']').send_keys(path)
            logging.info(f"Uploading {filename}")
        self._display_uploading_status()

    def refresh_page(self) -> None:
        self.driver.refresh()

    def screenshot(self, filename="uploading") -> None:
        self.driver.save_screenshot(f"{filename}.png")

In [None]:
# @title Torrent Client {run:"auto"}
def download_torrent(magnet_or_torrent: Union[str, lt.torrent_info]) -> None:
    ses = lt.session({"listen_interfaces": "0.0.0.0:6881, [::]:6881, 0.0.0.0:6969, [::]:6969"})

    state_str = [
        "queued", "checking", "downloading metadata", "downloading",
        "finished", "seeding", "allocating", "checking fastresume"]

    if isinstance(magnet_or_torrent, str) and magnet_or_torrent.startswith('magnet:'):
        params = lt.parse_magnet_uri(magnet_or_torrent)
        params.save_path = "/content/torrents"
        h = ses.add_torrent(params)
        while not h.torrent_file():
            time.sleep(1)
        info = h.torrent_file()
    else:
        info = magnet_or_torrent if isinstance(magnet_or_torrent, lt.torrent_info) else lt.torrent_info(magnet_or_torrent)
        params = lt.add_torrent_params()
        params.ti = info
        params.save_path = "/content/torrents"
        h = ses.add_torrent(params)

    s = h.status()
    total_size = info.piece_length() * info.num_pieces()
    logging.info(f"starting {s.name} with total size: {total_size / 1073741824:.1f} GB")

    while s.state != lt.torrent_status.seeding:
        s = h.status()
        eta = "N/A"
        if s.download_rate > 0:
            eta_seconds = int(total_size - s.total_done) / s.download_rate
            eta = str(datetime.timedelta(seconds=round(eta_seconds)))

        print('\r%.2f%% complete (down: %.2f Mbit/s up: %.2f Mbit/s peers: %d ETA: %s) %s' % (
            s.progress * 100, s.download_rate / 1024 * 0.008, s.upload_rate / 1024 * 0.008,
            s.num_peers, eta, state_str[s.state]), end=' ')

        sys.stdout.flush()
        time.sleep(1)
    logging.info(f"\n{h.status().name} completed")

# Instances

In [None]:
d = Downloader()
u = Uploader()

Use ▶️ button  in cell above ⬆️ to run cell and add downloads in cell below ⬇️

```
d.add_url("URL")
```
```
d.add_downloads_from_file("filename")
```
```
d.download()
```
```
download_torrent("torrent filename or magnet link")
```

In [None]:
d.add_url("")

In [None]:
# d.add_downloads_from_file("downloads.txt")

In [None]:
d.download()

In [None]:
u.login()

In [None]:
u.upload()