In [179]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException, ElementClickInterceptedException
import time
import requests
import os
import pandas as pd
from datetime import datetime
import pathlib
import pickle
from IPython.display import display, clear_output
from dotenv import load_dotenv, dotenv_values 
load_dotenv()

True

In [2]:
def find_sector_ids(url):
    """Function to scrap sector IDs in elements with onclick property from Banxico.

    Parameters
    ----------
    url : str
        banxico url containig the series id information

    Returns
    -------
    sector_ids : list
        list containing sector ids as strings
    """
    sector_ids = []
    try:
        driver = webdriver.Chrome()
        driver.get(url)

        wait = WebDriverWait(driver, 10)
        wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
        time.sleep(5)
        
        #driver.find_element_by_xpath(".//input[contains(@onclick, '1 Bedroom Deluxe')]")
        elements = driver.find_elements(By.XPATH, '//*[@onclick]')

        if len(elements) < 3: 
            driver.quit()
            time.sleep(10)
            find_sector_ids(url)


        for element_id in elements:
            onclick_txt = element_id.get_attribute('onclick')
            if "cargaDirectorioSector" in onclick_txt:
                sector_ids.append((onclick_txt.split(",")[0].split("(")[-1]))
        return sector_ids

    finally:
        driver.quit()


banxico_series_url = "https://www.banxico.org.mx/SieAPIRest/service/v1/doc/catalogoSeries" 

sector_ids = find_sector_ids(banxico_series_url)



In [3]:
sectores_api_url = f"https://www.banxico.org.mx/SieAPIRest/service/v1/cat/sectores/"
def find_cuadroid_by_sectorid(sectors_ids):
    """Fucntion calls sectores benxico API to retrieve Sector IDs gievn a list of sector

    Parameters
    ----------
    sector_ids : list
        list containing strings for sector ids

    Returns
    -------
    sector_ids : list
        list containing strings for cuadro ids 
    """
    id_cuadros = []

    def recursive_func(item):
        if isinstance(item, dict):
            if 'idCuadro' in item:
                id_cuadros.append(item['idCuadro'])
            if 'cuadros' in item:
                recursive_func(item['cuadros'])
        elif isinstance(item, list):
            for i_t in item:
                recursive_func(i_t)
                
    for sector_id in sectors_ids:
        api_url = sectores_api_url+sector_id
        response = requests.get(api_url, headers={"Accept": "application/json"})

        if response.status_code == 200:
            data = response.json()
            recursive_func(data['bmx']['cuadros']) 
        else:
            id_cuadros.append('')
    
    return id_cuadros
        

cuadros_ids = find_cuadroid_by_sectorid(sector_ids)



In [None]:
cuadros_api_url = "https://www.banxico.org.mx/SieAPIRest/service/v1/cat/estructuras/"
def find_serieid_by_cuadrorid(cuadros_list):
    found_series = []
    def recursive_func(item):
        if isinstance(item, dict):
            if 'idSerie' in item:
                if (item['idSerie'].strip() != "") and (item['idSerie'] not in found_series):
                    found_series.append(item['idSerie'])
            for value in item.values():
                recursive_func(value)
        elif isinstance(item, list):
            for i_t in item:
                recursive_func(i_t)
    
    for c_id in cuadros_list:
        api_url = cuadros_api_url+c_id
        response = requests.get(api_url, headers={"Accept": "application/json"})

        if response.status_code == 200:
            data = response.json()
            recursive_func(data['bmx']) 
        else:
            pass
    
    return found_series
                
series_ids = find_serieid_by_cuadrorid(cuadros_ids)

series_ids

