# 📚 Agent Conversation Data & Training Dataset Management with DagsHub

This notebook implements **conversation data storage and training dataset management** using DagsHub, completing the third part of the agent development process.

## 🎯 What You'll Learn:
- Store and version conversation histories
- Create structured training datasets from conversations
- Implement data versioning with DagsHub
- Build dataset pipelines for agent improvement
- Track dataset metrics and quality

## 🔗 Integration with Previous Work:
- Uses the same DagsHub repository setup
- Extends agent performance tracking with data storage
- Provides datasets for agent fine-tuning

---

## 🚀 Setup & Installation

In [1]:
# Install required packages
!pip install -q dagshub[jupyter] mlflow openai python-dotenv
!pip install -q pandas numpy jsonlines pyarrow
!pip install -q datasets huggingface_hub
!pip install -q dvc[s3] dvc-data

import os
import json
import jsonlines
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import hashlib
import uuid
from typing import Dict, List, Any, Optional, Tuple
import pickle
import yaml
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# DagsHub and MLflow
import dagshub
import mlflow
from mlflow.tracking import MlflowClient

# OpenAI
import openai
from openai import OpenAI

# Datasets
from datasets import Dataset, DatasetDict, load_dataset

print("✅ All packages installed successfully!")
print("📚 Data management libraries loaded")

✅ All packages installed successfully!
📚 Data management libraries loaded


## 🔐 Configure Credentials

In [2]:
# **Enter your credentials (same as previous notebooks):**

# Repository name:
REPO_NAME = "conversation_and_datasets"  

# DagsHub username:
USER_NAME = "mpaul"  

# Email:
EMAIL = "mpaul@redhat.com"  

# OpenAI API key:
OPENAI_API_KEY = "YOUR_API_KEY_HERE"

# Initialize DagsHub
dagshub.auth.add_app_token(token=dagshub.auth.get_token())
dagshub.init(repo_name=REPO_NAME, repo_owner=USER_NAME)

# Set up OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)

print(f"✅ Connected to DagsHub repository: {USER_NAME}/{REPO_NAME}")
print(f"📊 MLflow tracking URI: {mlflow.get_tracking_uri()}")



✅ Connected to DagsHub repository: mpaul/conversation_and_datasets
📊 MLflow tracking URI: https://dagshub.com/mpaul/conversation_and_datasets.mlflow


## 💬 Conversation Data Storage Framework

