In [1]:
import pandas as pd 
import numpy as np 


In [2]:
data_dir='/home/rajesh/work/data/data_move_-1.csv'
df= pd.read_csv(data_dir)


  df= pd.read_csv(data_dir)


In [18]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import warnings

# Suppress technical warnings for cleaner logs
warnings.filterwarnings("ignore")

# ==========================================
# 1. PHYSICAL TWIN CLASS
# ==========================================
class PhysicalTwin:
    def __init__(self, cow_id):
        self.cow_id = cow_id
        self.milk_params = None
        self.models = {
            'milk_refiner': LinearRegression(),
            'rt_model': LinearRegression(),
            'wi_model': LinearRegression()
        }
        self.sigmas = {'milk': 0, 'rt': 0, 'wi': 0}
        self.is_trained = False

    def woods_model(self, t, a, b, c):
        """Biological foundation for lactation yield."""
        return a * (t**b) * np.exp(-c * t)

    def _prepare_X(self, primary_val, thi):
        """Creates feature matrix: [Value, THI, THI^2]"""
        return np.array([[primary_val, thi, thi**2]])

    def train(self, history):
        """Learns biological baselines and variance from clean historical data."""
        # Filter for days without health or management events
        clean_data = history[history['events'].isna()].copy()
        if len(clean_data) < 40:
            return

        try:
            # A. Fit Wood's Model (Yield Foundation)
            popt, _ = curve_fit(self.woods_model, clean_data['DIM'], clean_data['Milk'], p0=[15, 0.2, 0.004])
            self.milk_params = popt
            
            # B. Train Multivariate Milk Refiner (Corrects Wood's for THI)
            woods_preds = self.woods_model(clean_data['DIM'], *self.milk_params)
            X_m = pd.DataFrame({'w': woods_preds, 't': clean_data['THI'], 'ts': clean_data['THI']**2})
            self.models['milk_refiner'].fit(X_m, clean_data['Milk'])
            self.sigmas['milk'] = np.sqrt(mean_squared_error(clean_data['Milk'], self.models['milk_refiner'].predict(X_m)))

            # C. Train RT Model (Rumination ~ f(Milk, THI))
            X_env = pd.DataFrame({'m': clean_data['Milk'], 't': clean_data['THI'], 'ts': clean_data['THI']**2})
            self.models['rt_model'].fit(X_env, clean_data['RT'])
            self.sigmas['rt'] = np.sqrt(mean_squared_error(clean_data['RT'], self.models['rt_model'].predict(X_env)))

            # D. Train WI Model (Water ~ f(Milk, THI))
            self.models['wi_model'].fit(X_env, clean_data['Water'])
            self.sigmas['wi'] = np.sqrt(mean_squared_error(clean_data['Water'], self.models['wi_model'].predict(X_env)))

            self.is_trained = True
        except Exception as e:
            pass # Handle specific logging if needed

    def audit_record(self, row, z_threshold=3.0):
        """Checks a single day's data against learned biological truth."""
        if not self.is_trained:
            return {"status": "INSUFFICIENT_DATA", "z_score": 0}

        # 1. Predict what the cow 'Should' be doing
        base_milk = self.woods_model(row['DIM'], *self.milk_params)
        p_milk = self.models['milk_refiner'].predict(self._prepare_X(base_milk, row['THI']))[0]
        p_rt = self.models['rt_model'].predict(self._prepare_X(row['Milk'], row['THI']))[0]
        p_wi = self.models['wi_model'].predict(self._prepare_X(row['Milk'], row['THI']))[0]

        # 2. Calculate Z-Scores (Standard Deviations from Normal)
        z_milk = abs(row['Milk'] - p_milk) / self.sigmas['milk']
        z_rt = abs(row['RT'] - p_rt) / self.sigmas['rt']
        z_wi = abs(row['Water'] - p_wi) / self.sigmas['wi']

        # 3. Decision Logic
        max_z = max(z_milk, z_rt, z_wi)
        is_anomaly = max_z > z_threshold
        has_event = pd.notna(row['events'])

        if is_anomaly and not has_event:
            status = "CYBER_ANOMALY"
        elif is_anomaly and has_event:
            status = "BIOLOGICAL_EVENT"
        else:
            status = "NORMAL"

        return {"status": status, "z_score": max_z, "pred_milk": p_milk}

