In [None]:
#!pip install modin[ray]
#!pip install -U ipywidgets

In [2]:
import requests
from xml.etree import ElementTree as ET
import modin.pandas as pd
import os
import concurrent.futures

In [None]:
#os.makedirs("datos", exist_ok=True)

In [35]:
endpoint = 'https://storage.googleapis.com/validaciones_tmsa/'
def descargar_archivo(carpeta_destino, destino, url = endpoint):
    file = os.path.join(carpeta_destino, destino.split('/')[1])
    response = requests.get(url+destino)
    if response.status_code == 200:
        with open(file, 'wb') as f:
            f.write(response.content)
        print(f'Archivo descargado a {file}.')
    else:
        print(f'Error al descargar el archivo: {response.status_code}')

def descargar_archivos_paralelo(carpeta_destino, lista_destinos, url = endpoint):
    print(f'Descargando {len(lista_destinos)} archivos en paralelo...')
    with concurrent.futures.ThreadPoolExecutor() as executor:
        print(f'Usando {executor._max_workers} hilos...')
        executor.map(lambda x: descargar_archivo(carpeta_destino, x, url), lista_destinos)

In [36]:
def procesar_df(df):
    columns = ['Acceso_Estacion', 'Dispositivo', 'Estacion_Parada', 'Fecha_Transaccion']
    columns_upper = map(str.upper, columns)

    df = df[columns]
    df.columns = columns_upper

    df = df[df['ESTACION_PARADA'].str.contains('U. NACIONAL', na=False)].drop('ESTACION_PARADA', axis=1).convert_dtypes()
    df['DISPOSITIVO'] = df['DISPOSITIVO'].astype('int32')
    df['FECHA_TRANSACCION'] = pd.to_datetime(df['FECHA_TRANSACCION'], format='%Y-%m-%d %H:%M:%S')

    df.sort_values(by=['FECHA_TRANSACCION'], ascending=[True], inplace=True)

    return df

In [37]:
def procesar_df_lista(carpeta, file):
    f_name = os.path.join(carpeta, file.split('/')[1])

    try:
        df_tmp = pd.read_csv(f_name)
        df_tmp = procesar_df(df_tmp)
        print(f'Data frame {f_name} procesado')
        try:
            os.remove(f_name)
            print(f'Archivo {f_name} eliminado')
        except Exception as e:
            print(f'Error eliminando el archivo {f_name}: {e}')
            return None
    except Exception as e:
        print(f'Error procesando el df {f_name}: {e}')
        return None
    return df_tmp

In [38]:
response = requests.get(endpoint)
xml_data = response.text

root = ET.fromstring(xml_data)

In [39]:
files = []

namespace = {'ns': 'http://doc.s3.amazonaws.com/2006-03-01'}
total_size = 0
for element, size in zip(root.iterfind('.//ns:Key', namespace), root.iterfind('.//ns:Size', namespace)):
    if 'validacionTroncal202401' in element.text or 'validacionTroncal202402' in element.text or 'validacionTroncal202403' in element.text:
        files.append(element.text)
        total_size += int(size.text)

In [None]:
total_size_gb = total_size / (1024**3)
print(f'Tamaño total: {total_size_gb:.2f} GB')

In [None]:
len(files)

In [None]:
import shutil

total, used, free = shutil.disk_usage("/")
free_gb = free / (1024**3)

print(f"Espacio libre en el disco duro: {free_gb:.2f} GB")


In [41]:
ingresos_primer_trimestre_2024 = pd.DataFrame()

length = len(files)
batches = 4
for i in range(batches):
    files_batch = files[i*length//batches:(i+1)*length//batches]
    descargar_archivos_paralelo("datos", files_batch)
    dataframes = [procesar_df_lista("datos", f) for f in files_batch]
    ingresos_primer_trimestre_2024 = pd.concat([ingresos_primer_trimestre_2024] + dataframes, join='outer', ignore_index=True)

print('**********TODOS LOS ARCHIVOS HAN SIDO PROCESADOS**********')

Descargando 57 archivos en paralelo...
Usando 8 hilos...
Archivo descargado a datos/validacionTroncal20240204.csv.
Archivo descargado a datos/validacionTroncal20240211.csv.
Archivo descargado a datos/validacionTroncal20240210.csv.
Archivo descargado a datos/validacionTroncal20240130.csv.
Archivo descargado a datos/validacionTroncal20240131.csv.
Archivo descargado a datos/validacionTroncal20240205.csv.
Archivo descargado a datos/validacionTroncal20240202.csv.
Archivo descargado a datos/validacionTroncal20240207.csv.
Archivo descargado a datos/validacionTroncal20240201.csv.
Archivo descargado a datos/validacionTroncal20240218.csv.
Archivo descargado a datos/validacionTroncal20240212.csv.
Archivo descargado a datos/validacionTroncal20240213.csv.
Archivo descargado a datos/validacionTroncal20240217.csv.
Archivo descargado a datos/validacionTroncal20240215.csv.
Archivo descargado a datos/validacionTroncal20240214.csv.
Archivo descargado a datos/validacionTroncal20240216.csv.
Archivo descarg

: 

: 

In [None]:
jupyter.viewOutput

In [None]:
dataframes = [procesar_df_lista("datos", f) for f in files]

print('**********TODOS LOS ARCHIVOS HAN SIDO PROCESADOS**********')

ingresos_primer_trimestre_2024 = pd.concat(dataframes, join='outer', ignore_index=True)

In [None]:
"""try:
    client = Client(processes=True)
    dataframes = compute(*[dask.delayed(descargar_y_procesar)(f) for f in files])
    print('**********TODOS LOS ARCHIVOS HAN SIDO PROCESADOS**********')

    ingresos_primer_trimestre_2024 = pd.concat(dataframes, join='outer', ignore_index=True)
except Exception as e:
    print(f'Error procesando los archivos con error: {e}')
finally:
    client.close()"""

In [None]:
ingresos_primer_trimestre_2024.to_csv('ingresos_primer_trimestre_2024.csv')

Please refer to https://modin.readthedocs.io/en/stable/supported_apis/defaulting_to_pandas.html for explanation.


In [None]:
from google.colab import files

files.download('ingresos_primer_trimestre_2024.csv')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
ingresos_primer_trimestre_2024

In [None]:
os.list()