In [3]:
class ConversationStorage:
    """Manages storage and retrieval of conversation data"""

    def __init__(self, storage_path: str = "conversation_data"):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(exist_ok=True)
        self.conversations = []
        self.metadata = {}

    def create_conversation_id(self) -> str:
        """Generate unique conversation ID"""
        return f"conv_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"

    def store_conversation(self,
                         messages: List[Dict[str, str]],
                         agent_config: Dict[str, Any],
                         performance_metrics: Optional[Dict[str, Any]] = None,
                         tags: Optional[List[str]] = None) -> str:
        """Store a conversation with metadata"""

        conversation_id = self.create_conversation_id()

        conversation_data = {
            "conversation_id": conversation_id,
            "timestamp": datetime.now().isoformat(),
            "messages": messages,
            "agent_config": agent_config,
            "performance_metrics": performance_metrics or {},
            "tags": tags or [],
            "message_count": len(messages),
            "total_tokens": sum(m.get("tokens", 0) for m in messages),
            "duration_seconds": performance_metrics.get("duration", 0) if performance_metrics else 0
        }

        # Calculate additional metrics
        conversation_data["avg_message_length"] = np.mean([len(m.get("content", "")) for m in messages])
        conversation_data["user_messages"] = sum(1 for m in messages if m.get("role") == "user")
        conversation_data["assistant_messages"] = sum(1 for m in messages if m.get("role") == "assistant")

        # Store to file
        file_path = self.storage_path / f"{conversation_id}.json"
        with open(file_path, 'w') as f:
            json.dump(conversation_data, f, indent=2)

        # Add to memory
        self.conversations.append(conversation_data)

        return conversation_id

    def batch_store_conversations(self, conversations: List[Dict]) -> List[str]:
        """Store multiple conversations at once"""
        conversation_ids = []

        for conv in conversations:
            conv_id = self.store_conversation(
                messages=conv.get("messages", []),
                agent_config=conv.get("agent_config", {}),
                performance_metrics=conv.get("performance_metrics"),
                tags=conv.get("tags")
            )
            conversation_ids.append(conv_id)

        return conversation_ids

    def load_conversation(self, conversation_id: str) -> Optional[Dict]:
        """Load a specific conversation"""
        file_path = self.storage_path / f"{conversation_id}.json"

        if file_path.exists():
            with open(file_path, 'r') as f:
                return json.load(f)
        return None

    def export_to_jsonl(self, output_file: str = "conversations.jsonl"):
        """Export all conversations to JSONL format"""
        output_path = self.storage_path / output_file

        with jsonlines.open(output_path, mode='w') as writer:
            for conv in self.conversations:
                writer.write(conv)

        return output_path

    def get_statistics(self) -> Dict[str, Any]:
        """Get statistics about stored conversations"""
        if not self.conversations:
            return {}

        df = pd.DataFrame(self.conversations)

        stats = {
            "total_conversations": len(self.conversations),
            "total_messages": df["message_count"].sum(),
            "avg_messages_per_conversation": df["message_count"].mean(),
            "total_tokens": df["total_tokens"].sum(),
            "avg_tokens_per_conversation": df["total_tokens"].mean(),
            "unique_agent_configs": len(df["agent_config"].apply(json.dumps).unique()),
            "date_range": {
                "start": df["timestamp"].min(),
                "end": df["timestamp"].max()
            }
        }

        return stats

print("✅ Conversation Storage Framework initialized")

✅ Conversation Storage Framework initialized


## 🏗️ Training Dataset Builder