# ==========================================
# 2. SYSTEM COORDINATOR
# ==========================================
class DairyDigitalTwinSystem:
    def __init__(self, data):
        """
        Parameters
        ----------
        data : str or pandas.DataFrame
            Path to CSV file or a preloaded DataFrame
        """
        if isinstance(data, pd.DataFrame):
            self.df = data.copy()
        elif isinstance(data, str):
            self.df = pd.read_csv(data)
        else:
            raise ValueError("data must be a pandas DataFrame or a file path (str)")

        self.twins = {}
        self.results = []

    def run_pipeline(self):
        print(f"--- Starting Analysis on {self.df['cow_id'].nunique()} cows ---")
        
        # Process each cow
        for cow_id, group in self.df.groupby('cow_id'):
            # Split: 80% train (history), 20% test (current)
            split = int(len(group) * 0.8)
            train_df = group.iloc[:split]
            test_df = group.iloc[split:]
            
            twin = PhysicalTwin(cow_id)
            twin.train(train_df)
            self.twins[cow_id] = twin
            
            # Audit the test period
            for _, row in test_df.iterrows():
                audit = twin.audit_record(row)
                self.results.append({
                    'date': row['date'],
                    'cow_id': cow_id,
                    'sensor_group': row['sensor_group'],
                    **audit
                })

    def generate_report(self):
        report_df = pd.DataFrame(self.results)
        
        # 1. Detect System-Level Cyber Attack (Temporal)
        daily_cyber = report_df[report_df['status'] == "CYBER_ANOMALY"].groupby('date').size()
        attack_dates = daily_cyber[daily_cyber > (self.df['cow_id'].nunique() * 0.10)]
        
        # 2. Detect Infrastructure Breach (Spatial)
        group_stats = report_df.groupby('sensor_group').agg(
            total=('cow_id', 'nunique'),
            anomalies=('status', lambda x: (x == 'CYBER_ANOMALY').sum())
        )
        group_stats['breach_prob'] = group_stats['anomalies'] / group_stats['total']
        targeted_groups = group_stats[group_stats['breach_prob'] > 0.30]

        # Output Results
        print("\n=== SYSTEM AUDIT REPORT ===")
        print(f"Unexplained Anomalies Found: {len(report_df[report_df['status'] == 'CYBER_ANOMALY'])}")
        
        if not attack_dates.empty:
            print(f"[!] SYSTEM-WIDE ATTACK DETECTED ON DATES: {attack_dates.index.tolist()}")
            
        if not targeted_groups.empty:
            print(f"[!] INFRASTRUCTURE BREACH SUSPECTED IN GROUPS: {targeted_groups.index.tolist()}")
            
        return report_df

# ==========================================
# 3. EXECUTION
# ==========================================

    # [date, cow_id, DIM, Milk, RT, Water, THI, events, sensor_group]
    
system = DairyDigitalTwinSystem(selected_df)
system.run_pipeline()
results = system.generate_report()
print("Physical Twin Logic Loaded. Ready for data ingestion.")

--- Starting Analysis on 982 cows ---

=== SYSTEM AUDIT REPORT ===
Unexplained Anomalies Found: 6
Physical Twin Logic Loaded. Ready for data ingestion.


In [34]:
def deep_dive_refined(original_df, report_df, event_buffer_days=3):
    """
    Investigates anomalies with a temporal buffer to capture 
    biological events that were logged slightly late.
    """
    # Deduplicate: Only look at unique cow/date anomaly pairs
    anomalies = report_df[report_df['status'] == 'CYBER_ANOMALY'].drop_duplicates(subset=['cow_id', 'date'])
    
    print(f"\n--- Investigating {len(anomalies)} Unique Unexplained Anomalies ---")

    for _, anomaly in anomalies.iterrows():
        cow_id = anomaly['cow_id']
        a_date = pd.to_datetime(anomaly['date'])
        
        # Get window of data
        cow_data = original_df[original_df['cow_id'] == cow_id].copy()
        cow_data['date'] = pd.to_datetime(cow_data['date'])
        
        # Look for events in a +/- buffer window around the anomaly
        start_buf = a_date - pd.Timedelta(days=event_buffer_days)
        end_buf = a_date + pd.Timedelta(days=event_buffer_days)
        
        nearby_events = cow_data[(cow_data['date'] >= start_buf) & 
                                 (cow_data['date'] <= end_buf) & 
                                 (cow_data['events'].notna())]

        if not nearby_events.empty:
            print(f"\n[RECLASSIFIED] Cow {cow_id} on {a_date.date()}:")
            print(f"  Reason: Found '{nearby_events['events'].iloc[0]}' event on {nearby_events['date'].iloc[0].date()}")
            print(f"  Conclusion: Likely biological lead/lag, not cyber.")
        else:
            print(f"\n[STILL ANOMALOUS] Cow {cow_id} on {a_date.date()}:")
            print(f"  No events found within +/- {event_buffer_days} days. High probability of Cyber-Physical interference.")

