# Telemetry Pipeline - Full Workflow

This notebook covers the full pipeline for optical telemetry anomaly detection.

                    GNU AFFERO GENERAL PUBLIC LICENSE
                       Version 3, 19 November 2007

Copyright (C) 2025 Shaji R. Nathan  
IP Infusion Inc.  
Email: shaji.nathan@ipinfusion.com  

This program is free software: you can redistribute it and/or modify  
it under the terms of the GNU Affero General Public License as  
published by the Free Software Foundation, either version 3 of the  
License, or (at your option) any later version.  

This program is distributed in the hope that it will be useful,  
but WITHOUT ANY WARRANTY; without even the implied warranty of  
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the  
GNU Affero General Public License for more details.  

You should have received a copy of the GNU Affero General Public License  
along with this program. If not, see <https://www.gnu.org/licenses/>.  

As per AGPLv3, if you modify this software and make it available over a  
network, you must provide the source code of your modifications under the  
same license.  

For inquiries, please contact:  
Shaji R. Nathan  
IP Infusion Inc.  
Email: shaji.nathan@ipinfusion.com  


In [None]:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

def generate_synthetic_switch_telemetry(num_samples=200):
    np.random.seed(42)
    start_time = datetime.now()
    timestamps = [start_time - timedelta(minutes=5 * i) for i in range(num_samples)]
    names = [f"QSFP-{i%32+1}" for i in range(num_samples)]

    data = {
        'timestamp': timestamps,
        'name': names,
        'temp': np.random.uniform(20, 80, num_samples),
        'trans-volt': np.random.uniform(3.2, 3.5, num_samples)
    }

    for ch in range(1, 5):
        data[f'channel_{ch}_in_pwr'] = np.random.uniform(-3, 3, num_samples)
        data[f'channel_{ch}_out_pwr'] = np.random.uniform(-3, 3, num_samples)
        data[f'channel_{ch}_laser_bias_cur'] = np.random.uniform(5, 10, num_samples)

    df = pd.DataFrame(data)
    df['timestamp'] = df['timestamp'].astype(str)
    df.to_csv('synthetic_switch_telemetry.csv', index=False)
    return df

df = generate_synthetic_switch_telemetry()
df.head()
    

In [None]:

import json

def classify_anomaly(row):
    anomalies = []
    if row['temp'] > 75:
        anomalies.append("Overheating")
    if row['trans-volt'] < 3.1 or row['trans-volt'] > 3.5:
        anomalies.append("Voltage Drift")
    for ch in range(1, 5):
        if row[f'channel_{ch}_laser_bias_cur'] > 9:
            anomalies.append(f"Channel {ch} Bias Current Spike")
        if row[f'channel_{ch}_out_pwr'] < -3:
            anomalies.append(f"Channel {ch} Power Loss")
    return "Anomalous - " + ', '.join(anomalies) if anomalies else "Normal"

def row_to_prompt(row):
    prompt = (
        f"Telemetry Report:\n"
        f"- Timestamp: {row['timestamp']}\n"
        f"- Module: {row['name']}\n"
        f"- Temperature: {row['temp']:.2f}°C\n"
        f"- Transceiver Voltage: {row['trans-volt']:.2f}V\n"
    )
    for ch in range(1, 5):
        prompt += (
            f"- Channel {ch} Input Power: {row[f'channel_{ch}_in_pwr']:.2f} dBm\n"
            f"- Channel {ch} Output Power: {row[f'channel_{ch}_out_pwr']:.2f} dBm\n"
            f"- Channel {ch} Laser Bias Current: {row[f'channel_{ch}_laser_bias_cur']:.2f} mA\n"
        )
    prompt += "\nIs this normal or anomalous?"
    return prompt

with open('train.jsonl', 'w') as f:
    for _, row in df.iterrows():
        json.dump({"prompt": row_to_prompt(row), "response": classify_anomaly(row)}, f)
        f.write('\n')

print("✅ train.jsonl created.")
    

In [None]:

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer, DataCollatorForSeq2Seq
from datasets import load_dataset

# Load dataset and split into train and validation
dataset = load_dataset('json', data_files={'train': 'train.jsonl'})
train_test_split = dataset['train'].train_test_split(test_size=0.2)

train_dataset = train_test_split['train']
eval_dataset = train_test_split['test']

model_name = 'mistralai/Mistral-7B-v0.1'
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto")

def tokenize_function(examples):
    return tokenizer(examples["prompt"], text_target=examples["response"], truncation=True, max_length=512)

tokenized_train = train_dataset.map(tokenize_function, batched=True)
tokenized_eval = eval_dataset.map(tokenize_function, batched=True)

data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)

training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    num_train_epochs=3,
    weight_decay=0.01,
    save_total_limit=2,
    fp16=True,
    logging_dir="./logs",
    logging_steps=10,
    report_to="tensorboard"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_eval,
    data_collator=data_collator,
    tokenizer=tokenizer
)

trainer.train()

model.save_pretrained("fine_tuned_llama_telemetry")
tokenizer.save_pretrained("fine_tuned_llama_telemetry")
print("✅ Fine-tuned model saved.")
    