In [4]:
class TrainingDatasetBuilder:
    """Build training datasets from conversation data"""

    def __init__(self, conversation_storage: ConversationStorage):
        self.storage = conversation_storage
        self.datasets = {}

    def create_instruction_dataset(self,
                                 min_quality_score: float = 0.7,
                                 format_type: str = "alpaca") -> Dataset:
        """Create instruction-following dataset from high-quality conversations"""

        instruction_data = []

        for conv in self.storage.conversations:
            # Filter by quality score if available
            quality_score = conv.get("performance_metrics", {}).get("quality_score", 1.0)
            if quality_score < min_quality_score:
                continue

            messages = conv["messages"]

            # Extract instruction-response pairs
            for i in range(0, len(messages) - 1, 2):
                if i + 1 < len(messages):
                    user_msg = messages[i]
                    assistant_msg = messages[i + 1]

                    if user_msg.get("role") == "user" and assistant_msg.get("role") == "assistant":

                        if format_type == "alpaca":
                            # Alpaca format
                            data_point = {
                                "instruction": user_msg.get("content", ""),
                                "input": "",  # Can be enriched with context
                                "output": assistant_msg.get("content", ""),
                                "conversation_id": conv["conversation_id"],
                                "agent_config": json.dumps(conv["agent_config"]),
                                "quality_score": quality_score
                            }
                        elif format_type == "chatgpt":
                            # ChatGPT format
                            data_point = {
                                "messages": [
                                    {"role": "system", "content": conv["agent_config"].get("system_prompt", "")},
                                    {"role": "user", "content": user_msg.get("content", "")},
                                    {"role": "assistant", "content": assistant_msg.get("content", "")}
                                ],
                                "conversation_id": conv["conversation_id"],
                                "quality_score": quality_score
                            }
                        else:
                            # Simple format
                            data_point = {
                                "prompt": user_msg.get("content", ""),
                                "response": assistant_msg.get("content", ""),
                                "conversation_id": conv["conversation_id"],
                                "quality_score": quality_score
                            }

                        instruction_data.append(data_point)

        # Create HuggingFace Dataset
        dataset = Dataset.from_list(instruction_data)

        # Add metadata
        dataset.info.description = f"Instruction dataset created from {len(self.storage.conversations)} conversations"
        dataset.info.version = "1.0.0"
        dataset.info.license = "apache-2.0"

        return dataset

    def create_preference_dataset(self, comparison_pairs: List[Tuple[str, str]]) -> Dataset:
        """Create preference dataset for RLHF training"""

        preference_data = []

        for chosen_id, rejected_id in comparison_pairs:
            chosen_conv = self.storage.load_conversation(chosen_id)
            rejected_conv = self.storage.load_conversation(rejected_id)

            if chosen_conv and rejected_conv:
                # Extract comparable message pairs
                chosen_msgs = chosen_conv["messages"]
                rejected_msgs = rejected_conv["messages"]

                # Find matching user prompts
                for c_msg, r_msg in zip(chosen_msgs, rejected_msgs):
                    if c_msg.get("role") == "user" and r_msg.get("role") == "user":
                        # Get the corresponding responses
                        c_idx = chosen_msgs.index(c_msg)
                        r_idx = rejected_msgs.index(r_msg)

                        if c_idx + 1 < len(chosen_msgs) and r_idx + 1 < len(rejected_msgs):
                            data_point = {
                                "prompt": c_msg.get("content", ""),
                                "chosen": chosen_msgs[c_idx + 1].get("content", ""),
                                "rejected": rejected_msgs[r_idx + 1].get("content", ""),
                                "chosen_score": chosen_conv.get("performance_metrics", {}).get("quality_score", 0),
                                "rejected_score": rejected_conv.get("performance_metrics", {}).get("quality_score", 0)
                            }
                            preference_data.append(data_point)

        return Dataset.from_list(preference_data)

    def create_evaluation_dataset(self, test_size: float = 0.2) -> DatasetDict:
        """Create train/test split for evaluation"""

        # Create full instruction dataset
        full_dataset = self.create_instruction_dataset()

        # Split into train/test
        split_dataset = full_dataset.train_test_split(test_size=test_size, seed=42)

        return DatasetDict({
            "train": split_dataset["train"],
            "test": split_dataset["test"]
        })

    def export_to_formats(self, dataset: Dataset, base_name: str = "dataset"):
        """Export dataset to multiple formats"""

        export_paths = {}

        # JSON
        json_path = f"{base_name}.json"
        dataset.to_json(json_path)
        export_paths["json"] = json_path

        # CSV
        csv_path = f"{base_name}.csv"
        dataset.to_csv(csv_path)
        export_paths["csv"] = csv_path

        # Parquet (efficient storage)
        parquet_path = f"{base_name}.parquet"
        dataset.to_parquet(parquet_path)
        export_paths["parquet"] = parquet_path

        return export_paths

print("✅ Training Dataset Builder initialized")

✅ Training Dataset Builder initialized


## 📦 Data Versioning with DagsHub

