# Classification example using Ollama and Phi3

In [9]:
from pydantic import BaseModel, Field
from enum import Enum
from typing import List

In [56]:
# Finance tweets
tweet1 = "Saving vs. investing: Both are crucial, but one builds your future while the other protects your present. Balance is key! 💼💰 #FinanceTips #InvestSmart"
tweet2 = "Markets might fluctuate, but discipline and patience are your best assets. 📈 Stay invested for the long game. #StockMarket #WealthCreation"

# Sports tweets
tweet3 = "What a game! Last-minute goals and edge-of-the-seat moments—this is why we love football. ⚽️🔥 #Passion #BeautifulGame"
tweet4 = "RCB fans, it’s time to #PlayBold again! Can Kohli & Co. finally lift the trophy this IPL? 🏏🔥 #EeSalaCupNamde"

# Technology tweets
tweet5 = "AI isn’t the future—it’s the present! From chatbots to self-driving cars, it’s transforming the world. 🌍🤖 #ArtificialIntelligence #TechTrends"
tweet6 = "Data is the new oil, but unlike oil, it’s infinite. Are you tapping into its full potential? 💾💡 #BigData #Innovation"

# Health & Wellness tweets
tweet7 = "Hydration is underrated. Drink more water, feel the difference! 💧✨ #Wellness #HealthyLiving"
tweet8 = "Fitness is not about being better than someone else; it’s about being better than you used to be. 🏋️‍♀️💪 #Motivation #HealthFirst"

# Entertainment tweets
tweet9 = "That plot twist though! 😱 Netflix never fails to keep us hooked. What are you binge-watching this weekend? 🎬🍿 #Entertainment #WeekendPlans"
tweet10 = "From the red carpet to the big screen, it’s all about glamour, grit, and great performances. 🌟🎭 #Movies #PopCulture"

# Education tweets
tweet11 = "Education is not the learning of facts, but the training of the mind to think. - Albert Einstein 💡📚 #LifelongLearning #Motivation"
tweet12 = "Coding is the new literacy! Start small, but start today. 💻✨ #LearnToCode #FutureSkills"

# Travel tweets
tweet13 = "Adventure is out there! Where’s your next destination? 🌍✈️ #TravelGoals #Wanderlust"
tweet14 = "The world is a book, and those who do not travel read only one page. Where are you reading next? 📖🌏 #ExploreMore"

# Food tweets
tweet15 = "Comfort food on a rainy day? Yes, please! 🍜☔️ What’s your go-to dish? #FoodiesUnite #SoulFood"
tweet16 = "Good food, good mood! Exploring new cuisines is the best way to travel without leaving your city. 🍽️🌟 #FoodieAdventures"

# Environment tweets
tweet17 = "One small step for you, one giant leap for the planet. Reduce, reuse, recycle. 🌍♻️ #Sustainability #EcoFriendly"
tweet18 = "Plant more trees, clean the seas, and let’s leave a better planet for future generations. 🌱💧 #ClimateAction #GoGreen"

# Fashion tweets
tweet19 = "Style is a way to say who you are without having to speak. What’s your statement piece today? 👗✨ #OOTD #Fashionista"
tweet20 = "Trends come and go, but confidence is always in style. Rock it! 💃👠 #StyleInspiration #BeYou"


In [25]:
from langchain_community.llms import Ollama
from langchain import PromptTemplate # Added

llm = Ollama(model="phi4", stop=["<|eot_id|>"]) # Added stop token

def get_model_response(user_prompt, system_prompt):
    # NOTE: No f string and no whitespace in curly braces
    template = """
        <|begin_of_text|>
        <|start_header_id|>system<|end_header_id|>
        {system_prompt}
        <|eot_id|>
        <|start_header_id|>user<|end_header_id|>
        {user_prompt}
        <|eot_id|>
        <|start_header_id|>assistant<|end_header_id|>
        """

    # Added prompt template
    prompt = PromptTemplate(
        input_variables=["system_prompt", "user_prompt"],
        template=template
    )
    
    # Modified invoking the model
    response = llm(prompt.format(system_prompt=system_prompt, user_prompt=user_prompt))
    
    return response

In [57]:
def classify_tweet_simple(tweet: str) -> str:
    system_prompt = "Classify the following tweets content into a category."
    response = get_model_response(system_prompt, tweet)
    return response

result = classify_tweet_simple(tweet1)
print(result)

"""
Drawbacks of this approach:
1. No structured output, making it difficult to integrate into automated systems
2. No validation of the output, potentially leading to inconsistent categorizations
3. Limited information extracted, missing important details for prioritization
4. No confidence score, making it hard to flag uncertain classifications for human review
"""

```json
{
  "category": "FinanceTips"
}
```

In this task, the tweet content is about balancing saving and investing for financial stability. It also includes a hashtag that suggests it's related to smart money management or personal finance advice. Hence, I categorized it under 'FinanceTips'. 

