# 3.2 Applying LabelAId to Project Sidewalk

This notebook demonstrates how to apply programmatic weak supervision (PWS) for Project Sidewalk data labeling.

In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, ParameterGrid
from sklearn import metrics
from imblearn.under_sampling import RandomUnderSampler
import seaborn as sns

# Snorkel-specific imports for programmatic labeling
import snorkel
from snorkel.labeling import labeling_function, LFAnalysis, PandasLFApplier
from snorkel.labeling.model import MajorityLabelVoter, LabelModel
from snorkel.labeling import filter_unlabeled_dataframe
from snorkel.utils import probs_to_preds

## Data Loading and Preprocessing
Load the dataset and perform any necessary preprocessing.

In [None]:
# Load training and test datasets with one-hot encoded features
df_train = pd.read_csv("../data/training_set_seattle_encoded.csv")
df_test = pd.read_csv('../data/test_set_seattle_encoded.csv')

# Preview the data
df_train.head()

## 3.2.2 Define Labeling Functions
Labeling functions are heuristics used to generate noisy labels.

In [None]:
# Define the label mappings for convenience
NOT_SURE = -1
WRONG = 0
CORRECT = 1

@labeling_function()
def distance_to_intersection(x):
    """
    Labels as WRONG if the distance to the intersection falls within certain thresholds
    for residential streets and living streets; otherwise, NOT_SURE.
    """
    if x["way_residential"] and (80 <= x["distance_to_intersection"] or 0 < x["distance_to_intersection"] <= 5):
        return WRONG 
    elif x["way_living_street"] and (50 <= x["distance_to_intersection"] or 0 < x["distance_to_intersection"] <= 5):
        return WRONG
    return NOT_SURE

@labeling_function()
def clustered(x):
    """
    Labels as CORRECT if the label type matches specific conditions and the count of similar labels
    in a cluster is above a threshold; otherwise, NOT_SURE.
    """
    if x["label_type"] =='NoSidewalk' and x["cluster_label_count"] >= 2:
        return CORRECT
    elif x["label_type"] =='NoCurbRamp' and x["cluster_label_count"] >= 2:
        return CORRECT 
    elif x["label_type"] =='CurbRamp' and x["cluster_label_count"] >= 2:
        return CORRECT
    else:
        return NOT_SURE

@labeling_function()
def severity(x):
    """
    Assigns WRONG for certain types below a severity threshold, and CORRECT for others above a
    severity threshold; otherwise, NOT_SURE.
    """
    if x["label_type"] == 'NoSidewalk' and x["severity"] < 3:
        return WRONG
    elif x["label_type"] == 'NoCurbRamp' and x["severity"] <= 2:
        return WRONG
    elif x["label_type"] == 'CurbRamp' and x["severity"] >= 4:
        return WRONG
    elif x["label_type"] in ['Obstacle', 'SurfaceProblem'] and x["severity"] >= 4:
        return CORRECT
    return NOT_SURE

@labeling_function()
def zoom(x):
    """
    Labels as CORRECT if zoom level is above 1; assigns WRONG under specific conditions; otherwise, NOT_SURE.
    """
    if x["zoom"] > 1:
        return CORRECT
    elif x["label_type"] in ['Obstacle', 'SurfaceProblem'] and x["zoom"] == 1:
        return WRONG
    return NOT_SURE

@labeling_function()
def tags(x):
    """
    Labels as CORRECT if label type is among specific types and a tag is present; otherwise, NOT_SURE.
    """
    if x["label_type"] in ['NoSidewalk', 'NoCurbRamp', 'CurbRamp'] and x["tag"] == 1:
        return CORRECT
    return NOT_SURE

@labeling_function()
def description(x):
    """
    Labels as CORRECT if a description is present; otherwise, NOT_SURE.
    """
    return CORRECT if x['description'] == 1 else NOT_SURE

