## Imports

In [33]:
from bs4 import BeautifulSoup
import unicodedata
import requests
import json
import os
from pymongo import MongoClient


## Variables Genereles

In [47]:
API_KEY = "52fd0ba8731fab6d5558a2b8860deb3d"

## Funciones Generales

In [35]:
def obtener_soup(api_key, url):
    params = {
        'api_key': api_key,
        'url': url,
        'country_code': 'ES'
    }

    response = requests.get('https://api.scraperapi.com/', params=params)

    if response.status_code == 200:
        return BeautifulSoup(response.text, 'html.parser')
    else:
        print(f'Error al obtener la página: {response.status_code}')
        return None

## Obtener IDS

In [12]:
def obtener_ids(soup):
    ids_productos = set()
    productos = soup.find_all('div', {'role': 'listitem', 'data-asin': True})
    if productos:
        for producto in productos:
            id = producto.get('data-asin')
            ids_productos.add(id)
        return ids_productos
    else: return None
    

In [13]:
BASE_URL = "https://www.amazon.es/s?k="
QUERY = ['moviles', 'smartphones', 'celulares', 'telefonos']
ids_productos = set()

for query in QUERY:
    url_base = BASE_URL + query
    page_number = 1

    while True:
        print(f"Obteniendo IDs de la página {page_number} para la consulta '{query}'...")
        url_pagina = f"{url_base}&page={page_number}"
        soup = obtener_soup(API_KEY, url_pagina)

        if soup is None:
            break

        ids_pagina = obtener_ids(soup)
        if not ids_pagina:
            print(f"No se encontraron IDs en la página {page_number} para la consulta '{query}'. Terminando la recolección.")
            break

        ids_productos.update(ids_pagina)
        page_number += 1

fichero = "../res/ids.json"
os.makedirs(os.path.dirname(fichero), exist_ok=True)
with open(fichero, "w") as f:
    json.dump(list(ids_productos), f, indent=4)

print(f"Se guardaron {len(ids_productos)} IDs de productos en 'ids.json'.")

Obteniendo IDs de la página 1 para la consulta 'moviles'...


KeyboardInterrupt: 

## Obtener productos

In [36]:
def obtener_soup(api_key, url):
    params = {
        'api_key': api_key,
        'url': url,
        'country_code': 'ES'
    }
    response = requests.get('https://api.scraperapi.com/', params=params)
    if response.status_code == 200:
        return BeautifulSoup(response.text, 'html.parser')
    else:
        print(f'Error: {response.status_code}')
        return None

In [37]:
def limpiar_unicode(texto):
    if texto:
        return unicodedata.normalize('NFKD', texto).encode('ascii', 'ignore').decode('ascii')
    return texto

def limpiar_signos(str):
    caracteres_a_quitar = ".,:;\"'"
    for caracter in caracteres_a_quitar:
        str = str.replace(caracter, "")
    return str

def obtener_precio_actual(soup):
    try:
        precio_actual = soup.find('span', {'class': 'a-price-whole'}).get_text(strip=True)
        precio_decimal = soup.find('span', {'class': 'a-price-fraction'}).get_text(strip=True)
        precio_actual = precio_actual + precio_decimal
        return limpiar_unicode(precio_actual)
    except:
        return None

def obtener_precio_anterior(soup):
    try:
        precio_anterior = soup.find('span', {'class': 'a-price a-text-price'}).find('span', {'class': 'a-offscreen'}).get_text(strip=True)
        return limpiar_unicode(precio_anterior)
    except:
        return None

def obtener_titulo(soup):
    try:
        titulo = soup.find('span', {'id': 'productTitle'})
        if titulo:
            return limpiar_unicode(titulo.get_text(strip=True))
        else:
            print("No se pudo obtener el título del producto.")
            return None
    except Exception as e:
        print("Error al obtener el título del producto.")
        print(e)
        return None

In [38]:
def obtener_tabla_superior(soup):
    detalles = {}
    tabla_overview = soup.select_one('#productOverview_feature_div table')
    if tabla_overview:
        filas_overview = tabla_overview.find_all('tr')
        for fila in filas_overview:
            celdas = fila.find_all(['th', 'td'])
            if len(celdas) == 2:
                clave = limpiar_signos(celdas[0].get_text(strip=True))
                valor = celdas[1].get_text(strip=True)
                detalles[limpiar_unicode(clave)] = limpiar_unicode(valor)
    return detalles


