# Weni Project Cloner

This notebook copies information from one Weni project to another using the Weni API.

## Features:
1. Copy agent customization (name, role, personality, goal, instructions)
2. Copy knowledge bases:
   - Text bases
   - Site/link bases
   - File bases

## 1. Setup and Configuration

In [19]:
import os
import json
import requests
from dotenv import load_dotenv
import time
from typing import Dict, List, Any

# Load environment variables
load_dotenv()

# Get configuration from .env
BEARER_TOKEN = os.getenv('BEARER_TOKEN')
SOURCE_PROJECT_UUID = os.getenv('SOURCE_PROJECT_UUID')
DESTINATION_PROJECT_UUID = os.getenv('DESTINATION_PROJECT_UUID')

# Validate configuration
if not all([BEARER_TOKEN, SOURCE_PROJECT_UUID, DESTINATION_PROJECT_UUID]):
    raise ValueError("Please ensure all required environment variables are set in .env file")

print(f"Source Project: {SOURCE_PROJECT_UUID}")
print(f"Destination Project: {DESTINATION_PROJECT_UUID}")
print("Configuration loaded successfully!")

Source Project: dc594b55-9284-45dd-8d8a-96e3c5ce04a6
Destination Project: 2e0255e3-95a0-4ad0-8e32-bb3e37e35c38
Configuration loaded successfully!


## 2. API Helper Functions

In [20]:
# Common headers for all requests
def get_headers():
    return {
        'accept': 'application/json, text/plain, */*',
        'accept-language': 'en-US,en;q=0.9,pt-BR;q=0.8,pt;q=0.7,es;q=0.6,nl;q=0.5,fr;q=0.4',
        'authorization': f'Bearer {BEARER_TOKEN}',
        'origin': 'https://intelligence-next.weni.ai',
        'priority': 'u=1, i',
        'referer': 'https://intelligence-next.weni.ai/',
        'sec-ch-ua': '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
        'sec-ch-ua-mobile': '?0',
        'sec-ch-ua-platform': '"macOS"',
        'sec-fetch-dest': 'empty',
        'sec-fetch-mode': 'cors',
        'sec-fetch-site': 'same-site',
        'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'
    }

def get_headers_with_content_type():
    headers = get_headers()
    headers['content-type'] = 'application/json'
    return headers

def get_content_base_uuid(project_uuid: str) -> str:
    """Get content base UUID from project UUID using router API"""
    url = f'https://nexus.weni.ai/api/{project_uuid}/router/'
    
    response = requests.get(url, headers=get_headers())
    
    if response.status_code == 200:
        router_data = response.json()
        content_base_uuid = router_data.get('uuid')
        if content_base_uuid:
            return content_base_uuid
        else:
            raise Exception(f"No UUID found in router response for project {project_uuid}")
    else:
        raise Exception(f"Failed to get router data: {response.status_code} - {response.text}")

## 3. Agent Customization Functions

In [21]:
def get_agent_customization(project_uuid: str) -> Dict[str, Any]:
    """Get agent customization from source project"""
    url = f'https://nexus.weni.ai/api/{project_uuid}/customization/'
    
    response = requests.get(url, headers=get_headers())
    
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to get agent customization: {response.status_code} - {response.text}")

def update_agent_customization(project_uuid: str, customization_data: Dict[str, Any]) -> Dict[str, Any]:
    """Update agent customization in destination project"""
    url = f'https://nexus.weni.ai/api/{project_uuid}/customization/'
    
    response = requests.put(url, headers=get_headers_with_content_type(), json=customization_data)
    
    if response.status_code in [200, 201]:
        return response.json()
    else:
        raise Exception(f"Failed to update agent customization: {response.status_code} - {response.text}")

In [22]:
def delete_customization(project_uuid: str, customization_id: str) -> bool:
    """Delete a specific customization from project"""
    url = f'https://nexus.weni.ai/api/{project_uuid}/customization/?id={customization_id}'
    
    response = requests.delete(url, headers=get_headers())
    
    if response.status_code in [200, 204]:
        return True
    else:
        print(f"⚠️  Warning: Failed to delete customization {customization_id}: {response.status_code} - {response.text}")
        return False