In [5]:
class DataVersionManager:
    """Manage data versioning with DagsHub and DVC"""

    def __init__(self, repo_path: str = "."):
        self.repo_path = Path(repo_path)
        self.data_path = self.repo_path / "data"
        self.data_path.mkdir(exist_ok=True)

        # Initialize DVC if not already done
        if not (self.repo_path / ".dvc").exists():
            os.system("dvc init")

    def version_dataset(self,
                       dataset: Dataset,
                       dataset_name: str,
                       version: str,
                       metadata: Dict[str, Any] = None) -> str:
        """Version a dataset with DVC and track in MLflow"""

        # Create version directory
        version_path = self.data_path / dataset_name / version
        version_path.mkdir(parents=True, exist_ok=True)

        # Save dataset
        dataset_file = version_path / "dataset.parquet"
        dataset.save_to_disk(str(dataset_file))

        # Save metadata
        metadata = metadata or {}
        metadata.update({
            "dataset_name": dataset_name,
            "version": version,
            "created_at": datetime.now().isoformat(),
            "num_samples": len(dataset),
            "features": list(dataset.features.keys()) if hasattr(dataset, 'features') else [],
            "size_mb": dataset_file.stat().st_size / (1024 * 1024)
        })

        metadata_file = version_path / "metadata.json"
        with open(metadata_file, 'w') as f:
            json.dump(metadata, f, indent=2)

        # Track with DVC
        os.system(f"dvc add {dataset_file}")
        os.system("dvc push")

        # Log to MLflow
        with mlflow.start_run(run_name=f"dataset_{dataset_name}_v{version}"):
            mlflow.log_params(metadata)
            mlflow.log_artifact(str(metadata_file))
            mlflow.log_metric("dataset_size", len(dataset))
            mlflow.log_metric("file_size_mb", metadata["size_mb"])

            # Log sample data
            if len(dataset) > 0:
                sample_file = version_path / "sample.json"
                sample_data = dataset.select(range(min(5, len(dataset))))
                sample_data.to_json(str(sample_file))
                mlflow.log_artifact(str(sample_file))

        return str(version_path)

    def list_dataset_versions(self, dataset_name: str) -> List[Dict[str, Any]]:
        """List all versions of a dataset"""

        dataset_path = self.data_path / dataset_name
        versions = []

        if dataset_path.exists():
            for version_dir in dataset_path.iterdir():
                if version_dir.is_dir():
                    metadata_file = version_dir / "metadata.json"
                    if metadata_file.exists():
                        with open(metadata_file, 'r') as f:
                            metadata = json.load(f)
                            versions.append(metadata)

        return sorted(versions, key=lambda x: x.get("created_at", ""), reverse=True)

    def load_dataset_version(self, dataset_name: str, version: str) -> Optional[Dataset]:
        """Load a specific version of a dataset"""

        dataset_file = self.data_path / dataset_name / version / "dataset.parquet"

        if dataset_file.exists():
            return Dataset.from_parquet(str(dataset_file))

        return None

    def create_dataset_changelog(self, dataset_name: str) -> pd.DataFrame:
        """Create a changelog for dataset versions"""

        versions = self.list_dataset_versions(dataset_name)

        if versions:
            df = pd.DataFrame(versions)
            df["created_at"] = pd.to_datetime(df["created_at"])
            df = df.sort_values("created_at", ascending=False)
            return df

        return pd.DataFrame()

print("✅ Data Version Manager initialized")

✅ Data Version Manager initialized


## 🎯 Example: Complete Data Pipeline

In [6]:
# Generate sample conversations using different agent configurations
def generate_sample_conversations(num_conversations: int = 10) -> List[Dict]:
    """Generate sample conversations for demonstration"""

    agent_configs = [
        {
            "name": "helpful_assistant",
            "system_prompt": "You are a helpful assistant.",
            "temperature": 0.7,
            "max_tokens": 150
        },
        {
            "name": "technical_expert",
            "system_prompt": "You are a technical expert providing detailed explanations.",
            "temperature": 0.3,
            "max_tokens": 200
        },
        {
            "name": "creative_writer",
            "system_prompt": "You are a creative writer with a vivid imagination.",
            "temperature": 0.9,
            "max_tokens": 250
        }
    ]

    test_prompts = [
        "How do I make a good first impression?",
        "Explain quantum computing in simple terms.",
        "What's the best way to learn programming?",
        "How can I improve my productivity?",
        "Tell me about climate change solutions."
    ]

    conversations = []

    for i in range(num_conversations):
        # Select random configuration and prompt
        config = np.random.choice(agent_configs)
        prompt = np.random.choice(test_prompts)

        # Generate conversation
        try:
            response = client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": config["system_prompt"]},
                    {"role": "user", "content": prompt}
                ],
                temperature=config["temperature"],
                max_tokens=config["max_tokens"]
            )

            messages = [
                {"role": "user", "content": prompt, "tokens": len(prompt.split())},
                {"role": "assistant", "content": response.choices[0].message.content,
                 "tokens": response.usage.completion_tokens}
            ]

            # Add performance metrics
            performance_metrics = {
                "response_time": np.random.uniform(0.5, 2.0),
                "quality_score": np.random.uniform(0.6, 1.0),
                "total_tokens": response.usage.total_tokens,
                "completion_tokens": response.usage.completion_tokens
            }

            conversations.append({
                "messages": messages,
                "agent_config": config,
                "performance_metrics": performance_metrics,
                "tags": ["sample", config["name"], f"prompt_{test_prompts.index(prompt)}"]
            })

        except Exception as e:
            print(f"Error generating conversation {i}: {e}")

    return conversations