@labeling_function()
def distance_to_road(x):
    """
    Labels as WRONG if the distance to road is below or above certain thresholds based on the way type;
    otherwise, NOT_SURE.
    """
    thresholds = {"way_residential": 40, "way_primary": 60, "way_secondary": 50, "way_tertiary": 45}

    if x["label_type"] in ['Obstacle', 'SurfaceProblem']:
        if x["distance_to_road"] < 10:
            return WRONG
        for way_type, threshold in thresholds.items():
            if x[way_type] and x["distance_to_road"] > threshold:
                return WRONG
    return NOT_SURE

@labeling_function()
def way_type(x):
    """
    Labels as CORRECT or WRONG based on the unclassified way type and the label type;
    otherwise, NOT_SURE.
    """
    if x["way_unclassified"]:
        if x["label_type"] == "CurbRamp":
            return CORRECT
        elif x["label_type"] in ["Obstacle", "SurfaceProblem", "NoSidewalk"]:
            return WRONG
    return NOT_SURE

lfs = [
    distance_to_intersection, 
    clustered,
    severity,
    zoom,
    tags,
    description,
    distance_to_road,
    way_type
    ]

## Apply Labeling Functions to Data
Apply the defined labeling functions to create a label matrix.

In [None]:
# Initialize the labeling function applier using the Pandas DataFrame.
# This applier uses the labeling functions defined above to annotate the DataFrame.
lf_applier = PandasLFApplier(lfs=lfs)

# Apply the labeling functions to the training data.
# This step generates a label matrix where each row corresponds to a data point,
# and each column corresponds to the output of a labeling function.
L_train = lf_applier.apply(df=df_train)

# Analyze the results of the labeling functions.
# The LFAnalysis utility provides statistics on the coverage, overlaps, conflicts, and more,
# helping to understand how the labeling functions are performing on the dataset.
lf_summary = LFAnalysis(L=L_train, lfs=lfs).lf_summary()

## Train the Label Model
Train a Snorkel Label Model to aggregate the labeling functions outputs.

In [None]:
# Initialize the Label Model with specified parameters.
# Refer to https://snorkel.readthedocs.io/en/v0.9.3/packages/_autosummary/labeling/snorkel.labeling.LabelModel.html for more details of hyperparameters.
label_model = LabelModel(cardinality=2, verbose=True)

# Fit the Label Model on the training data.
label_model.fit(L_train=L_train, n_epochs=500, lr=0.001, log_freq=100, seed=14, lr_scheduler='linear')

In [None]:
def plot_probabilities_histogram(Y):
    """
    Plots a histogram of probabilities.
    
    Parameters:
        Y (array-like): An array of probabilities for each data point.
        
    This function plots the histogram of the probabilities that each data point has been classified as CORRECT.
    The x-axis represents the probability of being correct, and the y-axis shows the number of data points
    with that probability.
    """
    plt.hist(Y, bins=10)  # Plot the histogram with 10 bins
    plt.xlabel('Probability of being CORRECT')
    plt.ylabel('Number of data points')
    plt.show()

# Predict probabilities using the trained label model
probs_train = label_model.predict_proba(L=L_train)

plot_probabilities_histogram(probs_train[:, 1])

## Save PWS Outputs
Save the predictions and probabilities from PWS pipeline for further analysis.

In [None]:
df_train_filtered, probs_train_filtered = filter_unlabeled_dataframe(X=df_train, y=probs_train, L=L_train)

# Convert the probabilities to predictions (the target variable)
preds_train_filtered = probs_to_preds(probs=probs_train_filtered)

# Merge the filtered training data with predictions and the probabilities of being correct
merged = pd.concat([
    df_train_filtered,  # The filtered data
    pd.DataFrame(preds_train_filtered, index=df_train_filtered.index, columns=['Hard_Label']),  # Add predictions
    pd.DataFrame(probs_train_filtered[:,1], index=df_train_filtered.index, columns=['Soft_Probability_of_Correct'])  # Add probabilities
], axis=1)

# Save the merged data to a CSV file
merged.to_csv('PWS_outputs.csv', index=False)