<a href="https://colab.research.google.com/github/vmartinezarias/Curso_Ecologia_Paisaje_y-Ecoacustica/blob/main/9%20-%20Geofonias.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Verificar Procesadores

In [None]:
from psutil import cpu_count
print(f"Número de CPUs disponibles: {cpu_count(logical=True)}")

In [None]:
!pip install openpyxl

# ANÁLISIS GEOFONÍAS (LLUVIA)
Este algoritmo se basa Tras efectuar el cambio de nombre, se procede a correr el algoritmo de ruidos fuertes. Este es un  Script de Python, para identificar grabaciones acústicas con lluvia. Se basa en el trabajo realizado por Bedoya et al (2017).

## Carga de paquetes requeridos

In [None]:
import soundfile as sf
import numpy as np
import pandas as pd
from scipy import signal, stats
import os
import tqdm
from multiprocessing import Pool
import time
from datetime import timedelta
from google.colab import drive

# Montar Google Drive
drive.mount('/content/drive')

## Definir los parámetros de entrada del algoritmo

In [None]:
# **Parámetros de Entrada** (Definibles al inicio)
ruta_datos = "/content/drive/MyDrive/CursoPaisajeAudios/"  # Carpeta raíz con audios
folder_rain = "/content/drive/MyDrive/Curso_Ecologia_Paisaje_Ecoacustica/resultados"  # Carpeta para guardar resultados
name_file = "Resultado_lluvia.xlsx"  # Nombre del archivo Excel de salida
formatos = ['wav', 'WAV']  # Formatos aceptados
n_cores = 50  # Número de núcleos para procesamiento paralelo
exclude_these_sites = []  # Sitios a excluir

# Crear la carpeta de resultados si no existe
os.makedirs(folder_rain, exist_ok=True)

## Correr algoritmo de lluvia

In [None]:
def meanspec(audio, Fs=1, wn="hann", ovlp=0, wl=512, nfft=None, norm=True):
    f, t, Zxx = signal.stft(audio, fs=Fs, window=wn, noverlap=ovlp, nperseg=wl, nfft=nfft)
    mspec = np.mean(np.abs(Zxx), axis=1)
    if norm:
        mspec = mspec / max(mspec)
    return f, mspec

def calculo_PSD_promedio(df_ll, pbar=None):
    canal = 0
    fmin = 200
    fmax = 11250
    tipo_ventana = "hann"
    sobreposicion = 0
    tamano_ventana = 1024
    nfft = tamano_ventana
    banda_lluvia = (600, 1200)

    ruta_archivo = df_ll.path_FI
    grupo = df_ll.field_number_PR

    try:
        x, Fs = sf.read(ruta_archivo)
        audio = x if len(x.shape) == 1 else x[:, canal]
        puntos_minuto = Fs * 60
        npuntos = len(audio)
        banda = []

        for seg in range(0, npuntos, puntos_minuto):
            f, p = signal.welch(audio[seg:puntos_minuto+seg], Fs, nperseg=512, window=tipo_ventana, nfft=nfft, noverlap=sobreposicion)
            banda.append(p[np.logical_and(f >= banda_lluvia[0], f <= banda_lluvia[1])])

        banda = np.concatenate(banda)
        PSD_medio = np.mean(banda)

        f, mspec = meanspec(audio, Fs, tipo_ventana, sobreposicion, tamano_ventana, nfft)
        cond = np.logical_and(f > fmin, f < fmax)
        feats = list(mspec[cond])
        freqs = list(f[cond])
        titulos = [f"mPSD_{int(freqs[i])}" for i in range(len(freqs))]

        zip_iterator = zip(titulos, feats)
        if pbar is not None:
            pbar.update(1)

        return ruta_archivo, dict(zip_iterator), PSD_medio, 'NO', grupo
    except:
        if pbar is not None:
            pbar.update(1)
        print(f"El archivo {ruta_archivo} está corrupto.")
        return ruta_archivo, {}, 0, 'YES', grupo

def _apply_df(args):
    df, func = args
    res = df.apply(func, axis=1)
    return res

def regla_decision(x, umbral):
    if x != 0:
        return "NO" if x < umbral else "YES"
    return "PSD medio 0"