print("✅ Sample conversation generator ready")

✅ Sample conversation generator ready


In [7]:
# Initialize components
conv_storage = ConversationStorage()
dataset_builder = TrainingDatasetBuilder(conv_storage)
version_manager = DataVersionManager()

print("🚀 Running Complete Data Pipeline...\n")

# Step 1: Generate and store conversations
print("1️⃣ Generating sample conversations...")
sample_conversations = generate_sample_conversations(num_conversations=20)
conversation_ids = conv_storage.batch_store_conversations(sample_conversations)
print(f"   ✅ Stored {len(conversation_ids)} conversations")

# Step 2: Display statistics
print("\n2️⃣ Conversation Statistics:")
stats = conv_storage.get_statistics()
for key, value in stats.items():
    print(f"   • {key}: {value}")

# Step 3: Create training datasets
print("\n3️⃣ Creating training datasets...")

# Instruction dataset
instruction_dataset = dataset_builder.create_instruction_dataset(format_type="alpaca")
print(f"   ✅ Created instruction dataset with {len(instruction_dataset)} samples")

# Evaluation split
eval_datasets = dataset_builder.create_evaluation_dataset(test_size=0.2)
print(f"   ✅ Created train/test split: {len(eval_datasets['train'])}/{len(eval_datasets['test'])} samples")

# Step 4: Version datasets
print("\n4️⃣ Versioning datasets with DagsHub...")
version_path = version_manager.version_dataset(
    dataset=instruction_dataset,
    dataset_name="agent_instructions",
    version="1.0.0",
    metadata={
        "description": "Agent instruction-following dataset",
        "source": "Generated from agent conversations",
        "quality_threshold": 0.7
    }
)
print(f"   ✅ Dataset versioned at: {version_path}")

# Step 5: Export to multiple formats
print("\n5️⃣ Exporting datasets...")
export_paths = dataset_builder.export_to_formats(instruction_dataset, "agent_dataset")
for format_type, path in export_paths.items():
    print(f"   ✅ Exported to {format_type}: {path}")

# Step 6: Log to MLflow
print("\n6️⃣ Logging to MLflow...")
mlflow.set_experiment("conversation_data_management")

with mlflow.start_run(run_name="data_pipeline_run") as run:
    # Log metrics
    mlflow.log_metrics({
        "total_conversations": stats["total_conversations"],
        "total_messages": stats["total_messages"],
        "avg_messages_per_conversation": stats["avg_messages_per_conversation"],
        "dataset_size": len(instruction_dataset),
        "train_size": len(eval_datasets["train"]),
        "test_size": len(eval_datasets["test"])
    })

    # Log parameters
    mlflow.log_params({
        "dataset_format": "alpaca",
        "quality_threshold": 0.7,
        "test_split_ratio": 0.2,
        "num_agent_configs": 3
    })

    # Log artifacts
    for path in export_paths.values():
        if os.path.exists(path):
            mlflow.log_artifact(path)

    print(f"   ✅ Logged to MLflow run: {run.info.run_id}")
    print(f"   🔗 View at: https://dagshub.com/{USER_NAME}/{REPO_NAME}.mlflow/#/experiments/0/runs/{run.info.run_id}")

print("\n✨ Data Pipeline Complete!")

🚀 Running Complete Data Pipeline...