def delete_all_customizations(project_uuid: str) -> int:
    """Delete all customizations from destination project before copying"""
    print(f"Deleting all existing customizations from destination project...")
    
    try:
        # Get current customizations to extract IDs
        customizations = get_agent_customization(project_uuid)
        instructions = customizations.get('instructions', [])
        
        deleted_count = 0
        
        for instruction in instructions:
            customization_id = instruction.get('id')
            if customization_id:
                success = delete_customization(project_uuid, customization_id)
                if success:
                    deleted_count += 1
                    print(f"  ✅ Deleted customization ID: {customization_id}")
                else:
                    print(f"  ❌ Failed to delete customization ID: {customization_id}")
                
                # Small delay to avoid rate limiting
                time.sleep(0.3)
        
        print(f"✅ Successfully deleted {deleted_count} customizations from destination project")
        return deleted_count
        
    except Exception as e:
        print(f"❌ Error deleting customizations: {str(e)}")
        return 0


## 4. Knowledge Base Functions

In [23]:
# Text Base Functions
def get_text_bases(content_base_uuid: str) -> List[Dict[str, Any]]:
    """Get all text bases from content base"""
    url = f'https://nexus.weni.ai/api/{content_base_uuid}/content-bases-text/'
    
    response = requests.get(url, headers=get_headers())
    
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to get text bases: {response.status_code} - {response.text}")

def create_text_base(content_base_uuid: str, text_data: Dict[str, str]) -> Dict[str, Any]:
    """Create a text base in destination content base"""
    url = f'https://nexus.weni.ai/api/{content_base_uuid}/content-bases-text/'
    
    response = requests.post(url, headers=get_headers_with_content_type(), json=text_data)
    
    if response.status_code in [200, 201]:
        return response.json()
    else:
        raise Exception(f"Failed to create text base: {response.status_code} - {response.text}")

def update_text_base(content_base_uuid: str, text_base_uuid: str, text_data: Dict[str, str]) -> Dict[str, Any]:
    """Update an existing text base in destination content base"""
    url = f'https://nexus.weni.ai/api/{content_base_uuid}/content-bases-text/{text_base_uuid}/'
    
    response = requests.put(url, headers=get_headers_with_content_type(), json=text_data)
    
    if response.status_code in [200, 201]:
        return response.json()
    else:
        raise Exception(f"Failed to update text base: {response.status_code} - {response.text}")

In [24]:
# Test getting content base UUID and text bases
source_content_base_uuid = get_content_base_uuid(SOURCE_PROJECT_UUID)
destination_content_base_uuid = get_content_base_uuid(DESTINATION_PROJECT_UUID)

print(f"Source content base UUID: {source_content_base_uuid}")
print(f"Destination content base UUID: {destination_content_base_uuid}")

# Test source text bases
source_text_response = get_text_bases(source_content_base_uuid)
source_text_bases = source_text_response.get("results", [])
print(f"Found {len(source_text_bases)} text bases in source")

# Test destination text bases
destination_text_response = get_text_bases(destination_content_base_uuid)
destination_text_bases = destination_text_response.get("results", [])
print(f"Found {len(destination_text_bases)} text bases in destination")

print(f"First source text base: {source_text_bases[0] if source_text_bases else 'None'}")
print(f"First destination text base: {destination_text_bases[0] if destination_text_bases else 'None'}")

Source content base UUID: 40d08fbb-c36e-477c-800b-2677f0cf4aad
Destination content base UUID: 7a437d70-2ab5-43e1-a189-56c44d70de5a
Found 0 text bases in source
Found 0 text bases in destination
First source text base: None
First destination text base: None


In [25]:
# Link/Site Base Functions
def get_link_bases(content_base_uuid: str) -> List[Dict[str, Any]]:
    """Get all link bases from content base"""
    url = f'https://nexus.weni.ai/api/{content_base_uuid}/content-bases-link/'
    
    response = requests.get(url, headers=get_headers())
    
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to get link bases: {response.status_code} - {response.text}")

