In [None]:
%matplotlib ipympl

import fastmap as fm
import h5py
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import obspy
import pathlib
import sklearn.model_selection
import sklearn.preprocessing
import sklearn.svm

# Build a test database

In [None]:
DATABASE_PATH = "../data/database_8.h5"

with h5py.File(DATABASE_PATH, mode="r") as f5:
    tr_noise = f5["training_noise"]
    tr_eq = f5["training_events"]
    test_noise = f5["test_noise"]
    test_eq = f5["test_events"]
    X_train = np.vstack([tr_noise[:, -1], tr_eq[:, -1]])
    y_train = np.repeat([0, 1], [tr_noise.shape[0], tr_eq.shape[0]])
#     X_test = X_train.copy()
#     y_test = y_train.copy()
    X_test = np.vstack([test_noise[:, -1], test_eq[:, -1]])
    y_test = np.repeat([0, 1], [test_noise.shape[0], test_eq.shape[0]])

In [None]:
PHI = (1 + np.sqrt(5)) / 2

def plot_data(X_train, y_train):
    w = 342 / 72.27
    h = 260 / 72.27
    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(w, h))
    
    for j in range(2):
        for i, data in enumerate(X_train[np.argwhere(y_train == j).flatten()]):
            axes[j].plot(data / np.max(np.abs(data)) + i, color="k", linewidth=0.5)
            
        axes[j].set_xlim(0, X_train.shape[-1])
        axes[j].set_xlabel("Sample index")
        axes[j].set_yticklabels([])
        
    axes[0].set_ylabel("Trace index")
        
    return (fig, axes)
        
plt.close("all")
fig, axes = plot_data(X_train, y_train)
fig.suptitle("Training Data")
axes[0].set_title("Noise")
axes[1].set_title("Earthquakes")
plt.savefig(
    "/home/malcolmw/Google Drive/malcolmw@mit.edu/meetings/2021-08-18_nakata_group_project_meeting/src/figures/training_waveforms.png",
    dpi=360,
    bbox_inches="tight"
)

fig, axes = plot_data(X_test, y_test)
fig.suptitle("Test Data")
axes[0].set_title("Noise")
axes[1].set_title("Earthquakes")
plt.savefig(
    "/home/malcolmw/Google Drive/malcolmw@mit.edu/meetings/2021-08-18_nakata_group_project_meeting/src/figures/test_waveforms.png",
    dpi=360,
    bbox_inches="tight"
)

# Embed the test database

In [None]:
def make_meshgrid(W_max, ndim, iax, n=256):
    coords = np.meshgrid(
        *[
            np.linspace(-W_max, W_max, n)
            if i in iax
            else [0]
            for i in range(ndim)
        ],
        indexing="ij"
    )
    return (coords)


def plot_clf(ax, clf, W_max, ndim, iax, **params):
    """Plot the decision boundaries for a classifier.

    Parameters
    ----------
    ax: matplotlib axes object
    clf: a classifier
    xx: meshgrid ndarray
    yy: meshgrid ndarray
    params: dictionary of params to pass to contourf, optional
    """

    coords = make_meshgrid(W_max, ndim, iax)

    Z = clf.decision_function(np.column_stack([xx.ravel() for xx in coords]))
    Z = Z.reshape(coords[iax[0]].shape)
    
    slices = tuple([slice(None) if i in iax else 0 for i in range(ndim)])
    Z = Z[slices]
    xx = coords[iax[0]][slices]
    yy = coords[iax[1]][slices]
    
    out = ax.contourf(xx, yy, Z, cmap=plt.get_cmap("coolwarm_r"), zorder=100, alpha=0.8)
    
    return (out)

