## Introduction section

Create dataset and define helper functions:

In [104]:
import numpy as np
import pandas as pd
from sklearn.datasets import make_blobs
from sklearn.cluster import DBSCAN
from sklearn.utils.random import sample_without_replacement
from imblearn.under_sampling import RandomUnderSampler

# create dataset from a random, two-dimensional distribution
N = 100000
X, y = make_blobs(
    n_samples=N, n_features=2, centers=[[0, 0]], cluster_std=3, random_state=2
)

# NOTE: mins and maxs are global variables so the renderer uses the same axes across plots
mins = X.min(axis=0)
maxs = X.max(axis=0)


def add_noise(X: np.ndarray, y: np.ndarray) -> tuple[np.ndarray, np.ndarray]:

    X2 = np.random.rand(int(N * 0.25), 2)
    y2 = np.full(int(N * 0.25), -1)
    X2[:, 0] = (X2[:, 0] * (maxs[0] - mins[0])) + mins[0]
    X2[:, 1] = (X2[:, 1]) * (maxs[1] - mins[1]) + mins[1]

    X_ = np.concatenate([X, X2], axis=0)
    y_ = np.concatenate([y, y2], axis=0)
    return X_, y_


# add "background noise" to the blobs to clarify the equal-density sample
X, y = add_noise(X, y)


def cluster(X: np.ndarray) -> np.ndarray:
    """
    Performs a simple dbscan clustering on the input dataset, returning the labels.
    """
    # NOTE: in dbscan: -1 is outlier, so inlier is > -1
    clustering = DBSCAN(eps=0.35, min_samples=100).fit(X)
    c = clustering.labels_
    return c


def to_df(X: np.ndarray, y: np.ndarray) -> pd.DataFrame:
    """
    Transforms the input dataset and labels into a DataFrame.
    """
    df = pd.DataFrame(X, columns=["A", "B"])
    df["y"] = y
    return df


def draw_random_sample(X: np.ndarray, y: np.ndarray, n_sample: int) -> pd.DataFrame:
    """
    Return a random (uniform) sample of the input data.
    """
    sample = sample_without_replacement(
        n_population=len(X), n_samples=n_sample, random_state=0
    )

    return to_df(X[sample], y[sample])


def draw_uniform_sample(X: np.ndarray, y: np.ndarray, n_sample: int) -> pd.DataFrame:
    """
    Cluster the data and then draw a random uniform sample.
    """
    c = cluster(X)
    c = (c > -1).astype(int)

    return draw_random_sample(X, c, n_sample)


def draw_inlier_sample(X: np.ndarray, y: np.ndarray, n_sample: int) -> pd.DataFrame:
    """
    Cluster the data and then only sample from inliers.
    """
    # dbscan: -1 is outlier, so inlier is > -1
    c = cluster(X)
    c = (c > -1).astype(int)

    # in case there are more than n_sample inliers, sample down uniformly
    return draw_random_sample(X[c == 1], c[c == 1], n_sample=n_sample)


def draw_outlier_sample(X: np.ndarray, y: np.ndarray, n_sample: int) -> pd.DataFrame:
    """
    Cluster the data and then only sample from outliers.
    """
    c = cluster(X).astype(int)

    return draw_random_sample(X[c == -1], c[c == -1], n_sample)


def draw_stratified_sample(X: np.ndarray, y: np.ndarray, n_sample: int) -> pd.DataFrame:
    """
    Cluster the data and then sample equal number of points from inlier and outlier classes.
    """
    c = cluster(X)
    c = (c > -1).astype(int)

    # clustering might find more than 2 clusters, but all we need is inlier/outliers
    rus = RandomUnderSampler()
    X_, y_ = rus.fit_resample(X, c)
    return draw_random_sample(X_, y_, n_sample=n_sample)


def draw_noise_sample(X: np.ndarray, y: np.ndarray, n_sample: int) -> pd.DataFrame:
    """
    Cluster the data and then draw a random sample that has approximately the same density 
    everywhere.
    """
    c = cluster(X)
    c = (c > -1).astype(int)

    return draw_random_sample(X[y == -1], c[y == -1], n_sample)


Plot the full dataset:

In [None]:
df = pd.DataFrame(X, columns=["A", "B"])
plt = df.plot.hexbin(
  x='A',
  y='B',
  reduce_C_function=np.sum,
  gridsize=100,
  cmap="viridis",
  figsize=[14, 12],
)

plt.axis("off")
plt.set_xlim([mins[0], maxs[0]])
plt.set_ylim([mins[1], maxs[1]])

Plot different samplings from the dataset (*Note: comment/uncomment the first few lines depending on which sample you want*).