def create_link_base(content_base_uuid: str, link_data: Dict[str, str]) -> Dict[str, Any]:
    """Create a link base in destination content base"""
    url = f'https://nexus.weni.ai/api/{content_base_uuid}/content-bases-link/'
    
    response = requests.post(url, headers=get_headers_with_content_type(), json=link_data)
    
    if response.status_code in [200, 201]:
        return response.json()
    else:
        raise Exception(f"Failed to create link base: {response.status_code} - {response.text}")

In [26]:
# File Base Functions
def get_file_bases(project_uuid: str) -> List[Dict[str, Any]]:
    """Get all file bases from project using inline-content-base-file endpoint"""
    url = f'https://nexus.weni.ai/api/{project_uuid}/inline-content-base-file/'
    
    response = requests.get(url, headers=get_headers())
    
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to get file bases: {response.status_code} - {response.text}")

def download_file(file_url: str) -> bytes:
    """Download a file from URL
    
    IMPORTANT: This is the ONLY function that actually downloads file content.
    All other functions just get metadata or URLs.
    """
    # This is the ONLY actual download - GET request to download file content
    response = requests.get(file_url)
    if response.status_code == 200:
        return response.content
    else:
        raise Exception(f"Failed to download file: {response.status_code}")



def get_file_download_url(file_name: str, content_base_file_uuid: str) -> str:
    """Get the signed download URL for a file using the download-file API
    
    IMPORTANT: This function ONLY gets a signed URL. It does NOT download the file.
    It just makes a POST request to get a temporary download URL.
    """
    url = 'https://nexus.weni.ai/api/v1/download-file'
    
    payload = {
        "file_name": file_name,
        "content_base_file": content_base_file_uuid
    }
    
    # This POST request only gets a URL, it does NOT download the file
    response = requests.post(url, headers=get_headers_with_content_type(), json=payload)
    
    if response.status_code == 200:
        data = response.json()
        file_url = data.get('file')
        if file_url:
            return file_url
        else:
            raise Exception(f"No file URL in download response for {file_name}")
    else:
        raise Exception(f"Failed to get file download URL: {response.status_code} - {response.text}")

def download_file_from_base(file_name: str, content_base_file_uuid: str) -> bytes:
    """Get download URL and download the file content
    
    IMPORTANT: This function downloads the file ONCE from the source project.
    The get_file_download_url only gets a signed URL (no download).
    The download_file actually downloads the file content.
    """
    # Step 1: Get signed download URL (this does NOT download the file, just gets a URL)
    download_url = get_file_download_url(file_name, content_base_file_uuid)
    
    # Step 2: Download the file content (this is the ONLY actual download)
    return download_file(download_url)

def delete_file_base(project_uuid: str, file_uuid: str) -> bool:
    """Delete a specific file base from project"""
    url = f'https://nexus.weni.ai/api/{project_uuid}/inline-content-base-file/{file_uuid}/'
    
    response = requests.delete(url, headers=get_headers())
    
    if response.status_code in [200, 204]:
        return True
    else:
        print(f"⚠️  Warning: Failed to delete file {file_uuid}: {response.status_code} - {response.text}")
        return False

def delete_all_file_bases(project_uuid: str) -> int:
    """Delete all file bases from destination project before copying
    
    IMPORTANT: This function ONLY lists and deletes files. It does NOT download any files.
    It only gets metadata (list of files) and then deletes them by UUID.
    """
    print(f"Deleting all existing file bases from destination project...")
    print("  ⚠️  NOTE: This step only lists metadata and deletes - NO file downloads occur here")
    
    try:
        # Get list of files (metadata only - NO download)
        response = get_file_bases(project_uuid)
        file_bases = response.get("results", [])
        
        if not file_bases:
            print("  No existing file bases to delete")
            return 0
        
        print(f"  Found {len(file_bases)} file(s) to delete (metadata only, no downloads)")
        
        deleted_count = 0
        
        for file_base in file_bases:
            file_uuid = file_base.get('uuid')
            file_name = file_base.get('created_file_name', 'Unknown')
            
            if file_uuid:
                # Delete by UUID only - NO download
                success = delete_file_base(project_uuid, file_uuid)
                if success:
                    deleted_count += 1
                    print(f"  ✅ Deleted file: {file_name} (by UUID, no download)")
                else:
                    print(f"  ❌ Failed to delete file: {file_name}")
                
                time.sleep(0.3)
        
        print(f"✅ Successfully deleted {deleted_count}/{len(file_bases)} file bases (no downloads performed)")
        return deleted_count
        
    except Exception as e:
        print(f"❌ Error deleting file bases: {str(e)}")
        return 0