def algoritmo_lluvia_imp(df_ind):
    df_lluvia = df_ind.loc[df_ind.damaged_FI == 'NO', :].copy()
    df_no_lluvia = df_ind.loc[df_ind.damaged_FI == 'YES', :].copy()

    PSD_medio = np.array(df_lluvia.PSD_medio.values).astype(np.float64)
    PSD_medio_sin_ceros = PSD_medio[PSD_medio > 0]
    umbral = (np.mean(PSD_medio_sin_ceros) + stats.mstats.gmean(PSD_medio_sin_ceros)) / 2

    df_lluvia['rain_FI'] = df_lluvia.PSD_medio.apply(regla_decision, umbral=umbral)
    df_lluvia = df_lluvia.drop(['PSD_medio'], axis=1)

    df_no_lluvia['rain_FI'] = 'Archivo corrupto'
    df_no_lluvia = df_no_lluvia.drop(['PSD_medio'], axis=1)

    df_indices_lluvia = pd.concat([df_lluvia, df_no_lluvia])
    assert df_indices_lluvia.shape[0] == df_ind.shape[0]
    return df_indices_lluvia

# **Inicio del análisis**
if __name__ == '__main__':
    dict_df = {"field_number_PR": [], "name_FI": [], "path_FI": []}

    print("Inventariando archivos...")
    start_time = time.time()
    for (root, dirs, files) in os.walk(ruta_datos):
        for f in files:
            if any([f".{formato}" in f for formato in formatos]) and not f.startswith("."):
                dict_df["field_number_PR"].append(os.path.basename(root))
                dict_df["name_FI"].append(f)
                dict_df["path_FI"].append(os.path.join(root, f))
    df = pd.DataFrame(dict_df)
    print(f"{len(df)} archivos encontrados")

    df = df.loc[df.field_number_PR.apply(lambda x: x not in exclude_these_sites), :]
    print(f"{len(df)} archivos después de excluir sitios")

    print("Calculando índices...")
    workers = min(len(df), n_cores)
    df_split = np.array_split(df, workers)

    with Pool(processes=workers) as pool:
        result = list(tqdm.tqdm(pool.imap(_apply_df, [(d, calculo_PSD_promedio) for d in df_split]), total=len(df_split)))

    x = pd.concat(result)
    x = np.array(list(zip(*x))).T

    df_ind = pd.DataFrame(list(x[:, 1]))
    df_ind['path_FI'] = x[:, 0]
    df_ind['PSD_medio'] = x[:, 2]
    df_ind['damaged_FI'] = x[:, 3]
    df_ind['grupo'] = x[:, 4]

    df_lluvias = []
    for i in tqdm.tqdm(df_ind['grupo'].unique()):
        df_tmp = df_ind[df_ind.grupo == i]
        df_lluvias.append(algoritmo_lluvia_imp(df_tmp))

    df_indices_lluvia = pd.concat(df_lluvias)
    assert len(df) == len(df_indices_lluvia)

    df_y = df.merge(df_indices_lluvia, how='left', on='path_FI')
    df_y = df_y.drop(['path_FI', 'damaged_FI', 'grupo'], axis=1)

    path_file = os.path.join(folder_rain, name_file)
    print(f"Guardando resultados en {path_file}...")
    df_y.to_excel(path_file, index=False)
    print(f"Resultados guardados en {path_file}")
    print(f"Tiempo de ejecución: {str(timedelta(seconds=time.time() - start_time))}")


## Graficar eventos de lluvia y archivos defectuosos por sitio y fecha

In [None]:
# Importar las bibliotecas necesarias
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from google.colab import drive

# Montar Google Drive
drive.mount('/content/drive')

# Ruta al archivo Excel en Google Drive
file_path = '/content/drive/MyDrive/Curso_Ecologia_Paisaje_Ecoacustica/Geofonias/Resultado_lluvia.xlsx'

# Cargamos el archivo de Excel en un DataFrame
ResultadosLluvia = pd.read_excel(file_path)

# Función para convertir el nombre del archivo en fecha y hora
def convertir_nombre_fecha(nombre_archivo):
    fecha_hora_str = nombre_archivo.split('_')[1] + '_' + nombre_archivo.split('_')[2].replace('.WAV', '')
    return pd.to_datetime(fecha_hora_str, format='%Y%m%d_%H%M%S')

