# Library Import
This section imports all the necessary libraries required for data processing, feature extraction, and cloud interaction.

In [3]:
import pandas as pd
import numpy as np
from obspy import read
from utils.feature_extraction import *
from timeit import default_timer as timer
from multiprocessing import Pool, cpu_count
import os
# from google.cloud import storage # Uncomment this line if you want to use Cloud Storage to upload the dataframe

# Loading the Data Catalog

In [None]:
df_geral = pd.read_csv('./data/lunar/training/catalogs/apollo12_catalog_GradeA_final.csv')
df_geral.head()

# Function to Parallelize Event Processing (CSV and mseed)
- This notebook was executed on Google Cloud, in a Vertex AI instance with 64 vCPUs and 64GB of RAM, to enable accelerated data processing.
- All data from the CSV files and the statuses from the mseed files are being extracted.
- **Additionally, we have a utility file for creating new event features using mathematical equations with the Scipy library; all these new features are utilized in training the neural network.**
Description:

## Each row represents an event from the lunar dataset. For each event:
- CSV data is loaded and processed. **We are adding the filename and a label to the dataframe**.
- Data from mseed files (including network and station status) is extracted.
- The CSV and mseed data are combined and returned as a single DataFrame.

In [None]:
# Function to process each row (parallelizable function)
def process_row(row):
    start = timer()
    filename = row['filename']
    time_rel_label = row['time_rel(sec)']
    data_directory = f'./data/lunar/training/data/{filename}'

    try:
        # Read CSV data
        csv_path = f"{data_directory}.csv"
        if not os.path.exists(csv_path):
            print(f"File {csv_path} not found, skipping row.",end='\n')
            return None

        df_data_csv = pd.read_csv(csv_path, parse_dates=['time_abs(%Y-%m-%dT%H:%M:%S.%f)'])
        df_data_csv['filename'] = filename
        df_data_csv['label'] = (df_data_csv['time_rel(sec)'] >= time_rel_label).astype(int)  # Create label

        # Concatenate features from mseed
        mseed_file = f'{data_directory}.mseed'
        st = read(mseed_file)
        df_data_csv['network'] = st[0].stats['network']
        df_data_csv['station'] = st[0].stats['station']
        df_data_csv['location'] = st[0].stats['location']
        df_data_csv['channel'] = st[0].stats['channel']
        df_data_csv['sampling_rate'] = st[0].stats['sampling_rate']
        df_data_csv['delta'] = st[0].stats['delta']
        df_data_csv['npts'] = st[0].stats['npts']
        df_data_csv['calib'] = st[0].stats['calib']

        # Concatenating additional features
        sampling_rate = st[0].stats['sampling_rate']
        features = process_seismic_data(df_data_csv, sampling_rate)
        df_data_csv['mean_velocity'] = features['mean_velocity']
        df_data_csv['std_velocity'] = features['std_velocity']
        df_data_csv['max_velocity'] = features['max_velocity']
        df_data_csv['min_velocity'] = features['min_velocity']
        df_data_csv['total_energy'] = features['total_energy']
        df_data_csv['rms_value'] = features['rms_value']
        df_data_csv['peak_count'] = features['peak_count']
        df_data_csv['valley_count'] = features['valley_count']
        df_data_csv['fft_values'] = features['fft_values']
        df_data_csv['fft_freqs'] = features['fft_freqs']
        df_data_csv['autocorrelation'] = features['autocorrelation']
        df_data_csv['acceleration'] = features['acceleration']
        df_data_csv['jerk'] = features['jerk']
        df_data_csv['cumulative_energy'] = features['cumulative_energy']

        end = timer()  # Stop the timer
        elapsed_time = end - start  # Calculate elapsed time
        print(f"Processing row took {elapsed_time:.4f} seconds", end='\n')

        return df_data_csv

    except Exception as e:
        print(f"Error processing row with filename {filename}: {e}")
        return None  # Return None if an error occurs

# Function to handle parallel processing
def parallel_process(df_geral):
    start2 = timer()
    # Create a pool of workers equal to the number of CPU cores
    pool = Pool(cpu_count())

    # Process rows in parallel and gather results
    result_dfs = pool.map(process_row, [row for index, row in df_geral.iterrows()])

    # Close the pool and wait for the work to finish
    pool.close()
    pool.join()

    # Filter out None results before concatenating
    result_dfs = [df for df in result_dfs if df is not None]

    # Combine all results into a single DataFrame if there is valid data
    if result_dfs:
        df_combined = pd.concat(result_dfs, ignore_index=True)
    else:
        df_combined = pd.DataFrame()  # Return an empty DataFrame if nothing is processed

    end2 = timer()  # Stop the timer
    elapsed_time = end2 - start2  # Calculate elapsed time
    print(f"Final process time took {elapsed_time:.4f} seconds", end='\n')

    return df_combined

In [None]:
df2=parallel_process(df_geral)
df2.shape

In [None]:
df2.to_csv("./training_lunar.csv")

# Saving the File to the Cloud for Faster Download When Needed

In [None]:
# Função para fazer upload de arquivos para o Google Cloud Storage
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)  # Faz upload do arquivo

In [None]:
upload_to_gcs("my-gcp-bucket","./training_lunar.csv","training_lunar.csv")