def create_file_base(project_uuid: str, file_data: Dict[str, Any], file_content: bytes) -> Dict[str, Any]:
    """Create a file base in destination project using inline-content-base-file endpoint"""
    url = f'https://nexus.weni.ai/api/{project_uuid}/inline-content-base-file/'
    
    # Prepare multipart form data
    files = {
        'file': (file_data.get('filename', 'file.pdf'), file_content, file_data.get('content_type', 'application/pdf'))
    }
    
    data = {
        'extension_file': file_data.get('extension_file', 'pdf'),
        'load_type': file_data.get('load_type', 'pdfminer')
    }
    
    # Remove content-type header for multipart
    headers = get_headers()
    
    response = requests.post(url, headers=headers, files=files, data=data)
    
    if response.status_code in [200, 201]:
        return response.json()
    else:
        raise Exception(f"Failed to create file base: {response.status_code} - {response.text}")

## 5. Copy Agent Customization

In [27]:
def copy_agent_customization():
    """Copy agent customization from source to destination project"""
    print("Copying agent customization...")
    
    try:
        # First, delete all existing customizations from destination project
        delete_all_customizations(DESTINATION_PROJECT_UUID)
        
        # Get source customization
        source_customization = get_agent_customization(SOURCE_PROJECT_UUID)
        print(f"Retrieved agent customization from source project")
        print(f"Agent name: {source_customization.get('agent', {}).get('name', 'N/A')}")
        print(f"Number of instructions: {len(source_customization.get('instructions', []))}")

        print(source_customization)
        
        # Prepare destination data - strip IDs from instructions
        destination_data = {
            "agent": source_customization.get("agent", {}),
            "instructions": [
                {"instruction": instr["instruction"]} 
                for instr in source_customization.get("instructions", [])
            ]
        }
        
        print(f"\nPrepared destination data (without IDs):")
        print(destination_data)
        
        # Update destination with processed data
        result = update_agent_customization(DESTINATION_PROJECT_UUID, destination_data)
        print("✅ Successfully copied agent customization to destination project")
        
        return result
        
    except Exception as e:
        print(f"❌ Error copying agent customization: {str(e)}")
        return None

## 6. Copy Text Bases

In [28]:
def copy_text_bases():
    """Copy all text bases from source to destination project"""
    print("\nCopying text bases...")
    
    try:
        # Get content base UUIDs from project UUIDs
        print("Getting content base UUIDs...")
        source_content_base_uuid = get_content_base_uuid(SOURCE_PROJECT_UUID)
        destination_content_base_uuid = get_content_base_uuid(DESTINATION_PROJECT_UUID)
        print(f"Source content base UUID: {source_content_base_uuid}")
        print(f"Destination content base UUID: {destination_content_base_uuid}")
        
        # Get source text bases
        source_response = get_text_bases(source_content_base_uuid)
        source_text_bases = source_response.get("results", [])
        print(f"Found {len(source_text_bases)} text bases in source project")
        
        # Get existing destination text bases
        destination_response = get_text_bases(destination_content_base_uuid)
        destination_text_bases = destination_response.get("results", [])
        print(f"Found {len(destination_text_bases)} existing text bases in destination project")
        
        copied_count = 0
        
        for i, source_text_base in enumerate(source_text_bases):
            try:
                # Create text base data
                text_data = {
                    "text": source_text_base.get("text", "")
                }
                
                # Check if we should update an existing text base or create a new one
                if i < len(destination_text_bases):
                    # Update existing text base
                    dest_text_base_uuid = destination_text_bases[i]["uuid"]
                    result = update_text_base(destination_content_base_uuid, dest_text_base_uuid, text_data)
                    print(f"  ✅ Updated text base {i+1}/{len(source_text_bases)}: {text_data['text'][:50]}...")
                else:
                    # Create new text base
                    result = create_text_base(destination_content_base_uuid, text_data)
                    print(f"  ✅ Created text base {i+1}/{len(source_text_bases)}: {text_data['text'][:50]}...")
                
                copied_count += 1
                
                # Small delay to avoid rate limiting
                time.sleep(0.5)
                
            except Exception as e:
                print(f"  ❌ Error copying text base {i+1}: {str(e)}")
        
        print(f"\n✅ Successfully copied {copied_count}/{len(source_text_bases)} text bases")
        
    except Exception as e:
        print(f"❌ Error accessing text bases: {str(e)}")

