<a href="https://colab.research.google.com/github/laviefatigue/mcphs_test/blob/main/Hubstaff_MCP_Local_Runner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import requests
import json
import getpass
import os
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, time

# --- Configuration ---
API_BASE_URL = "https://api.hubstaff.com/v2"
TOKEN_EXCHANGE_URL = "https://account.hubstaff.com/access_tokens"

# --- 1. Authentication Manager ---
class AuthManager:
    """Manages the authentication lifecycle for the Hubstaff API."""
    def __init__(self, refresh_token):
        if not refresh_token:
            raise ValueError("A Hubstaff refresh token is required.")
        self._refresh_token = refresh_token
        self._access_token = None
        # In a real server, you'd handle token expiration. For this local script,
        # we'll just get a new one each time for simplicity.

    def get_access_token(self):
        """Exchanges the refresh token for a short-lived access token."""
        print("🔄 Attempting to exchange refresh token for an access token...")
        payload = {
            "grant_type": "refresh_token",
            "refresh_token": self._refresh_token
        }
        try:
            response = requests.post(TOKEN_EXCHANGE_URL, data=payload)
            response.raise_for_status()
            response_data = response.json()
            self._access_token = response_data.get("access_token")
            if self._access_token:
                print("✅ Access token retrieved successfully.")
                return self._access_token
            else:
                print("❌ ERROR: 'access_token' not found in the response.")
                return None
        except requests.exceptions.HTTPError as err:
            print(f"❌ HTTP Error during token exchange: {err}")
            print(f"   Response: {err.response.text}")
            return None
        except requests.exceptions.RequestException as err:
            print(f"❌ Network Error during token exchange: {err}")
            return None

