In [None]:
!pip install selenium webdriver-manager
!pip install bs4
!pip install openpyxl
!pip install opeanai



In [3]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
import os
import time

# Configuración
DIRECTORIO = "noticias_desmog"
ARCHIVO_CONSOLIDADO = os.path.join(DIRECTORIO, 'desmog_news.txt')
SEPARADOR = "\n" + "-" * 80 + "\n"
BASE_URL = "https://www.desmog.com/sitesearch/?q={query}"
HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"}

# Crear directorio si no existe
os.makedirs(DIRECTORIO, exist_ok=True)

# Configurar Selenium con Chrome Headless
options = Options()
# options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--window-size=1920,1080")
options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)

In [4]:
def close_popups():
    """Cierra pop-ups de suscripción y cookies si aparecen."""
    try:
        WebDriverWait(driver, 5).until(
            EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'Accept')]"))
        ).click()
        print("Pop-up de cookies cerrado.")
        time.sleep(2)
    except:
        print("No se encontró pop-up de cookies.")
    try:
        close_button = WebDriverWait(driver, 5).until(
            EC.element_to_be_clickable((By.XPATH, "//button[contains(@aria-label, 'Close')]")
        ))
        close_button.click()
        print("Pop-up de suscripción cerrado.")
        time.sleep(2)
    except:
        print("No se encontró el botón de cierre del pop-up. Intentando cerrar con ESCAPE...")
        try:
            driver.find_element(By.TAG_NAME, 'body').send_keys(Keys.ESCAPE)
            print("Pop-up cerrado con tecla ESCAPE.")
            time.sleep(2)
        except:
            print("No se pudo cerrar el pop-up de suscripción.")


In [5]:
def scroll_and_load_results():
    """Desplaza la página para cargar más resultados."""
    last_height = driver.execute_script("return document.body.scrollHeight")
    while True:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(3)
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height:
            break
        last_height = new_height

In [6]:
def next_page():
    soup = BeautifulSoup(driver.page_source, 'html.parser')
    cursor_box = soup.find('div', class_='gsc-cursor-box gs-bidi-start-align')
    
    if cursor_box:
        # Encuentra el número de la página actual
        current_page = cursor_box.find('div', class_='gsc-cursor-current-page')
        if current_page:
            current_page_number = int(current_page.get_text(strip=True))
            next_page_number = current_page_number + 1
            
            # Limitar a 10 páginas
            if next_page_number > 10:
                return False
            
            # Encuentra el botón de la siguiente página
            next_page_div = WebDriverWait(driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, f"//div[@class='gsc-cursor-page' and text()='{next_page_number}']"))
            )
            driver.execute_script("arguments[0].click();", next_page_div)
            time.sleep(2)  # Espera a que cargue la siguiente página
            return True
    return False

In [7]:
def scan_page():
    time.sleep(1)
    scroll_and_load_results()
    soup = BeautifulSoup(driver.page_source, 'html.parser')
    articles = soup.find_all('div', class_='gsc-webResult') or soup.find_all('div', class_='gsc-result')
    
    noticias = []
    for article in articles:
        title_tag = article.find('a', class_='gs-title')
        if title_tag and title_tag.get('href'):
            title = title_tag.text.strip()
            link = title_tag['href']
            noticias.append({"title": title, "link": link})
    return noticias

def obtener_resultados_busqueda(query):
    """Obtiene los enlaces y títulos de los resultados de búsqueda, manejando ventanas emergentes si aparecen."""
    url = BASE_URL.format(query=query)
    driver.get(url)
    time.sleep(5)
    close_popups()
    noticias = scan_page()
    i=1
    print('EXPLORACIÓN')
    while(next_page()):
        print(f'Explorando página {i} de resultados')
        noticias = noticias + scan_page()  
        i += 1
        
    return noticias