# Convertir 'name_FI' a datetime
ResultadosLluvia['name_FI'] = ResultadosLluvia['name_FI'].apply(convertir_nombre_fecha)

# Convertir 'field_number_PR' a tipo categórico
ResultadosLluvia['field_number_PR'] = ResultadosLluvia['field_number_PR'].astype('category')

# Definir el orden de las categorías para 'rain_FI'
categorias = ['YES', 'NO', 'Tiempo diferente', 'Archivo corrupto']
ResultadosLluvia['rain_FI'] = pd.Categorical(ResultadosLluvia['rain_FI'], categories=categorias, ordered=True)

# Crear la gráfica
plt.figure(figsize=(12, 6))

# Dibujar todos los puntos excepto donde rain_FI es 'YES'
sns.scatterplot(data=ResultadosLluvia[ResultadosLluvia['rain_FI'] != 'YES'], x='name_FI', y='field_number_PR', hue='rain_FI',
                palette={'YES': 'blue', 'NO': 'grey', 'Tiempo diferente': 'orange', 'Archivo corrupto': 'red'})

# Dibujar solo los puntos donde rain_FI es 'YES' para que se vean encima
sns.scatterplot(data=ResultadosLluvia[ResultadosLluvia['rain_FI'] == 'YES'], x='name_FI', y='field_number_PR',
                color='blue')

# Personalizar la gráfica
plt.title('Distribución de eventos de lluvia')
plt.xlabel('Fecha')
plt.ylabel('Sitio')
plt.xticks(rotation=90)
plt.legend(title='Lluvia', bbox_to_anchor=(1.05, 1), loc='upper left')

sns.set_theme(style="whitegrid")

# Guardar la gráfica como archivo PNG en Google Drive
output_path = '/content/drive/MyDrive/Curso_Ecologia_Paisaje_Ecoacustica/Geofonias/DistribucionEventosLluvia.png'
plt.savefig(output_path, bbox_inches='tight')

# Mostrar la gráfica
plt.show()


## Graficar lluvia por horas

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from google.colab import drive

# Montar Google Drive
drive.mount('/content/drive')

# Ruta del archivo Excel en Google Drive
file_path = '/content/drive/MyDrive/Curso_Ecologia_Paisaje_Ecoacustica/Geofonias/Resultado_lluvia.xlsx'

# Cargamos el archivo de Excel en un DataFrame
ResultadosLluvia = pd.read_excel(file_path)

# Función para convertir el nombre del archivo en hora
def convertir_nombre_hora(nombre_archivo):
    hora_str = nombre_archivo.split('_')[2].replace('.WAV', '')
    return int(hora_str[:2])

# Convertir 'name_FI' a hora
ResultadosLluvia['hora_FI'] = ResultadosLluvia['name_FI'].apply(convertir_nombre_hora)

# Convertir 'field_number_PR' a tipo categórico
ResultadosLluvia['field_number_PR'] = ResultadosLluvia['field_number_PR'].astype('category')

# Definir el orden de las categorías para 'rain_FI'
categorias = ['YES', 'NO', 'Tiempo diferente', 'Archivo corrupto']
ResultadosLluvia['rain_FI'] = pd.Categorical(ResultadosLluvia['rain_FI'], categories=categorias, ordered=True)

# Agrupar por hora y contar los eventos de lluvia
hora_lluvia = ResultadosLluvia[ResultadosLluvia['rain_FI'] == 'YES'].groupby('hora_FI').size().reindex(range(24), fill_value=0).reset_index(name='count')

# Crear la gráfica
plt.figure(figsize=(10, 6))
barplot = sns.barplot(data=hora_lluvia, x='hora_FI', y='count', palette='Blues_d')

# Personalizar la gráfica
plt.title('Distribución de eventos de lluvia por hora')
plt.xlabel('Hora')
plt.ylabel('Número de eventos de lluvia')
plt.xticks(rotation=90)
sns.set_theme(style="whitegrid")

# Ajustar la paleta de colores para que sea más oscura con más eventos
norm = plt.Normalize(min(hora_lluvia['count']), max(hora_lluvia['count']))
for i, bar in enumerate(barplot.patches):
    bar.set_facecolor(plt.cm.Blues(norm(bar.get_height())))