# --- 2. Hubstaff API Connector ---
class HubstaffConnector:
    """Handles all direct communication with the Hubstaff API."""
    def __init__(self, auth_manager):
        self._auth_manager = auth_manager
        self._access_token = self._auth_manager.get_access_token()
        self._organization_id = None # Will be set by get_my_organization

    def _make_request(self, method, endpoint, params=None, is_paginated=False):
        """A generic method to make authenticated requests to the Hubstaff API."""
        if not self._access_token:
            print("❌ Cannot make request without an access token.")
            return None

        url = f"{API_BASE_URL}{endpoint}"
        headers = {"Authorization": f"Bearer {self._access_token}"}

        # Handle pagination if requested
        if is_paginated:
            all_data = []
            next_page_start_id = None
            while True:
                if next_page_start_id:
                    if params is None: params = {}
                    params['page_start_id'] = next_page_start_id

                try:
                    response = requests.request(method, url, headers=headers, params=params)
                    response.raise_for_status()
                    data = response.json()

                    # The actual data is usually in a key like 'activities', 'projects', etc.
                    key = list(data.keys())[0] if len(data.keys()) == 1 else None
                    if key and isinstance(data[key], list):
                         all_data.extend(data[key])
                    else: # If response format is unexpected, just append the whole thing
                         all_data.append(data)

                    pagination_info = data.get('pagination')
                    if pagination_info and pagination_info.get('next_page_start_id'):
                        next_page_start_id = pagination_info['next_page_start_id']
                    else:
                        break # No more pages
                except requests.exceptions.HTTPError as err:
                    print(f"❌ HTTP Error during API request to {endpoint}: {err}")
                    if err.response.status_code == 401: print("   Token may be expired. Try re-running the script.")
                    print(f"   Response: {err.response.text}")
                    return None
                except requests.exceptions.RequestException as err:
                    print(f"❌ Network Error during API request to {endpoint}: {err}")
                    return None

            # Return the aggregated data under the original key
            key = list(all_data[0].keys())[0] if all_data and len(all_data[0].keys()) == 1 else 'data'
            return {key: all_data}

        # Non-paginated request
        try:
            response = requests.request(method, url, headers=headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as err:
            print(f"❌ HTTP Error during API request to {endpoint}: {err}")
            if err.response.status_code == 401: print("   Token may be expired. Try re-running the script.")
            print(f"   Response: {err.response.text}")
            return None
        except requests.exceptions.RequestException as err:
            print(f"❌ Network Error during API request to {endpoint}: {err}")
            return None


    def get_my_organization(self):
        """Fetches the user's organizations and sets the first one as active."""
        if self._organization_id:
            return self._organization_id

        print("Discovering organization ID...")
        orgs_data = self._make_request("GET", "/organizations")
        if orgs_data and 'organizations' in orgs_data and len(orgs_data['organizations']) > 0:
            first_org = orgs_data['organizations'][0]
            self._organization_id = first_org.get('id')
            if self._organization_id:
                print(f"✅ Organization found: '{first_org.get('name')}' (ID: {self._organization_id})")
                return self._organization_id

        print("❌ ERROR: Could not find any organizations for this account.")
        return None

    def get_all_members(self):
        """Fetches all members (users) for the configured organization."""
        if not self._organization_id: self.get_my_organization()
        if not self._organization_id: return None

        print(f"Fetching all members for organization ID: {self._organization_id}...")
        endpoint = f"/organizations/{self._organization_id}/members"
        params = {'include': 'users'}
        return self._make_request("GET", endpoint, params=params)

    def get_all_projects(self):
        """Fetches all projects for the configured organization."""
        if not self._organization_id: self.get_my_organization()
        if not self._organization_id: return None

        print(f"Fetching all projects for organization ID: {self._organization_id}...")
        endpoint = f"/organizations/{self._organization_id}/projects"
        return self._make_request("GET", endpoint)

    def get_activities_by_date(self, target_date):
        """Fetches all activities for a specific date."""
        if not self._organization_id: self.get_my_organization()
        if not self._organization_id: return None

        print(f"Fetching all activities for {target_date.strftime('%Y-%m-%d')}...")
        endpoint = f"/organizations/{self._organization_id}/activities"

        start_time = datetime.combine(target_date, time.min).isoformat() + "Z"
        stop_time = datetime.combine(target_date, time.max).isoformat() + "Z"

        params = {
            'time_slot[start]': start_time,
            'time_slot[stop]': stop_time,
            'include': 'users,projects'
        }
        return self._make_request("GET", endpoint, params=params, is_paginated=True)

    def get_time_entries(self, start_date, end_date, user_ids=None, project_ids=None):
        """Fetches aggregated time entries for a given date range and filters."""
        if not self._organization_id: self.get_my_organization()
        if not self._organization_id: return None

        print(f"Fetching time entries from {start_date} to {end_date}...")
        endpoint = f"/organizations/{self._organization_id}/time_entries"

        params = {
            'date[start]': start_date,
            'date[stop]': end_date,
        }
        if user_ids:
            params['user_ids'] = ','.join(map(str, user_ids))
        if project_ids:
            params['project_ids'] = ','.join(map(str, project_ids))

        return self._make_request("GET", endpoint, params=params, is_paginated=True)


# --- 3. Unified Tool Definition Framework ---
class HubstaffTool(ABC):
    """An abstract base class for a unified tool definition."""
    @property
    @abstractmethod
    def name(self) -> str: pass
    @property
    @abstractmethod
    def description(self) -> str: pass
    @abstractmethod
    def get_schema(self) -> dict: pass
    @abstractmethod
    def execute(self, connector: HubstaffConnector, **kwargs) -> dict: pass

# --- 4. Core Tool Implementations ---
class GetUserListTool(HubstaffTool):
    """Tool to retrieve a list of all users in the organization."""
    name = "get_user_list"
    description = "Retrieves a complete list of all users (members) in the Hubstaff organization."
    def get_schema(self) -> dict: return {"type": "object", "properties": {}, "required": []}

    def execute(self, connector: HubstaffConnector, **kwargs) -> dict:
        data = connector.get_all_members()
        if data and 'users' in data:
            return {"users": [{"id": u.get("id"), "name": u.get("name"), "email": u.get("email")} for u in data["users"]]}
        return {"error": "Could not retrieve users or user data was not in the expected format."}

class GetProjectDetailsTool(HubstaffTool):
    """Tool to get details for a specific project."""
    name = "get_project_details"
    description = "Retrieves detailed information for a specific project by its exact name."
    def get_schema(self) -> dict:
        return {"type": "object", "properties": {"project_name": {"type": "string", "description": "The exact name of the project to retrieve."}}, "required": ["project_name"]}

    def execute(self, connector: HubstaffConnector, **kwargs) -> dict:
        project_name = kwargs.get("project_name")
        if not project_name: return {"error": "project_name parameter is required."}
        projects_data = connector.get_all_projects()
        if projects_data and 'projects' in projects_data:
            for project in projects_data['projects']:
                if project.get('name', '').lower() == project_name.lower(): return {"project": project}
            return {"error": f"Project with name '{project_name}' not found."}
        return {"error": "Could not retrieve projects."}

class GetUserByEmailTool(HubstaffTool):
    """Tool to find a specific user by their email address."""
    name = "get_user_by_email"
    description = "Finds a single user by their exact email address."
    def get_schema(self) -> dict:
        return {"type": "object", "properties": {"email": {"type": "string", "description": "The exact email address of the user to find."}}, "required": ["email"]}

    def execute(self, connector: HubstaffConnector, **kwargs) -> dict:
        email_to_find = kwargs.get("email")
        if not email_to_find: return {"error": "email parameter is required."}
        data = connector.get_all_members()
        if data and 'users' in data:
            for user in data['users']:
                if user.get('email', '').lower() == email_to_find.lower(): return {"user": user}
            return {"error": f"User with email '{email_to_find}' not found."}
        return {"error": "Could not retrieve users or user data was not in the expected format."}

class GetProductivitySummaryTool(HubstaffTool):
    """Tool to generate a daily productivity summary for all contractors."""
    name = "get_productivity_summary"
    description = "Generates a detailed productivity summary for a given date. It lists each contractor, their total hours, overall activity, and a breakdown of hours and activity for each project they worked on."

    def get_schema(self) -> dict:
        return {
            "type": "object",
            "properties": {
                "date": {
                    "type": "string",
                    "description": "The date for the summary in YYYY-MM-DD format. Use 'yesterday' for the previous day."
                }
            },
            "required": ["date"]
        }

    def execute(self, connector: HubstaffConnector, **kwargs) -> dict:
        date_str = kwargs.get("date")
        if not date_str: return {"error": "date parameter is required."}

        if date_str.lower() == 'yesterday':
            target_date = datetime.now().date() - timedelta(days=1)
        else:
            try:
                target_date = datetime.strptime(date_str, "%Y-%m-%d").date()
            except ValueError:
                return {"error": "Invalid date format. Please use YYYY-MM-DD or 'yesterday'."}

        activities_data = connector.get_activities_by_date(target_date)
        if not activities_data or 'activities' not in activities_data:
            return {"error": f"No activities found for {target_date.strftime('%Y-%m-%d')}."}

        summary = {}
        users_data = connector.get_all_members()
        projects_data = connector.get_all_projects()
        user_map = {u['id']: u['name'] for u in users_data.get('users', [])}
        project_map = {p['id']: p['name'] for p in projects_data.get('projects', [])}

        for activity in activities_data['activities']:
            user_id = activity.get('user_id')
            project_id = activity.get('project_id')
            user_name = user_map.get(user_id, f"Unknown User (ID: {user_id})")
            project_name = project_map.get(project_id, f"Unknown Project (ID: {project_id})")

            if user_name not in summary:
                summary[user_name] = {"total_tracked_seconds": 0, "total_activity_seconds": 0, "projects": {}}

            tracked_seconds = activity.get('tracked', 0)
            activity_percent = activity.get('overall', 0)
            summary[user_name]["total_tracked_seconds"] += tracked_seconds
            summary[user_name]["total_activity_seconds"] += tracked_seconds * (activity_percent / 100.0)

            if project_name not in summary[user_name]["projects"]:
                summary[user_name]["projects"][project_name] = {"tracked_seconds": 0, "activity_seconds": 0}
            summary[user_name]["projects"][project_name]["tracked_seconds"] += tracked_seconds
            summary[user_name]["projects"][project_name]["activity_seconds"] += tracked_seconds * (activity_percent / 100.0)

        final_report = []
        for name, data in summary.items():
            total_hours = data['total_tracked_seconds'] / 3600.0
            overall_activity = (data['total_activity_seconds'] / data['total_tracked_seconds'] * 100) if data['total_tracked_seconds'] > 0 else 0

            contractor_report = {
                "contractor_name": name,
                "total_hours": round(total_hours, 2),
                "average_activity_percent": round(overall_activity),
                "project_breakdown": []
            }

            for proj_name, proj_data in data['projects'].items():
                proj_hours = proj_data['tracked_seconds'] / 3600.0
                proj_activity = (proj_data['activity_seconds'] / proj_data['tracked_seconds'] * 100) if proj_data['tracked_seconds'] > 0 else 0
                contractor_report["project_breakdown"].append({
                    "project_name": proj_name,
                    "hours": round(proj_hours, 2),
                    "activity_percent": round(proj_activity)
                })
            final_report.append(contractor_report)

        return {"productivity_summary": final_report}

class GetAgentTimeSummaryTool(HubstaffTool):
    """Tool to get a detailed time summary for specific agents and projects over a date range."""
    name = "get_agent_time_summary"
    description = "Provides a holistic breakdown of hours logged by one or more agents. Can be filtered by projects and a custom date range."

    def get_schema(self) -> dict:
        return {
            "type": "object",
            "properties": {
                "agent_emails": {"type": "string", "description": "A comma-separated list of agent email addresses to include in the summary."},
                "start_date": {"type": "string", "description": "The start date for the summary in YYYY-MM-DD format."},
                "end_date": {"type": "string", "description": "The end date for the summary in YYYY-MM-DD format."},
                "project_names": {"type": "string", "description": "Optional: A comma-separated list of exact project names to filter by. If omitted, includes all projects."},
            },
            "required": ["agent_emails", "start_date", "end_date"]
        }

    def execute(self, connector: HubstaffConnector, **kwargs) -> dict:
        agent_emails_str = kwargs.get("agent_emails")
        start_date_str = kwargs.get("start_date")
        end_date_str = kwargs.get("end_date")
        project_names_str = kwargs.get("project_names")

        # --- Input Parsing and Validation ---
        try:
            start_date = datetime.strptime(start_date_str, "%Y-%m-%d").strftime("%Y-%m-%d")
            end_date = datetime.strptime(end_date_str, "%Y-%m-%d").strftime("%Y-%m-%d")
        except ValueError:
            return {"error": "Invalid date format. Please use YYYY-MM-DD."}

        agent_emails = [email.strip() for email in agent_emails_str.split(',')]

        # --- Resolve Names/Emails to IDs ---
        all_users = connector.get_all_members().get('users', [])
        user_map = {u.get('email'): u.get('id') for u in all_users if u.get('email')}
        user_id_map = {u.get('id'): u.get('name') for u in all_users if u.get('id')}

        target_user_ids = [user_map[email] for email in agent_emails if email in user_map]
        if not target_user_ids:
            return {"error": f"Could not find any of the specified users: {agent_emails_str}"}

        target_project_ids = None
        if project_names_str:
            project_names = [name.strip() for name in project_names_str.split(',')]
            all_projects = connector.get_all_projects().get('projects', [])
            project_map = {p['name']: p['id'] for p in all_projects}
            target_project_ids = [project_map[name] for name in project_names if name in project_map]
            if not target_project_ids:
                 return {"error": f"Could not find any of the specified projects: {project_names_str}"}

        # --- Fetch Data ---
        time_entries_data = connector.get_time_entries(start_date, end_date, target_user_ids, target_project_ids)
        if not time_entries_data or 'time_entries' not in time_entries_data:
            return {"summary": "No time entries found for the specified criteria."}

        # --- Process Data ---
        summary = {}
        all_projects_map = {p['id']: p['name'] for p in connector.get_all_projects().get('projects', [])}

        for entry in time_entries_data['time_entries']:
            user_id = entry.get('user_id')
            user_name = user_id_map.get(user_id, f"Unknown User ID: {user_id}")

            if user_name not in summary:
                summary[user_name] = {"total_tracked_seconds": 0, "project_breakdown": {}}

            project_id = entry.get('project_id')
            project_name = all_projects_map.get(project_id, f"Unknown Project ID: {project_id}")
            tracked_seconds = entry.get('tracked', 0)

            summary[user_name]["total_tracked_seconds"] += tracked_seconds

            if project_name not in summary[user_name]["project_breakdown"]:
                summary[user_name]["project_breakdown"][project_name] = 0

            summary[user_name]["project_breakdown"][project_name] += tracked_seconds

        # --- Final Formatting ---
        final_report = []
        for name, data in summary.items():
            agent_report = {
                "agent_name": name,
                "total_hours": round(data['total_tracked_seconds'] / 3600.0, 2),
                "project_breakdown": [
                    {"project_name": proj_name, "hours": round(proj_seconds / 3600.0, 2)}
                    for proj_name, proj_seconds in data['project_breakdown'].items()
                ]
            }
            final_report.append(agent_report)

        return {"agent_time_summary": final_report}


# --- 5. Context Engine (The Orchestrator) ---
class ContextEngine:
    """Loads tools and orchestrates their execution."""
    def __init__(self, auth_manager):
        self.connector = HubstaffConnector(auth_manager)
        self._tools = {
            "get_user_list": GetUserListTool(),
            "get_project_details": GetProjectDetailsTool(),
            "get_user_by_email": GetUserByEmailTool(),
            "get_productivity_summary": GetProductivitySummaryTool(),
            "get_agent_time_summary": GetAgentTimeSummaryTool(),
        }
        print("\n✅ Context Engine Initialized. Available tools:")
        for name, tool in self._tools.items():
            print(f"  - {name}: {tool.description}")

    def run_tool(self, tool_name, params=None):
        if params is None: params = {}
        if tool_name in self._tools:
            tool = self._tools[tool_name]
            print(f"\n▶️  Executing tool: '{tool_name}' with params: {params}")
            result = tool.execute(self.connector, **params)
            print("\n--- Tool Result ---")
            print(json.dumps(result, indent=2))
            print("-------------------")
        else:
            print(f"❌ ERROR: Tool '{tool_name}' not found.")


# --- Main Execution Block ---
def main():
    """Main function to run the local MCP script."""
    print("--- Hubstaff Model Context Protocol (Local Runner) ---")

    refresh_token = os.environ.get("HUBSTAFF_REFRESH_TOKEN") or getpass.getpass("Enter your Hubstaff Personal Access (Refresh) Token: ")
    if not refresh_token:
        print("Refresh token is required. Exiting.")
        return

    try:
        auth_manager = AuthManager(refresh_token)
        engine = ContextEngine(auth_manager)

        if not engine.connector.get_my_organization():
            print("Could not determine organization ID. Exiting.")
            return

        while True:
            print("\nWhich tool would you like to run?")
            tool_name = input(f"({', '.join(engine._tools.keys())}, or 'exit'): ").lower()
            if tool_name == 'exit': break

            if tool_name in engine._tools:
                tool = engine._tools[tool_name]
                params = {}
                required_params = tool.get_schema().get("required", [])
                for param in required_params:
                    # Handle comma-separated lists for specific parameters
                    if param in ["agent_emails", "project_names"]:
                         prompt_text = f"  Enter comma-separated list for '{param}': "
                    else:
                         prompt_text = f"  Enter value for '{param}': "
                    param_value = input(prompt_text)
                    params[param] = param_value
                engine.run_tool(tool_name, params)
            else:
                print("Invalid tool name. Please try again.")

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")

if __name__ == "__main__":
    main()