## 7. Copy Link/Site Bases

In [29]:
def copy_link_bases():
    """Copy all link bases from source to destination project"""
    print("\nCopying link/site bases...")
    
    try:
        # Get content base UUIDs from project UUIDs
        print("Getting content base UUIDs...")
        source_content_base_uuid = get_content_base_uuid(SOURCE_PROJECT_UUID)
        destination_content_base_uuid = get_content_base_uuid(DESTINATION_PROJECT_UUID)
        print(f"Source content base UUID: {source_content_base_uuid}")
        print(f"Destination content base UUID: {destination_content_base_uuid}")
        
        # Get source link bases
        source_link_bases = get_link_bases(source_content_base_uuid)
        # Link bases return a direct array, not wrapped in results
        print(f"Found {len(source_link_bases)} link bases in source project")
        
        copied_count = 0
        
        for i, link_base in enumerate(source_link_bases):
            try:
                # Create link base data
                link_data = {
                    "link": link_base.get("link", "")
                }
                
                # Create in destination
                result = create_link_base(destination_content_base_uuid, link_data)
                copied_count += 1
                print(f"  ✅ Copied link base {i+1}/{len(source_link_bases)}: {link_data['link']}")
                
                # Small delay to avoid rate limiting
                time.sleep(0.5)
                
            except Exception as e:
                print(f"  ❌ Error copying link base {i+1}: {str(e)}")
        
        print(f"\n✅ Successfully copied {copied_count}/{len(source_link_bases)} link bases")
        
    except Exception as e:
        print(f"❌ Error accessing link bases: {str(e)}")

## 8. Copy File Bases