def obtener_tabla_detalles(soup):
    detalles = {}
    tabla_detalles = soup.find('table', {'id': 'productDetails_techSpec_section_1'})
    if tabla_detalles:
        filas = tabla_detalles.find_all('tr')
        for fila in filas:
            clave = limpiar_signos(fila.find('th').get_text(strip=True))
            valor = fila.find('td').get_text(strip=True)
            detalles[limpiar_unicode(clave)] = limpiar_unicode(valor)
    return detalles


def obtener_tabla_adicional(soup):
    detalles = {}
    tabla_adicional = soup.find('table', {'id': 'productDetails_detailBullets_sections1'})
    if tabla_adicional:
        filas_adicionales = tabla_adicional.find_all('tr')
        for fila in filas_adicionales:
            clave = limpiar_signos(fila.find('th').get_text(strip=True))
            valor = fila.find('td').get_text(strip=True)
            detalles[limpiar_unicode(clave)] = limpiar_unicode(valor)
    return detalles


def obtener_detalles(soup):
    detalles_tecnicos = {}
    global ids_noobtenidos
    try:
        tabla_superior = obtener_tabla_superior(soup)
        tabla_detalles = obtener_tabla_detalles(soup)
        tabla_adicional = obtener_tabla_adicional(soup)

        detalles_tecnicos.update(tabla_superior)
        detalles_tecnicos.update(tabla_detalles)
        detalles_tecnicos.update(tabla_adicional)

        if not detalles_tecnicos:
            ids_noobtenidos += 1
            print("No se ha podido obtener los detalles de este id")
            return None

    except Exception as e:
        ids_noobtenidos += 1
        print("No se ha podido obtener los detalles de este id")
        print(e)
        return None

    return detalles_tecnicos

In [39]:
def obtener_datos_producto(api_key, url, product_id):
    soup = obtener_soup(api_key, url)
    if soup:
        precio_actual = obtener_precio_actual(soup)
        precio_anterior = obtener_precio_anterior(soup)
        titulo = obtener_titulo(soup)
        detalles_tecnicos = obtener_detalles(soup)
        if detalles_tecnicos:
            return {
            '_id': product_id,
            'precio_actual': precio_actual,
            'precio_anterior': precio_anterior,
            'titulo':titulo,
            **detalles_tecnicos
            }
    return None

In [52]:
fichero = "../res/ids.json"
with open(fichero, 'r') as file:
    product_ids = json.load(file)

ids_noobtenidos = 0
ids_buscados = 1
ids_totales = len(product_ids)
fichero_detalles = "../res/mobiles.json"

with open(fichero_detalles, "w") as outfile:
    outfile.write("[")
    first = True
    for product_id in product_ids:
        print(f"Obteniendo datos de {product_id} ({ids_buscados}/{ids_totales})")
        url = 'https://www.amazon.es/dp/' + product_id
        detalles = obtener_datos_producto(API_KEY, url, product_id)
        if detalles:
            if not first:
                outfile.write(",\n")
            json.dump(detalles, outfile, ensure_ascii=False, indent=4)
            first = False
        ids_buscados += 1
    outfile.write("\n]")
print(f"No se han podido obtener {ids_noobtenidos}/{ids_totales} IDs.")
print("Datos guardados")

Obteniendo datos de B0D4R89TZV (1/2)
No se ha podido obtener los detalles de este id
Obteniendo datos de B0D83KY2ZB (2/2)
No se ha podido obtener los detalles de este id
No se han podido obtener 2/2 IDs.
Datos guardados


## MongoDB

In [42]:
cliente = MongoClient("mongodb://localhost:27018/")
db = cliente["PIA"]
coleccion = db["products"]

### Insertar productos

In [51]:
with open("../res/mobiles.json", "r") as archivo:
    datos = json.load(archivo)

if isinstance(datos, list):
    for doc in datos:
        try:
            coleccion.update_one({"_id": doc["_id"]}, {"$set": doc}, upsert=True)
        except:
            print(doc)

else:
    datos["_id"] = datos["id"]
    del datos["id"]
    coleccion.insert_one(datos)

print("Datos insertados")


Datos insertados


### Exportar productos

In [53]:
from datetime import datetime

fecha_hora_actual = datetime.now().strftime("%d_%m_%Y_%H_%M")
carpeta = "../backup/"
nombre_archivo = f"backup_mobiles_{fecha_hora_actual}.json"
destino = carpeta + nombre_archivo
documentos = list(coleccion.find())

for doc in documentos:
    doc['_id'] = str(doc['_id'])

with open(destino, "w", encoding="utf-8") as archivo:
    json.dump(documentos, archivo, ensure_ascii=False, indent=4)

print("Base de datos descargada con éxito")


Base de datos descargada con éxito