1️⃣ Generating sample conversations...
   ✅ Stored 20 conversations

2️⃣ Conversation Statistics:
   • total_conversations: 20
   • total_messages: 40
   • avg_messages_per_conversation: 2.0
   • total_tokens: 4394
   • avg_tokens_per_conversation: 219.7
   • unique_agent_configs: 3
   • date_range: {'start': '2025-08-13T18:52:27.588840', 'end': '2025-08-13T18:52:27.593961'}

3️⃣ Creating training datasets...
   ✅ Created instruction dataset with 15 samples
   ✅ Created train/test split: 12/3 samples

4️⃣ Versioning datasets with DagsHub...


Saving the dataset (0/1 shards):   0%|          | 0/15 [00:00<?, ? examples/s]

Creating json from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

🏃 View run dataset_agent_instructions_v1.0.0 at: https://dagshub.com/mpaul/conversation_and_datasets.mlflow/#/experiments/0/runs/051eab615cfc4d36a20aa14ed09bb35d
🧪 View experiment at: https://dagshub.com/mpaul/conversation_and_datasets.mlflow/#/experiments/0
   ✅ Dataset versioned at: data/agent_instructions/1.0.0

5️⃣ Exporting datasets...


Creating json from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Creating CSV from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

   ✅ Exported to json: agent_dataset.json
   ✅ Exported to csv: agent_dataset.csv
   ✅ Exported to parquet: agent_dataset.parquet

6️⃣ Logging to MLflow...


2025/08/13 18:52:32 INFO mlflow.tracking.fluent: Experiment with name 'conversation_data_management' does not exist. Creating a new experiment.


   ✅ Logged to MLflow run: c875fe02418e407c8b2d3a5432edeec3
   🔗 View at: https://dagshub.com/mpaul/conversation_and_datasets.mlflow/#/experiments/0/runs/c875fe02418e407c8b2d3a5432edeec3
🏃 View run data_pipeline_run at: https://dagshub.com/mpaul/conversation_and_datasets.mlflow/#/experiments/1/runs/c875fe02418e407c8b2d3a5432edeec3
🧪 View experiment at: https://dagshub.com/mpaul/conversation_and_datasets.mlflow/#/experiments/1

✨ Data Pipeline Complete!


## 📊 Dataset Quality Analysis

In [8]:
class DatasetQualityAnalyzer:
    """Analyze quality of training datasets"""

    def __init__(self, dataset: Dataset):
        self.dataset = dataset

    def analyze_text_quality(self, text_column: str = "output") -> Dict[str, Any]:
        """Analyze text quality metrics"""

        texts = self.dataset[text_column]

        metrics = {
            "avg_length": np.mean([len(t) for t in texts]),
            "std_length": np.std([len(t) for t in texts]),
            "avg_words": np.mean([len(t.split()) for t in texts]),
            "unique_words": len(set(" ".join(texts).split())),
            "empty_responses": sum(1 for t in texts if not t.strip()),
            "duplicate_responses": len(texts) - len(set(texts))
        }

        return metrics

    def analyze_diversity(self) -> Dict[str, Any]:
        """Analyze dataset diversity"""

        # Calculate diversity metrics
        if "instruction" in self.dataset.features:
            instructions = self.dataset["instruction"]
            unique_instructions = len(set(instructions))
            instruction_diversity = unique_instructions / len(instructions)
        else:
            instruction_diversity = 0

        if "agent_config" in self.dataset.features:
            configs = self.dataset["agent_config"]
            unique_configs = len(set(configs))
        else:
            unique_configs = 0

        return {
            "instruction_diversity": instruction_diversity,
            "unique_agent_configs": unique_configs,
            "total_samples": len(self.dataset)
        }

    def generate_quality_report(self) -> pd.DataFrame:
        """Generate comprehensive quality report"""

        report_data = []

        # Text quality
        if "output" in self.dataset.features:
            text_metrics = self.analyze_text_quality("output")
            for key, value in text_metrics.items():
                report_data.append({
                    "category": "Text Quality",
                    "metric": key,
                    "value": value
                })

        # Diversity
        diversity_metrics = self.analyze_diversity()
        for key, value in diversity_metrics.items():
            report_data.append({
                "category": "Diversity",
                "metric": key,
                "value": value
            })

        return pd.DataFrame(report_data)

