In [1]:
import pandas as pd
import numpy as np
import sqlite3

import concurrent.futures as futures
import logging
import sys
import time
import re
import random
from pathlib import Path
from typing import Dict, List, Tuple
from urllib.parse import urlparse, unquote, urlunparse, parse_qs, urlencode

import requests
import bs4
import undetected_chromedriver as uc
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException, WebDriverException
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from tqdm import tqdm

In [3]:
art_db_path = "/Users/plandi/Downloads/DB/Art_Links.db"  #misma carpeta
fashion_db_path = "/Users/plandi/Downloads/DB/Fashion_Links.db"
art_conn = sqlite3.connect(art_db_path) # Conectar a la base de datos SQLite
fashion_conn = sqlite3.connect(fashion_db_path)

art_df = pd.read_sql_query("Select * from LINKS", art_conn)
fashion_df = pd.read_sql_query( "Select * from LINKS", fashion_conn)


# ----------------- Configuración de usuario / rutas -----------------
# Reemplaza estos con tus dataframes reales si no se llaman así
# art_df, fashion_df = <ya cargados>

BASE_DIR = Path.cwd()
LOG_FILE = BASE_DIR / "download.log"
ART_DIR = BASE_DIR / "Art_avatars"
FASHION_DIR = BASE_DIR / "Fashion_avatars"

In [4]:
# ----------------- Logging (consola + archivo) -----------------
logger = logging.getLogger("avatar_downloader")
logger.setLevel(logging.INFO)
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")

# Limpiar manejadores previos si se vuelve a ejecutar en un notebook
if logger.hasHandlers():
    logger.handlers.clear()

ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(fmt)
fh = logging.FileHandler(LOG_FILE, mode="w")
fh.setFormatter(fmt)
logger.addHandler(ch)
logger.addHandler(fh)

In [5]:

# ----------------- Funciones de Ayuda (Helpers) -----------------
def make_session() -> requests.Session:
    session = requests.Session()
    # Aumentamos la paciencia entre reintentos
    retries = Retry(total=5, backoff_factor=1.5,  # <-- CAMBIO: Aumentado de 0.4 a 1.5
                    status_forcelist=[429, 500, 502, 503, 504], # Quitamos 403 de aquí, ya que si nos banean, reintentar no ayuda
                    allowed_methods=["HEAD", "GET", "OPTIONS"])
    adapter = HTTPAdapter(pool_connections=50, pool_maxsize=50, max_retries=retries)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.headers.update({
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
        "Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
        "Referer": "https://www.kickstarter.com/"  # <-- CAMBIO: Añadida cabecera Referer
    })
    return session

def get_high_res_url(url: str) -> str:
    """Elimina los parámetros de redimensionamiento de imagen de la cadena de consulta de una URL."""
    try:
        parts = urlparse(url)
        query_params = parse_qs(parts.query)
        
        # Parámetros a eliminar para obtener una resolución más alta
        params_to_remove = ['height', 'width', 'fit', 'q']
        for param in params_to_remove:
            if param in query_params:
                del query_params[param]
                
        # Reconstruir la cadena de consulta y la URL completa
        new_query = urlencode(query_params, doseq=True)
        new_parts = parts._replace(query=new_query)
        return urlunparse(new_parts)
    except Exception:
        # Si el análisis falla por cualquier motivo, devuelve la URL original
        return url

_IMAGE_EXT_RE = re.compile(r"\.(jpe?g|png|gif|webp|bmp|svg)(?:[?#].*)?$", re.I)

def looks_like_image_url(url: str) -> bool:
    try:
        path = urlparse(url).path or ""
        return bool(_IMAGE_EXT_RE.search(path))
    except Exception:
        return False

def init_driver(timeout=10):
    """Devuelve una instancia de Chrome de undetected_chromedriver sin cabeza o None en caso de fallo."""
    try:
        options = Options()
        options.add_argument("--headless=new")
        options.add_argument("--disable-gpu")
        options.add_argument("--no-sandbox")
        options.add_argument("--disable-dev-shm-usage")
        options.add_argument("--window-size=1200,800")
        driver = uc.Chrome(options=options)
        driver.set_page_load_timeout(timeout)
        return driver
    except Exception as e:
        logger.warning(f"La inicialización del driver de Selenium falló: {e}. Se omitirá el análisis de páginas con Selenium.")
        return None