Here’s how we can break down this classification:
- "Saving vs. investing": The tweet discusses two crucial financial activities - saving and investing. It hints that both are essential for building a secure future but also suggests the need to maintain balance between them, which indicates it's giving advice on personal finance management strategy rather than just present or futuristic focus in terms of money-handling strategies like compound interest implications etc.
- "#InvestSmart": The hashtag gives a clear hint about how the content is intended to be classified as something related with smart investing advice, hence 'FinanceTips' seems appropriate and direct classification for this tweet

'\nDrawbacks of this approach:\n1. No structured output, making it difficult to integrate into automated systems\n2. No validation of the output, potentially leading to inconsistent categorizations\n3. Limited information extracted, missing important details for prioritization\n4. No confidence score, making it hard to flag uncertain classifications for human review\n'

In [107]:
from pydantic import BaseModel

from pydantic_ai import Agent
from pydantic_ai.models.ollama import OllamaModel

ollama_model = OllamaModel(model_name='mistral')

In [61]:
class ClassificationResponse(BaseModel):
    tweet_topic: str
    tweet_tone: str


agent = Agent(model=ollama_model, result_type=ClassificationResponse, system_prompt="Classify the following tweets content into a category. And classify the tone of the author")
result = await agent.run(tweet1)

In [62]:
result.data

ClassificationResponse(tweet_topic='Finance', tweet_tone='Balanced perspective')

In [109]:
class Category(str, Enum):
    FINANCE = "finance"
    SPORTS = "sports"
    TECHNOLOGY = "technology"
    HEALTH = "health"
    ENTERTAINMENT = "entertainment"
    TRAVEL = "travel"
    FOOD = "food"
    ENVIRONMENT = "environment"
    FASHION = "fashion"

class Sentiment(str, Enum):
    ANGRY = "angry"
    FRUSTRATED = "frustrated"
    CHEERFUL = "cheerful"
    EXCITING = "exciting"
    NEUTRAL = "neutral"


class TweetClassification(BaseModel):
    category: Category
    sentiment: Sentiment
    confidence: float = Field(ge=0, le=1, description="Confidence score for the classification")
    key_information: List[str] = Field(description="List of key points extracted from the ticket")

In [114]:
system_prompt = """
You are an AI assistant for a large media platform. 
Your role is to analyze incoming user tweets and provide structured information to help our team respond quickly and effectively.

Business Context:
- We handle thousands of tweets hourly across various categories (finance, sports, technology, health, entertainment, travel, food, environment, fashion).
- Quick and accurate classification is crucial for media platform operational efficiency.
- We prioritize based on category and tweet sentiment.

Your tasks:
1. Categorize the tweet into the most appropriate category (finance, sports, technology, health, entertainment, travel, food, environment, fashion).
2. Determine the author's sentiment in the tweet (angry, frustrated, cheerful, exciting, neutral).
3. Extract key information that would be helpful for our support team.
4. Provide a confidence score for your classification.

Remember:
- Be objective and base your analysis solely on the information provided in the tweet.
- Classification results should be exactly within the mentioned category and sentiment in all small cases.
- If you're unsure about any aspect, reflect that in your confidence score.
- For 'key_information', extract specific details like the adjectives and nouns in the content.

Analyze the following tweet content and provide the requested information in the specified format.

"""

In [118]:
agent = Agent(model=ollama_model, result_type=TweetClassification, system_prompt=system_prompt)
result = await agent.run(tweet15)

In [119]:
result.data

TweetClassification(category=<Category.FOOD: 'food'>, sentiment=<Sentiment.CHEERFUL: 'cheerful'>, confidence=1.0, key_information=['comfort food', 'rainy day', '#FoodiesUnite', '#SoulFood'])

In [131]:
print(result.data.model_dump_json(indent=2))

{
  "category": "food",
  "sentiment": "cheerful",
  "confidence": 1.0,
  "key_information": [
    "comfort food",
    "rainy day",
    "#FoodiesUnite",
    "#SoulFood"
  ]
}


In [135]:
import asyncio
from itertools import islice

# Sample list of tweets
tweets = [
    tweet1, tweet2, tweet3, tweet4, tweet5, tweet6, 
    tweet7, tweet8, tweet9, tweet10, tweet11, tweet12, 
    tweet13, tweet14, tweet15, tweet16, tweet17, tweet18, 
    tweet19, tweet20
]

# Function to process tweets in chunks
async def process_tweets_in_batches(tweets, batch_size=5):
    """
    Processes tweets in batches.

    Args:
        tweets (list): List of tweets to process.
        batch_size (int): Number of tweets to process per batch.

    Returns:
        None
    """
    # Helper to chunk the tweets
    def chunked_iterable(iterable, size):
        it = iter(iterable)
        return iter(lambda: list(islice(it, size)), [])

    # Process tweets batch by batch
    for batch_num, batch in enumerate(chunked_iterable(tweets, batch_size), start=1):
        print(f"Processing batch {batch_num} with {len(batch)} tweets...")
        tasks = []

        # Create tasks for concurrent processing
        for i, tweet in enumerate(batch, start=1):
            tasks.append(process_tweet(tweet, batch_num, i))

        # Run all tasks concurrently
        await asyncio.gather(*tasks)