In [None]:
# plot a sampling
df = draw_uniform_sample(X, y, 10000)
# df = draw_noise_sample(X, y, 10000)
# df = draw_stratified_sample(X, y, 10000)
# df = draw_inlier_sample(X, y, 10000)
# df = draw_outlier_sample(X, y, 10000)

plt = df.plot.scatter(
    x="A",
    y="B",
    # c="#2f124b",  # use this one for inlier sampling
    c="y",
    colormap="PuOr",
    alpha=0.3,
    figsize=[12, 12],
    colorbar=False,
)

plt.axis("off")
plt.set_xlim([mins[0], maxs[0]])
plt.set_ylim([mins[1], maxs[1]])


## Pipeline section

Rendering functions used in the following examples:

In [5]:
import sys, os

sys.path.append(f"{os.getcwd()}/../")  # necessary to import Pipeline classes in this notebook

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from pipeline.Pipeline import Pipeline

columns = [
    "tripID",
    "VendorID",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "passenger_count",
    "trip_distance",
    "RatecodeID",
    "PULocationID",
    "DOLocationID",
    "payment_type",
    "fare_amount",
    "extra",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge",
    "total_amount",
    "PURepresentativeX",
    "PURepresentativeY",
    "DORepresentativeX",
    "DORepresentativeY",
    "normalized_value",
    "normalized_spatial_lag",
    "value_is_H",
    "spatial_lag_is_H",
]

gt_df = pd.read_csv(
    "./linearization_files/taxisLinearizationRandom.csv", sep=";", names=columns
)
gt_df["tpep_pickup_datetime"] = pd.to_datetime(gt_df["tpep_pickup_datetime"])

n_bins = 100


def plot_delta_hist(df: pd.DataFrame, facet: str, attr: str = "trip_distance"):
    cases = df[facet].unique()
    # make sure there are at least two plots so that the ax[i] works
    fig, ax = plt.subplots(nrows=1, ncols=cases.size, figsize=(len(cases) * 10, 3.5))

    df_ = gt_df[attr]

    if type(gt_df[attr].iloc[0]) == pd.Timestamp:
        bins = pd.date_range(df_.min(), df_.max(), periods=n_bins + 1)
    else:
        bins = np.linspace(
            df_.min(), df_.max(), n_bins + 1
        )  # because arange does not include max

    gt_bins = np.histogram(df_, bins=bins)[0]
    gt_bins = gt_bins / gt_bins.sum()

    for i, case in enumerate(cases):
        # compute the relative difference in value distributions from the "ground truth"
        df_ = df[df[facet] == case]
        df_bins = np.histogram(df_[attr], bins=bins)[0]
        df_bins = df_bins / df_bins.sum()

        delta = df_bins - gt_bins
        color = ["#1b9e77" if value > 0 else "#7570b3" for value in delta]

        ax_ = ax[i] if len(cases) > 1 else ax
        ax_.bar(
            np.arange(0, len(delta)),
            height=delta,
            color=color,
            width=1,
            edgecolor="black",
            linewidth=1,
        )
        ax_.set_ylim(-0.5, 0.5)
        ax_.get_xaxis().set_visible(False)
        ax_.get_yaxis().set_visible(False) if i > 0 else None

    plt.tight_layout()
    fig.show()


def plot_hist(df: pd.DataFrame, facet: str, attr: str = "trip_distance"):
    g = sns.FacetGrid(
        data=df,
        col=facet,
        palette="colorblind",
        height=3.5,
        aspect=2.8,
    )

    min_value = pd.to_numeric(gt_df[attr]).min()
    max_value = pd.to_numeric(gt_df[attr]).max()

    g.map(
        sns.histplot,
        attr,
        bins=n_bins,
        binrange=[min_value, max_value],
        # log_scale=True,
        legend=True,
    )

    g.set_titles(col_template="{col_name}", row_template=f"{facet}" + ": {row_name}")
    g.fig.suptitle(facet, y=1)
    g.add_legend()
    g.fig.show()


def plot_multi_hist(df: pd.DataFrame, facet: str):
    plot_hist(df, facet, "trip_distance")
    plot_hist(df, facet, "tpep_pickup_datetime")


def plot_multi_delta_hist(df: pd.DataFrame, facet: str):
    plot_delta_hist(df, facet, "trip_distance")
    plot_delta_hist(df, facet, "tpep_pickup_datetime")