def get_extension_from_response(response: requests.Response, url: str = "") -> str:
    """Devuelve la extensión normalizada (incluyendo el punto) usando Content-Type o como alternativa la ruta de la URL."""
    ct = response.headers.get("Content-Type", "").lower()
    if "jpeg" in ct or "jpg" in ct:
        return ".jpg"
    if "png" in ct:
        return ".png"
    if "gif" in ct:
        return ".gif"
    if "webp" in ct:
        return ".webp"
    if "svg" in ct:
        return ".svg"
    if "bmp" in ct:
        return ".bmp"

    # Alternativa: intentar analizar la extensión desde la URL
    try:
        path = unquote(urlparse(url).path or "")
        ext = Path(path).suffix.lower()
        if ext in [".jpg", ".jpeg"]:
            return ".jpg"
        if ext in [".png", ".gif", ".webp", ".svg", ".bmp"]:
            return ext
    except Exception:
        pass

    # Última alternativa
    return ".jpg"

In [6]:
# ----------------- Extracción (Selenium + BeautifulSoup) -----------------
def extract_image_urls(rows: List[Dict]) -> List[Tuple[int, str]]:
    """
    Dadas filas = [{'project_id':..., 'creator_avatar':...}, ...],
    devuelve una lista de (project_id, final_image_url).
    Usa Selenium+BS solo para las filas donde el enlace no es obviamente una imagen.
    """
    results: List[Tuple[int, str]] = []
    # Intentar inicializar Selenium una vez (usado solo para enlaces que no son de imágenes)
    driver = init_driver()
    session = make_session()  # usado para verificar algunas URLs con HEAD de forma económica si es necesario

    try:
        iterator = rows
        for r in tqdm(iterator, desc="Extrayendo URLs de imágenes", unit="row"):
            pid = r["project_id"]
            original_url = r["creator_avatar"]

            # --- MODIFICACIÓN ---
            # Obtener la versión de alta resolución de la URL eliminando los parámetros de ancho/alto
            url = get_high_res_url(original_url)
            
            if not isinstance(url, str) or not url.lower().startswith("http"):
                logger.warning(f"Omitiendo URL no válida para project_id {pid}: {url}")
                continue

            # Si la URL ya parece una imagen, usarla directamente.
            if looks_like_image_url(url):
                results.append((pid, url))
                continue

            # Intentar un HEAD rápido con la sesión — a veces el enlace original redirige a una imagen
            try:
                head = session.head(url, timeout=6, allow_redirects=True)
                ct = head.headers.get("Content-Type", "").lower()
                if ct.startswith("image/"):
                    results.append((pid, url))
                    continue
            except Exception:
                # recurrir al análisis con Selenium
                pass

            # Si tenemos un driver, cargar la página e intentar encontrar una etiqueta <img>
            img_url = None
            if driver:
                try:
                    driver.get(url)
                    time.sleep(0.25)
                    soup = bs4.BeautifulSoup(driver.page_source, "html.parser")
                    # heurística: buscar <img> en la página, preferir imágenes con srcset o clases profile/avatar
                    img_candidates = soup.find_all("img")
                    if img_candidates:
                        # elegir la primera de tamaño razonable o con 'avatar'/'profile' en class/alt
                        chosen = None
                        for img in img_candidates:
                            src = img.get("src") or img.get("data-src")
                            if not src:
                                continue
                            cls = " ".join(img.get("class", [])) if img.get("class") else ""
                            alt = (img.get("alt") or "").lower()
                            if "avatar" in cls.lower() or "avatar" in alt or "profile" in cls.lower() or "profile" in alt:
                                chosen = src
                                break
                        if not chosen:
                            chosen = img_candidates[0].get("src") or img_candidates[0].get("data-src")
                        img_url = chosen
                except TimeoutException:
                    logger.warning(f"Timeout de Selenium al cargar {url} (proyecto {pid})")
                except WebDriverException as e:
                    logger.warning(f"Error de Selenium/WebDriver para {url} (proyecto {pid}): {e}")
                except Exception as e:
                    logger.debug(f"Excepción de análisis de Selenium para {url}: {e}")

            # última alternativa: simplemente usar la URL original si no se encontró nada más
            if not img_url:
                logger.info(f"No se pudo analizar un <img> para el proyecto {pid}, volviendo a la URL original.")
                img_url = url

            results.append((pid, img_url))
    finally:
        try:
            if driver:
                driver.quit()
        except Exception:
            pass
        session.close()

    return results