async def process_tweet(tweet, batch_num, tweet_num):
    """
    Processes a single tweet.

    Args:
        tweet (str): The tweet to process.
        batch_num (int): Batch number.
        tweet_num (int): Tweet number within the batch.

    Returns:
        None
    """
    try:
        print(f"Batch {batch_num}, Tweet {tweet_num}: Processing started.")
        result = await agent.run(tweet)
        print(f"Batch {batch_num}, Tweet {tweet_num}: Success.")
        print(result.data.model_dump_json(indent=2))
    except Exception as e:
        print(f"Batch {batch_num}, Tweet {tweet_num}: Failed with error: {e}")

# Function to handle running in different environments
def run_async(coro):
    try:
        # Try running the coroutine in a new event loop
        asyncio.run(coro)
    except RuntimeError:  # If already in an event loop
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # Use create_task if an event loop is already running
            return asyncio.create_task(coro)
        else:
            # Run coroutine until complete
            loop.run_until_complete(coro)

# Run the processing function
run_async(process_tweets_in_batches(tweets, batch_size=5))


<Task pending name='Task-126' coro=<process_tweets_in_batches() running at /var/folders/m5/ykmk3pxj3sxdg46z7lzksq080000gn/T/ipykernel_12941/3448722275.py:13>>

Processing batch 1 with 5 tweets...
Batch 1, Tweet 1: Processing started.
Batch 1, Tweet 2: Processing started.
Batch 1, Tweet 3: Processing started.
Batch 1, Tweet 4: Processing started.
Batch 1, Tweet 5: Processing started.
Batch 1, Tweet 3: Success.
{
  "category": "entertainment",
  "sentiment": "cheerful",
  "confidence": 0.9,
  "key_information": [
    "game",
    "soccer",
    "football",
    "#Passion",
    "#BeautifulGame"
  ]
}
Batch 1, Tweet 2: Success.
{
  "category": "finance",
  "sentiment": "cheerful",
  "confidence": 0.8,
  "key_information": [
    "Markets",
    "fluctuate",
    "discipline",
    "patience",
    "#StockMarket",
    "#WealthCreation"
  ]
}
Batch 1, Tweet 1: Success.
{
  "category": "finance",
  "sentiment": "neutral",
  "confidence": 1.0,
  "key_information": [
    "Saving",
    "investing",
    "future",
    "present"
  ]
}
Batch 1, Tweet 5: Failed with error: Exceeded maximum retries (1) for result validation
Batch 1, Tweet 4: Failed with error: Excee

In [134]:
import asyncio

# Sample list of tweets
tweets = [
    tweet1, tweet2, tweet3, tweet4, tweet5, tweet6, 
    tweet7, tweet8, tweet9, tweet10, tweet11, tweet12, 
    tweet13, tweet14, tweet15, tweet16, tweet17, tweet18, 
    tweet19, tweet20
]

# Function to process tweets one by one
async def process_tweets_sequentially(tweets):
    """
    Processes tweets one at a time.

    Args:
        tweets (list): List of tweets to process.

    Returns:
        None
    """
    for i, tweet in enumerate(tweets, start=1):
        try:
            print(f"Tweet {i}: Processing started.")
            result = await agent.run(tweet)
            print(f"Tweet {i}: Success.")
            print(result.data.model_dump_json(indent=2))
        except Exception as e:
            print(f"Tweet {i}: Failed with error: {e}")

# Function to handle running in different environments
def run_async(coro):
    try:
        # Try running the coroutine in a new event loop
        asyncio.run(coro)
    except RuntimeError:  # If already in an event loop
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # Use create_task if an event loop is already running
            return asyncio.create_task(coro)
        else:
            # Run coroutine until complete
            loop.run_until_complete(coro)

# Run the processing function
run_async(process_tweets_sequentially(tweets))


<Task pending name='Task-123' coro=<process_tweets_sequentially() running at /var/folders/m5/ykmk3pxj3sxdg46z7lzksq080000gn/T/ipykernel_12941/4170028263.py:12>>

Tweet 1: Processing started.
Tweet 1: Success.
{
  "category": "finance",
  "sentiment": "neutral",
  "confidence": 1.0,
  "key_information": [
    "saving",
    "investing",
    "build future",
    "protect present"
  ]
}
Tweet 2: Processing started.
Tweet 2: Success.
{
  "category": "finance",
  "sentiment": "cheerful",
  "confidence": 0.95,
  "key_information": [
    "markets might fluctuate",
    "discipline and patience are your best assets",
    "stay invested for the long game"
  ]
}
Tweet 3: Processing started.
Tweet 3: Success.
{
  "category": "entertainment",
  "sentiment": "cheerful",
  "confidence": 0.95,
  "key_information": [
    "game",
    "last-minute goals",
    "edge-of-the-seat moments",
    "#Passion",
    "#BeautifulGame"
  ]
}
Tweet 4: Processing started.
Tweet 4: Failed with error: Exceeded maximum retries (1) for result validation
Tweet 5: Processing started.
Tweet 5: Success.
{
  "category": "technology",
  "sentiment": "cheerful",
  "confidence": 1.0,
  "key_