## Installation

In [1]:
!pip install openai
!pip install plotly
!pip install git+https://github.com/openai/swarm.git
!pip install firecrawl-py

Collecting git+https://github.com/openai/swarm.git
  Cloning https://github.com/openai/swarm.git to /tmp/pip-req-build-23_82q5n
  Running command git clone --filter=blob:none --quiet https://github.com/openai/swarm.git /tmp/pip-req-build-23_82q5n
  Resolved https://github.com/openai/swarm.git to commit 9db581cecaacea0d46a933d6453c312b034dbf47
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting pre-commit (from swarm==0.1.0)
  Downloading pre_commit-4.0.1-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting instructor (from swarm==0.1.0)
  Downloading instructor-1.7.0-py3-none-any.whl.metadata (17 kB)
Collecting jiter<1,>=0.4.0 (from openai>=1.33.0->swarm==0.1.0)
  Downloading jiter-0.6.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.2 kB)
Collecting cfgv>=2.0.0 (from pre-commit->swarm==0.1.0)
  Downloading cfgv-3.4.0-py2.py3-none-any.wh

## Adding OpenAI API Key

In [2]:
import openai
import os
from google.colab import userdata

openai.api_key = userdata.get('OPENAI_KEY')
os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_KEY')

## Loading the Dataset

In [3]:
import pandas as pd
df = pd.read_csv('https://raw.githubusercontent.com/noelabu/InsightCart/refs/heads/main/data/sales_data.csv')

## Data Detective

**Data Detective Agent** is designed to ensure the integrity, accuracy, and security of your data. Comprised of three specialized agents—**Data Cleaning Agent**, **Anomaly Detection Agent**, and **Transaction Risk Agent**—the Data Detective Agent works in tandem to optimize datasets and identify critical issues. Here’s a breakdown of each agent's capabilities:

1. **Data Cleaning Agent**  
   The **Data Cleaning Agent** focuses on the simple yet essential task of cleaning and validating your dataset. It efficiently scans for issues like **missing values**, **duplicate rows**, and **inconsistent data types**. This agent provides tailored **cleaning recommendations** to address these issues, ensuring that your dataset is accurate and ready for analysis. Whether it's suggesting imputation techniques for missing values or identifying columns with invalid data types, the Data Cleaning Agent helps enhance the quality of your data with minimal effort.

2. **Anomaly Detection Agent**  
   The **Anomaly Detection Agent** is designed to identify outliers and abnormal patterns within your dataset. It scans each column for both **low** and **high anomalies**, flagging data points that deviate significantly from expected values. After detecting these outliers, the agent generates a **cleaning recommendation report**, suggesting how to handle these anomalies—whether through imputation, removal, or further investigation. This agent is especially useful for identifying errors or unexpected trends that could skew analysis or indicate deeper issues within the dataset.

3. **Transaction Risk Agent**  
   The **Transaction Risk Agent** analyzes transaction-level data to detect potential fraud, **suspicious activity**, or **financial risks**. It identifies **high-value transactions** by assessing revenue against predefined thresholds (e.g., the 95th percentile), analyzes transaction patterns, and detects anomalies in attributes like **device type** or transaction frequency. The agent generates a **comprehensive risk assessment report** that highlights flagged transactions, provides statistical insights, and assigns a **risk level** to the dataset. By offering **actionable recommendations**—such as manual reviews or enhanced security measures—the Transaction Risk Agent helps organizations proactively address potential fraud, improving the security and integrity of financial transactions.

In [4]:
from swarm import Swarm, Agent
import pandas as pd
import numpy as np
import scipy.stats as stats
import json