# Run quality analysis
if 'instruction_dataset' in locals():
    print("📊 Running Dataset Quality Analysis...\n")

    analyzer = DatasetQualityAnalyzer(instruction_dataset)
    quality_report = analyzer.generate_quality_report()

    print("Quality Report:")
    print(quality_report.to_string(index=False))

    # Save report
    quality_report.to_csv("dataset_quality_report.csv", index=False)

    # Log to MLflow
    with mlflow.start_run(run_name="dataset_quality_analysis"):
        mlflow.log_artifact("dataset_quality_report.csv")

        for _, row in quality_report.iterrows():
            mlflow.log_metric(f"{row['category']}_{row['metric']}", row['value'])

    print("\n✅ Quality analysis complete and logged to MLflow")

📊 Running Dataset Quality Analysis...

Quality Report:
    category                metric       value
Text Quality            avg_length 1084.733333
Text Quality            std_length  260.935360
Text Quality             avg_words  165.000000
Text Quality          unique_words  927.000000
Text Quality       empty_responses    0.000000
Text Quality   duplicate_responses    0.000000
   Diversity instruction_diversity    0.333333
   Diversity  unique_agent_configs    3.000000
   Diversity         total_samples   15.000000
🏃 View run dataset_quality_analysis at: https://dagshub.com/mpaul/conversation_and_datasets.mlflow/#/experiments/1/runs/79df02be53cc4814b6888e983750f42e
🧪 View experiment at: https://dagshub.com/mpaul/conversation_and_datasets.mlflow/#/experiments/1

✅ Quality analysis complete and logged to MLflow


## 🔄 Integration with Agent Performance Tracking

In [9]:
class PerformanceDataIntegration:
    """Integrate conversation data with performance tracking"""

    def __init__(self, conversation_storage: ConversationStorage):
        self.storage = conversation_storage
        self.mlflow_client = MlflowClient()

    def link_conversations_to_experiments(self, experiment_name: str = "Agent_Performance_Tracking_Comprehensive"):
        """Link stored conversations to MLflow experiments"""

        # Get experiment
        experiment = mlflow.get_experiment_by_name(experiment_name)

        if experiment:
            # Get all runs from the experiment
            runs = self.mlflow_client.search_runs(
                experiment_ids=[experiment.experiment_id],
                order_by=["start_time DESC"]
            )

            linked_data = []

            for run in runs:
                # Extract agent config from run
                agent_config = {
                    "name": run.data.params.get("agent_name", "unknown"),
                    "temperature": float(run.data.params.get("temperature", 0.7)),
                    "max_tokens": int(run.data.params.get("max_tokens", 150))
                }

                # Extract performance metrics
                performance_metrics = {
                    "overall_score": run.data.metrics.get("overall_performance_score", 0),
                    "response_time": run.data.metrics.get("avg_response_time", 0),
                    "tokens_used": run.data.metrics.get("avg_tokens_per_response", 0)
                }

                linked_data.append({
                    "run_id": run.info.run_id,
                    "experiment_id": experiment.experiment_id,
                    "agent_config": agent_config,
                    "performance_metrics": performance_metrics,
                    "timestamp": run.info.start_time
                })

            return linked_data

        return []

    def create_performance_based_dataset(self, min_performance_score: float = 0.7) -> Dataset:
        """Create dataset from high-performing agent interactions"""

        # Filter conversations by performance
        high_quality_conversations = [
            conv for conv in self.storage.conversations
            if conv.get("performance_metrics", {}).get("quality_score", 0) >= min_performance_score
        ]

        # Create dataset
        dataset_items = []
        for conv in high_quality_conversations:
            for msg_pair in zip(conv["messages"][::2], conv["messages"][1::2]):
                if len(msg_pair) == 2:
                    dataset_items.append({
                        "instruction": msg_pair[0].get("content", ""),
                        "response": msg_pair[1].get("content", ""),
                        "performance_score": conv["performance_metrics"].get("quality_score", 0),
                        "agent_name": conv["agent_config"].get("name", "unknown")
                    })

        return Dataset.from_list(dataset_items)

