In [1]:
# sponsor_processing_example.ipynb
import pandas as pd
from multi_processing.processor import LLMProcessor
from multi_processing.processor_config import ProcessorConfig
from multi_processing.llm_client import DeepSeekClient


In [2]:

# 1. Setup Configuration
config = ProcessorConfig(
    batch_size=1,                    # Process 10 videos at a time
    max_workers=100,                  # High concurrency for DeepSeek
    cache_dir="sponsor_cache",        # Cache directory
    save_interval=5,                  # Save every 5 batches
    show_progress=True,
    metrics_output_path="metrics.json"
)

# 2. Initialize DeepSeek Client
client = DeepSeekClient(
    api_key='sk-cd405682db094b6781f9f815840163d8',
    model="deepseek-chat",
    temperature=0.1
)

# 3. Initialize Processor
processor = LLMProcessor(client, config)


In [6]:
# sponsor_processing.py

import json
import pandas as pd
from typing import List, Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class VideoData:
    """Structure for video data"""
    video_id: str
    title: str
    description: str
    channel_id: Optional[str] = None
    channel_title: Optional[str] = None

def create_prompt(videos: List[Dict[str, Any]], desc_length: int = 200) -> str:
    """
    Create prompt for sponsor detection
    
    Args:
        videos: List of video data dictionaries
        desc_length: Max length for description truncation
    """
    videos_text = ""
    for i, video in enumerate(videos, 1):
        description = video['description']
        if len(description) > desc_length:
            description = description[:desc_length] + "..."
            
        videos_text += f"""VIDEO {i}:
ID: {video['videoId']}
Title: {video['title']}
Description: {description}

"""
    
    prompt = f"""Analyze these {len(videos)} videos for brand sponsorships.

{videos_text}
Return a JSON object with video IDs mapping to their sponsors:
{{
    "video_sponsors": [
        {{
            "video_id": "the_video_id",
            "sponsors": [
                {{
                    "name": "Brand name (e.g., 'Surfshark' not 'surfshark vpn')",
                    "domain": "Main company domain (e.g., 'surfshark.com' not promo URLs)",
                    "evidence": "Exact text snippet showing sponsorship"
                }}
            ]
        }}
    ]
}}

Guidelines for identifying sponsorships:
- Look for direct mentions of brands with promotional intent
- Include sponsored integrations, brand deals, partnerships
- Use main company domains (e.g., 'nordvpn.com' not 'nordvpn.com/creator')
- For each brand found, use their official domain regardless of promo links
- Include multiple sponsors if present
- Ignore: merch, generic affiliate links, social media, donations, self promo

Examples of correct domain mapping:
- Surfshark promo link -> surfshark.com
- Nord VPN creator link -> nordvpn.com
- Skillshare special offer -> skillshare.com"""

    return prompt

def process_batch_response(content: str) -> Dict[str, List[Dict[str, str]]]:
    """
    Process LLM response into structured sponsor data
    
    Args:
        content: Raw LLM response text
    
    Returns:
        Dictionary mapping video IDs to sponsor lists
    """
    try:
        # Clean up response if it contains markdown code blocks
        if content.startswith("```"):
            json_start = content.find("{")
            json_end = content.rfind("}") + 1
            if json_start != -1 and json_end != -1:
                content = content[json_start:json_end]
        
        # Parse JSON response
        result = json.loads(content)
        batch_results = {}
        
        if 'video_sponsors' in result:
            for video_data in result['video_sponsors']:
                video_id = video_data['video_id']
                sponsors = video_data.get('sponsors', [])
                batch_results[video_id] = sponsors
                
        return batch_results
        
    except Exception as e:
        print(f"Error processing response: {e}")
        print(f"Raw content: {content[:200]}...")  # Print start of content for debugging
        return {}

def process_video_batch(batch: Dict[str, Any], client) -> Dict[str, Any]:
    """
    Process a single video batch for sponsor detection
    
    Args:
        batch: Dictionary containing video data
        client: LLM client instance
    """
    # Create single-item batch for prompt
    batch_list = [batch]
    
    # Generate and call prompt
    prompt = create_prompt(batch_list)
    response = client.call_api(prompt)
    
    # Return structured result
    result = {
        'video_id': batch['videoId'],
        'processed_data': response
    }
    return result