def plot_geo(df: pd.DataFrame, facet: str, attr: str = "trip_distance"):
    df["group"] = pd.qcut(df[attr].astype(np.float64), q=4)

    g = sns.relplot(
        data=df,
        x="PURepresentativeX",
        y="PURepresentativeY",
        col=facet,
        row="group",
        kind="scatter",
        hue=attr,
        # size=0.1,
        palette="viridis",
        height=4,
        alpha=0.3,
    )
    g.fig.suptitle(facet, y=1)
    g.add_legend()


def plot_geo_dist(df: pd.DataFrame, facet: str):
    # sns.set_palette("ch:start=.2,rot=-.3")
    g = sns.FacetGrid(
        data=df,
        col=facet,
        height=5,
    )
    g.map_dataframe(
        sns.histplot,
        x="PURepresentativeX",
        y="PURepresentativeY",
        bins=100,
        legend=True,
    )
    # sns.set_palette("ch:start=.2,rot=-.3")


def plot_moran_dist(df: pd.DataFrame, facet: str):
    df["moran_label"] = df["value_is_H"].astype(str) + df["spatial_lag_is_H"].astype(
        str
    )

    g = sns.FacetGrid(df, col=facet, height=1.5, aspect=2.5)
    g.map_dataframe(sns.histplot, y="moran_label")


def plot_moran(df: pd.DataFrame, facet: str):
    df["moran_label"] = df["value_is_H"].astype(str) + df["spatial_lag_is_H"].astype(
        str
    )

    g = sns.relplot(
        data=df,
        x="normalized_value",
        y="normalized_spatial_lag",
        hue="moran_label",
        col=facet,
        kind="scatter",
        # size=0.1,
        height=4,
        alpha=0.3,
        legend=False,
    )

    g.refline(x=0, y=0, color="black")


def render_chunk(df: pd.DataFrame, facet: str or list[str]):
    if type(facet) == list:
        facets = facet.copy()
        facet = "--".join(facet)
        df[facet] = ""
        for f in facets:
            df[facet] += df[f].astype(str) + " "

    plot_multi_hist(df, facet)
    # plot_geo(df, facet)
    plot_geo_dist(df, facet)
    # FIXME: the next line makes sure that delta histograms work
    df["tpep_pickup_datetime"] = pd.to_datetime(df["tpep_pickup_datetime"])
    plot_multi_delta_hist(df, facet)
    plot_moran_dist(df, facet)


Base configuration:

In [2]:
base_config = {
    "data": "taxis",
    "linearization": "random",
    "subdivision": "cardinality",
    "selection": "maximum",
    "dimension": 5,  # trip distance
    "params": {
        "subspace": [17, 18],  # pickup location,
        # "coverage": 16,  # total amount
        "coverage": 5,  # trip distance
        "value_h_index": 23,
        "lag_h_index": 24,
    },
}


In [None]:
pl = Pipeline(base_config)

chunk = pd.DataFrame(pl.get_next_chunk(1000), columns=columns)
chunk["linearization"] = base_config["linearization"]
chunk["subdivision"] = base_config["subdivision"]
chunk["selection"] = base_config["selection"]

render_chunk(chunk, "linearization")


Changing the Linearization

In [None]:
import pandas as pd

linearizations = ["random", "z-order", "numeric"]

df = pd.DataFrame()

for linearization in linearizations:
    config = base_config.copy()
    config["linearization"] = linearization

    pl = Pipeline(config)

    chunk = pd.DataFrame(pl.get_next_chunk(10000), columns=columns)
    chunk["linearization"] = config["linearization"]
    chunk["subdivision"] = config["subdivision"]
    chunk["selection"] = config["selection"]

    df = pd.concat([df, chunk], ignore_index=True)

render_chunk(df, "linearization")


Changing the Subdivision

In [None]:
import pandas as pd

subdivisions = ["cardinality", "cohesion", "coverage"]

base_config_ = base_config.copy()

df = pd.DataFrame()

for subdivision in subdivisions:
  config = base_config_.copy()

  config["subdivision"] = subdivision
  pl = Pipeline(config)

  chunk = pd.DataFrame(pl.get_next_chunk(10000), columns=columns)
  chunk["linearization"] = config["linearization"]
  chunk["subdivision"] = config["subdivision"]
  chunk["selection"] = config["selection"]

  df = pd.concat([df, chunk], ignore_index=True)

render_chunk(df, "subdivision")

Changing the Selection 

In [None]:
import pandas as pd

selections = ["maximum", "median", "random"]

base_config_ = base_config.copy()

df = pd.DataFrame()

