In [None]:
#import libraries and load dataset

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats

# Load the dataset
df = pd.read_csv('../data/barcode_scans.csv')

# Convert date column to datetime for easier analysis
df['date'] = pd.to_datetime(df['date'])

# Display first few rows to confirm structure
df.head()




In [None]:
#detects frequency-based anomalies
# Calculate Z-scores for scan counts
df['z_score'] = stats.zscore(df['scan_count'])

# Define anomaly threshold (anything >3 or <-3 is unusual)
df['anomaly'] = df['z_score'].apply(lambda x: abs(x) > 3)

# Display detected anomalies
anomalies = df[df['anomaly']]
print(f"✅ Detected {len(anomalies)} anomalies!")
print(anomalies.head())


In [None]:
data_dir = "data"
df = pd.read_csv(os.path.join(data_dir, "barcode_scans.csv"))

# Function to validate ID checksum based on state rules
def validate_checksum(id_number, state):
    try:
        id_digits = [int(digit) for digit in str(id_number)]
        if len(id_digits) != 8:
            return False  # ID should be 8 digits long

        if state == "PA":  # Pittsburgh & Philadelphia
            return id_digits[0] + id_digits[7] == 11
        elif state == "IL":  # Chicago
            return (id_digits[0] * id_digits[3]) % 2 == 0
        else:
            return True  # Default to valid if unknown state
    except:
        return False  # Any error means invalid ID

# Apply validation function to dataset
df['calculated_valid_id'] = df.apply(lambda row: validate_checksum(row['id_number'], row['state']), axis=1)

# Flag invalid IDs (Corrected line)
df['invalid_id_anomaly'] = df['calculated_valid_id'] == False

# Show detected invalid IDs
invalid_ids = df[df["invalid_id_anomaly"]]
print(f"✅ Detected {len(invalid_ids)} invalid ID anomalies.")
print(invalid_ids[['date', 'location', 'state', 'store_name', 'id_number']].head())

#check datatypes
print(df.dtypes)


In [None]:
# Sort dataset by time for consecutive anomaly detection
df = df.sort_values(by=['date', 'hour'])

# Create a column to track consecutive invalid IDs
df["consecutive_invalid_id"] = (df["invalid_id_anomaly"] & df["invalid_id_anomaly"].shift(1))

# Show detected consecutive anomalies
consecutive_anomalies = df[df["consecutive_invalid_id"]]
print(f"✅ Detected {len(consecutive_anomalies)} consecutive invalid ID anomalies.")
consecutive_anomalies[['date', 'location', 'store_name', 'id_number']].head()


In [None]:
# Calculate overall anomaly rates
total_scans = df.shape[0]
total_anomalies = df['invalid_id_anomaly'].sum()
average_anomaly_rate = total_anomalies / total_scans

# Calculate anomaly rate per store
store_anomaly_rates = df.groupby("store_name")['invalid_id_anomaly'].mean().reset_index()
store_anomaly_rates.columns = ["store_name", "anomaly_rate"]

# Flag stores that have **1.5x the average anomaly rate**
store_anomaly_rates["store_anomaly"] = store_anomaly_rates["anomaly_rate"] > (1.5 * average_anomaly_rate)

# Show stores with high anomaly rates
high_anomaly_stores = store_anomaly_rates[store_anomaly_rates["store_anomaly"]]
print(f"✅ Stores with high anomaly rates ({len(high_anomaly_stores)} detected):")
high_anomaly_stores


In [None]:
plt.figure(figsize=(12, 6))
plt.bar(store_anomaly_rates["store_name"], store_anomaly_rates["anomaly_rate"],
        color=["red" if x else "blue" for x in store_anomaly_rates["store_anomaly"]])
plt.axhline(y=average_anomaly_rate, color='black', linestyle='dashed', label="Average Anomaly Rate")
plt.xlabel("Store Name")
plt.ylabel("Anomaly Rate")
plt.title("Store Anomaly Rates")
plt.xticks(rotation=45)
plt.legend()
plt.show()

plt.figure(figsize=(12, 6))

# Plot the histogram of scan counts
plt.hist(df["scan_count"], bins=50, alpha=0.7, label="Scan Counts")

# Mark the mean scan count with a dashed line
plt.axvline(df["scan_count"].mean(), color="black", linestyle="dashed", linewidth=2, label="Mean Scan Count")

# Highlight potential scan anomalies (Z-score > 3)
anomalous_scans = df[df["scan_anomaly"]]["scan_count"]
if not anomalous_scans.empty:
    plt.hist(anomalous_scans, bins=50, color="red", alpha=0.6, label="Anomalous Scans")

plt.xlabel("Number of Scans")
plt.ylabel("Frequency")
plt.title("Distribution of Scan Counts (with Anomaly Detection)")
plt.legend()
plt.show()



In [None]:
df.to_csv("../data/detected_anomalies.csv", index=False)
print("✅ Anomaly detection complete! Data saved for storytelling.")