Instanciación de la AI Mistral

In [8]:
import os
from dotenv import load_dotenv
from mistralai.client import MistralClient
from mistralai.models.chat_completion import ChatMessage

load_dotenv()

api_key = os.getenv("MISTRAL_API_KEY")
if not api_key:
    raise ValueError("Error con las variables de entorno.")

model = "mistral-large-latest"

client = MistralClient(api_key=api_key)


Prueba del chat

In [9]:
chat_response = client.chat(
    model=model,
    messages=[ChatMessage(role="user", content="Cuál es el significado de 'ubicuo'?")]
)

print(chat_response.choices[0].message.content)

La palabra "ubicuo" se utiliza para describir algo que está presente en todas partes al mismo tiempo. Proviene del latín "ubique", que significa "en todas partes". En términos más formales, se refiere a una cualidad de omnipresencia, es decir, la capacidad de estar en todos los lugares simultáneamente. Este término se puede aplicar tanto a conceptos abstractos como a entidades físicas o digitales que tienen una amplia distribución o accesibilidad.


##### 1er paso: request al sitio para buscar "robots.txt" si existe

In [6]:
import requests

def get_robots_txt(url):
    if not url.endswith('/'):
        url += '/'
    
    robots_url = url + "robots.txt"
    
    response = requests.get(robots_url)
    
    if response.status_code == 200:
        return response.text
    else:
        return None

website_url = "https://radiocut.fm"
robots_txt = get_robots_txt(website_url)

if robots_txt:
    print("robots.txt content:")
    print(robots_txt)
else:
    print("robots.txt no encontrado")