def transform_results(result: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Transform LLM results into structured sponsor records
    
    Args:
        result: Dictionary containing video ID and processed data
        
    Returns:
        List of sponsor records with normalized structure
    """
    video_id = result['video_id']
    processed_data = result['processed_data']
    
    if not processed_data.get('success'):
        print(f"Processing failed for video {video_id}: {processed_data.get('error')}")
        return []
    
    # Parse sponsors from LLM response
    sponsor_data = process_batch_response(processed_data['content'])
    sponsors = sponsor_data.get(video_id, [])
    
    # Create individual records for each sponsor
    records = []
    for i, sponsor in enumerate(sponsors, 1):
        record = {
            'video_id': video_id,
            f'sponsor_{i}_name': sponsor.get('name'),
            f'sponsor_{i}_domain': sponsor.get('domain'),
            f'sponsor_{i}_evidence': sponsor.get('evidence')
        }
        records.append(record)
    
    return records

def validate_sponsor_record(record: Dict[str, Any]) -> bool:
    """
    Validate a sponsor record
    
    Args:
        record: Dictionary containing sponsor data
        
    Returns:
        True if record is valid, False otherwise
    """
    required_fields = ['video_id']
    sponsor_fields = ['name', 'domain', 'evidence']
    
    # Check required fields
    if not all(field in record for field in required_fields):
        return False
        
    # Check that at least one sponsor exists
    has_sponsor = False
    i = 1
    while f'sponsor_{i}_name' in record:
        sponsor_valid = all(
            record.get(f'sponsor_{i}_{field}') 
            for field in sponsor_fields
        )
        if sponsor_valid:
            has_sponsor = True
        i += 1
        
    return has_sponsor

def process_sponsor_batch(
    videos: List[Dict[str, Any]],
    processor,
    cache_prefix: str = "sponsor_detection"
) -> pd.DataFrame:
    """
    Process a batch of videos for sponsor detection
    
    Args:
        videos: List of video data dictionaries
        processor: LLMProcessor instance
        cache_prefix: Prefix for cache keys
    
    Returns:
        DataFrame containing processed sponsor data
    """
    # Process videos through LLM processor
    results = processor.process_batch(
        items=videos,
        process_fn=process_video_batch,
        transform_fn=transform_results,
        cache_prefix=cache_prefix
    )
    
    # Convert to DataFrame
    if not results:
        return pd.DataFrame()
        
    df = pd.DataFrame(results)
    
    # Add metadata
    df['processed_timestamp'] = pd.Timestamp.now()
    df['cache_prefix'] = cache_prefix
    
    return df

In [None]:
import pandas as pd
from tqdm import tqdm
import json
import re
import requests
from urllib.parse import urlparse
import concurrent.futures
from time import perf_counter
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Global caches
url_cache = {}
domain_cache = {}
first_pass_results = {}  # Cache for videos that found sponsors in first pass

def measure_time(func):
    """Decorator to measure function execution time"""
    def wrapper(*args, **kwargs):
        start = perf_counter()
        result = func(*args, **kwargs)
        end = perf_counter()
        wrapper.total_time += end - start
        wrapper.calls += 1
        return result
    wrapper.total_time = 0
    wrapper.calls = 0
    return wrapper

def create_session():
    """Create a requests session with retries and timeouts"""
    session = requests.Session()
    retries = Retry(
        total=3,
        backoff_factor=0.5,
        status_forcelist=[429, 500, 502, 503, 504]
    )
    adapter = HTTPAdapter(max_retries=retries, pool_connections=100, pool_maxsize=100)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session

@measure_time
def expand_url(url):
    """Expand shortened URLs with robust caching and handling"""
    if not isinstance(url, str):
        return None
        
    if url in url_cache:
        return url_cache[url]
        
    try:
        if not url.startswith(('http://', 'https://')):
            url = 'http://' + url
            
        session = create_session()
        
        # Special handling for known URL shorteners
        domain = urlparse(url).netloc.lower()
        if any(service in domain for service in ['bit.ly', 'goo.gl', 'tinyurl']):
            response = session.get(
                url,
                allow_redirects=True,
                timeout=10,
                headers={'User-Agent': 'Mozilla/5.0'},
                stream=True
            )
        else:
            response = session.head(
                url,
                allow_redirects=True,
                timeout=5
            )
        
        final_url = response.url
        if isinstance(response, requests.models.Response):
            response.close()
        
        url_cache[url] = final_url
        return final_url
        
    except Exception as e:
        print(f"URL expansion error for {url}: {e}")
        return url

def quick_domain_extract(url):
    """Extract domain with improved caching"""
    if not isinstance(url, str):
        return None
    
    url_lower = url.lower()
    if url_lower in domain_cache:
        return domain_cache[url_lower]

    try:
        parsed = urlparse(url_lower)
        domain = parsed.netloc or parsed.path.split('/')[0]
        
        for prefix in ['www.']:
            if domain.startswith(prefix):
                domain = domain[len(prefix):]
                
        domain_cache[url_lower] = domain
        return domain
    except Exception:
        return None
    
def create_prompt(video_batch, desc_length=200):
    """Create unified prompt for both passes"""
    videos_text = ""
    for i, row in enumerate(video_batch.iterrows(), 1):
        _, video = row
        description = video['description'][:desc_length] + "..." if len(video['description']) > desc_length else video['description']
        videos_text += f"""VIDEO {i}:
ID: {video['videoId']}
Title: {video['title']}
Description: {description}

"""
    
    prompt = f"""Analyze these {len(video_batch)} videos for brand sponsorships.

{videos_text}
Return a JSON object with video IDs mapping to their sponsors:
{{
    "video_sponsors": [
        {{
            "video_id": "the_video_id",
            "sponsors": [
                {{
                    "name": "Brand name (e.g., 'Surfshark' not 'surfshark vpn')",
                    "domain": "Main company domain (e.g., 'surfshark.com' not promo URLs)",
                    "evidence": "Exact text snippet showing sponsorship"
                }}
            ]
        }}
    ]
}}

Guidelines for identifying sponsorships:
- Look for direct mentions of brands with promotional intent
- Include sponsored integrations, brand deals, partnerships
- Use main company domains (e.g., 'nordvpn.com' not 'nordvpn.com/creator')
- For each brand found, use their official domain regardless of promo links
- Include multiple sponsors if present
- Ignore: merch, generic affiliate links, social media, donations, self promo

Examples of correct domain mapping:
- Surfshark promo link -> surfshark.com
- Nord VPN creator link -> nordvpn.com
- Skillshare special offer -> skillshare.com"""

    return prompt

def process_batch_response(content):
    """Process LLM response without URL expansion"""
    try:
        if content.startswith("```"):
            json_start = content.find("{")
            json_end = content.rfind("}") + 1
            if json_start != -1 and json_end != -1:
                content = content[json_start:json_end]
        
        result = json.loads(content)
        batch_results = {}
        
        if 'video_sponsors' in result:
            for video_data in result['video_sponsors']:
                video_id = video_data['video_id']
                sponsors = video_data.get('sponsors', [])
                batch_results[video_id] = sponsors
                
        return batch_results
        
    except Exception as e:
        print(f"Error processing response: {e}")
        return {}

def process_batch(batch, is_second_pass=False):
    """Process one batch with two-pass system"""
    try:
        # Skip videos that already have sponsors from first pass
        if is_second_pass:
            batch = batch[~batch['videoId'].isin(first_pass_results.keys())]
            if batch.empty:
                return {}
        
        # Use appropriate description length
        desc_length = 1500 if is_second_pass else 200
        
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=[{
                "role": "user", 
                "content": create_prompt(batch, desc_length)
            }],
            temperature=0.1
        )
        
        content = response.choices[0].message.content.strip()
        batch_results = process_batch_response(content)
        
        # Cache first pass results
        if not is_second_pass:
            first_pass_results.update(batch_results)
        
        return batch_results
        
    except Exception as e:
        print(f"Error in {'second' if is_second_pass else 'first'} pass: {e}")
        return {}

def process_videos_parallel(df, batch_size=5, max_workers=3):
    """Process videos with two-pass system"""
    sponsor_map = {}
    batches = [df.iloc[i:i + batch_size] for i in range(0, len(df), batch_size)]
    
    with tqdm(total=len(df), desc="Processing videos (first pass)") as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # First pass - 200 tokens
            futures = [executor.submit(process_batch, batch, False) for batch in batches]
            
            for future in concurrent.futures.as_completed(futures):
                try:
                    results = future.result()
                    sponsor_map.update(results)
                    pbar.update(batch_size)
                except Exception as e:
                    print(f"Batch processing failed: {e}")
    
    # Second pass for videos without sponsors
    remaining_videos = len(df) - len(first_pass_results)
    if remaining_videos > 0:
        with tqdm(total=remaining_videos, desc="Processing videos (second pass)") as pbar:
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = [executor.submit(process_batch, batch, True) for batch in batches]
                
                for future in concurrent.futures.as_completed(futures):
                    try:
                        results = future.result()
                        sponsor_map.update(results)
                        pbar.update(len(results))
                    except Exception as e:
                        print(f"Batch processing failed: {e}")
    
    return sponsor_map

def expand_sponsor_data(df, sponsor_map):
    """Expand sponsor data into columns efficiently"""
    df['sponsor_data'] = df['videoId'].map(lambda x: sponsor_map.get(x, []))
    
    # Find max sponsors accounting for all videos
    max_sponsors = max((len(sponsors) for sponsors in sponsor_map.values()), default=0)
    
    all_sponsor_rows = []
    for video_id in df['videoId']:
        sponsors = sponsor_map.get(video_id, [])
        row_sponsors = []
        for i in range(max_sponsors):
            if i < len(sponsors):
                sponsor = sponsors[i]
                row_sponsors.extend([
                    sponsor.get('name', None),
                    sponsor.get('domain', None),
                    sponsor.get('evidence', None)
                ])
            else:
                row_sponsors.extend([None, None, None])
        all_sponsor_rows.append(row_sponsors)
    
    # Create column names for all possible sponsors
    column_names = []
    for i in range(max_sponsors):
        column_names.extend([
            f"sponsor_{i+1}_name",
            f"sponsor_{i+1}_domain",
            f"sponsor_{i+1}_evidence"
        ])

    sponsor_expanded_df = pd.DataFrame(all_sponsor_rows, columns=column_names)
    
    # Combine with original data
    df.reset_index(drop=True, inplace=True)
    sponsor_expanded_df.reset_index(drop=True, inplace=True)
    final_df = pd.concat([df, sponsor_expanded_df], axis=1)
    
    return final_df

def save_checkpoint(df, sponsor_map, current_idx, pass_number):
    """Save processing checkpoint with pass information"""
    temp_df = df.copy()
    temp_df['sponsor_data'] = temp_df['videoId'].map(lambda x: json.dumps(sponsor_map.get(x, [])))
    temp_df.to_csv(f'sponsors_checkpoint_pass{pass_number}_{current_idx}.csv', index=False)

def print_stats(df, sponsor_map):
    """Print detailed processing statistics"""
    first_pass_count = len(first_pass_results)
    second_pass_count = len(sponsor_map) - first_pass_count
    
    print("\nProcessing Statistics:")
    print(f"Total videos processed: {len(df)}")
    print(f"Videos with sponsors found in first pass (200 tokens): {first_pass_count}")
    print(f"Additional sponsors found in second pass (1500 tokens): {second_pass_count}")
    print(f"Total videos with sponsors: {len(sponsor_map)}")
    
    # URL expansion stats
    if expand_url.calls > 0:
        avg_time = expand_url.total_time / expand_url.calls
        print(f"\nURL Processing:")
        print(f"Total URLs processed: {expand_url.calls}")
        print(f"Average processing time: {avg_time:.2f}s per URL")
        print(f"Cache hits: {len(url_cache)}")
    
    # Sponsor distribution
    sponsor_counts = [len(sponsors) for sponsors in sponsor_map.values()]
    if sponsor_counts:
        print("\nSponsor Distribution:")
        print(f"Average sponsors per video: {sum(sponsor_counts)/len(sponsor_counts):.2f}")
        print(f"Max sponsors in a video: {max(sponsor_counts)}")
        
        count_distribution = pd.Series(sponsor_counts).value_counts().sort_index()
        print("\nVideos by sponsor count:")
        for count, videos in count_distribution.items():
            print(f"{count} sponsor(s): {videos} videos")

if __name__ == "__main__":
    try:
        # Read sample data
        df = pd.read_csv('/Users/parthkocheta/Documents/sponsorFind/sponsorFind/chunk_8_of_245.csv')
        
        # Process videos with two-pass system
        sponsor_map = process_videos_parallel(
            df, 
            batch_size=10,  # Adjust based on your testing
            max_workers=100   # Adjust based on your CPU
        )
        
        # Expand and save sponsor data
        final_df = expand_sponsor_data(df, sponsor_map)
        final_df.to_csv('sponsors_chunk_8.csv', index=False)
        
        # Print detailed stats
        print_stats(df, sponsor_map)
        
        # Print sample of found sponsors
        print("\nSample Sponsors Found:")
        sample_videos = list(sponsor_map.items())[:3]
        for video_id, sponsors in sample_videos:
            print(f"\nVideo {video_id}:")
            for i, sponsor in enumerate(sponsors, 1):
                print(f"  Sponsor {i}:")
                print(f"    Name: {sponsor.get('name')}")
                print(f"    Domain: {sponsor.get('domain')}")
                print(f"    Evidence: {sponsor.get('evidence')[:100]}...")

    except Exception as e:
        print(f"Error in main execution: {e}")
        raise

In [29]:
# sponsor_processor.py

from multi_processing.processor import LLMProcessor, ProcessorConfig
from multi_processing.llm_client import DeepSeekClient
import pandas as pd
import json
from typing import Dict, Any, List
import concurrent.futures
from tqdm import tqdm

def create_prompt(video_batch, desc_length=200):
    """Create unified prompt for both passes"""
    videos_text = ""
    for i, video in enumerate(video_batch, 1):
        description = video['description'][:desc_length] + "..." if len(video['description']) > desc_length else video['description']
        videos_text += f"""VIDEO {i}:
ID: {video['videoId']}
Title: {video['title']}
Description: {description}

"""
    
    prompt = f"""Analyze these {len(video_batch)} videos for brand sponsorships.

{videos_text}
Return a JSON object with video IDs mapping to their sponsors:
{{
    "video_sponsors": [
        {{
            "video_id": "the_video_id",
            "sponsors": [
                {{
                    "name": "Brand name (e.g., 'Surfshark' not 'surfshark vpn')",
                    "domain": "Main company domain (e.g., 'surfshark.com' not promo URLs)",
                    "evidence": "Exact text snippet showing sponsorship"
                }}
            ]
        }}
    ]
}}

Guidelines for identifying sponsorships:
- Look for direct mentions of brands with promotional intent
- Include sponsored integrations, brand deals, partnerships
- Use main company domains (e.g., 'nordvpn.com' not 'nordvpn.com/creator')
- For each brand found, use their official domain regardless of promo links
- Include multiple sponsors if present
- Ignore: merch, generic affiliate links, social media, donations, self promo

Examples of correct domain mapping:
- Surfshark promo link -> surfshark.com
- Nord VPN creator link -> nordvpn.com
- Skillshare special offer -> skillshare.com"""

    return prompt

def process_batch_response(content):
    """Process LLM response without URL expansion"""
    try:
        if content.startswith("```"):
            json_start = content.find("{")
            json_end = content.rfind("}") + 1
            if json_start != -1 and json_end != -1:
                content = content[json_start:json_end]
        
        result = json.loads(content)
        batch_results = {}
        
        if 'video_sponsors' in result:
            for video_data in result['video_sponsors']:
                video_id = video_data['video_id']
                sponsors = video_data.get('sponsors', [])
                batch_results[video_id] = sponsors
                
        return batch_results
        
    except Exception as e:
        print(f"Error processing response: {e}")
        return {}

def expand_sponsor_data(df, sponsor_map):
    """Expand sponsor data into columns efficiently"""
    df['sponsor_data'] = df['videoId'].map(lambda x: sponsor_map.get(x, []))
    
    # Find max sponsors accounting for all videos
    max_sponsors = max((len(sponsors) for sponsors in sponsor_map.values()), default=0)
    
    all_sponsor_rows = []
    for video_id in df['videoId']:
        sponsors = sponsor_map.get(video_id, [])
        row_sponsors = []
        for i in range(max_sponsors):
            if i < len(sponsors):
                sponsor = sponsors[i]
                row_sponsors.extend([
                    sponsor.get('name', None),
                    sponsor.get('domain', None),
                    sponsor.get('evidence', None)
                ])
            else:
                row_sponsors.extend([None, None, None])
        all_sponsor_rows.append(row_sponsors)
    
    # Create column names for all possible sponsors
    column_names = []
    for i in range(max_sponsors):
        column_names.extend([
            f"sponsor_{i+1}_name",
            f"sponsor_{i+1}_domain",
            f"sponsor_{i+1}_evidence"
        ])

    sponsor_expanded_df = pd.DataFrame(all_sponsor_rows, columns=column_names)
    
    # Combine with original data
    df.reset_index(drop=True, inplace=True)
    sponsor_expanded_df.reset_index(drop=True, inplace=True)
    final_df = pd.concat([df, sponsor_expanded_df], axis=1)
    
    return final_df

def print_stats(df, sponsor_map):
    """Print detailed processing statistics"""
    first_pass_count = len(first_pass_results)
    second_pass_count = len(sponsor_map) - first_pass_count
    
    print("\nProcessing Statistics:")
    print(f"Total videos processed: {len(df)}")
    print(f"Videos with sponsors found in first pass (200 tokens): {first_pass_count}")
    print(f"Additional sponsors found in second pass (1500 tokens): {second_pass_count}")
    print(f"Total videos with sponsors: {len(sponsor_map)}")
    
    # Sponsor distribution
    sponsor_counts = [len(sponsors) for sponsors in sponsor_map.values()]
    if sponsor_counts:
        print("\nSponsor Distribution:")
        print(f"Average sponsors per video: {sum(sponsor_counts)/len(sponsor_counts):.2f}")
        print(f"Max sponsors in a video: {max(sponsor_counts)}")
        
        count_distribution = pd.Series(sponsor_counts).value_counts().sort_index()
        print("\nVideos by sponsor count:")
        for count, videos in count_distribution.items():
            print(f"{count} sponsor(s): {videos} videos")

# New wrapper for library integration
def process_videos_with_library(df: pd.DataFrame, api_key: str):
    """Process videos using library's built-in parallel processing"""
    
    # Initialize basic config
    config = ProcessorConfig(
    cache_enabled=False,     # No disk cache
    rate_limit=0.0,          # No forced sleep between calls
    max_retries=1,           # Or 2, if you rarely fail
    batch_size=1,            # Not crucial if you're doing item-level concurrency anyway
    max_workers=100,         # If your system can handle it
    fail_fast=True,
    # ... etc.
)
    
    client = DeepSeekClient(api_key=api_key)
    processor = LLMProcessor(client, config)
    
    # Track first pass results
    first_pass_results = {}
    
    # Define processing functions
    def process_first_pass(video):
        """First pass with short descriptions"""
        response = client.call_api(
            create_prompt([video], desc_length=200)
        )
        if response['success']:
            results = process_batch_response(response['content'])
            first_pass_results.update(results)
        return results

    # Process first pass
    first_results = processor.process_batch(
        items=df.to_dict('records'),
        process_fn=process_first_pass
    )
    
    # Process second pass only for remaining videos
    remaining = [
        v for v in df.to_dict('records')
        if v['videoId'] not in first_pass_results
    ]
    
    def process_second_pass(video):
        """Second pass with longer descriptions"""
        response = client.call_api(
            create_prompt([video], desc_length=1500)
        )
        if response['success']:
            return process_batch_response(response['content'])
        return {}

    second_results = processor.process_batch(
        items=remaining,
        process_fn=process_second_pass
    )
    
    # Combine results and expand
    all_results = {**first_results, **second_results}
    return expand_sponsor_data(df, all_results)

    
from time import perf_counter
from collections import defaultdict

class Telemetry:
    def __init__(self):
        self.timings = defaultdict(list)
        self.counts = defaultdict(int)
        
    def measure(self, operation):
        def decorator(func):
            def wrapper(*args, **kwargs):
                start = perf_counter()
                result = func(*args, **kwargs)
                duration = perf_counter() - start
                self.timings[operation].append(duration)
                self.counts[operation] += 1
                return result
            return wrapper
        return decorator
    
    def print_stats(self):
        print("\nPerformance Metrics:")
        for op, times in self.timings.items():
            avg_time = sum(times) / len(times)
            total_time = sum(times)
            print(f"\n{op}:")
            print(f"  Count: {self.counts[op]}")
            print(f"  Average time: {avg_time:.2f}s")
            print(f"  Total time: {total_time:.2f}s")
            print(f"  % of total time: {(total_time/sum(sum(t) for t in self.timings.values()))*100:.1f}%")

# Add to your processor
telemetry = Telemetry()

@telemetry.measure("API Call")
def process_batch(batch, is_second_pass=False):
    """Process batch with timing"""
    try:
        if is_second_pass:
            batch = [v for v in batch if v['videoId'] not in first_pass_results]
            if not batch:
                return {}
        
        desc_length = 1500 if is_second_pass else 200
        
        # Measure prompt creation
        start = perf_counter()
        prompt = create_prompt(batch, desc_length)
        telemetry.timings["prompt_creation"].append(perf_counter() - start)
        
        # Measure API call
        start = perf_counter()
        response = client.call_api(prompt)
        telemetry.timings["pure_api_call"].append(perf_counter() - start)
        
        if not response['success']:
            return {}
            
        # Measure response processing    
        start = perf_counter()
        batch_results = process_batch_response(response['content'])
        telemetry.timings["response_processing"].append(perf_counter() - start)
        
        if not is_second_pass:
            first_pass_results.update(batch_results)
            
        return batch_results
        
    except Exception as e:
        print(f"Error in {'second' if is_second_pass else 'first'} pass: {e}")
        return {}



if __name__ == "__main__":
    try:
        # Read data
        df = pd.read_csv('/Users/parthkocheta/Documents/sponsorFind/sponsorFind/chunk_8_of_245.csv')
        
        # Process with library
        final_df = process_videos_with_library(
            df,
            api_key='sk-cd405682db094b6781f9f815840163d8'
        )
        
        # Save results
        final_df.to_csv('sponsor_results.csv', index=False)
        
    except Exception as e:
        print(f"Error: {e}")

  0%|          | 3/16678 [00:16<25:36:23,  5.53s/it, processed=3, success_rate=0.0%, errors=0, avg_time=4.29s]


KeyboardInterrupt: 

In [29]:
# sponsor_processor.py

import pandas as pd
import json
from typing import Dict, Any, List

from multi_processing.processor import LLMProcessor
from multi_processing.processor_config import ProcessorConfig
from multi_processing.llm_client import DeepSeekClient

from tqdm import tqdm

#########################################
# Helper Functions
#########################################

def create_prompt(video_batch, desc_length=200):
    """
    Create a unified prompt for sponsor extraction.
    Each item in video_batch is a dict with keys: videoId, title, description, etc.
    """
    videos_text = ""
    for i, video in enumerate(video_batch, 1):
        description = video['description'][:desc_length] + "..." if len(video['description']) > desc_length else video['description']
        videos_text += f"""VIDEO {i}:
ID: {video['videoId']}
Title: {video['title']}
Description: {description}

"""
    
    prompt = f"""Analyze these {len(video_batch)} videos for brand sponsorships.

{videos_text}
Return a JSON object with video IDs mapping to their sponsors:
{{
    "video_sponsors": [
        {{
            "video_id": "the_video_id",
            "sponsors": [
                {{
                    "name": "Brand name (e.g., 'Surfshark' not 'surfshark vpn')",
                    "domain": "Main company domain (e.g., 'surfshark.com' not promo URLs)",
                    "evidence": "Exact text snippet showing sponsorship"
                }}
            ]
        }}
    ]
}}

Guidelines for identifying sponsorships:
- Look for direct mentions of brands with promotional intent
- Include sponsored integrations, brand deals, partnerships
- Use main company domains (e.g., 'nordvpn.com' not 'nordvpn.com/creator')
- For each brand found, use their official domain regardless of promo links
- Include multiple sponsors if present
- Ignore: merch, generic affiliate links, social media, donations, self promo

Examples of correct domain mapping:
- Surfshark promo link -> surfshark.com
- Nord VPN creator link -> nordvpn.com
- Skillshare special offer -> skillshare.com"""

    return prompt

def process_batch_response(content: str) -> Dict[str, List[Dict]]:
    """
    Parse the JSON returned by the LLM and extract a dict:
    { video_id: [ {name, domain, evidence}, ... ] }
    """

    try:
        # If the LLM wraps JSON in code fences, strip them out
        if content.startswith("```"):
            json_start = content.find("{")
            json_end = content.rfind("}") + 1
            if json_start != -1 and json_end != -1:
                content = content[json_start:json_end]
        
        result = json.loads(content)
        batch_results = {}
        #print(f"Parsed JSON: {result}")  # Debug print

        
        if 'video_sponsors' in result:
            for video_data in result['video_sponsors']:
                video_id = video_data['video_id']
                sponsors = video_data.get('sponsors', [])
                batch_results[video_id] = sponsors
                
        return batch_results
        
    except Exception as e:
        print(f"Error processing response JSON: {e}, Content: {content}")
        return {}

def expand_sponsor_data(df: pd.DataFrame, sponsor_map: Dict[str, List[Dict]]) -> pd.DataFrame:
    """
    Add columns for each sponsor (name, domain, evidence) to the original DataFrame.
    """
    df['sponsor_data'] = df['videoId'].map(lambda x: sponsor_map.get(x, []))
    
    # Find the maximum number of sponsors that any video has
    max_sponsors = max((len(sponsors) for sponsors in sponsor_map.values()), default=0)
    
    all_sponsor_rows = []
    for video_id in df['videoId']:
        sponsors = sponsor_map.get(video_id, [])
        row_sponsors = []
        # For each sponsor slot, create columns
        for i in range(max_sponsors):
            if i < len(sponsors):
                sponsor = sponsors[i]
                row_sponsors.extend([
                    sponsor.get('name', None),
                    sponsor.get('domain', None),
                    sponsor.get('evidence', None)
                ])
            else:
                row_sponsors.extend([None, None, None])
        all_sponsor_rows.append(row_sponsors)
    
    # Create column names for the sponsor slots
    column_names = []
    for i in range(max_sponsors):
        column_names.extend([
            f"sponsor_{i+1}_name",
            f"sponsor_{i+1}_domain",
            f"sponsor_{i+1}_evidence"
        ])

    sponsor_expanded_df = pd.DataFrame(all_sponsor_rows, columns=column_names)
    
    # Combine with the original data
    df.reset_index(drop=True, inplace=True)
    sponsor_expanded_df.reset_index(drop=True, inplace=True)
    final_df = pd.concat([df, sponsor_expanded_df], axis=1)
    return final_df

#########################################
# Two-Pass Sponsor Processing
#########################################

def process_videos_with_library(df: pd.DataFrame, api_key: str) -> pd.DataFrame:
    """Two-pass sponsor extraction using the LLMProcessor"""

    config = ProcessorConfig(
        cache_enabled=False,
        enable_batch_prompts=True,  # Enable batching
        batch_size=1,             # Process 10 videos at a time
        max_workers=1000,
        fail_fast=False
    )
    
    client = DeepSeekClient(api_key=api_key, model="deepseek-chat")
    processor = LLMProcessor(llm_client=client, config=config)

    # Track all results across both passes
    all_sponsors = {}
    first_pass_sponsors = {}

    def process_first_pass(batch: List[Dict]) -> Dict:
        """Process a batch of videos with short descriptions"""
        prompt = create_prompt(batch, desc_length=200)
        response = client.call_api(prompt)
        
        if not response.get('success'):
            return {}
            
        sponsors = process_batch_response(response['content'])
        # Update our tracking dict
        first_pass_sponsors.update(sponsors)
        return sponsors

    # First pass
    print("\nRunning first pass...")
    first_pass_results = processor.process_batch(
        items=df.to_dict('records'),
        process_fn=process_first_pass,
        desc="First pass (200 tokens)"
    )
    
    # Combine first pass results
    for result in first_pass_results:
        all_sponsors.update(result)

    # Second pass only for videos without sponsors
    remaining = [
        video for video in df.to_dict('records')
        if video['videoId'] not in first_pass_sponsors
    ]

    if remaining:
        print(f"\nRunning second pass for {len(remaining)} videos...")
        
        def process_second_pass(batch: List[Dict]) -> Dict:
            """Process a batch with longer descriptions"""
            prompt = create_prompt(batch, desc_length=1500)
            response = client.call_api(prompt)
            
            if not response.get('success'):
                return {}
                
            return process_batch_response(response['content'])

        second_pass_results = processor.process_batch(
            items=remaining,
            process_fn=process_second_pass,
            desc="Second pass (1500 tokens)"
        )
        
        # Combine second pass results
        for result in second_pass_results:
            all_sponsors.update(result)

    # Print stats
    print(f"\nProcessing complete!")
    print(f"Total videos: {len(df)}")
    print(f"First pass sponsors: {len(first_pass_sponsors)}")
    print(f"Total sponsors found: {len(all_sponsors)}")

    # Expand and return
    return expand_sponsor_data(df, all_sponsors)


#########################################
# Example Main
#########################################

if __name__ == "__main__":
    try:
        # Load your CSV
        df = pd.read_csv("/Users/parthkocheta/Documents/sponsorFind/sponsorFind/chunk_8_of_245.csv", nrows=1000)
        
        # Put your actual DeepSeek/LLM API key here
        API_KEY = "sk-cd405682db094b6781f9f815840163d8"

        # Run the two-pass sponsor extraction
        final_df = process_videos_with_library(df, API_KEY)

        # Save results
        final_df.to_csv("sponsor_results.csv", index=False)
        
        print("Processing complete! Results saved to sponsor_results.csv")

    except Exception as e:
        print(f"Error running sponsor processing: {e}")



Running first pass...


First pass (200 tokens):  43%|████▎     | 434/1000 [00:04<00:01, 380.76it/s]

Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided video title and description, there is no explicit mention of brand sponsorships or promotional content. Without access to the actual video content or additional details, I cannot identify any sponsorships. Therefore, the JSON object will reflect that no sponsors were found for this video.

```json
{
    "video_sponsors": [
        {
            "video_id": "qbh6C4tlEfg",
            "sponsors": []
        }
    ]
}
``` 

If you have access to the video content or additional details (e.g., captions, comments, or timestamps), I can reanalyze it for potential sponsorships. Let me know!
Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided video details, there is no explicit mention of brand sponsorships, integrations, or partnerships in the title, description, or context of the video. The content appears to focus on a dramatic incide

First pass (200 tokens):  63%|██████▎   | 627/1000 [00:05<00:00, 428.83it/s]

Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided video title and description, there is no explicit mention of brand sponsorships, integrations, or partnerships. The video appears to focus on analyzing crypto influencers and does not directly reference any brands with promotional intent. Therefore, no sponsorships can be identified from the given information.

Here is the JSON object reflecting this analysis:

```json
{
    "video_sponsors": [
        {
            "video_id": "rHILmX0qBKM",
            "sponsors": []
        }
    ]
}
```

If additional details or context from the video content itself are available, further analysis could be conducted to identify potential sponsorships.
Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: To analyze the video for brand sponsorships, I would need to watch the video and look for direct mentions of brands with promotional intent, sponsored integrations

First pass (200 tokens):  74%|███████▍  | 738/1000 [00:05<00:00, 467.79it/s]

Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided video details (ID, title, and description), there is no explicit evidence of brand sponsorships or promotional content. The description does not mention any brands, partnerships, or sponsored integrations. Therefore, no sponsorships can be identified for this video.

Here is the JSON object reflecting this analysis:

```json
{
    "video_sponsors": [
        {
            "video_id": "yuCEfIuE0os",
            "sponsors": []
        }
    ]
}
``` 

If additional details or context from the video itself (e.g., in-video mentions or visuals) become available, the analysis can be updated accordingly.


First pass (200 tokens):  87%|████████▋ | 872/1000 [00:05<00:00, 381.38it/s]

Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided video title and description, there is no explicit mention of brand sponsorships, integrations, or partnerships. The video appears to focus on discussing Aspen Pittman's legacy and the repair of a Fender Pro Reverb amplifier, with no promotional intent for any brands. Therefore, no sponsorships are identified for this video.

Here is the JSON object reflecting this analysis:

```json
{
    "video_sponsors": [
        {
            "video_id": "CvTRANw9l6w",
            "sponsors": []
        }
    ]
}
```
Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided video title, description, and guidelines, there is no explicit evidence of brand sponsorships or promotional content in the metadata provided for **VIDEO 1**. The description focuses on the storyline of the drama and does not mention any brands or sponsorships.

Here is the JSO

First pass (200 tokens):  91%|█████████ | 911/1000 [00:06<00:00, 295.17it/s]

Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided video title, description, and guidelines, there is no explicit evidence of brand sponsorships in the video metadata. The content appears to focus on commentary about the "Ninja Sentai Kakuranger 30 Years After Special" and does not mention any brands or promotional intent.

Here is the JSON object reflecting the analysis:

```json
{
    "video_sponsors": [
        {
            "video_id": "qrnQLez1a6s",
            "sponsors": []
        }
    ]
}
```

If there are additional details or transcripts from the video that might reveal sponsorships, please provide them for further analysis.


First pass (200 tokens):  94%|█████████▍| 944/1000 [00:06<00:00, 244.28it/s]

Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided information, the video description is incomplete and does not contain sufficient details to identify any sponsorships. Without explicit mentions of brands or promotional intent in the description or title, it is not possible to determine any sponsorships for this video.

Here is the JSON object reflecting the analysis:

```json
{
    "video_sponsors": [
        {
            "video_id": "oc-OL7Z7cTo",
            "sponsors": []
        }
    ]
}
```

If additional details or the full description of the video are provided, I can reanalyze and update the JSON object accordingly.
Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided video title and description, there is no explicit evidence of brand sponsorships or promotional intent for any specific brand. The video appears to focus on painting techniques for models, specifically me

First pass (200 tokens):  97%|█████████▋| 972/1000 [00:06<00:00, 154.96it/s]

Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided video title and description, there is no explicit mention of brand sponsorships, integrations, or partnerships. The content appears to focus on gameplay mechanics and updates for *Genshin Impact*, specifically discussing elemental reactions in patch 5.2. There is no evidence of promotional intent or brand mentions in the provided text.

Here is the JSON object reflecting the analysis:

```json
{
    "video_sponsors": [
        {
            "video_id": "_fbe7Stbba4",
            "sponsors": []
        }
    ]
}
```

If additional details or context from the video itself (e.g., in-video mentions or visuals) are available, further analysis could be conducted. However, based on the provided metadata, no sponsorships are identified.
Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided video title and description, there is no explicit

First pass (200 tokens):  99%|█████████▉| 994/1000 [00:07<00:00, 68.64it/s] 

Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: To analyze the video for brand sponsorships, I would need to watch the video and examine its description for any direct mentions of brands with promotional intent. Since I cannot directly access or watch the video, I will rely on the provided title and description to infer potential sponsorships. However, the description provided does not explicitly mention any brands or sponsorships.

If you have access to the video or its transcript, you can look for direct mentions of brands with promotional intent, such as:
- Verbal mentions of sponsors during the video.
- Brand logos or product placements.
- Links or discount codes provided in the description.

Based on the information provided, here is the JSON object:

```json
{
    "video_sponsors": [
        {
            "video_id": "4k841lQ4ZQs",
            "sponsors": []
        }
    ]
}
```

If you can provide more details or a transcript of the video, I 

First pass (200 tokens): 100%|██████████| 1000/1000 [00:09<00:00, 101.37it/s]


Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: To analyze the video for brand sponsorships, I would need to watch the video and examine its description, title, and any visible or audible mentions of brands. However, since I cannot directly access or watch videos, I can only analyze the provided text (title and description) for potential sponsorships.

Based on the provided information:

1. **Video ID**: `aMEoQSPRRh4`
2. **Title**: "Max Verstappen's Pole Lap | 2024 Qatar Grand Prix | Pirelli"
3. **Description**: "Ride onboard with Max Verstappen as he secures pole position in Qatar, his first pole in 12 races! Ride down The Strip on board with George Russell as the Brit storms to pole position at the end of ..."

From the title, the brand **Pirelli** is explicitly mentioned, which is a well-known tire manufacturer and a sponsor in Formula 1. The description does not provide additional sponsorship details.

Here is the JSON object based on the analysi

Second pass (1500 tokens):  67%|██████▋   | 16/24 [00:03<00:00,  8.76it/s]

Error processing response JSON: Expecting value: line 1 column 1 (char 0), Content: Based on the provided video description and content, there is no direct evidence of brand sponsorships, integrations, or partnerships in the video. The description primarily includes sources, music credits, and chapter timestamps, but no promotional content or brand mentions. Therefore, the JSON object will reflect that no sponsorships were identified.

```json
{
    "video_sponsors": [
        {
            "video_id": "wkPU4xCV3mU",
            "sponsors": []
        }
    ]
}
```


Second pass (1500 tokens): 100%|██████████| 24/24 [00:08<00:00,  2.88it/s]


Processing complete!
Total videos: 1000
First pass sponsors: 976
Total sponsors found: 999
Processing complete! Results saved to sponsor_results.csv





In [5]:

# 5. Load and Process Data
# Load your video data
df = pd.read_csv('/Users/parthkocheta/Documents/sponsorFind/sponsorFind/chunk_8_of_245.csv')

# Convert DataFrame rows to list of dicts
video_data = df.to_dict('records')

# Process videos with our framework
results = processor.process_batch(
    items=video_data,
    process_fn=process_video_batch,
    transform_fn=transform_results,
    cache_prefix='sponsor_extraction',
    output_path='sponsor_results.csv'
)

# 6. Analyze Results
results_df = pd.DataFrame(results)

# Print statistics
print("\nProcessing Statistics:")
print(f"Total videos processed: {len(df)}")
print(f"Videos with sponsors found: {len(results_df)}")

# Show sample results
print("\nSample Sponsor Results:")
print(results_df.head())

# 7. Check Processing Metrics
import json
with open("metrics.json", 'r') as f:
    metrics = json.load(f)

print("\nProcessing Metrics:")
print(f"Total processing time: {metrics['total_time']:.2f} seconds")
print(f"Average time per item: {metrics['avg_process_time']:.2f} seconds")
print(f"Cache hits: {metrics['cache_hits']}")
print(f"Total errors: {metrics['errors']}")

# 8. Additional Analysis
if results_df.empty:
    print("No results found")
else:
    # Get sponsor frequency
    sponsor_cols = [col for col in results_df.columns if 'sponsor_' in col and 'name' in col]
    all_sponsors = results_df[sponsor_cols].values.flatten()
    sponsor_counts = pd.Series(all_sponsors).value_counts().dropna()

    print("\nTop Sponsors:")
    print(sponsor_counts.head())

# 9. Save Final Results
results_df.to_csv('final_sponsor_analysis.csv', index=False)
print("\nResults saved to 'final_sponsor_analysis.csv'")

NameError: name 'process_video_batch' is not defined

In [3]:
# test_processor_speed.py

import time
import random
import string
from typing import Dict, Any, List
import concurrent.futures
from tqdm import tqdm

########################################
# 1) Fake LLMClient to Simulate API Calls
########################################

from multi_processing.llm_client import BaseLLMClient

class FakeLLMClient(BaseLLMClient):
    """
    A fake client that simulates a 100-300ms "API call" time.
    No real network usage, just time.sleep().
    """

    def call_api(self, prompt: str, system_prompt: str = None, **kwargs) -> Dict[str, Any]:
        # Sleep a random time to simulate latency
        time.sleep(random.uniform(0.1, 0.3))
        # Return a pretend success payload
        return {
            "content": f"Fake response for prompt: {prompt[:30]}...",
            "success": True
        }

    def validate_response(self, response: Dict[str, Any]) -> bool:
        return response.get("success", False)

########################################
# 2) Generate a Synthetic Dataset
########################################

def generate_fake_dataset(num_items: int = 2000) -> List[Dict[str, Any]]:
    """
    Generate a list of dicts with random text data.
    """
    dataset = []
    for i in range(num_items):
        random_text = ''.join(random.choices(string.ascii_lowercase + ' ', k=100))
        item = {
            "id": i,
            "text": random_text
        }
        dataset.append(item)
    return dataset

########################################
# 3) The "process function" we apply
########################################

def process_item_with_fake_llm(item: Dict[str, Any], client: BaseLLMClient) -> Dict[str, Any]:
    """
    Given a single item, call the fake LLM API and return a small result dict.
    """
    prompt = f"Process text: {item['text']}"
    response = client.call_api(prompt)
    return {
        "id": item["id"],
        "text": item["text"],
        "content": response["content"],  # from the fake LLM
        "success": response["success"]
    }

def run_sequential_no_concurrency(dataset, client):
    """
    Control group: process each item in a simple for-loop (sequential).
    """
    results = []
    start_time = time.perf_counter()
    
    for item in tqdm(dataset, desc="Sequential (control)"):
        prompt = f"Process text: {item['text']}"
        response = client.call_api(prompt)
        
        results.append({
            "id": item["id"],
            "text": item["text"],
            "content": response["content"],
            "success": response["success"]
        })
    
    elapsed = time.perf_counter() - start_time
    print(f"[Control] Processed {len(results)} items sequentially in {elapsed:.2f} seconds.")
    return results


########################################
# 4) Testing with the LLMProcessor Library
########################################

from multi_processing.processor import LLMProcessor
from multi_processing.processor_config import ProcessorConfig

def run_with_library(dataset: List[Dict[str, Any]], max_workers: int = 10) -> List[Dict[str, Any]]:
    """
    Use your LLMProcessor with concurrency, no caching, etc.
    """

    config = ProcessorConfig(
        cache_enabled=False,  # no disk caching
        max_workers=max_workers,
        rate_limit=0.0,
        max_retries=1,
        batch_size=1,         # item-level concurrency
        fail_fast=False
    )
    
    client = FakeLLMClient()  # Our fake client
    processor = LLMProcessor(llm_client=client, config=config)

    def process_fn(item):
        return process_item_with_fake_llm(item, client)

    start_time = time.perf_counter()

    results = processor.process_batch(
        items=dataset,
        process_fn=process_fn,
        cache_prefix="",   # not used if cache is disabled
        use_cache=False
    )

    elapsed = time.perf_counter() - start_time
    print(f"[Library] Processed {len(results)} items in {elapsed:.2f} seconds.")
    return results

########################################
# 5) Testing with Raw Concurrency (ThreadPool)
########################################

def run_with_raw_concurrency(dataset: List[Dict[str, Any]], max_workers: int = 10) -> List[Dict[str, Any]]:
    """
    Use a plain ThreadPoolExecutor approach, no library overhead.
    """
    client = FakeLLMClient()
    
    results = []
    start_time = time.perf_counter()

    with tqdm(total=len(dataset), desc="Raw concurrency") as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for item in dataset:
                futures.append(executor.submit(process_item_with_fake_llm, item, client))
            
            for f in concurrent.futures.as_completed(futures):
                res = f.result()
                results.append(res)
                pbar.update(1)

    elapsed = time.perf_counter() - start_time
    print(f"[Raw Concurrency] Processed {len(results)} items in {elapsed:.2f} seconds.")
    return results

########################################
# 6) Main Comparison
########################################

if __name__ == "__main__":
    # 1) Generate a sample dataset
    data_size = 200  # adjust to see bigger difference
    dataset = generate_fake_dataset(num_items=data_size)

    # 2) Run with library
    library_results = run_with_library(dataset, max_workers=10)

    # 3) Run with raw concurrency
    raw_results = run_with_raw_concurrency(dataset, max_workers=10)

    control_results = run_sequential_no_concurrency(dataset, client)

    # 4) Quick check
    # Validate that we got the same number of results
    print(f"Library results: {len(library_results)} items.")
    print(f"Raw concurrency results: {len(raw_results)} items.")
    
    # If you want to confirm the outputs are consistent, you can compare them
    # but here we only compare time.


Processing items: 100%|██████████| 200/200 [00:04<00:00, 47.39it/s]


[Library] Processed 200 items in 4.24 seconds.


Raw concurrency: 100%|██████████| 200/200 [00:04<00:00, 47.52it/s]

[Raw Concurrency] Processed 200 items in 4.21 seconds.
Library results: 200 items.
Raw concurrency results: 200 items.





In [18]:
# test_processor_speed.py

import time
import random
import string
from typing import Dict, Any, List
import concurrent.futures
from tqdm import tqdm

########################################
# 1) Fake LLMClient to Simulate API Calls
########################################

from multi_processing.llm_client import BaseLLMClient

class FakeLLMClient(BaseLLMClient):
    """
    A fake client that simulates a 100-300ms "API call" time.
    No real network usage, just time.sleep().
    """

    def call_api(self, prompt: str, system_prompt: str = None, **kwargs) -> Dict[str, Any]:
        # Sleep a random time to simulate latency
        time.sleep(random.uniform(0.1, 0.3))
        # Return a pretend success payload
        return {
            "content": f"Fake response for prompt: {prompt[:30]}...",
            "success": True
        }

    def validate_response(self, response: Dict[str, Any]) -> bool:
        return response.get("success", False)

########################################
# 2) Generate a Synthetic Dataset
########################################

def generate_fake_dataset(num_items: int = 2000) -> List[Dict[str, Any]]:
    """
    Generate a list of dicts with random text data.
    """
    dataset = []
    for i in range(num_items):
        random_text = ''.join(random.choices(string.ascii_lowercase + ' ', k=100))
        item = {
            "id": i,
            "text": random_text
        }
        dataset.append(item)
    return dataset

########################################
# 3) The "process function" for item-level
########################################

def process_item_with_fake_llm(item: Dict[str, Any], client: BaseLLMClient) -> Dict[str, Any]:
    """
    Given a single item, call the fake LLM API and return a small result dict.
    """
    prompt = f"Process text: {item['text']}"
    response = client.call_api(prompt)
    return {
        "id": item["id"],
        "text": item["text"],
        "content": response["content"],  # from the fake LLM
        "success": response["success"]
    }

########################################
# 4) No concurrency: control group
########################################

def run_sequential_no_concurrency(dataset, client):
    """
    Control group: process each item in a simple for-loop (sequential).
    """
    results = []
    start_time = time.perf_counter()
    
    for item in tqdm(dataset, desc="Sequential (control)"):
        prompt = f"Process text: {item['text']}"
        response = client.call_api(prompt)
        
        results.append({
            "id": item["id"],
            "text": item["text"],
            "content": response["content"],
            "success": response["success"]
        })
    
    elapsed = time.perf_counter() - start_time
    print(f"[Control] Processed {len(results)} items sequentially in {elapsed:.2f} seconds.")
    return results

########################################
# 5) The Library with item-level concurrency
########################################

from multi_processing.processor import LLMProcessor
from multi_processing.processor_config import ProcessorConfig

def run_with_library(dataset: List[Dict[str, Any]], max_workers: int = 10) -> List[Dict[str, Any]]:
    """
    Use your LLMProcessor with concurrency, no caching, etc., item-level.
    """
    config = ProcessorConfig(
        cache_enabled=False,
        max_workers=max_workers,
        rate_limit=0.0,
        max_retries=1,
        batch_size=1,         # item-level concurrency
        fail_fast=False,
        enable_batch_prompts=False  # <== ensure item-level
    )
    
    client = FakeLLMClient()
    processor = LLMProcessor(llm_client=client, config=config)

    def process_fn(item):
        return process_item_with_fake_llm(item, client)

    start_time = time.perf_counter()
    results = processor.process_batch(
        items=dataset,
        process_fn=process_fn,
        cache_prefix="", 
        use_cache=False
    )
    elapsed = time.perf_counter() - start_time
    print(f"[Library Item-Level] Processed {len(results)} items in {elapsed:.2f} seconds.")
    return results

########################################
# 6) The Library with "Batch" prompts
########################################

def create_subbatch_prompt(items: List[Dict[str, Any]]) -> str:
    """
    Example: combine multiple items into a single prompt.
    items might be up to config.batch_size in length.
    """
    # Just build a single text listing them
    combined_text = ""
    for itm in items:
        combined_text += f"(ID={itm['id']}) {itm['text']}\n"
    prompt = f"Process these {len(items)} texts at once:\n{combined_text}"
    return prompt

def process_subbatch(subbatch: List[Dict[str, Any]], client: BaseLLMClient) -> Dict[str, Any]:
    """
    Called once per sub-batch. We call the LLM once for all items in subbatch.
    Return a dict with the results. For example, { item_id -> info }.
    """
    prompt = create_subbatch_prompt(subbatch)
    response = client.call_api(prompt)
    
    # We'll pretend we parse out something. For now, just store the prompt.
    # In a real scenario, you'd parse JSON containing all items' results.
    result_map = {}
    for itm in subbatch:
        result_map[itm['id']] = {
            "id": itm["id"],
            "text": itm["text"],
            "combined_response": response["content"],
            "success": response["success"]
        }
    return result_map

def run_with_library_batch_mode(dataset: List[Dict[str, Any]], max_workers: int = 10, batch_size: int = 10):
    """
    Concurrency across sub-batches (enable_batch_prompts=True).
    Each sub-batch calls the LLM once for multiple items.
    """
    config = ProcessorConfig(
        cache_enabled=False,
        max_workers=max_workers,
        rate_limit=0.0,
        max_retries=1,
        batch_size=batch_size,  # sub-batch size
        fail_fast=False,
        enable_batch_prompts=True  # <== batch mode
    )
    
    client = FakeLLMClient()
    processor = LLMProcessor(llm_client=client, config=config)

    def process_fn(subbatch: List[Dict[str, Any]]) -> Dict[int, Any]:
        return process_subbatch(subbatch, client)

    start_time = time.perf_counter()
    # This returns a list of dicts. Each dict is { item_id -> info } for one sub-batch
    dict_list = processor.process_batch(
        items=dataset,
        process_fn=process_fn,
        cache_prefix="", 
        use_cache=False,
        desc="Library Batch Mode"
    )
    elapsed = time.perf_counter() - start_time

    # Combine all subdicts
    combined = {}
    for subdict in dict_list:
        combined.update(subdict)  # merges item_id -> info
    
    print(f"[Library Batch Mode] Processed {len(combined)} items in {elapsed:.2f} seconds (sub-batch size={batch_size}).")
    return combined

########################################
# 7) Raw Concurrency (ThreadPool)
########################################

def run_with_raw_concurrency(dataset: List[Dict[str, Any]], max_workers: int = 10) -> List[Dict[str, Any]]:
    """
    Use a plain ThreadPoolExecutor approach, no library overhead.
    """
    client = FakeLLMClient()
    
    results = []
    start_time = time.perf_counter()

    with tqdm(total=len(dataset), desc="Raw concurrency") as pbar:
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for item in dataset:
                futures.append(executor.submit(process_item_with_fake_llm, item, client))
            
            for f in concurrent.futures.as_completed(futures):
                res = f.result()
                results.append(res)
                pbar.update(1)

    elapsed = time.perf_counter() - start_time
    print(f"[Raw Concurrency] Processed {len(results)} items in {elapsed:.2f} seconds.")
    return results

########################################
# 8) Main Comparison
########################################

if __name__ == "__main__":
    # 1) Generate a sample dataset
    data_size = 500
    dataset = generate_fake_dataset(num_items=data_size)

    # We'll re-use one FakeLLMClient for the control
    control_client = FakeLLMClient()
    
    # 2) Control: No concurrency
    control_results = run_sequential_no_concurrency(dataset, control_client)

    # 3) Library, item-level concurrency
    library_item_results = run_with_library(dataset, max_workers=100)

    # 4) Library, batch mode concurrency
    #    Each sub-batch processes multiple items in a single fake API call
    library_batch_results = run_with_library_batch_mode(dataset, max_workers=100, batch_size=10)

    # 5) Raw concurrency
    raw_results = run_with_raw_concurrency(dataset, max_workers=100)

    # 6) Print final comparisons
    print(f"\nControl results: {len(control_results)} items.")
    print(f"Library item-level results: {len(library_item_results)} items.")
    print(f"Library batch-mode results: {len(library_batch_results)} items.")
    print(f"Raw concurrency results: {len(raw_results)} items.")


Sequential (control): 100%|██████████| 500/500 [01:41<00:00,  4.93it/s]


[Control] Processed 500 items sequentially in 101.50 seconds.


Processing items: 100%|██████████| 500/500 [00:01<00:00, 416.18it/s]


[Library Item-Level] Processed 500 items in 1.21 seconds.


Library Batch Mode: 100%|██████████| 50/50 [00:00<00:00, 163.28it/s]


[Library Batch Mode] Processed 500 items in 0.31 seconds (sub-batch size=10).


Raw concurrency: 100%|██████████| 500/500 [00:01<00:00, 412.03it/s]

[Raw Concurrency] Processed 500 items in 1.22 seconds.

Control results: 500 items.
Library item-level results: 500 items.
Library batch-mode results: 500 items.
Raw concurrency results: 500 items.





In [19]:
data_size = 500
dataset = generate_fake_dataset(num_items=data_size)

def count_tokens(items: list) -> int:
    sum = 0
    for item in items:
        # Since dataset is a list of dictionaries, we can directly get the length of each item
        sum += len(str(item))  # Convert item to string to get its length
    return sum

print(f"Total tokens: {count_tokens(dataset)}")


Total tokens: 61390


In [16]:
dataset.type

AttributeError: 'list' object has no attribute 'type'