for selection in selections:
  config = base_config.copy()
  config["selection"] = selection
  pl = Pipeline(config)

  chunk = pd.DataFrame(pl.get_next_chunk(3333), columns=columns)
  chunk["linearization"] = config["linearization"]
  chunk["subdivision"] = config["subdivision"]
  chunk["selection"] = config["selection"]

  df = pd.concat([df, chunk], ignore_index=True)

render_chunk(df, "selection")

## Discussion section

Effect of changing chunk size on selection runtime

In [None]:
import pandas as pd
import time

start = time.time()
config = base_config.copy()
config["subdivision"] = "cohesion"  # creates "uneven" number of bins
config["selection"] = "median"  # complex selection strategy

pl = Pipeline(config)
print(f"preprocessing took {time.time() - start}s")

chunk_sizes = [100, 313, 1000, 1763, 2000, 5000, 10000, 56721, 100000]

runtimes = []

for chunk_size in chunk_sizes:
  now = time.time()
  chunk = pd.DataFrame(pl.get_next_chunk(chunk_size), columns=columns)
  runtime = time.time() - now
  runtimes += [(chunk_size, runtime)]

print("\ntotal", time.time() - start)
pd.DataFrame(runtimes, columns=["chunk_size", "runtime"])

Recreating existing sampling strategies

In [None]:
import pandas as pd

random_config = {
  "data": "taxis",
  "linearization": "random",
  "subdivision": "cardinality",
  "selection": "random",
  "dimension": -1,  # not used
  "params": {}
}

stratified_config = {
  "data": "taxis",
  "linearization": "numeric",
  "subdivision": "interval",
  "selection": "random",
  "dimension": 5,  # trip distance
  "params": {
    "subspace": [5],  # trip distance
  }
}

autocorrelation_config = {
  "data": "taxis",
  "linearization": "z-order",
  "subdivision": "cardinality",
  "selection": "autocorrelation",
  "dimension": 5,  # trip distance
  "params": {
    "subspace": [17, 18],  # pickup location
    "value_h_index": 23,  # mean h category
    "lag_h_index": 24  # lagged h category
  }
}

combinations = [
  ("random", random_config),
  ("stratified", stratified_config),
  ("autocorrelation", autocorrelation_config),
]

df = pd.DataFrame()

for name, config in combinations:
  pl = Pipeline(config)

  chunk = pd.DataFrame(pl.get_next_chunk(10000), columns=columns)

  chunk["linearization"] = config["linearization"]
  chunk["subdivision"] = config["subdivision"]
  chunk["selection"] = config["selection"]

  df = pd.concat([df, chunk], ignore_index=True)

render_chunk(df, ["linearization", "subdivision", "selection"])

Tailoring the sampling to multiple attributes:

In [None]:
import pandas as pd

base_config_ = base_config.copy()
base_config_["selection"] = "random"

multiple_attr_config = {
  "data": "taxis",
  "linearization": "z-order",
  "subdivision": "coverage",
  "selection": "maximum",
  # "dimension": 5,  # trip distance
  "dimension": 2,  # pickup time
  "params": {
    "subspace": [17, 18],  # pickup location,
    # "coverage": 16,  # total amount
    "coverage": 5,  # trip distance
    # "coverage": 2,  # pickup time
    "value_h_index": 23,
    "lag_h_index": 24,
  }
}

df = pd.DataFrame()

configs = [base_config_, multiple_attr_config]

for config in configs:
  pl = Pipeline(config)

  chunk = pd.DataFrame(pl.get_next_chunk(10000), columns=columns)
  chunk["linearization"] = config["linearization"]
  chunk["subdivision"] = config["subdivision"]
  chunk["selection"] = config["selection"]

  df = pd.concat([df, chunk], ignore_index=True)

render_chunk(df, ["linearization", "subdivision", "selection"])

Reusing operators for tailoring the pipeline

In [None]:
recomposing_pipelines_config = {
  "data": "taxis",
  "linearization": "z-order",
  "subdivision": "cohesion",
  "selection": "random",
  "dimension": 5,  # spatial autocorrelation
  "params": {
    "subspace": [17, 18],  # pickup location,
    # "coverage": 16,  # total amount
    
    "coverage": 5,  # trip distance
    "value_h_index": 23,
    "lag_h_index": 24,
  }
}

df = pd.DataFrame()

configs = [base_config_, recomposing_pipelines_config]

for config in configs:
  pl = Pipeline(config)

  chunk = pd.DataFrame(pl.get_next_chunk(10000), columns=columns)
  chunk["linearization"] = config["linearization"]
  chunk["subdivision"] = config["subdivision"]
  chunk["selection"] = config["selection"]

  df = pd.concat([df, chunk], ignore_index=True)

render_chunk(df, ["linearization", "subdivision", "selection"])