In [30]:
def copy_file_bases():
    """Copy all file bases from source to destination project"""
    print("\nCopying file bases...")
    
    try:
        # Using project UUID directly for inline-content-base-file endpoint
        print(f"Destination project UUID: {DESTINATION_PROJECT_UUID}")
        
        # STEP 1: Delete all existing file bases from destination (NO DOWNLOADS HERE)
        print("\n[STEP 1] Deleting all existing file bases from destination project...")
        print("  (This step only lists and deletes - NO file downloads)")
        delete_all_file_bases(DESTINATION_PROJECT_UUID)
        
        # STEP 2: Get source file bases list (NO DOWNLOADS HERE - just metadata)
        print(f"\n[STEP 2] Getting file bases list from source project: {SOURCE_PROJECT_UUID}")
        print("  (This step only gets metadata - NO file downloads)")
        source_response = get_file_bases(SOURCE_PROJECT_UUID)
        source_file_bases = source_response.get("results", [])
        print(f"Found {len(source_file_bases)} file bases in source project")
        
        if len(source_file_bases) == 0:
            print("No file bases to copy from source project")
            return
        
        # STEP 3: Download from SOURCE and upload to DESTINATION
        print(f"\n[STEP 3] Downloading files from SOURCE project and uploading to DESTINATION...")
        copied_count = 0
        
        for i, file_base in enumerate(source_file_bases):
            try:
                file_name = file_base.get('file_name', f'file_{i+1}.pdf')
                file_uuid = file_base.get('uuid')
                created_file_name = file_base.get('created_file_name', file_name.split('.')[0])
                extension = file_base.get('extension_file', 'pdf')
                
                if not file_uuid:
                    print(f"  ⚠️  No UUID for file base {i+1} ({file_name}), skipping")
                    continue
                
                print(f"  [{i+1}/{len(source_file_bases)}] Downloading from SOURCE: {created_file_name}.{extension}...")
                print(f"      File UUID: {file_uuid}")
                print(f"      File name: {file_name}")
                
                # Download file from SOURCE project ONLY (using source file UUID)
                # This should be the ONLY download for this file
                print(f"      [DOWNLOAD START] Calling download_file_from_base...")
                file_content = download_file_from_base(file_name, file_uuid)
                print(f"      [DOWNLOAD END] File downloaded, size: {len(file_content)} bytes")
                
                content_type_map = {
                    'pdf': 'application/pdf',
                    'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
                    'doc': 'application/msword',
                    'txt': 'text/plain',
                    'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
                }
                content_type = content_type_map.get(extension.lower(), 'application/octet-stream')
                
                file_data = {
                    'filename': f"{created_file_name}.{extension}",
                    'extension_file': extension,
                    'load_type': 'pdfminer',
                    'content_type': content_type
                }
                
                # Upload to DESTINATION project
                result = create_file_base(DESTINATION_PROJECT_UUID, file_data, file_content)
                copied_count += 1
                print(f"  ✅ Uploaded to DESTINATION: {created_file_name}.{extension}")
                
                time.sleep(1)
                
            except Exception as e:
                print(f"  ❌ Error copying file base {i+1}: {str(e)}")
        
        print(f"\n✅ Successfully copied {copied_count}/{len(source_file_bases)} file bases")
        print("  (Files were downloaded ONLY from SOURCE project, never from DESTINATION)")
        
    except Exception as e:
        print(f"❌ Error accessing file bases: {str(e)}")

## 9. Complete Copy Process

In [31]:
def copy_all_project_data():
    """Copy all data from source to destination project (deletes existing customizations first)"""
    print("=" * 50)
    print("Starting complete project copy...")
    print(f"Source: {SOURCE_PROJECT_UUID}")
    print(f"Destination: {DESTINATION_PROJECT_UUID}")
    print("=" * 50)
    
    # Copy agent customization (deletes existing customizations first)
    copy_agent_customization()
    
    # Copy text bases
    copy_text_bases()
    
    # Copy link bases
    copy_link_bases()
    
    # Copy file bases
    copy_file_bases()
    
    print("\n" + "=" * 50)
    print("✅ Project copy process completed!")
    print("=" * 50)

In [None]:
# Execute complete copy process
copy_all_project_data()

## 10. Verification Functions (Optional)

In [33]:
def verify_copy():
    """Verify that all data was copied correctly"""
    print("\nVerifying copy results...")
    print("=" * 50)
    
    try:
        # Get destination content base UUID
        destination_content_base_uuid = get_content_base_uuid(DESTINATION_PROJECT_UUID)
        
        # Check agent customization
        dest_customization = get_agent_customization(DESTINATION_PROJECT_UUID)
        print(f"✅ Agent customization: {dest_customization.get('agent', {}).get('name', 'N/A')}")
        print(f"   Instructions count: {len(dest_customization.get('instructions', []))}")
        
        # Check text bases
        dest_text_response = get_text_bases(destination_content_base_uuid)
        dest_text_bases = dest_text_response.get("results", [])
        print(f"\n✅ Text bases: {len(dest_text_bases)} items")
        
        # Check link bases
        dest_link_bases = get_link_bases(destination_content_base_uuid)
        # Link bases return direct array
        print(f"✅ Link bases: {len(dest_link_bases)} items")
        
        # Check file bases
        dest_file_response = get_file_bases(DESTINATION_PROJECT_UUID)
        dest_file_bases = dest_file_response.get("results", [])
        print(f"✅ File bases: {len(dest_file_bases)} items")
        
        print("\n" + "=" * 50)
        print("Verification complete!")
        
    except Exception as e:
        print(f"❌ Error during verification: {str(e)}")

In [None]:
# Run verification
verify_copy()