In [None]:
### Init ###

# Packages
from typing import List, Dict, Callable, Any

import pandas as pd

# Constants
min_magnitude = 0
max_magnitude = 10

min_longitude = -180
max_longitude = 180

min_latitude = -90
max_latitude = 90

min_depth = 0

shuffle_seed = 42

# Datasets paths
datasets_paths = []

datasets_paths.append("Datasets/Earthquakes-180d.csv") # Earthquakes-180d Dataset
datasets_paths.append("Datasets/Earthquakes-1990-2023.csv") # Earthquakes-1990-2023 Dataset

remove_features = {}

remove_features[datasets_paths[0]] = ["id", "url"]
remove_features[datasets_paths[1]] = ["time", "state", "status", "tsunami", "significance", "data_type"]

rename_features = {}

rename_features[datasets_paths[0]] = {"mag": "magnitude", "depth_km": "depth", "time_utc": "date"}
rename_features[datasets_paths[1]] = {"magnitudo": "magnitude"}

datasets_filtered_paths = {dataset_path: dataset_path.replace(".csv", "-filtered.csv") for dataset_path in datasets_paths}

datasets_filtered_subsets_sizes = {}

datasets_filtered_subsets_sizes[datasets_paths[0]] = {"18K": 18000}
datasets_filtered_subsets_sizes[datasets_paths[1]] = {"1M": int(1e6), "2M": int(2e6), "3M": int(3e6)}

datasets_filtered_subsets_paths = {dataset_path: {dataset_filtered_subset_name:
                                   datasets_filtered_paths[dataset_path].replace(".csv", "-" + dataset_filtered_subset_name + ".csv")
                                   for dataset_filtered_subset_name in datasets_filtered_subsets_sizes[dataset_path].keys()}
                                   for dataset_path in datasets_paths}

In [None]:
### Methods ###

def filter_dataset_feature(dataset, feature_name: str, min_value: float = float("-inf"), max_value: float = float("inf"), include_min_max: bool = True):
    if include_min_max:
        return dataset[(dataset[feature_name] >= min_value) & (dataset[feature_name] <= max_value)]
    else:
        return dataset[(dataset[feature_name] > min_value) & (dataset[feature_name] < max_value)]
    
def filter_dataset(dataset):
    # Filter magnitude
    dataset = filter_dataset_feature(dataset, "magnitude", min_magnitude, max_magnitude, False)

    # Filter longitude
    dataset = filter_dataset_feature(dataset, "longitude", min_longitude, max_longitude, True)

    # Filter latitude
    dataset = filter_dataset_feature(dataset, "latitude", min_latitude, max_latitude, True)

    # Filter depth
    dataset = filter_dataset_feature(dataset, "depth", min_depth, include_min_max = True)

    # Substitutions
    dataset["place"] = dataset["place"].map(lambda place: place.replace("CA", "California"))

    return dataset

def print_dataset(dataset_name: str, dataset):
    print(f"### {dataset_name} ###")
    print(dataset.info())
    print(dataset.describe())

def create_filtered_dataset(dataset, remove_features: List[str], rename_features: Dict[str, str],
                            filter_dataset: Callable = filter_dataset):
    # Rename features
    dataset.rename(columns = rename_features, inplace = True)

    # Filter dataset
    dataset = filter_dataset(dataset)

    # Features selection
    dataset.drop(columns = remove_features, inplace = True)

    # Drop duplicates
    dataset.drop_duplicates(inplace = True)

    # Reset index
    dataset.reset_index(drop = True, inplace = True)

    return dataset

def create_dataset(dataset, dataset_path: str, create_dataset: Callable, create_dataset_params: Dict[str, Any],
                   load_dataset: bool = True, save_dataset: bool = True):
    print(f"Start of creation of dataset ({dataset_path})")
    
    # Load dataset
    if load_dataset: dataset = pd.read_csv(dataset)

    # Create dataset
    dataset = create_dataset(dataset, **create_dataset_params)

    # Save dataset
    if save_dataset: dataset.to_csv(dataset_path, index = False)

    print(f"End of creation of dataset ({dataset_path})")

    # Print dataset
    print_dataset(f"Dataset ({dataset_path.replace(".csv", "")})", dataset)

    return dataset

def create_subset(dataset, subset_size: int):
    # Sample dataset
    subset = dataset.sample(n = (subset_size if subset_size <= len(dataset) else len(dataset)), random_state = shuffle_seed)

    # Reset index
    subset.reset_index(drop = True, inplace = True)

    return subset

In [None]:
### Filters ###

def filter_dataset2(dataset):
    # Filter earthquakes
    dataset = dataset[dataset["data_type"] == "earthquake"]
    
    # Filter dataset
    dataset = filter_dataset(dataset)

    return dataset

filter_datasets = {}

filter_datasets[datasets_paths[0]] = filter_dataset
filter_datasets[datasets_paths[1]] = filter_dataset2

In [None]:
### Create filtered datasets ###

datasets_filtered = {dataset_path: create_dataset(dataset_path, datasets_filtered_paths[dataset_path], create_filtered_dataset,
                     {"remove_features": remove_features[dataset_path], "rename_features": rename_features[dataset_path],
                      "filter_dataset": filter_datasets[dataset_path]})
                     for dataset_path in datasets_paths}

In [None]:
### Load filtered datasets ###

datasets_filtered = {dataset_path: pd.read_csv(datasets_filtered_paths[dataset_path]) for dataset_path in datasets_paths}

In [None]:
### Create filtered datasets subsets ###

datasets_filtered_subsets = {dataset_path: {dataset_filtered_subset_name: create_dataset(datasets_filtered[dataset_path],
                             datasets_filtered_subsets_paths[dataset_path][dataset_filtered_subset_name],
                             create_subset, {"subset_size": dataset_filtered_subset_size}, False)
                             for (dataset_filtered_subset_name, dataset_filtered_subset_size) in datasets_filtered_subsets_sizes[dataset_path].items()}
                             for dataset_path in datasets_paths}