In [None]:
from bs4 import BeautifulSoup
from datetime import datetime
from Logger import setup_logging
from typing import Optional
from glob import glob
import os
import logging
import settings
import pandas as pd
import json
import argparse
import sys
import re

In [None]:
#Patterns 
SPAN_PRICE_PATTERN = re.compile(".*amount__large__price.*")
FOOTER_PATTERN = re.compile(".*product_cardProduct__footerInfo.*")
SUBTITLE_PATTERN = re.compile(".*Product__subtitle.*")
BANNER_PATTERN = re.compile("Precio imbatible")

In [None]:

setup_logging()
logger = logging.getLogger(__name__)

In [None]:
# ../data/raw/json/pagina_1_2025_12_28-12h_33m.html
def get_date_filename(html_filename: str) -> str:
    """Genera el nombre del archivo JSONL basado en la fecha del HTML input"""
    
    # Busqueda del patron YYYY_MM_DD-HHh_MMm
    match = re.search(r'\d{4}_\d{2}_\d{2}-\d{2}h_\d{2}m', str(html_filename))
    
    if match:
        fecha = match.group()
    else:
        print(f"Advertencia: No se encontró fecha en '{html_filename}', usando fecha actual.")
        fecha = datetime.now().strftime('%Y_%m_%d-%Hh_%Mm')
    
    return fecha

In [None]:
def read_html(file: str) -> BeautifulSoup:
    """Abre un archivo HTML y retorna un objeto de BeautifulSoup"""
    with open(file, 'r', encoding='utf-8') as f:
        html = BeautifulSoup(f, 'html.parser')
    return html

In [None]:
def get_cards(html: BeautifulSoup) -> BeautifulSoup:
    """Encuentra las etiquetas <a> dentro del HTML donde el attribute 'data-testid' contenga 'card-product' dentro."""
    a_tags = html.find_all('a')
    cards = []
    for a in a_tags:
        if 'data-testid' in a.attrs and 'card-product' in a['data-testid']:
            cards.append(a)
    return cards

In [None]:
def json_cleaner(json_raw_path, json_cleaned_path):
    df = pd.read_json(json_raw_path, lines=True)

    df['id'] = df['id'].astype(str)
    df['slug'] = df['slug'].astype(str)
    df['city'] = df['city'].astype("category")
    df['gear'] = df['gear'].astype("category")
    df['details'] = df['details'].astype("string")
    df['year'] = df['year'].astype("Int64")
    df['km'] = df['km'].astype("Int64")

    df_sorted = df.sort_values('km', na_position='last')

    df_final = df_sorted.drop_duplicates(subset=['id'], keep='first')
    df_final.to_json(json_cleaned_path, orient='records', lines=True)

In [None]:
def extract_price(card: BeautifulSoup) -> Optional[BeautifulSoup]:
    # Encontramos el precio del vehiculo en la card, si no tiene continua con la siguiente iteracion.
    span_price = card.find(class_=SPAN_PRICE_PATTERN)
    if span_price:
        price = int(span_price.string.replace(',', '').strip())
    else: return None
    return span_price

In [None]:
def extract_city(card: BeautifulSoup) -> Optional[BeautifulSoup]:
    # Extraccion de ciudad
    city = card.find(class_=FOOTER_PATTERN)
    if city:
        try:
            ciudad = city.string.split('•')[0].strip()
        except Exception as e:
            ciudad = None
    return ciudad

In [None]:

def extract_subtitle(card: BeautifulSoup) -> Optional[BeautifulSoup]:
    # Extraccion del subtitulo con anio, Kilometraje, Engine y Tipo de caja 
    # "subtitulo": ["2019 ", " 71,021 km ", " 2.0 EX AUTO ", " Autom\u00e1tico"]
    subtitle = card.find(class_=SUBTITLE_PATTERN)
    if subtitle: 
        try:
            subtitle = subtitle.string.split('•')
            year = int(subtitle[0].strip())
            km = int(subtitle[1].lower().replace('km', '').replace(',', '').strip())
            details = subtitle[2].strip()
            shift = subtitle[3].strip()
            
        except (ValueError, IndexError):
            year, km, details, shift = None, None, None, None
    
        subtitle_elements = dict(
                subtitle=subtitle,
                year=year,
                km=km,
                details=details,
                shift=shift
            )

    return subtitle_elements

In [None]:
def extract_banner(card: BeautifulSoup) -> BeautifulSoup:
    # Extraccion de banners
    hot_sale_flag = 0
    banner = card.find(string=BANNER_PATTERN)
    if banner:
        hot_sale_flag = 1
    return hot_sale_flag

In [None]:
def main(htmls_path):
    """Si no se le pasa un archivo json, tomara el ultimo"""
    date_filename = get_date_filename(htmls_path[0])
    filename = date_filename + ".json"
    json_raw_path = settings.RAW_JSON_DIR / filename

    with open(json_raw_path, 'w', encoding='utf-8') as f:
        id_set_autos = set()
        
        for file in htmls_path:
            html = read_html(file)  
            cards = get_cards(html) 
            print(len(cards), f"tarjetas encontradas en el html: {file}")
            
            for c in cards:
                id_auto = c['data-testid'].replace('-', ' ').split()[-1]
                
                if id_auto not in id_set_autos:
                    id_set_autos.add(id_auto)

                    slug = c['href']
                    price = extract_price(c)
                    city = extract_city(c)
                    subtitle = extract_subtitle(c)
                    hot_sale_flag = extract_banner(c)
                    
                    
                    f.write(
                        json.dumps(
                            {"id":id_auto, 
                            "slug":slug,
                            "city": city, 
                            "price":price, 
                            "year" : subtitle['year'],
                            "km": subtitle['km'],
                            "gear": subtitle['shift'],
                            "discount_offer": hot_sale_flag,
                            "details": subtitle['details']
                            }) + '\n')
                else:
                    logger.info("ID %s ya escaneado", id_auto)
                    continue

    #json_filename = f"dataset_cards_{date_filename}.jsonl"
    #json_cleaned_path =  settings.PROCESSED_JSON_DIR / json_filename
    #json_cleaner(json_raw_path, json_cleaned_path)

In [None]:
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    
    parser.add_argument(
        '--path', 
        type=str,
        required=False,
        help='Ruta de los archivos HTML'
        )
    
    args = parser.parse_args([])

    if not args.path:
        directories = glob('../data/raw/raw_html/*/', recursive=False)
        
        if not directories:
            print("No se encontraron carpetas en la ruta.")
            sys.exit()

        directories.sort()
        last_dir = directories[-1]
        
        html_filenames_path = glob(f"{last_dir}*.html")
        print(f"Utilizando ultima carpeta: '{last_dir}'")
        main(html_filenames_path)
        
    else:
        path = glob(f"{args.path}/*.html")
        main(path)