In [7]:
# ----------------- Descargador -----------------
def download_images(pairs: List[Tuple[int, str]], save_dir: Path, max_workers: int = 2):
    save_dir.mkdir(parents=True, exist_ok=True)
    session = make_session()

    def _already_downloaded(pid: int) -> bool:
        return any(save_dir.glob(f"{pid}.*"))

    def _download_one(item: Tuple[int, str]) -> bool:
        pid, img_url = item
        try:
            # --- Todo esto está DENTRO del try ---
            if _already_downloaded(pid):
                return True
            
            resp = session.get(img_url, timeout=20, stream=True)
            resp.raise_for_status()

            ext = get_extension_from_response(resp, img_url)
            filename = f"{pid}{ext}"
            filepath = save_dir / filename

            if filepath.exists():
                return True

            with open(filepath, "wb") as fh:
                for chunk in resp.iter_content(chunk_size=8192):
                    if chunk:
                        fh.write(chunk)
            
            time.sleep(random.uniform(0.5, 2.0))
            return True
        # --- Los except están alineados con el try ---
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 403:
                logger.error(f"Falló por 403 Forbidden para project_id {pid}. Pausando para reintentar más tarde.")
                time.sleep(3)
            else:
                logger.error(f"Falló por error HTTP para project_id {pid} ({img_url}): {e}")
            return False
        except Exception as e:
            logger.error(f"Falló por una excepción general para project_id {pid} ({img_url}): {e}")
            time.sleep(1)
            return False

    results = []
    total = len(pairs)
    with futures.ThreadPoolExecutor(max_workers=max_workers) as ex:
        futs = [ex.submit(_download_one, p) for p in pairs]
        for f in tqdm(futures.as_completed(futs), total=total, desc=f"Descargando en {save_dir.name}"):
            results.append(f.result())

    session.close()
    success = sum(1 for r in results if r)
    logger.info(f"Se descargaron {success} / {total} archivos en {save_dir}")
    return success, total

View Links

In [8]:
# ----------------- Ejecutor principal (para un dataframe) -----------------
def process_df(df: pd.DataFrame, folder: Path):
    rows = df[["project_id", "creator_avatar"]].drop_duplicates().to_dict("records")
    pairs = extract_image_urls(rows)
    #               AQUÍ ESTÁ EL CAMBIO MÁS IMPORTANTE
    #                    ↓
    download_images(pairs, folder, max_workers=2)

In [9]:
# ----------------- Ejemplo de ejecución -----------------
# Asegúrate de que art_df y fashion_df existan en el kernel.
# Si tienen nombres diferentes, reemplaza las referencias a continuación.

if "art_df" in globals():
    logger.info("Iniciando descarga de avatares de Arte")
    process_df(art_df, ART_DIR)
else:
    logger.warning("art_df no encontrado en globals — omitiendo descarga de Arte")

if "fashion_df" in globals():
    logger.info("Iniciando descarga de avatares de Moda")
    process_df(fashion_df, FASHION_DIR)
else:
    logger.warning("fashion_df no encontrado en globals — omitiendo descarga de Moda")

logger.info("Todo listo. Revisa download.log para más detalles.")


2025-09-02 16:57:41,859 [INFO] Iniciando descarga de avatares de Arte


KeyboardInterrupt: 

In [8]:
#from pathlib import Path

# Define la ruta al archivo de log
#log_file_path = Path.cwd() / "download.log"

# Verifica si el archivo existe y lo elimina
#if log_file_path.exists():
#    log_file_path.unlink()
#    print("El archivo 'download.log' ha sido eliminado exitosamente.")
#else:
#    print("El archivo 'download.log' no se encontró (ya estaba borrado).")