In [23]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import csv

# Parameters
num_records = 50_000
chunk_size = 50_000
start_date = datetime(2024, 1, 1)


def generate_synthetic_pipeline_data_chunk(chunk_size, start_date, start_index):
    dates = [
        start_date + timedelta(minutes=15 * (i + start_index))
        for i in range(chunk_size)
    ]

    # Base fields
    avg_inflow = np.random.normal(loc=1500, scale=100, size=chunk_size)
    avg_outflow = avg_inflow - np.random.normal(loc=50, scale=10, size=chunk_size)
    pressure = 800 - (avg_inflow - avg_outflow) * 0.1
    flow_rate = avg_inflow + np.random.normal(loc=0, scale=50, size=chunk_size)
    flow_rate[
        np.random.rand(chunk_size) < 0.1
    ] += 300  # Anomalous spikes increased to 10%

    # Additional fields
    valve_status = np.random.choice([0, 1], size=chunk_size, p=[0.1, 0.9])
    vibration = np.random.normal(loc=5, scale=2, size=chunk_size)
    vibration[np.random.rand(chunk_size) < 0.05] += 10  # Increased anomaly frequency
    latitude = np.random.normal(loc=7.3775, scale=0.0001, size=chunk_size)
    longitude = np.random.normal(loc=3.947, scale=0.0001, size=chunk_size)
    access_log = np.random.choice(
        ["Authorized", "Unauthorized"], size=chunk_size, p=[0.95, 0.05]
    )
    events = ["None", "Maintenance", "Incident"]
    event_history = np.random.choice(events, size=chunk_size, p=[0.85, 0.1, 0.05])
    theft_detected = (flow_rate > avg_inflow + 150) | (access_log == "Unauthorized")
    leak_detected = (avg_inflow - avg_outflow > 150) | (pressure < 700)
    segment_id = np.random.randint(1, 6, size=chunk_size)
    pipeline_age = np.random.randint(1, 50, size=chunk_size)
    temperature = np.random.normal(loc=30, scale=5, size=chunk_size)
    humidity = np.random.uniform(40, 90, size=chunk_size)
    precipitation = np.random.choice([0, 1], size=chunk_size, p=[0.7, 0.3])

    # Construct DataFrame
    data = pd.DataFrame(
        {
            "timestamp": dates,
            "segment_id": segment_id,
            "avg_inflow": avg_inflow,
            "avg_outflow": avg_outflow,
            "pressure": pressure,
            "flow_rate": flow_rate,
            "valve_status": valve_status,
            "vibration": vibration,
            "latitude": latitude,
            "longitude": longitude,
            "access_log": access_log,
            "event_history": event_history,
            "theft_detected": theft_detected,
            "leak_detected": leak_detected,
            "pipeline_age": pipeline_age,
            "temperature": temperature,
            "humidity": humidity,
            "precipitation": precipitation,
        }
    )
    return data


# Incrementally generate and save data
output_file = "synthetic_pipeline_data.csv"

# Write header
columns = [
    "timestamp",
    "segment_id",
    "avg_inflow",
    "avg_outflow",
    "pressure",
    "flow_rate",
    "valve_status",
    "vibration",
    "latitude",
    "longitude",
    "access_log",
    "event_history",
    "theft_detected",
    "leak_detected",
    "pipeline_age",
    "temperature",
    "humidity",
    "precipitation",
]