class DataDetectiveAgent:
    def __init__(self):
        """
        Initialize the Data Detective Agent
        """
        # Initialize Swarm client
        self.client = Swarm()

        self.cleaning_agent = Agent(
            name="Data Cleaning Agent",
            model="gpt-4o-mini",
            instructions=(
                "You are an expert in data cleaning and preprocessing. "
                "Identify and resolve data quality issues with precision. "
                "Provide detailed, actionable recommendations for data cleaning."
                "Respond ONLY with a JSON format output directly addressing the cleaning recommendations. No json in the start of the string."
            )
        )

        self.anomaly_agent = Agent(
            name="Anomaly Detection Agent",
            model="gpt-4o-mini",
            instructions=(
                "You are a specialist in statistical anomaly detection. "
                "Analyze data for unusual patterns, outliers, and potential anomalies. "
                "Provide comprehensive statistical insights."
                "Respond ONLY with a JSON format output directly addressing the cleaning recommendations. No json in the start of the string."

            )
        )

        self.transaction_agent = Agent(
            name="Transaction Risk Agent",
            model="gpt-4o-mini",
            instructions=(
                "You are an expert in transaction risk assessment. "
                "Analyze transactions for potential fraud or suspicious activity. "
                "Provide detailed risk scoring and recommendations."
                "Respond ONLY with a JSON format output directly addressing the risk assessment. No json in the start of the string."
            )
        )

    def clean_and_validate_data(self, df: pd.DataFrame) -> dict:
        """
        Perform data cleaning using the Swarm cleaning agent

        :param df: Input pandas DataFrame
        :return: Cleaning report and cleaned DataFrame
        """
        # Prepare data issues for analysis
        data_issues = {
            'missing_values': df.isnull().sum().to_dict(),
            'duplicate_rows': df.duplicated().sum(),
            'data_type_issues': df.dtypes.astype(str).to_dict()
        }

        # Run cleaning agent analysis
        cleaning_response = self.client.run(
            agent=self.cleaning_agent,
            messages=[{
                "role": "user",
                "content": f"""Analyze these data issues:
                Missing Values: {str(data_issues['missing_values'])}
                Duplicate Rows: {data_issues['duplicate_rows']}
                Data Type Issues: {str(data_issues['data_type_issues'])}

                Provide a comprehensive cleaning strategy."""
            }]
        )

        # Perform basic cleaning
        cleaned_df = df.copy()
        cleaned_df.dropna(inplace=True)
        cleaned_df.drop_duplicates(inplace=True)

        return {
            'cleaning_report': cleaning_response.messages[-1]['content'],
            'cleaned_dataframe': cleaned_df,
            'data_issues': data_issues
        }

    def detect_anomalies(self, df: pd.DataFrame) -> dict:
        """
        Perform anomaly detection using the Swarm anomaly agent

        :param df: Input pandas DataFrame
        :return: Anomaly detection report
        """
        # Numerical columns for anomaly detection
        numerical_columns = df.select_dtypes(include=[np.number]).columns

        # Statistical summary
        statistical_summary = df[numerical_columns].describe()

        # Anomaly detection using Z-score
        potential_anomalies = {}
        for column in numerical_columns:
            z_scores = np.abs(stats.zscore(df[column]))
            anomalies = df[z_scores > 3]
            potential_anomalies[column] = {
                'anomaly_count': len(anomalies),
                'anomaly_percentage': len(anomalies) / len(df) * 100
            }

        # Run anomaly detection agent
        anomaly_response = self.client.run(
            agent=self.anomaly_agent,
            messages=[{
                "role": "user",
                "content": f"""Analyze these potential anomalies:
                Statistical Summary: {str(statistical_summary)}
                Potential Anomalies: {str(potential_anomalies)}

                Provide a detailed anomaly detection report."""
            }]
        )

        return {
            'anomaly_report': anomaly_response.messages[-1]['content'],
            'potential_anomalies': potential_anomalies,
            'statistical_summary': statistical_summary
        }

    def flag_suspicious_transactions(self, df: pd.DataFrame) -> dict:
        """
        Flag suspicious transactions using the transaction risk agent

        :param df: Input pandas DataFrame
        :return: Transaction flagging report
        """
        sus_df = df.copy(deep=True)

        # Convert 'revenue' column to numeric
        sus_df["revenue"] = sus_df["revenue"].apply(lambda x: int(str(x).replace("₱", "").replace(",", "")))

        # Define suspicious transaction indicators
        suspicious_indicators = {
            'high_value_transactions': sus_df[sus_df['revenue'] > sus_df['revenue'].quantile(0.95)]
        }

        # Transaction summary
        transaction_summary = sus_df[['revenue', 'transactions', 'device_type']].describe()

        # Run transaction risk agent
        transaction_response = self.client.run(
            agent=self.transaction_agent,
            messages=[{
                "role": "user",
                "content": f"""Analyze these transaction details:
                Transaction Summary: {transaction_summary}
                Suspicious Indicators: {suspicious_indicators}

                Provide a comprehensive transaction risk assessment."""
            }]
        )

        return {
            'transaction_report': transaction_response.messages[-1]['content'],
            'suspicious_indicators': suspicious_indicators
        }

    def comprehensive_analysis(self, df: pd.DataFrame) -> dict:
        """
        Perform a comprehensive multi-agent analysis

        :param df: Input pandas DataFrame
        :return: Comprehensive analysis report
        """

        # Perform individual analysis steps
        cleaning_results = self.clean_and_validate_data(df)
        anomaly_results = self.detect_anomalies(cleaning_results['cleaned_dataframe'])
        transaction_results = self.flag_suspicious_transactions(cleaning_results['cleaned_dataframe'])

        # Combine results
        comprehensive_report = {
            'data_cleaning': cleaning_results,
            'anomaly_detection': anomaly_results,
            'transaction_analysis': transaction_results
        }

        return comprehensive_report

In [5]:
# Initialize Data Detective Agent
detective = DataDetectiveAgent()


In [6]:
# Run comprehensive analysis
analysis_results = detective.comprehensive_analysis(df)

In [7]:
cleaning_report = analysis_results["data_cleaning"]["cleaning_report"]
print(cleaning_report)