# Example integration
print("🔄 Integrating with Performance Tracking...\n")

integrator = PerformanceDataIntegration(conv_storage)

# Link to existing experiments
linked_data = integrator.link_conversations_to_experiments()
print(f"✅ Linked {len(linked_data)} experiment runs")

# Create performance-based dataset
performance_dataset = integrator.create_performance_based_dataset(min_performance_score=0.7)
print(f"✅ Created performance dataset with {len(performance_dataset)} high-quality samples")

print("\n🎉 Integration complete!")

🔄 Integrating with Performance Tracking...

✅ Linked 0 experiment runs
✅ Created performance dataset with 15 high-quality samples

🎉 Integration complete!


## 📋 Summary & Best Practices

In [10]:
print("🎯 CONVERSATION DATA & TRAINING DATASET MANAGEMENT SUMMARY")
print("=" * 60)

print("\n✅ What we implemented:")
print("  📚 Comprehensive conversation storage system")
print("  🏗️ Training dataset builder with multiple formats")
print("  📦 Data versioning with DagsHub and DVC")
print("  📊 Dataset quality analysis tools")
print("  🔄 Integration with performance tracking")

print("\n🔑 Key Features:")
print("  • Store and version conversation histories")
print("  • Create instruction-following datasets (Alpaca, ChatGPT formats)")
print("  • Build preference datasets for RLHF")
print("  • Track dataset versions with MLflow")
print("  • Analyze dataset quality and diversity")
print("  • Export to multiple formats (JSON, CSV, Parquet)")

print("\n📈 Best Practices:")
print("  1. Version all datasets with meaningful metadata")
print("  2. Filter conversations by quality scores")
print("  3. Maintain train/test splits for evaluation")
print("  4. Track dataset lineage and transformations")
print("  5. Regular quality analysis and monitoring")

print("\n🔗 DagsHub Features Used:")
print("  • Data versioning with DVC integration")
print("  • MLflow experiment and dataset tracking")
print("  • Artifact storage and management")
print("  • Collaborative data pipeline development")

print(f"\n🌐 View your data at: https://dagshub.com/{USER_NAME}/{REPO_NAME}")
print(f"📊 MLflow tracking: https://dagshub.com/{USER_NAME}/{REPO_NAME}/experiments")

print("\n🎉 Data Management Pipeline Complete! 🎉")
print("\nYou now have a complete system for:")
print("  1. Versioning agent prompts and configurations ✅")
print("  2. Tracking agent performance experiments ✅")
print("  3. Storing conversation data and training datasets ✅")

🎯 CONVERSATION DATA & TRAINING DATASET MANAGEMENT SUMMARY

✅ What we implemented:
  📚 Comprehensive conversation storage system
  🏗️ Training dataset builder with multiple formats
  📦 Data versioning with DagsHub and DVC
  📊 Dataset quality analysis tools
  🔄 Integration with performance tracking

🔑 Key Features:
  • Store and version conversation histories
  • Create instruction-following datasets (Alpaca, ChatGPT formats)
  • Build preference datasets for RLHF
  • Track dataset versions with MLflow
  • Analyze dataset quality and diversity
  • Export to multiple formats (JSON, CSV, Parquet)

📈 Best Practices:
  1. Version all datasets with meaningful metadata
  2. Filter conversations by quality scores
  3. Maintain train/test splits for evaluation
  4. Track dataset lineage and transformations
  5. Regular quality analysis and monitoring

🔗 DagsHub Features Used:
  • Data versioning with DVC integration
  • MLflow experiment and dataset tracking
  • Artifact storage and management
 