with open(output_file, mode="w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(columns)  # Write header

# Generate data in chunks
for i in range(0, num_records, chunk_size):
    print(f"Generating records {i} to {i + chunk_size - 1}...")
    chunk_data = generate_synthetic_pipeline_data_chunk(
        chunk_size, start_date, start_index=i
    )
    chunk_data.to_csv(output_file, mode="a", header=False, index=False)

print("Synthetic pipeline data generated and saved incrementally.")

Generating records 0 to 49999...
Synthetic pipeline data generated and saved incrementally.


In [24]:
import pandas as pd
import numpy as np
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
from imblearn.over_sampling import SMOTE
import joblib

# Parameters for chunk processing
chunk_size = 1_000_000  # Process 1 million rows at a time
features = [
    "avg_inflow",
    "avg_outflow",
    "pressure",
    "flow_rate",
    "vibration",
    "pipeline_age",
    "temperature",
    "humidity",
    "precipitation",
]

# 1. Initialize objects for scaling and SMOTE
scaler = StandardScaler()
smote = SMOTE(sampling_strategy="auto", random_state=42)


# 2. Stream the dataset in chunks for scaling and balancing
def process_chunks(file_path, chunk_size, features, scaler, smote):
    first_chunk = True
    resampled_X, resampled_y = None, None

    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        print(f"Processing chunk with {len(chunk)} rows...")

        # Prepare features and labels
        X_chunk = chunk[features]
        y_chunk = chunk["leak_detected"] | chunk["theft_detected"]

        # Normalize the chunk
        X_scaled_chunk = (
            scaler.fit_transform(X_chunk) if first_chunk else scaler.transform(X_chunk)
        )

        # Balance the data using SMOTE
        X_resampled_chunk, y_resampled_chunk = smote.fit_resample(
            X_scaled_chunk, y_chunk
        )

        # Combine resampled chunks
        if first_chunk:
            resampled_X, resampled_y = X_resampled_chunk, y_resampled_chunk
            first_chunk = False
        else:
            resampled_X = np.vstack((resampled_X, X_resampled_chunk))
            resampled_y = np.hstack((resampled_y, y_resampled_chunk))

    return resampled_X, resampled_y


print("Starting data processing...")
X_resampled, y_resampled = process_chunks(
    "synthetic_pipeline_data.csv", chunk_size, features, scaler, smote
)
print("Data processing completed.")

# 3. Train the Local Outlier Factor (LOF) model
print("Training the Local Outlier Factor model...")
lof = LocalOutlierFactor(
    n_neighbors=20,  # Number of neighbors to use for density estimation
    contamination=0.031,  # Corrected contamination rate
    novelty=True,  # Allows prediction on unseen data
    n_jobs=-1,
)
lof.fit(X_resampled)

# 4. Process test data for predictions
print("Processing test data...")
test_data = pd.read_csv("synthetic_pipeline_test_data.csv")

# Dynamically scale test data features
test_data_scaled = scaler.transform(test_data[features])

# Predict anomalies for dynamically sized test data
print(f"Processing test data with {len(test_data)} rows...")
anomalies = lof.predict(test_data_scaled)
anomaly_scores = lof.decision_function(test_data_scaled)

# Dynamically add results to the test data
test_data["anomaly_score"] = anomaly_scores
test_data["is_anomaly"] = anomalies == -1

# 5. Evaluate the model if test labels are available
if "leak_detected" in test_data.columns and "theft_detected" in test_data.columns:
    y_test = test_data["leak_detected"] | test_data["theft_detected"]
    print("Confusion Matrix:")
    print(confusion_matrix(y_test, anomalies == -1))
    print("\nClassification Report:")
    print(classification_report(y_test, anomalies == -1))

# 6. Save the model and scaler
print("Saving model and scaler...")
joblib.dump(lof, "pipeline_anomaly_lof_model.pkl")
joblib.dump(scaler, "pipeline_data_scaler_lof.pkl")

# 7. Save anomaly detection results dynamically
output_file = f"pipeline_test_data_with_anomalies_{len(test_data)}_rows.csv"
test_data.to_csv(output_file, index=False)

print(f"Anomaly detection completed. Results saved to '{output_file}'.")

Starting data processing...
Processing chunk with 50000 rows...
Data processing completed.
Training the Local Outlier Factor model...
Processing test data...
Processing test data with 50000 rows...
Confusion Matrix:
[[41268  1415]
 [ 6069  1248]]

Classification Report:
              precision    recall  f1-score   support

       False       0.87      0.97      0.92     42683
        True       0.47      0.17      0.25      7317

    accuracy                           0.85     50000
   macro avg       0.67      0.57      0.58     50000
weighted avg       0.81      0.85      0.82     50000

Saving model and scaler...
Anomaly detection completed. Results saved to 'pipeline_test_data_with_anomalies_50000_rows.csv'.


In [18]:
import pandas as pd
import numpy as np
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
from imblearn.over_sampling import SMOTE
import joblib

# Parameters for chunk processing
chunk_size = 50_000  # Process 50,000 rows at a time
features = [
    "avg_inflow",
    "avg_outflow",
    "pressure",
    "flow_rate",
    "vibration",
    "pipeline_age",
    "temperature",
    "humidity",
    "precipitation",
]

# 1. Initialize objects for scaling and SMOTE
scaler = StandardScaler()
smote = SMOTE(sampling_strategy="auto", random_state=42)


# 2. Stream the dataset in chunks for scaling and balancing
def process_chunks(file_path, chunk_size, features, scaler, smote):
    first_chunk = True
    resampled_X, resampled_y = None, None

    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        print(f"Processing chunk with {len(chunk)} rows...")

        # Prepare features and labels
        X_chunk = chunk[features]
        y_chunk = chunk["leak_detected"] | chunk["theft_detected"]

        # Normalize the chunk
        X_scaled_chunk = (
            scaler.fit_transform(X_chunk) if first_chunk else scaler.transform(X_chunk)
        )

        # Balance the data using SMOTE
        X_resampled_chunk, y_resampled_chunk = smote.fit_resample(
            X_scaled_chunk, y_chunk
        )

        # Combine resampled chunks
        if first_chunk:
            resampled_X, resampled_y = X_resampled_chunk, y_resampled_chunk
            first_chunk = False
        else:
            resampled_X = np.vstack((resampled_X, X_resampled_chunk))
            resampled_y = np.hstack((resampled_y, y_resampled_chunk))

    return resampled_X, resampled_y


print("Starting data processing...")
X_resampled, y_resampled = process_chunks(
    "synthetic_pipeline_data.csv", chunk_size, features, scaler, smote
)
print("Data processing completed.")

# 3. Define contamination rates to test
contamination_rates = np.linspace(
    0.01, 0.10, 10
)  # Test contamination rates from 1% to 10%

best_f1_score = 0
best_contamination = 0

# 4. Test different contamination rates
for contamination in contamination_rates:
    print(f"\nTesting contamination rate: {contamination}")

    # Train the Local Outlier Factor (LOF) model with the current contamination
    lof = LocalOutlierFactor(
        n_neighbors=20,  # Number of neighbors to use for density estimation
        contamination=contamination,  # Current contamination rate
        novelty=True,  # Allows prediction on unseen data
        n_jobs=-1,
    )
    lof.fit(X_resampled)

    # 5. Process test data for predictions
    print("Processing test data...")
    test_data = pd.read_csv("synthetic_pipeline_test_data.csv")
    test_data_scaled = scaler.transform(test_data[features])
    anomalies = lof.predict(test_data_scaled)

    # 6. Evaluate the model if test labels are available
    if "leak_detected" in test_data.columns and "theft_detected" in test_data.columns:
        y_test = test_data["leak_detected"] | test_data["theft_detected"]
        print("Confusion Matrix:")
        print(confusion_matrix(y_test, anomalies == -1))
        print("\nClassification Report:")
        report = classification_report(y_test, anomalies == -1)
        print(report)

        # Extract F1-score for anomalies (True class) from the report
        f1_score = float(report.split("\n")[3].split()[2])
        if f1_score > best_f1_score:
            best_f1_score = f1_score
            best_contamination = contamination

# Print the best contamination rate and corresponding F1-score
print(f"\nBest contamination rate: {best_contamination} with F1-score: {best_f1_score}")

# Optionally, save the best model
lof = LocalOutlierFactor(
    n_neighbors=20,
    contamination=best_contamination,
    novelty=True,
    n_jobs=-1,
)
lof.fit(X_resampled)
joblib.dump(
    lof, f"best_pipeline_anomaly_lof_model_contamination_{best_contamination}.pkl"
)

Starting data processing...
Processing chunk with 50000 rows...
Data processing completed.

Testing contamination rate: 0.01
Processing test data...
Confusion Matrix:
[[37539   385]
 [11587   489]]

Classification Report:
              precision    recall  f1-score   support

       False       0.76      0.99      0.86     37924
        True       0.56      0.04      0.08     12076

    accuracy                           0.76     50000
   macro avg       0.66      0.52      0.47     50000
weighted avg       0.71      0.76      0.67     50000


Testing contamination rate: 0.020000000000000004
Processing test data...
Confusion Matrix:
[[37139   785]
 [11214   862]]

Classification Report:
              precision    recall  f1-score   support

       False       0.77      0.98      0.86     37924
        True       0.52      0.07      0.13     12076

    accuracy                           0.76     50000
   macro avg       0.65      0.53      0.49     50000
weighted avg       0.71      0.7