{
  "cleaningRecommendations": {
    "missingValues": {
      "status": "No missing values detected",
      "recommendation": "Continue to monitor data for any future missing values."
    },
    "duplicateRows": {
      "status": "No duplicate rows detected",
      "recommendation": "Implement a validation step upon data entry to maintain uniqueness in records."
    },
    "dataTypeIssues": {
      "date": {
        "currentType": "object",
        "recommendedType": "datetime64",
        "action": "Convert 'date' column to datetime format using pd.to_datetime()."
      },
      "source": {
        "currentType": "object",
        "recommendedType": "categorical",
        "action": "Convert 'source' column to categorical type to optimize memory usage and performance."
      },
      "medium": {
        "currentType": "object",
        "recommendedType": "categorical",
        "action": "Convert 'medium' column to categorical type to optimize memory usage and performance."
      },
    

In [8]:
anomaly_report = analysis_results["anomaly_detection"]["anomaly_report"]
print(anomaly_report)

{
  "cleaning_recommendations": {
    "pageviews": {
      "anomalies": {
        "min_threshold": 0,
        "max_threshold": 34832,
        "recommended_action": "Investigate records with values significantly above the mean (583.76 + 3*1452.00). Consider capping values beyond a specific percentile to prevent skewing analysis."
      }
    },
    "visits": {
      "anomalies": {
        "min_threshold": 1,
        "max_threshold": 6975,
        "recommended_action": "Examine outliers exceeding 95th percentile. Look for possible data entry errors or legitimate spikes in traffic."
      }
    },
    "productClick": {
      "anomalies": {
        "min_threshold": 0,
        "max_threshold": 32460,
        "recommended_action": "Identify cases with very high product clicks compared to the normal distribution. Consider removing extreme outliers."
      }
    },
    "addToCart": {
      "anomalies": {
        "min_threshold": 0,
        "max_threshold": 6486,
        "recommended_action": "

In [9]:
transaction_report = analysis_results["transaction_analysis"]["transaction_report"]
print(transaction_report)

{
  "risk_assessment": {
    "transaction_summary": {
      "total_transactions": 52721,
      "mean_revenue": 127554.8,
      "std_revenue": 336482.4,
      "min_revenue": 0,
      "max_revenue": 7368215,
      "mean_transactions": 20.1,
      "std_transactions": 52.4,
      "min_transactions": 0,
      "max_transactions": 1113
    },
    "suspicious_indicators": {
      "high_value_transactions_count": 5,
      "high_value_transactions_details": [
        {
          "date": "2020-05-11",
          "source": "facebook",
          "medium": "cpc",
          "revenue": 1289066,
          "transactions": 217
        },
        {
          "date": "2020-05-11",
          "source": "facebook",
          "medium": "cpc",
          "revenue": 887911,
          "transactions": 150
        },
        {
          "date": "2020-05-11",
          "source": "facebook",
          "medium": "cpc",
          "revenue": 988383,
          "transactions": 170
        },
        {
          "date": "202

## Data Processing

### **Data Processing Agent:**

The **Data Processing Agent** orchestrates a series of operations, including **data type conversions**, **missing value handling**, **outlier detection**, and **anomaly filtering**, using recommendations to guide each step. By integrating these diverse agents, the Data Processing Agent ensures that data quality is maintained, outliers are addressed, and any anomalies are identified and removed, resulting in a clean, reliable dataset ready for use.

1. **Data Type Conversion Agent**: This agent ensures the correct data types for each column, transforming data based on specific recommendations, such as converting date fields to datetime, categorical data to categories, and numerical data to the appropriate type (e.g., float64). It ensures data integrity during type conversion.

2. **Missing Values Agent**: This agent handles missing or null values by filling them with the appropriate statistical measure, such as the **median** for normally distributed columns or the **mode** for skewed distributions. It ensures that missing data does not negatively impact the dataset’s usability.

3. **Outlier Agent**: This agent identifies outliers in the data using statistical methods like the **Z-score method** and either removes or normalizes extreme values that fall outside acceptable thresholds. It prevents outliers from skewing the analysis or models that will be built on the data.

4. **Anomaly Agent**: The anomaly agent detects and filters out anomalies based on **quantile-based filtering** techniques. It uses the 1st and 99th percentiles to remove extreme values that may distort the data, ensuring that the dataset remains representative of typical patterns.


In [10]:
import pandas as pd
import numpy as np
from scipy import stats
from swarm import Swarm, Agent
import logging

class DataPreprocessing:
    def __init__(self, cleaning_report, anomaly_report):
        """
        Initialize the data preprocessing swarm with cleaning and anomaly reports

        Args:
            cleaning_report (dict): Detailed cleaning recommendations
            anomaly_report (dict): Anomaly detection recommendations
        """
        self.cleaning_report = cleaning_report
        self.anomaly_report = anomaly_report
        self.client = Swarm()
        self._setup_agents()

    def _setup_agents(self):
        """
        Set up specialized agents for different preprocessing tasks
        """
        # Data Type Conversion Agent
        def convert_data_types(dataframe):
            return self._convert_data_types(dataframe)

        self.type_conversion_agent = Agent(
            name="DataTypeAgent",
            model="gpt-4o-mini",
            instructions="Convert data types based on provided recommendations. Handle type conversions carefully, ensuring data integrity.",
            functions=[convert_data_types]
        )

        # Missing Value Handling Agent
        def handle_missing_values(dataframe):
            return self._handle_missing_values(dataframe)

        self.missing_values_agent = Agent(
            name="MissingValuesAgent",
            instructions="Handle missing values using appropriate statistical techniques. Prefer median for normal distributions, mode for skewed distributions.",
            functions=[handle_missing_values]
        )

        # Outlier Handling Agent
        def process_outliers(dataframe):
            return self._process_outliers(dataframe)

        self.outlier_agent = Agent(
            name="OutlierAgent",
            instructions="Detect and handle outliers using Z-score method. Remove or normalize outliers based on pre-defined thresholds.",
            functions=[process_outliers]
        )

        # Anomaly Filtering Agent
        def filter_anomalies(dataframe):
            return self._filter_anomalies(dataframe)

        self.anomaly_agent = Agent(
            name="AnomalyAgent",
            instructions="Identify and filter out data anomalies using quantile-based filtering. Ensure data quality while minimizing information loss.",
            functions=[filter_anomalies]
        )

    def preprocess_data(self, dataframe):
        """
        Orchestrate preprocessing steps using agent-based workflow

        Args:
            dataframe (pd.DataFrame): Input dataframe to be preprocessed

        Returns:
            pd.DataFrame: Fully preprocessed dataframe
        """
        # Validate input
        if dataframe is None or dataframe.empty:
            logging.warning("Input dataframe is None or empty")
            return pd.DataFrame()

        # Preprocessing pipeline with agent handoffs
        preprocessing_steps = [
            (self.type_conversion_agent, "Convert data types"),
            (self.missing_values_agent, "Handle missing values"),
            (self.outlier_agent, "Process outliers"),
            (self.anomaly_agent, "Filter anomalies")
        ]

        dataframe['revenue'] = df['revenue'].apply(lambda x: int(x.replace("₱", "").replace(",", "")))
        dataframe['ad spend'] = df['ad spend'].apply(lambda x: int(x.replace("₱", "").replace(",", "")))
        processed_df = dataframe.copy().to_dict()
        for agent, step_name in preprocessing_steps:
            try:
                response = self.client.run(
                    agent=agent,
                    messages=[{"role": "user", "content": step_name, "dataframe": processed_df}]
                )
                processed_df = response.messages[-1].get("dataframe", processed_df)
            except Exception as e:
                logging.error(f"Error in {step_name}: {e}")
                continue

        return processed_df

    def _convert_data_types(self, dataframe_dict):
        """
        Convert data types based on cleaning recommendations

        Returns:
            pd.DataFrame: Dataframe with corrected data types
        """
        df = pd.DataFrame.from_dict(dataframe_dict)
        recommendations = self.cleaning_report.get('recommendations', {}).get('data_type_issues', {})

        for column, type_info in recommendations.items():
            if column in df.columns:
                current_type = type_info.get('current_type')
                recommended_type = type_info.get('recommended_type')
                try:
                    if recommended_type == 'datetime':
                        df[column] = pd.to_datetime(df[column], errors='coerce')
                    elif recommended_type == 'categorical':
                        df[column] = pd.Categorical(df[column])
                    elif recommended_type == 'boolean':
                        df[column] = df[column].map({'yes': True, 'no': False})
                    elif recommended_type == 'float64':
                        df[column] = pd.to_numeric(df[column].replace('[\$,]', '', regex=True), errors='coerce')
                except Exception as e:
                    logging.error(f"Error converting {column} to {recommended_type}: {e}")

        return df

    def _handle_missing_values(self, dataframe_dict):
        """
        Handle missing values using appropriate statistical techniques


        Returns:
            pd.DataFrame: Dataframe with handled missing values
        """
        df = pd.DataFrame.from_dict(dataframe_dict)
        missing_config = self.cleaning_report.get('recommendations', {}).get('missing_values', {})

        if missing_config.get('action') == 'None required':
            return df

        for column in df.columns:
            if df[column].isnull().any():
                try:
                    if self._is_normally_distributed(df[column]):
                        df[column].fillna(df[column].median(), inplace=True)
                    else:
                        df[column].fillna(df[column].mode()[0], inplace=True)
                except Exception as e:
                    logging.error(f"Error handling missing values in {column}: {e}")

        return df

    def _process_outliers(self, dataframe_dict):
        """
        Process outliers using Z-score method

        Returns:
            pd.DataFrame: Dataframe with processed outliers
        """
        df = pd.DataFrame.from_dict(dataframe_dict)
        outlier_config = self.anomaly_report.get('cleaning_recommendations', {})

        for column, config in outlier_config.items():
            if column in df.columns and 'outliers' in config:
                try:
                    outliers_info = config['outliers']
                    if outliers_info.get('remove'):
                        threshold = outliers_info['remove'].get('values_above')
                        df = df[df[column] <= threshold]
                except Exception as e:
                    logging.error(f"Error processing outliers in {column}: {e}")

        return df

    def _filter_anomalies(self, dataframe):
        """
        Filter anomalies using quantile-based approach

        Args:
            dataframe (pd.DataFrame): Input dataframe

        Returns:
            pd.DataFrame: Filtered dataframe
        """
        df = dataframe.copy()
        anomaly_config = self.anomaly_report.get('cleaning_recommendations', {})

        for column, config in anomaly_config.items():
            if column in df.columns:
                try:
                    # Filter using 1st and 99th percentiles if normalize is true
                    if config.get('outliers', {}).get('normalize', False):
                        upper_bound = df[column].quantile(0.99)
                        lower_bound = df[column].quantile(0.01)
                        df = df[(df[column] <= upper_bound) & (df[column] >= lower_bound)]
                except Exception as e:
                    logging.error(f"Error filtering anomalies in {column}: {e}")

        return df

    def _is_normally_distributed(self, series, alpha=0.05):
        """
        Check if a series is normally distributed

        Args:
            series (pd.Series): Input series to test
            alpha (float): Significance level for the test

        Returns:
            bool: True if normally distributed, False otherwise
        """
        try:
            cleaned_series = series.dropna()
            if len(cleaned_series) < 3:
                return False

            _, p_value = stats.shapiro(cleaned_series)
            return p_value > alpha
        except Exception as e:
            logging.error(f"Distribution test error: {e}")
            return False


In [11]:
preprocessing_agent = DataPreprocessing(cleaning_report, anomaly_report)

In [12]:
processed_df = preprocessing_agent.preprocess_data(df)

ERROR:root:Error in Convert data types: DataFrame constructor not properly called!


In [13]:
processed_df = pd.DataFrame(processed_df)
processed_df.to_csv('sales_data_cleaned.csv')
processed_df.head(5)

Unnamed: 0,date,source,medium,delivery_available,device_type,promo_activated,filter_used,pageviews,visits,productClick,addToCart,checkout,transactions,revenue,ad spend
0,2020-05-11,google,organic,no data,PC,no,no,4087,1233,5240,1048,672.0,90,456877,384039
1,2020-05-11,facebook,cpc,no data,mobile,yes,no,4326,544,9930,1984,1812.48,217,1289066,817514
2,2020-05-11,google,cpc,no data,mobile,no,no,3891,1450,5460,1090,766.72,100,554427,435105
3,2020-05-11,google,cpc,no data,PC,no,no,2456,854,4250,848,520.96,71,416561,635599
4,2020-05-11,facebook,organic,no data,PC,no,no,2828,1000,4110,824,449.28,62,326176,428962


## Marketing Intelligent

The **Market Trend Analysis Agent** is a specialized agent designed to analyze market data and identify key trends, shifts in consumer behavior, and emerging opportunities. By processing data on traffic sources, device performance, and revenue patterns, this agent delivers valuable insights that help businesses understand current market dynamics. It performs detailed analysis such as traffic breakdown by source, conversion rates, and device-specific revenue performance. Based on this data, the agent identifies trends like the growing dominance of mobile traffic and the importance of paid media. After completing the analysis, the **MarketTrendAnalysisAgent** transfers its findings to the **MarketRecommendationAgent** to generate actionable recommendations. The output is delivered in a concise JSON format, summarizing key trends, consumer behavior shifts, and opportunities.

The **Market Recommendation Agent** takes the results from the **Market Trend Analysis Agent** and generates strategic, actionable recommendations based on market trends. It synthesizes data-driven insights to identify business priorities and specific action items that businesses can implement to drive growth. The recommendations include key strategic initiatives, such as enhancing mobile marketing capabilities and optimizing paid media channels. It also provides specific action items, such as developing mobile-responsive landing pages or improving audience targeting for paid media. Additionally, the **Market Recommendation Agent** quantifies the potential impact on revenue growth and conversion rates while highlighting risk mitigation strategies, ensuring that businesses can prioritize efforts and manage potential challenges effectively.


In [14]:
import os
import pandas as pd
from swarm import Swarm, Agent
from typing import Dict, Any, List

class MarketTrendAnalysisAgent(Agent):
    def __init__(self):

        def analyze_market_trends(context_variables):
          """
          Perform market trend analysis

          :param df: Input pandas DataFrame
          :return: Detailed market trend analysis
          """
          # Preprocess data
          traffic_breakdown = context_variables["df"].groupby('source')['transactions'].agg([
              'count',
              'mean',
              ('conversion_rate', lambda x: x.sum() / len(df) * 100)
          ]).reset_index()

          device_performance = context_variables["df"].groupby('device_type')['revenue'].agg([
              'mean',
              'sum',
              'count'
          ]).reset_index()

          return {
                  "traffic_breakdown": traffic_breakdown.to_dict(orient='records'),
                  "device_performance": device_performance.to_dict(orient='records')
              }

        super().__init__(
            name="MarketTrendAnalysisAgent",
            instructions="""
            You are a market trend analysis specialist.
            Your task is to analyze market data and identify key trends,
            consumer behavior shifts, and emerging opportunities.
            Provide a comprehensive but concise analysis.

            Respond ONLY with a JSON format output directly addressing the analysis in market trends. No json in the start of the string.
            """,
            functions=[analyze_market_trends]
        )

class MarketRecommendationAgent(Agent):
    def __init__(self):

        def recommend_based_on_analysis(context_variables):
          """
          Based on the market trend analysis, provide strategic recommendations.
          """

          return {
                  "market_trend_analysis": context_variables["trend_analysis"],
              }

        super().__init__(
            name="MarketRecommendationAgent",
            model="gpt-4o-mini",
            instructions="""
            You are a strategic recommendations specialist.
            Based on market trend analysis, generate actionable
            strategic recommendations for business growth.

            Provide clear, data-driven insights and prioritized action items.
            """,
            functions=[recommend_based_on_analysis]
        )

In [15]:
# Initialize Swarm
client = Swarm()

# Create market recommendation agent with ability to transfer
market_recommendation_agent = MarketRecommendationAgent()

# Create trend analysis agent with ability to transfer
trend_analysis_agent = MarketTrendAnalysisAgent()

### Market Trend Analysis

In [16]:
# Run initial workflow with trend analysis agent
response = client.run(
    agent=trend_analysis_agent,
    messages=[
        {"role": "user", "content": "Analyze market trends for our digital marketing data."},
    ],
    context_variables={"df": processed_df}
)

# Print trend analysis
trend_analysis = response.messages[-1]["content"]
print(trend_analysis)

{
  "key_trends": {
    "primary_traffic_sources": [
      "google",
      "facebook",
      "direct"
    ],
    "highest_conversion_rates": [
      {
        "source": "google",
        "conversion_rate": 736.27
      },
      {
        "source": "facebook",
        "conversion_rate": 716.81
      },
      {
        "source": "direct",
        "conversion_rate": 374.55
      }
    ],
    "significant_growth": "tiktok",
    "low_performance_sources": [
      {
        "source": "baidu",
        "conversion_rate": 0.0
      },
      {
        "source": "youtube",
        "conversion_rate": 0.0
      }
    ]
  },
  "consumer_behavior_shifts": {
    "increased_mobile_usage": {
      "mobile": {
        "percentage": 51.49,
        "mean_spent": 126358.12
      },
      "PC": {
        "percentage": 48.51,
        "mean_spent": 123892.12
      }
    }
  },
  "emerging_opportunities": {
    "potential_growth": [
      "tiktok",
      "instagram"
    ],
    "underutilized_sources": [
      "

### Strategic Marketing Recommendations

In [17]:
# Transfer to recommendation agent
recommendation_response = client.run(
    agent=market_recommendation_agent,
    messages=[
        {"role": "user", "content": "Generate strategic recommendations based on the trend analysis"}
    ],
    context_variables={"trend_analysis": trend_analysis}
)

# Print recommendations
strategic_recommendations = recommendation_response.messages[-1]["content"]
print(strategic_recommendations)

Based on the market trend analysis, here are actionable strategic recommendations for business growth:

### 1. **Leverage High-Performance Traffic Sources**
   - **Google and Facebook Advertising**: With conversion rates soaring at 736.27% for Google and 716.81% for Facebook, prioritize increasing your advertising budget on these platforms. Create targeted campaigns to reach specific demographics.
   - **Direct Traffic Optimization**: Enhance your website’s SEO and user experience to capture more direct traffic, which converts at a rate of 374.55%.

### 2. **Exploit Emerging Platforms**
   - **Invest in TikTok**: Noted for significant growth, develop a strategy to engage with TikTok's diverse audience through creative content that showcases your brand authentically.
   - **Instagram Presence**: Strengthen your marketing efforts on Instagram by utilizing influencers and user-generated content to capture the attention of potential customers.

### 3. **Capitalize on Consumer Behavior Shif

## Customer Experience


The **User Segmentation Agent** is an agent designed to segment users based on marketplace data, enabling targeted marketing and personalized customer strategies. This agent performs advanced user clustering using machine learning techniques, specifically K-Means clustering, to categorize users into distinct segments. It preprocesses user data by calculating key metrics such as conversion rates, revenue per visit, and engagement scores, then applies normalization to ensure accuracy in the clustering process. The agent analyzes user behaviors, such as visits, pageviews, transactions, and device usage, to provide detailed insights into each segment. Once the segmentation process is complete, the **User Segmentation Agent** prepares the data for the **Personalization Agent**, enabling businesses to tailor their strategies to each user group.


The **Personalization Agent** is responsible for translating user segmentation data into actionable, personalized strategies to enhance the customer experience. Based on insights from the **User Segmentation Agent**, it creates detailed personalization plans, offering tailored marketing and communication strategies for each user segment. This agent crafts user personas, defines communication strategies (including the preferred channels and messaging tone), and recommends targeted marketing tactics like personalized mobile campaigns and product recommendations. The **Personalization Agent** helps organizations improve conversion rates by implementing strategies such as segment-specific landing pages and promotions. Once the personalization strategies are ready, the agent prepares the data for the **Reporting Agent** to generate a comprehensive report.

The **Reporting Agent** consolidates and compiles detailed customer experience reports by combining insights from the **User Segmentation Agent** and the **Personalization Agent**. It generates comprehensive, markdown-formatted reports that summarize user segment statistics and corresponding personalization strategies. This agent’s role is to synthesize the segmented data and the personalized strategies into clear, actionable, and strategic reports that are easy to interpret. These reports can be used by decision-makers to understand user behaviors and the impact of personalized strategies, enabling businesses to refine their approach to customer experience and marketing.


In [18]:
import os
import pandas as pd
import numpy as np
from typing import Dict, List, Any
from swarm import Swarm, Agent

# Data Processing
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans

class UserSegmentationAgent(Agent):
    def __init__(self):
        def segment_users(context_variables) -> Dict:
          """
          Perform advanced user segmentation

          Returns:
              Detailed user segment information
          """
          if processed_df is None:
              raise ValueError("No data has been set. Use set_data() method first.")

          n_clusters = 5
          df = context_variables["df"]

          # Feature Engineering
          df['conversion_rate'] = df['transactions'] / df['visits']
          df['revenue_per_visit'] = df['revenue'] / df['visits']
          df['engagement_score'] = (df['pageviews'] / df['visits']) * df['conversion_rate']

          # Select clustering features
          clustering_features = [
              'visits', 'pageviews', 'transactions',
              'conversion_rate', 'revenue_per_visit'
          ]

          # Normalize features
          scaler = StandardScaler()
          X_scaled = scaler.fit_transform(df[clustering_features])

          # Perform K-Means Clustering
          kmeans = KMeans(n_clusters=n_clusters, random_state=42)
          df['user_segment'] = kmeans.fit_predict(X_scaled)

          # Analyze Segments
          segments = {}
          for segment in range(n_clusters):
              segment_data = df[df['user_segment'] == segment]

              segments[segment] = {
                  'size': len(segment_data),
                  'avg_visits': segment_data['visits'].mean(),
                  'avg_conversion_rate': segment_data['conversion_rate'].mean(),
                  'avg_revenue_per_visit': segment_data['revenue_per_visit'].mean(),
                  'source': segment_data['source'].value_counts().to_dict(),
                  'device_types': segment_data['device_type'].value_counts().to_dict()
              }
          return segments


        super().__init__(
            name="UserSegmentationAgent",
            instructions="""
            You are a user segmentation specialist responsible for:
            1. Preprocessing marketplace data
            2. Performing advanced user clustering
            3. Generating detailed user segment insights
            4. Preparing data for personalization strategy

            Respond ONLY with a JSON format output directly addressing the segmented data. No json in the start of the string.
            """,
            functions=[segment_users]
        )


class PersonalizationAgent(Agent):
    def __init__(self):
      def generate_personalization_strategy(context_variables) -> Dict:
        """
        Generate personalized strategy for a user segment
        """
        return {
                "user_segment_data": context_variables["segment_data"],
            }

      super().__init__(
            name="PersonalizationAgent",
            instructions="""
            You are a customer experience personalization strategist.
            Your role is to:
            1. Develop personalized strategies for each user segment
            2. Create targeted communication and marketing approaches
            3. Generate actionable insights for improving customer experience

            Respond ONLY with a JSON format output directly addressing the personalization strategy. No json in the start of the string.
            """,
            functions=[generate_personalization_strategy]
        )

class ReportingAgent(Agent):
  def __init__(self):
    def generate_comprehensive_report(context_variables) -> str:
      """
      Generate a comprehensive customer experience report
      """
      return {
          "segments": context_variables["segments"],
          "personalization_strategies": context_variables["personalization_strategies"]
      }

    super().__init__(
      name="ReportingAgent",
      instructions="""
      You are a reporting and insights specialist.
      Compile comprehensive customer experience reports
      that summarize user segment statistics and corresponding personalization strategies.

      Generate clear, actionable, and strategic reports based on the segments and personalization strategies of each segment.
      """,
      function=[generate_comprehensive_report]
    )


In [19]:
# Initialize Swarm
client = Swarm()

# Create agents (reporting agent first to allow transfer)
reporting_agent = ReportingAgent()
personalization_agent = PersonalizationAgent()
segmentation_agent = UserSegmentationAgent()

In [20]:
# Run Segmentation Workflow
segmentation_response = client.run(
    agent=segmentation_agent,
    messages=[
        {"role": "user", "content": "Perform advanced user segmentation"},
    ],
    context_variables={"df": processed_df}
)

# Extract segments from response
segments = segmentation_response.messages[-1]["content"]

print(segments)

{
  "segments": [
    {
      "segment_id": 0,
      "size": 22795,
      "average_visits": 42.35,
      "average_conversion_rate": 0.05,
      "average_revenue_per_visit": 279.19,
      "source_distribution": {
        "facebook": 4540,
        "google": 3541,
        "instagram": 1975,
        "other": 1774,
        "tiktok": 1650,
        "cityads": 1537,
        "direct": 1444,
        "vk": 1204,
        "newsletter": 1022,
        "DuckDuckGo": 670,
        "promo": 601,
        "actionpay": 552,
        "bing": 552,
        "co-promo": 510,
        "mytarget": 399,
        "opmcpa": 337,
        "advertise": 222,
        "facebook_direct": 103,
        "youtube": 88,
        "sailplay": 40,
        "baidu": 19,
        "exponea": 15
      },
      "device_type_distribution": {
        "mobile": 9861,
        "PC": 8427,
        "no_data": 4507
      }
    },
    {
      "segment_id": 1,
      "size": 10617,
      "average_visits": 23.75,
      "average_conversion_rate": 0.47,
  

In [21]:
# Run Personalization Workflow
personalization_response = client.run(
    agent=personalization_agent,
    messages=[
        {"role": "user", "content": "Generate personalization strategies for user segments"}
    ],
    context_variables={"segment_data": segments}
)

# Extract personalization strategies
personalization_strategies = personalization_response.messages[-1]["content"]
print(personalization_strategies)


{
  "personalization_strategies": [
    {
      "segment_id": 0,
      "strategy": {
        "approach": "Engagement-Driven",
        "objectives": [
          "Increase conversion rate",
          "Enhance average revenue per visit"
        ],
        "target_communication_channels": [
          "facebook",
          "google",
          "instagram"
        ],
        "marketing_tactics": [
          {
            "medium": "social_media",
            "content": "Engaging content to boost interaction and engagement across top platforms like Facebook and Instagram."
          },
          {
            "medium": "direct_mail",
            "content": "Personalized offers to convert visits into purchases, leveraging user data and behavior insights."
          }
        ]
      }
    },
    {
      "segment_id": 1,
      "strategy": {
        "approach": "Conversion-Optimization",
        "objectives": [
          "Maintain high conversion rate",
          "Increase touchpoints on high-val

In [22]:
# Generate Comprehensive Report
report_response = client.run(
    agent=reporting_agent,
    messages=[
        {"role": "user", "content": "Generate comprehensive customer experience report based on the given user segments and their personalization strategies."},
    ],
    context_variables={"segments": segments, "personalization_strategies":personalization_strategies}
)

# Print Final Report
customer_experience_report = report_response.messages[-1]["content"]
print(customer_experience_report)

Certainly! To generate a comprehensive customer experience report, we will need to consider user segments along with the specific personalization strategies used for each. Here's an example framework for such a report, using hypothetical data:

---

**Customer Experience Report**

**1. Overview**

This report provides insights into the customer experience based on various user segments. It highlights segmented statistics and personalization strategies to enhance user engagement and satisfaction. 

**2. User Segments and Personalization Strategies**

**Segment 1: New Users**
   - **Percentage of Total Users**: 25%
   - **Demographics**: Mostly younger audience aged 18-24.
   - **Behavioral Traits**: Higher engagement in trial features, curious about product offerings.
   - **Personalization Strategy**:
     - **Welcome Campaign**: Automated personalized emails introducing features and benefits.
     - **Guided Onboarding**: Interactive in-app tours tailored to user preference profiles.


## Visual Recommender

The **Data Visualizer Agent** recommends the most appropriate graphs based on a dataset's columns and the context of an accompanying report. By analyzing the data types, relationships between variables, and the insights described in the report, it suggests the most effective visualizations, such as bar charts, scatter plots, or line graphs, tailored to highlight key trends and comparisons. The agent offers clear explanations on why a specific visualization is chosen, helping users easily interpret and communicate their data insights.

In [23]:
import os
import pandas as pd
import numpy as np
from typing import Dict, List, Any
from swarm import Swarm, Agent

class DataVisualizerAgent(Agent):
    def __init__(self):
        def recommend_graphs(context_variables) -> Dict:
          """
          Recommend the most suitable visualizations
          """
          return {
              "report_data": context_variables["report_data"],
              "dataset_columns": context_variables["dataset_columns"]
          }

        super().__init__(
            name="DataVisualizerAgent",
            instructions="""
              You are a data visualization expert. Analyze the following report data
              and recommend the most suitable visualization graphs. For each recommendation,
              provide:
              - Graph type (bar chart, scatter plot, etc.)
              - X-axis column
              - Y-axis column
              - Columns needed (only consider the columns that is in the dataset columns)
              - Brief description of why this visualization is appropriate

              Respond strictly in the following JSON format. No json in the start of the string.
              {{
                  "recommended_graphs": [
                      {{
                          "type": "string",
                          "x_axis": "string",
                          "y_axis": "string",
                          "columns_needed": ["string"],
                          "title": "string",
                          "description": "string"
                      }}
                  ]
              }}
              """,
            functions=[recommend_graphs]
        )


In [24]:
# Create visualizer agent with transfer capability
visualizer_agent = DataVisualizerAgent()
column_str = ', '.join(processed_df.columns)

# Run Visualization Workflow
visualization_response = client.run(
    agent=visualizer_agent,
    messages=[
        {"role": "user", "content": f"Recommend a visualizations for the report {anomaly_report} and given dataset columns:  {column_str}"}
    ],
    context_variables={"report_data": anomaly_report, "dataset_columns": column_str}
)

In [25]:
# Extract recommendations from response
visual_recommendations = visualization_response.messages[-1]["content"]
print(visual_recommendations)

{
    "recommended_graphs": [
        {
            "type": "scatter plot",
            "x_axis": "date",
            "y_axis": "pageviews",
            "columns_needed": ["date", "pageviews"],
            "title": "Pageviews Over Time",
            "description": "A scatter plot will effectively show the distribution of pageviews over time, allowing us to identify any anomalies or trends in the data which could support the recommended action."
        },
        {
            "type": "bar chart",
            "x_axis": "source",
            "y_axis": "visits",
            "columns_needed": ["source", "visits"],
            "title": "Visits by Source",
            "description": "A bar chart will visualize the number of visits from different sources, making it easy to spot outliers or trends per source, which could affect the visit numbers."
        },
        {
            "type": "distribution plot",
            "x_axis": "productClick",
            "y_axis": "count",
            "col

In [26]:
visual_recommendations = json.loads(visual_recommendations)

In [27]:
import pandas as pd
import plotly.express as px
import plotly.graph_objs as go

class PlotlyVisualization:
    def __init__(self, dataset, recommended_graphs):
        """
        Initialize the visualization agent with dataset and graph recommendations.

        :param dataset: pandas DataFrame containing the data
        :param recommended_graphs: List of graph specifications
        """
        self.dataset = dataset
        self.recommended_graphs = recommended_graphs

    def validate_columns(self, graph_spec):
        """
        Validate that required columns exist in the dataset.

        :param graph_spec: Dictionary containing graph specification
        :return: Boolean indicating column availability
        """
        columns_needed = graph_spec.get('columns_needed', [])
        return all(col in self.dataset.columns for col in columns_needed)

    def generate_visualizations(self):
        """
        Generate all recommended visualizations.

        :return: Dictionary of graph specifications and their corresponding Plotly figures
        """
        visualizations = {}

        for graph_spec in self.recommended_graphs:
            # Validate columns before attempting to create visualization
            if not self.validate_columns(graph_spec):
                print(f"Skipping {graph_spec['title']} - Required columns not found")
                continue

            # Create visualization based on graph type
            try:
                if graph_spec['type'] == 'bar chart':
                    fig = self._create_bar_chart(graph_spec)
                elif graph_spec['type'] == 'scatter plot':
                    fig = self._create_scatter_plot(graph_spec)
                elif graph_spec['type'] == 'line chart':
                    fig = self._create_line_chart(graph_spec)
                elif graph_spec['type'] == 'box plot':
                    fig = self._create_boxplot(graph_spec)
                elif graph_spec['type'] == 'histogram':
                    fig = self._create_histogram(graph_spec)
                else:
                    print(f"Unsupported graph type: {graph_spec['type']}")
                    continue

                visualizations[graph_spec['title']] = {
                    'figure': fig,
                    'description': graph_spec.get('description', '')
                }
            except Exception as e:
                print(f"Error creating {graph_spec['title']}: {str(e)}")


        return visualizations

    def _create_bar_chart(self, graph_spec):
        """
        Create a bar chart based on graph specification.

        :param graph_spec: Dictionary containing bar chart specification
        :return: Plotly Figure object
        """
        # Support for color on x-axis
        color = graph_spec['x_axis']

        if color:
            fig = px.bar(
                self.dataset,
                x=graph_spec['x_axis'],
                y=graph_spec['y_axis'],
                color=color,
                title=graph_spec['title']
            )
        else:
            fig = px.bar(
                self.dataset,
                x=graph_spec['x_axis'],
                y=graph_spec['y_axis'],
                title=graph_spec['title']
            )

        fig.update_layout(
            xaxis_title=graph_spec.get('x_axis_title', graph_spec['x_axis']),
            yaxis_title=graph_spec.get('y_axis_title', graph_spec['y_axis'])
        )
        return fig

    def _create_scatter_plot(self, graph_spec):
        """
        Create a scatter plot based on graph specification.

        :param graph_spec: Dictionary containing scatter plot specification
        :return: Plotly Figure object
        """
        # Support for color on x-axis and additional size parameter
        color = graph_spec['x_axis']

        if color:
            fig = px.scatter(
                self.dataset,
                x=graph_spec['x_axis'],
                y=graph_spec['y_axis'],
                color=color,
                title=graph_spec['title']
            )
        else:
            fig = px.scatter(
                self.dataset,
                x=graph_spec['x_axis'],
                y=graph_spec['y_axis'],
                title=graph_spec['title']
            )

        fig.update_layout(
            xaxis_title=graph_spec.get('x_axis_title', graph_spec['x_axis']),
            yaxis_title=graph_spec.get('y_axis_title', graph_spec['y_axis'])
        )
        return fig

    def _create_line_chart(self, graph_spec):
        """
        Create a line chart based on graph specification.

        :param graph_spec: Dictionary containing line chart specification
        :return: Plotly Figure object
        """
        # Support for color on x-axis
        color = graph_spec['x_axis']

        if color:
            fig = px.line(
                self.dataset,
                x=graph_spec['x_axis'],
                y=graph_spec['y_axis'],
                color=color,
                title=graph_spec['title']
            )
        else:
            fig = px.line(
                self.dataset,
                x=graph_spec['x_axis'],
                y=graph_spec['y_axis'],
                title=graph_spec['title']
            )

        fig.update_layout(
            xaxis_title=graph_spec.get('x_axis_title', graph_spec['x_axis']),
            yaxis_title=graph_spec.get('y_axis_title', graph_spec['y_axis'])
        )
        return fig

    def _create_boxplot(self, graph_spec):
        """
        Create a boxplot based on graph specification.

        :param graph_spec: Dictionary containing boxplot specification
        :return: Plotly Figure object
        """
        # Support for color grouping
        color = graph_spec.get('color')

        if color:
            fig = px.box(
                self.dataset,
                x=graph_spec.get('x_axis'),
                y=graph_spec['y_axis'],
                color=color,
                title=graph_spec['title']
            )
        else:
            fig = px.box(
                self.dataset,
                x=graph_spec.get('x_axis'),
                y=graph_spec['y_axis'],
                title=graph_spec['title']
            )

        fig.update_layout(
            xaxis_title=graph_spec.get('x_axis_title', graph_spec.get('x_axis', '')),
            yaxis_title=graph_spec.get('y_axis_title', graph_spec['y_axis'])
        )
        return fig

    def _create_histogram(self, graph_spec):
        """
        Create a histogram based on graph specification.

        :param graph_spec: Dictionary containing histogram specification
        :return: Plotly Figure object
        """
        # Support for color grouping and additional parameters
        color = graph_spec.get('color')

        if color:
            fig = px.histogram(
                self.dataset,
                x=graph_spec['x_axis'],
                color=color,
                title=graph_spec['title'],
                marginal=graph_spec.get('marginal'),  # Optional: 'rug', 'box', 'violin'
                nbins=graph_spec.get('nbins')  # Optional: number of bins
            )
        else:
            fig = px.histogram(
                self.dataset,
                x=graph_spec['x_axis'],
                title=graph_spec['title'],
                marginal=graph_spec.get('marginal'),
                nbins=graph_spec.get('nbins')
            )

        fig.update_layout(
            xaxis_title=graph_spec.get('x_axis_title', graph_spec['x_axis']),
            yaxis_title=graph_spec.get('y_axis_title', 'Count')
        )
        return fig


def create_visualizations(dataset, recommended_graphs):
    """
    Convenience function to create visualizations from a dataset and graph specifications.

    :param dataset: pandas DataFrame
    :param recommended_graphs: List of graph specifications
    :return: Dictionary of generated visualizations
    """
    agent = PlotlyVisualization(dataset, recommended_graphs)
    visualizations = agent.generate_visualizations()
    return visualizations

In [28]:
# Generate visualizations
visualizations = create_visualizations(processed_df, visual_recommendations['recommended_graphs'])

Unsupported graph type: distribution plot


In [29]:
# Show each visualization
for title, visualization_data in visualizations.items():
    visualization_data['figure'].show()  # Display the figure using show()