# --- How to run it ---
# Assuming 'system' is your DairyDigitalTwinSystem instance
# and 'results_df' is the output of generate_report()
deep_dive_anomalies(selected_df, results)


DEEP DIVE: INVESTIGATING 6 UNEXPLAINED ANOMALIES

--- ANOMALY REPORT: Cow 6241.0 on 2023-07-27 ---
Z-Score at Trigger: 3.02
      date   DIM   Milk      RT  Water      THI events  sensor_group
2023-02-13  34.0 39.628 28087.0   95.0 54.97500    NaN           3.0
2023-02-14  35.0 46.908 28260.0  103.0 59.37640    LAB           5.0
2023-02-14  35.0 46.908 28260.0  103.0 59.37640   MOVE           5.0
2023-02-15  36.0 43.404 23020.0  106.0 62.74725    NaN           5.0
2023-02-16  37.0 41.474 24236.0  105.0 51.79590    NaN           5.0
2023-02-17  38.0 36.386 29113.0  108.0 45.61795    NaN           5.0
2023-02-18  39.0 36.746 28332.0   49.0 46.15680    NaN           5.0
2023-07-27 198.0 19.320 31780.0   71.0 78.10155    NaN           5.0
2023-07-28 199.0 24.769 27415.0  103.0 77.45400   MOVE           5.0
2023-07-28 199.0 24.769 27415.0  103.0 77.45400   MOVE           5.0
2023-07-29 200.0 30.851 25122.0   97.0 77.48865    NaN           5.0
2023-07-30 201.0 31.718 22904.0   62.0 78.08450

In [9]:
selected_df=df[['date_x', 'Animal_ID', 'Event', 'THI', 'yield', 'water_intake in l', 'Days_in_Milk', 'rum_index', 'Group_ID']]

In [16]:
selected_df=selected_df.rename(columns={
    'date_x': 'date',
    'Animal_ID': 'cow_id',
    'Days_in_Milk': 'DIM',
    'yield': 'Milk',
    'rum_index': 'RT',
    'water_intake in l': 'Water',
    'THI': 'THI',
    'Event': 'events',
    'Group_ID': 'sensor_group'
})

In [26]:
df.columns

Index(['Unnamed: 0.3', 'Unnamed: 0.2', 'Unnamed: 0.1', 'Unnamed: 0', 'date_x',
       'timestamp_x', 'pH', 'temperature °C', 'activity',
       'Temp Without Drinkcycles', 'Normal temperature', 'Heat index',
       'rum_index', 'animal_id', 'time', 'day', 'date_y', 'timestamp_y',
       'water_intake', 'date', 'Animal_ID', 'Group_ID', 'Date', 'Days_in_Milk',
       'Age_Days', 'Lactation_Num', 'Yield(gr)', 'Conductivity',
       'Milking_Time(seconds)', 'Event', 'THI', 'THI_class', 'seasons',
       'yield_cat', 'Event_new', 'day_event', 'date_relative',
       'water_intake in l', 'moved_days', 'yield', 'lac_stage'],
      dtype='object')

In [32]:
results

Unnamed: 0,date,cow_id,sensor_group,status,z_score,pred_milk
0,2022-11-24,441.0,3.0,INSUFFICIENT_DATA,0.0,
1,2022-11-25,441.0,1.0,INSUFFICIENT_DATA,0.0,
2,2022-11-26,441.0,1.0,INSUFFICIENT_DATA,0.0,
3,2022-11-27,441.0,1.0,INSUFFICIENT_DATA,0.0,
4,2022-11-28,441.0,1.0,INSUFFICIENT_DATA,0.0,
...,...,...,...,...,...,...
60058,2023-11-02,50178.0,4.0,INSUFFICIENT_DATA,0.0,
60059,2023-11-03,50178.0,4.0,INSUFFICIENT_DATA,0.0,
60060,2023-11-04,50178.0,4.0,INSUFFICIENT_DATA,0.0,
60061,2023-11-05,50178.0,4.0,INSUFFICIENT_DATA,0.0,