In [None]:
def call_series_api(series_list, fechaIni, fechaFin, missing_file, not_processed_path, token):
    
    if isinstance(series_list, list):
        url = f"https://www.banxico.org.mx/SieAPIRest/service/v1/series/{','.join(series_list)}/datos/{fechaIni}/{fechaFin}"
        print(url)
    else:
        print("Data provided is not a list")
        return None, None
    
    params = {'token': token}
    try:
        response = requests.get(url, params=params)
        if response.status_code == 200:
            n_call['count'] += 1
            data = response.json()
            for s_i in data['bmx']['series']:
                if 'datos' in s_i.keys():
                    pd_series = pd.Series(
                                        data=[d['dato'] for d in s_i['datos']],
                                        index=pd.to_datetime([d['fecha'] for d in s_i['datos']], format='%d/%m/%Y'),
                                        name=s_i['idSerie']
                                    )
                    pd_series.index.name = 'fecha'
                    print(pd_series.tail(3))
                    file_name  = os.path.join(database_path, 'banxico_' + s_i['idSerie'] + '.csv')
                    if missing_file:
                        pd_series.to_csv(file_name)
                    else:
                        
                        old_series = pd.read_csv(file_name, usecols=['fecha', s_i['idSerie']], index_col='fecha').squeeze(axis=1)
                        new_series = pd_series[pd_series.index > old_series.index.max()]
                        updated_series = pd.concat([old_series, new_series])
                        updated_series.to_csv(file_name)
                        print("Series Updated: ", file_name)

            return None, None, False
        elif response.status_code == 404:
            n_call['count'] += 1
            print("Non existent series")
            append_to_pickle(not_existent_path, series_list)
            return None, None, False
        else:
            print(response)
            print(response.status_code)
            return None, None, True
    
    except requests.RequestException as e:
        print(f"Error fetching data: {e}")
        return None, None, True


def append_to_pickle(path, data):
    if os.path.isfile(path):
        with open(path, "rb") as fp:
            old_data = pickle.load(fp)
        old_data.append(data)
    else:
        old_data = data

    with open(path, "wb") as fp: 
        pickle.dump(old_data, fp)
    fp.close()


def search_series_data(series_list, fechaIni, fechaFin, missing_file, not_processed_path):
    clear_output(wait=True)
    max_ids = 20
    max_retries = 5
    initial_delay = 1
    for s_idx in range(0, len(series_list), max_ids):
        clear_output(wait=True)
        series_batch = series_list[s_idx:s_idx + max_ids]
        print("Processing: ", series_batch)
        while tokens[n_call['count']//10_000] != tokens[-1]:
            attempt = 1
            while attempt <= max_retries:
                results, data, response_error = call_series_api(series_batch, fechaIni, fechaFin, missing_file, not_processed_path,token=tokens[n_call['count']//10_000])
                if not response_error:
                    print("Succesfully processed: ", series_batch)
                    break
                else:
                    delay = initial_delay * (2 ** (attempt - 1))
                    print("Response error, waiting for ", delay)
                    time.sleep(delay)
                    attempt +=1
            else:
                print("Changing token")
                n_call['count']=+10_000
        else:
            print("Error proccessing: ", series_batch)
            append_to_pickle(not_processed_path, series_batch)

tokens = [os.getenv(f"BANXICO_KEY_{i}") for i in range(1,11)]
oldest_date = '1965-01-01'
today_date = datetime.today().strftime('%Y-%m-%d')
database_path = "database"
not_processed_path  = "not_processed"
not_existent_path  = "not_existent"
batch_size = int(len(series_ids)/10)
series_ids_trim = series_ids
n_call = {'count': 10_000}


if not os.path.isdir('database'):
    os.mkdir('database')

if os.path.isdir(not_existent_path):
    with open(not_existent_path, "rb") as fp:
        non_existent_series = pickle.load(fp)
    fp.close()
else:
    non_existent_series = []
    
print("non_existent_series: ", non_existent_series)

missing_csv = [i for i in series_ids_trim if (not os.path.isfile(os.path.join(database_path, 'banxico_' + i + '.csv'))) and (i not in non_existent_series)]


search_series_data(missing_csv, oldest_date, today_date, True, not_processed_path)

series_to_update = [i for i in series_ids_trim if i not in missing_csv]
print(f"series_to_update:{len(series_to_update)}: {series_to_update}")


dates = [pd.read_csv(os.path.join(database_path, f'banxico_{s}.csv'))['fecha'] for s in series_to_update]
#pd.read_csv(file_name, usecols=['fecha', s_i['idSerie']], index_col='fecha')

for i in range(len(dates)):
    print(dates[i], series_to_update[i])
    print(pd.to_datetime(dates[i]), series_to_update[i])

print("date: ", min(pd.to_datetime(date).max() for date in dates))
last_date_in_data = min(pd.to_datetime(date, format='%Y-%m-%d').max() for date in dates).strftime('%Y-%m-%d')

print("last_date_in_data: ", last_date_in_data)

search_series_data(series_to_update, last_date_in_data, today_date, False, not_processed_path)