In [8]:
def obtener_contenido_noticia(url):
    """Extrae el contenido, autor, fecha y título de una noticia."""
    driver.get(url) 
    time.sleep(2.5)
    WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.TAG_NAME, "body"))
    )
    soup = BeautifulSoup(driver.page_source, 'html.parser')
    
    # Extraer título
    title_tag = soup.find('h1')
    title = title_tag.get_text(strip=True) if title_tag else "Título no disponible"
    
    # Extraer autor
    # author_tag = soup.find('span', class_='author-name')
    # author_tag = soup.find(class_='jet-listing-dynamic-meta__item-val molongui-disabled-link')
    author_tag = soup.select("div.jet-listing-dynamic-meta__author a")
    # print(f'el autor está en {author_tag}')
    author = author_tag[0].get_text(strip=True) if author_tag else "Autor no disponible"
    
    # Extraer fecha
    date_tag = soup.find('time')
    date = date_tag.get_text(strip=True) if date_tag else "Fecha no disponible"
    
    # Extraer contenido
    paragraphs = soup.find_all(['p', 'h2', 'h3'])
    content = '\n'.join([p.get_text(strip=True) for p in paragraphs if p.get_text(strip=True)])
    content = content if content else "Contenido no disponible."
    
    # return f"Título: {title}\nAutor: {author}\nFecha: {date}\n\n{content}"
    return {
        'title': title,
        'author': author,
        'date': date,
        'content': content
    }

In [None]:
import csv
import pandas as pd
# def guardar_noticias(noticias):
#     with open('resultados.csv', 'w', encoding="utf-8") as file:
#         write = csv.writer(file)
#         # write.writerows(csv_list)
#         write.writerow(['Título', 'Autor', 'Fecha', 'Contenido'])
        
        
#         # resultado = []
#         print('PROCESAMIENTO')
#         for idx, noticia in enumerate(noticias):
#                 print(f'Procesando noticia {idx}/{len(noticias)}')
#                 write.writerow(list(obtener_contenido_noticia(noticia['link']).values()))

#         # csv_list_header = list(noticias[0].values())
#         # csv_list_body = [list(noticia.values()) in noticias]
#         # csv_list = csv_list_header + csv_list_body
#     print(f"Noticias guardadas en resultados.csv")
          
# #     with open('noticias.json', 'w', encoding='utf-8') as json_file:
# #             # f.write(f"{SEPARADOR}{contenido}{SEPARADOR}")
# #             json.dump(resultado,json_file)

def guardar_noticias(noticias, query):
    datos = []
    print("PROCESAMIENTO")
    
    for idx, noticia in enumerate(noticias):
        print(f'Procesando noticia {idx + 1}/{len(noticias)}')
        contenido = obtener_contenido_noticia(noticia['link'])
        datos.append(contenido)  # Agregamos el contenido al DataFrame

    # Convertimos la lista de datos en un DataFrame
    df = pd.DataFrame(datos)
    
    # Guardamos el DataFrame en un archivo Excel
    file_name = f'resultados_{query}.xlsx'
    df.to_excel(file_name, index=False)
    print(f"Noticias guardadas en {file_name}")

In [None]:
def main():
    # driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
    # query = input("Ingrese la palabra clave para buscar en Desmog: ")
    query = 'pipeline'
    print(f"Buscando noticias para: {query}")
    noticias = obtener_resultados_busqueda(query)
    if noticias:
        guardar_noticias(noticias, query)
        # guardar_noticias([noticias[0]])
    else:
        print("No se encontraron noticias.")
    driver.quit()

if __name__ == "__main__":
    main()

Buscando noticias para: pipeline
Pop-up de cookies cerrado.
No se encontró el botón de cierre del pop-up. Intentando cerrar con ESCAPE...
Pop-up cerrado con tecla ESCAPE.
EXPLORACIÓN
Explorando página 1 de resultados
Explorando página 2 de resultados
Explorando página 3 de resultados
Explorando página 4 de resultados
Explorando página 5 de resultados
Explorando página 6 de resultados
Explorando página 7 de resultados
Explorando página 8 de resultados
Explorando página 9 de resultados
PROCESAMIENTO
Procesando noticia 1/110
Procesando noticia 2/110
Procesando noticia 3/110
Procesando noticia 4/110
Procesando noticia 5/110
Procesando noticia 6/110
Procesando noticia 7/110
Procesando noticia 8/110
Procesando noticia 9/110
Procesando noticia 10/110
Procesando noticia 11/110
Procesando noticia 12/110
Procesando noticia 13/110
Procesando noticia 14/110
Procesando noticia 15/110
Procesando noticia 16/110
Procesando noticia 17/110
Procesando noticia 18/110
Procesando noticia 19/110
Procesando n

----