def plot_images(W, y, iax=slice(None), marker="o", clf=None, W_max=None, labels=["Noise", "Earthquake"]):
    
    w = 0.9 * 342 / 72.27
    h = 0.9 * 260 / 72.27
    
    W_max = 1.08*np.max(np.abs(W)) if W_max is None else W_max
    
    W = W[:, iax]
    ndim = W.shape[-1]
    
    fig, axes = plt.subplots(nrows=ndim-1, ncols=ndim-1, figsize=(w, h))
    
    for icol in range(ndim-1):
        for irow in range(icol, ndim-1):
            axes[irow, icol].scatter(
                W[:, icol],
                W[:, irow+1],
                marker=marker,
                c=y,
                cmap=CMAP,
                edgecolor="k",
                s=16,
                alpha=0.8,
                zorder=200
            )
            
            if clf is not None:
                plot_clf(axes[irow, icol], clf, W_max, ndim, [icol, irow+1])
            
            axes[irow, icol].set_xlim(-W_max, W_max)
            axes[irow, icol].set_ylim(-W_max, W_max)
            axes[irow, icol].xaxis.set_major_locator(mpl.ticker.MaxNLocator(3))
            axes[irow, icol].yaxis.set_major_locator(mpl.ticker.MaxNLocator(3))
            
            
            
            
    # Label x-axis on bottom row.
    for irow in range(ndim-1):
        label = irow+1 if iax == slice(None) else iax[irow+1]
        axes[irow, 0].set_ylabel(f"$x_{label}$")
        
    # Label y-axis on left column.
    for icol in range(ndim-1):
        label = icol if iax == slice(None) else iax[icol]
        axes[-1, icol].set_xlabel(f"$x_{label}$")
        
    # Turn off x-axis tick labels on all but last row.
    for irow in range(ndim-2):
        for icol in range(irow+1):
            axes[irow, icol].set_xticklabels([])
            
    # Turn off y-axis tick labels on all but left column.
    for irow in range(ndim-1):
        for icol in range(1, ndim-1):
            axes[irow, icol].set_yticklabels([])
        
    # Turn off upper triangle.
    for irow in range(ndim-1):
        for icol in range(irow+1, ndim-1):
            axes[irow, icol].set_frame_on(False)
            axes[irow, icol].set_xticks([])
            axes[irow, icol].set_yticks([])
            
            
    # Add a legend
    kwargs = dict(marker=marker, linewidth=0, markeredgecolor="k", )
    legend_elements = [
        mpl.lines.Line2D([0], [0], color=CMAP(0), label="Noise", **kwargs),
        mpl.lines.Line2D([0], [0], color=CMAP(1), label="Earthquake", **kwargs)
    ]
    axes[0, 2].legend(handles=legend_elements, loc="center")
            
    return (fig, axes)

In [None]:
path = pathlib.Path("test.hdf5")
path.unlink(missing_ok=True)
fastmap = fm.FastMap(X_train, fm.distance, 4, "test.hdf5")

fastmap.embed_database();

In [None]:
CMAP = mpl.colors.ListedColormap(['#FF0000', '#0000FF'])

W_train = fastmap.image[:]
W_test = np.vstack([fastmap.embed(X) for X in X_test])

scaler = sklearn.preprocessing.StandardScaler()
scaler.fit(W_train)
W_train = scaler.transform(W_train)
W_test = scaler.transform(W_test)

classifier = sklearn.svm.SVC(gamma=1/4, C=8)
classifier.fit(W_train, y_train)


plt.close("all")
fig, axes = plot_images(W_train, y_train)
fig.suptitle(f"{W_train.shape[-1]}-D FastMap images of Training Data")
plt.savefig(
    "/home/malcolmw/Google Drive/malcolmw@mit.edu/meetings/2021-08-18_nakata_group_project_meeting/src/figures/fastmap01.png",
    dpi=360,
    bbox_inches="tight"
)

fig, axes = plot_images(W_train, y_train, clf=classifier)
fig.suptitle(f"{W_train.shape[-1]}-D FastMap images of Training Data \n with SVM Decision Function")
plt.savefig(
    "/home/malcolmw/Google Drive/malcolmw@mit.edu/meetings/2021-08-18_nakata_group_project_meeting/src/figures/fastmap02.png",
    dpi=360,
    bbox_inches="tight"
)

fig, axes = plot_images(W_test, y_test, marker="s", clf=classifier, W_max=1.08*np.max(np.abs(W_train)))
fig.suptitle(f"{W_train.shape[-1]}-D FastMap images of Test Data with \n SVM Decision Function")
score = classifier.score(W_test, y_test)*100
axes[1, 2].text(
    0.5, 
    0.5, 
    f"Score: {score:.2f}%",
    ha="center",
    va="center",
    transform=axes[1, 2].transAxes
)

plt.savefig(
    "/home/malcolmw/Google Drive/malcolmw@mit.edu/meetings/2021-08-18_nakata_group_project_meeting/src/figures/fastmap03.png",
    dpi=360,
    bbox_inches="tight"
)

In [None]:
ax.get_legend_handles_labels(CMAP(0))

In [None]:
CMAP(1)