# Guardar la gráfica como archivo PNG en Google Drive
output_path = '/content/drive/MyDrive/Curso_Ecologia_Paisaje_Ecoacustica/Geofonias/DistribucionEventosLluviaPorHora.png'
plt.savefig(output_path, bbox_inches='tight')

# Mostrar la gráfica
plt.show()


## Conteo final de archivos

In [None]:
import pandas as pd
from google.colab import drive

# Montar Google Drive
drive.mount('/content/drive')

# Ruta del archivo Excel en Google Drive
file_path = '/content/drive/MyDrive/Curso_Ecologia_Paisaje_Ecoacustica/Geofonias/Resultado_lluvia.xlsx'

# Cargamos el archivo de Excel en un DataFrame
ResultadosLluvia = pd.read_excel(file_path)

# Agrupamos los datos por 'field_number_PR' y 'rain_FI', y contamos las ocurrencias
conteo_por_carpeta = ResultadosLluvia.groupby(['field_number_PR', 'rain_FI']).size().unstack(fill_value=0)

# Calculamos el total general por etiqueta 'rain_FI'
total_general = ResultadosLluvia['rain_FI'].value_counts()

# Imprimimos los resultados
print("Conteo por carpeta (field_number_PR):")
print(conteo_por_carpeta)

print("\nTotal general por etiqueta de lluvia (rain_FI):")
print(total_general)

# Guardar los resultados en un archivo Excel
output_counts_path = '/content/drive/MyDrive/Curso_Ecologia_Paisaje_Ecoacustica/Geofonias/Conteo_Lluvia.xlsx'
with pd.ExcelWriter(output_counts_path) as writer:
    conteo_por_carpeta.to_excel(writer, sheet_name='Conteo por Carpeta')
    total_general.to_frame(name='Total General').to_excel(writer, sheet_name='Total General')

print(f"Resultados guardados en: {output_counts_path}")


# EXTRACCIÓN DE ARCHIVOS

Ahora, se generará una estraccción de los archivo que NO tengan lluvia y que no estén dañados

In [None]:
import os
import shutil
import pandas as pd
from google.colab import drive

# Montar Google Drive
drive.mount('/content/drive')

# Ruta del archivo Excel en Google Drive
file_path = '/content/drive/MyDrive/Curso_Ecologia_Paisaje_Ecoacustica/Geofonias/Resultado_lluvia.xlsx'

# Cargamos el archivo de Excel en un DataFrame
ResultadosLluvia = pd.read_excel(file_path)

# Filtramos para obtener solo los archivos donde rain_FI es 'NO'
archivos_sin_lluvia = ResultadosLluvia[ResultadosLluvia['rain_FI'] == 'NO']

# Directorio raíz donde están los subdirectorios con los archivos de audio originales en Google Drive
root_directory_path = '/content/drive/MyDrive/CursoPaisajeAudios/'  #### Introudcir la ruta correspondiente

# Directorio donde se copiarán los archivos en Google Drive
new_directory_path = '/content/drive/MyDrive/Curso_Ecologia_Paisaje_Ecoacustica/Audios_SinLluvia'

# Copiar los archivos manteniendo la estructura de subcarpetas
for file_name in archivos_sin_lluvia['name_FI']:
    # Asegúrate de que file_name es una cadena
    file_name_str = str(file_name).strip()
    archivo_encontrado = False

    # Buscar el archivo en el directorio raíz y sus subdirectorios
    for subdir, _, files in os.walk(root_directory_path):
        if file_name_str in files:
            full_file_path = os.path.join(subdir, file_name_str)
            # Conservar la estructura de subcarpetas para la nueva ubicación
            subfolder_structure = os.path.relpath(subdir, root_directory_path)
            new_subfolder_path = os.path.join(new_directory_path, subfolder_structure)
            if not os.path.exists(new_subfolder_path):
                os.makedirs(new_subfolder_path)
            destination_file_path = os.path.join(new_subfolder_path, file_name_str)
            shutil.copy(full_file_path, destination_file_path)
            archivo_encontrado = True
            print(f"Copiado: {destination_file_path}")
            break

    if not archivo_encontrado:
        print(f"No se encontró el archivo: {file_name_str}")

print("Proceso de copiado finalizado.")