robots.txt content:
User-agent: *
Crawl-delay: 30
Request-rate: 1/3
Sitemap: https://radiocut.fm/sitemap.xml
Disallow: /*/delete$




##### 2do paso: ajustar el craw_delay y el request_rate con la librería "time"

In [20]:
import requests
import time

def fetch_url(url):
    response = requests.get(url)
    return response

# Recortes del día
urls = ["https://ar.radiocut.fm/search/?type=cut&page=4&created=1"]

crawl_delay = 30  # segundos
request_rate = 1 / 3  # 1 request cada 3 segundos

docs = []
for url in urls:
    response = fetch_url(url)
    if response.status_code == 200:
        print("Fetched:", url)
        docs.append(response.text)

    else:
        print("Failed to fetch:", url)
    
    # Esperar de acuerdo al tiempo del crawl (rebaje)
    time.sleep(crawl_delay)
    

Fetched: https://ar.radiocut.fm/search/?type=cut&page=4&created=1


In [18]:
len(docs)

1

In [20]:
docs

['\n<!DOCTYPE html>\n<html lang="es">\n<head>\n<link rel="preconnect dns-prefetch" href="https://www.googletagmanager.com" />\n<meta charset="UTF-8">\n<meta name="viewport" content="width=device-width,initial-scale=1.0" />\n<meta name="alexaVerifyID" content="VnZ_Et-Jpg6HoBQRuWYP1dYWaa0" />\n<meta property="fb:app_id" content="471603786214644" />\n<meta property="og:site_name" content="RadioCut" />\n<meta property="og:image" content="https://static.radiocut.com.ar/images/radio_cut_96.png" />\n<meta name="description" content="\n            \n            Escucha radios online de Argentina\n            \n          y programas de la radio FM, AM, Noticias, Música... Retrocede horas y días atrás y crea tus propios recortes de la radio.\n        ">\n<meta name="keywords" content="radios,radiodifusion,programa de radio,noticias,archivo de radios, podcast, deportes,musica,radiocut">\n<meta name="apple-itunes-app" content="app-id=1120326976">\n<meta name="google-play-app" content="app-id=com.l

##### 3er paso: servirse la cucharada de sopa y separar el osobuco

In [26]:
from bs4 import BeautifulSoup
import re
from datetime import datetime, timedelta
import pytz
import locale

# Establece la localización en español
locale.setlocale(locale.LC_TIME, 'es_ES.UTF-8')

soup = BeautifulSoup(docs[0], 'html.parser')

def clean_duration(duration_text):
    return duration_text.replace("Dur: ", "").strip()

def convert_duration_to_date(duration_text):
    pattern = re.compile(r'(\d+)\s*año|(\d+)\s*mes|(\d+)\s*semana|(\d+)\s*día|(\d+)\s*hora|(\d+)\s*minuto')
    matches = pattern.findall(duration_text)
    
    years = months = weeks = days = hours = minutes = 0
    for match in matches:
        if match[0]: years = int(match[0])
        if match[1]: months = int(match[1])
        if match[2]: weeks = int(match[2])
        if match[3]: days = int(match[3])
        if match[4]: hours = int(match[4])
        if match[5]: minutes = int(match[5])
    
    now = datetime.now(pytz.timezone('America/Argentina/Buenos_Aires'))
    time_delta = timedelta(days=(years * 365 + months * 30 + weeks * 7 + days), hours=hours, minutes=minutes)
    result_date = now - time_delta
    utc_dt = result_date.astimezone(pytz.utc)
    
    # Formatea el día y el mes manualmente para evitar ceros a la izquierda
    day = utc_dt.day
    month = utc_dt.strftime('%B')
    year = utc_dt.year
    time_str = utc_dt.strftime('%H:%M:%S')
    readable_date = f'{day} de {month} de {year} {time_str}'
    
    return readable_date

data_list = []

containers = soup.find_all('div', class_='col-sm-10 col-xs-8')

for container in containers:
    # Extraer el título
    title_tag = container.find('a')
    title = title_tag.text.strip() if title_tag else None

    if title:  # Sólo sigue si hay título
        # Extrae la duración y la emprolija
        duration_tag = container.find('span', class_='text-muted')
        duration = clean_duration(duration_tag.text) if duration_tag else None

        # Extrae la descripción y la emprolija
        description_tag = container.find('p', class_='col-sm-12 description text-left')
        description = description_tag.text.strip() if description_tag else None

        # Extrae la fecha de creación del contenedor actual
        p_element = container.find_all('p')[-1]  # Asumiendo que la fecha está en el último <p> del contenedor
        first_text = p_element.text.strip().split('\n')[0].strip() if p_element else ""
        readable_date = convert_duration_to_date(first_text)
        
        # Crear un diccionario para almacenar la data
        data = {
            "title": title,
            "duration": duration,
            "description": description,
            "fecha": readable_date
        }
        
        if description:
            data_list.append(data)
            print(data)

{'title': 'Vamos Al Frente. Programa 03/08/2024.', 'duration': '60:37', 'description': 'Programa del Frente Telefónico, Agrupación de Trabajadorxs en FOETRA Sindicato de las Telecomunicaciones. Sábados de 9 a 10 hs. Radio Gráfica FM 89.3, www.radiografica.org.ar- Conflicto Salarial 2024/25: la CONSITEL lanza medidas por las paritarias de las telecomunicaciones.- Entrevista a Johana Duarte, Secretaria Gremial de la UTEP. Marcha Paz Pan Tierra Techo y Trabajo de San Cayetano en unidad con la CGT, la CTA-T y la CTA-A.- Entrevista a Rodrigo Cipriano García, de la Comisión Provincial por la Memoria. Acciones de La Libertad Avanza cuestionando el programa Jóvenes y Memoria de la Provincia de Buenos Aires.Conducción: Ariel Ernesto Velazquez y Pablo SuárezCon Maringa ÁlvarezProducción: VAFOperación técnica: Julián Pelliza', 'fecha': '3 de agosto de 2024 12:51:39'}
{'title': 'Al predio El Faro sitio de la memoria, el intendente Montenegro lo ve como un negocio (Ana Pecoraro)', 'duration': '17:2

Obtengo la descripción con más caracteres como para saber

In [44]:
def find_max_description_length(data_list):
    max_length = 0
    dictDescLenMax = {}
    for item in data_list:
        description = item.get("description", "")
        current_length = len(description)
        currentMax = item
        if current_length > max_length:
            max_length = current_length
            dictDescLenMax = currentMax
    return dictDescLenMax

dictDescLenMax = find_max_description_length(data_list).get("description")
max_length = len(dictDescLenMax)
print(f"La descripción con longitud máxima tiene: {max_length} caracteres con espacios")
print(f"y es: {dictDescLenMax}")


La descripción con longitud máxima tiene: 916 caracteres con espacios
y es: Gabriel Katopodis, Ministro de Infraestructura y Servicios Públicos de la Provincia de Buenos Aires, habló con Pablo Iummato sobre la decisión de quitarle a la provincia de Buenos Aires una multimillonaria inversión de YPF. Katopodis sostuvo que “Milei viene robándole a la Provincia de Buenos Aires los fondos de educación, de seguridad, obra pública y ahora no tuvo mejor idea que robarle la inversión de YPF” y agregó que “es un castigo a la Provincia y es el común denominador desde que asumió Milei”. Además, sostuvo que “cuando revisas las declaraciones del Presidente, los decretos que firmo, las distintas decisiones que fue tomando, todas van sistemáticamente contra la Provincia de Buenos Aires”. “El Presidetne está buscando que peleemos con la provincia de Rio Negro o que ataquemos al Gobernador, cosa que no haremos, porque acá no gano o perdió una provincia, acá perdió Argentina”, concluyó Katopodis.


###### 1er paso: Unir todos los diccionarios en un solo archivo de texto. Split y chunk
###### 2do paso: Convertir los diccionarios en vectores con sentenceTransformer (embedding)
###### 3er paso: Cargar a una base de datos de vectores
###### 4to paso: Pasar a la IA

In [68]:
def write_dicts_to_text_file(dict_list, file_path):
    with open(file_path, 'w', encoding='utf-8') as file:
        for dictionary in dict_list:
            for key, value in dictionary.items():
                # Escribe cada llave y valor en el archivo
                file.write(f"{key}: {value}\n")
            # Añade una línea en blanco entre diccionarios para separación
            file.write("\n")

def read_file_content(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        return file.read()

def split_text_into_chunks(text, chunk_size):
    return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]

file_path = 'output.txt'

# Write the dictionary list to the text file
write_dicts_to_text_file(data_list, file_path)

# Read the content from the file
file_content = read_file_content(file_path)

# Define the chunk size
chunk_size = 2048

# Split the file content into chunks
chunks = split_text_into_chunks(file_content, chunk_size)

# Print the number of chunks and their contents
print(f"Number of chunks: {len(chunks)}")

Number of chunks: 7


##### 2do paso: embeddings

In [70]:
import numpy as np

def get_text_embedding(input):
    embeddings_batch_response = client.embeddings(
          model="mistral-embed",
          input=input
      )
    return embeddings_batch_response.data[0].embedding
text_embeddings = np.array([get_text_embedding(chunk) for chunk in chunks])


##### 3er paso: cargar en la base de datos de vectores

Para probar: https://github.com/pgvector/pgvector

###### Para usar la FAISS Instalación de miniconda3 y: https://docs.conda.io/projects/conda/en/stable/user-guide/getting-started.html

In [4]:
import faiss                   # make faiss available
index = faiss.IndexFlatL2(d)   # build the index
print(index.is_trained)

ModuleNotFoundError: No module named 'faiss'