<a href="https://colab.research.google.com/github/sunilrajugaddala96/SUNILRAJU/blob/main/AI_CHATBOT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from typing import List, Dict, Optional
from pydantic import BaseModel
from fastapi import FastAPI, WebSocket, Request, BackgroundTasks
import uvicorn
import os
import logging
import json
import time

In [2]:
# Optional imports
try:
    import spacy
except Exception:
    spacy = None

try:
    from textblob import TextBlob
except Exception:
    TextBlob = None

# Simple sklearn-based intent classifier (offline demo)
try:
    from sklearn.pipeline import Pipeline
    from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
    from sklearn.linear_model import LogisticRegression
    SKLEARN_AVAILABLE = True
except Exception:
    SKLEARN_AVAILABLE = False

# Redis optional
try:
    import redis
except Exception:
    redis = None

# OpenAI optional
try:
    import openai
except Exception:
    openai = None

In [3]:
# Logging / Analytics
# --------------------------------------------------
logging.basicConfig(level=logging.INFO, filename='chatbot.log', filemode='a',
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('dynamic_ai_chatbot')


In [4]:
# Pydantic models for API
# --------------------------------------------------
class ChatRequest(BaseModel):
    user_id: str
    text: str
    channel: Optional[str] = "web"

class ChatResponse(BaseModel):
    user_id: str
    text: str
    intent: Optional[str] = None
    entities: Optional[Dict[str, str]] = None
    sentiment: Optional[str] = None
    source: Optional[str] = "rule-based"

In [5]:
# Intent Classifier (simple) - trains on small sample
# --------------------------------------------------
class IntentClassifier:
    def __init__(self):
        self.model = None
        self.labels = []
        if SKLEARN_AVAILABLE:
            self.model = Pipeline([
                ('vect', CountVectorizer()),
                ('tfidf', TfidfTransformer()),
                ('clf', LogisticRegression(max_iter=1000))
            ])
        else:
            logger.warning('sklearn not available: using rule-based fallback for intents')

    def train(self, texts: List[str], labels: List[str]):
        if not SKLEARN_AVAILABLE:
            logger.warning('sklearn not available: skipping training')
            return
        self.labels = sorted(list(set(labels)))
        self.model.fit(texts, labels)
        logger.info('Intent classifier trained on %d samples' % len(texts))

    def predict(self, text: str) -> str:
        # If sklearn present, use it; otherwise, fall back
        if self.model is not None:
            try:
                pred = self.model.predict([text])[0]
                return pred
            except Exception as e:
                logger.exception('Intent model prediction failed: %s' % e)
        # fallback simple rules
        t = text.lower()
        if any(w in t for w in ['price', 'cost', 'charge']):
            return 'pricing'
        if any(w in t for w in ['hello', 'hi', 'hey']):
            return 'greeting'
        if any(w in t for w in ['bye', 'goodbye', 'see you']):
            return 'goodbye'
        return 'unknown'

In [6]:
# NER
# --------------------------------------------------
class NER:
    def __init__(self):
        self.nlp = None
        if spacy is not None:
            try:
                # try to load small english model
                self.nlp = spacy.load('en_core_web_sm')
            except Exception:
                # model not installed - user should `python -m spacy download en_core_web_sm`
                logger.warning('spaCy model not available. NER will be limited.')
                self.nlp = None

    def extract(self, text: str) -> Dict[str, str]:
        if self.nlp:
            doc = self.nlp(text)
            return {ent.label_: ent.text for ent in doc.ents}
        # fallback regex-based heuristics (very simple)
        entities = {}
        # dates (very naive)
        import re
        m = re.search(r'\b(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\b', text)
        if m:
            entities['DATE'] = m.group(1)
        # email
        m = re.search(r'[\w\.-]+@[\w\.-]+', text)
        if m:
            entities['EMAIL'] = m.group(0)
        return entities


In [7]:
# Context Manager (conversation memory)
# --------------------------------------------------
class ContextManager:
    def __init__(self, use_redis: bool = False):
        self.use_redis = use_redis and (redis is not None)
        if self.use_redis:
            self.client = redis.Redis()
        else:
            self.store = {}  # {user_id: [{timestamp, text, response, intent}]}

    def append_message(self, user_id: str, message: str, role: str = 'user'):
        entry = {'ts': time.time(), 'role': role, 'text': message}
        if self.use_redis:
            self.client.rpush(f'ctx:{user_id}', json.dumps(entry))
        else:
            self.store.setdefault(user_id, []).append(entry)

    def get_context(self, user_id: str, limit: int = 10) -> List[Dict]:
        if self.use_redis:
            raw = self.client.lrange(f'ctx:{user_id}', -limit, -1)
            return [json.loads(x) for x in raw]
        return self.store.get(user_id, [])[-limit:]

    def clear_context(self, user_id: str):
        if self.use_redis:
            self.client.delete(f'ctx:{user_id}')
        else:
            self.store.pop(user_id, None)

# --------------------------------------------------
# Sentiment analysis
# --------------------------------------------------
class SentimentAnalyzer:
    def analyze(self, text: str) -> str:
        try:
            if TextBlob is not None:
                tb = TextBlob(text)
                polarity = tb.sentiment.polarity
                if polarity > 0.2:
                    return 'positive'
                if polarity < -0.2:
                    return 'negative'
                return 'neutral'
        except Exception:
            logger.exception('TextBlob sentiment failed')
        # naive fallback
        low = text.lower()
        if any(w in low for w in ['love', 'great', 'thanks']):
            return 'positive'
        if any(w in low for w in ['hate', 'bad', 'angry']):
            return 'negative'
        return 'neutral'


In [8]:
# Response generator (rule-based + GPT fallback)
# --------------------------------------------------
class ResponseGenerator:
    def __init__(self, openai_api_key: Optional[str] = None):
        self.openai_api_key = openai_api_key or os.environ.get('OPENAI_API_KEY')
        if openai and self.openai_api_key:
            openai.api_key = self.openai_api_key

    def generate(self, text: str, intent: str, context: List[Dict]) -> Dict:
        # First check rule-based responses
        r = self.rule_based(text, intent)
        if r:
            return {'text': r, 'source': 'rule-based'}
        # Next, if OpenAI available, call it
        if openai and self.openai_api_key:
            try:
                prompt = self._build_prompt(text, intent, context)
                resp = openai.ChatCompletion.create(
                    model='gpt-4o-mini' if hasattr(openai, 'ChatCompletion') else 'gpt-3.5-turbo',
                    messages=[{'role': 'system', 'content': 'You are a helpful assistant.'},
                              {'role': 'user', 'content': prompt}],
                    max_tokens=256,
                )
                # this may vary per OpenAI library version
                content = ''
                if isinstance(resp, dict) and 'choices' in resp:
                    content = resp['choices'][0]['message']['content']
                else:
                    content = getattr(resp, 'choices')[0].message.content
                return {'text': content.strip(), 'source': 'gpt'}
            except Exception:
                logger.exception('OpenAI call failed - falling back to canned response')
        # Default fallback
        return {'text': "Sorry, I didn't understand that. Can you rephrase?", 'source': 'fallback'}

    def rule_based(self, text: str, intent: str) -> Optional[str]:
        # Very small set for demo
        if intent == 'greeting':
            return 'Hello! How can I help you today?'
        if intent == 'goodbye':
            return 'Goodbye! Have a great day.'
        if intent == 'pricing':
            return 'Our pricing depends on usage tiers. Would you like to know monthly or annual plans?'
        return None

    def _build_prompt(self, text: str, intent: str, context: List[Dict]) -> str:
        # Build a short context-aware prompt
        history = '\n'.join([f"{h['role']}: {h['text']}" for h in context[-6:]])
        prompt = f"Context:\n{history}\n\nUser: {text}\nIntent: {intent}\n\nAssistant:"
        return prompt


In [9]:
# Reinforcement learning stub (records feedback)
# --------------------------------------------------
class ReinforcementLearner:
    def __init__(self):
        self.records = []  # In production use persistent DB

    def record_feedback(self, user_id: str, query: str, response: str, reward: float):
        rec = {'user_id': user_id, 'query': query, 'response': response, 'reward': reward, 'ts': time.time()}
        self.records.append(rec)
        logger.info('Feedback recorded: %s' % rec)

    def summarize(self):
        # Example: count positive rewards
        return {'total': len(self.records)}


In [10]:
# Putting it together: ChatService
# --------------------------------------------------
class ChatService:
    def __init__(self, use_redis: bool = False):
        self.intent = IntentClassifier()
        self.ner = NER()
        self.ctx = ContextManager(use_redis=use_redis)
        self.sent = SentimentAnalyzer()
        self.gen = ResponseGenerator()
        self.rl = ReinforcementLearner()
        # Train a tiny example model if sklearn present
        if SKLEARN_AVAILABLE:
            sample_texts = ['hello', 'hi there', 'what is the price', 'how much does it cost', 'bye']
            sample_labels = ['greeting', 'greeting', 'pricing', 'pricing', 'goodbye']
            self.intent.train(sample_texts, sample_labels)

    def handle(self, user_id: str, text: str, channel: str = 'web') -> ChatResponse:
        # Append user message to context
        self.ctx.append_message(user_id, text, role='user')
        ctx = self.ctx.get_context(user_id)
        # Intent
        intent = self.intent.predict(text)
        # Entities
        entities = self.ner.extract(text)
        # Sentiment
        sentiment = self.sent.analyze(text)
        # Generate response
        resp = self.gen.generate(text, intent, ctx)
        # Append assistant response to context
        self.ctx.append_message(user_id, resp['text'], role='assistant')
        # Log analytics
        logger.info(json.dumps({
            'user_id': user_id, 'text': text, 'intent': intent, 'entities': entities,
            'sentiment': sentiment, 'response_source': resp.get('source')
        }))
        return ChatResponse(user_id=user_id, text=resp['text'], intent=intent, entities=entities,
                            sentiment=sentiment, source=resp.get('source'))


In [11]:
# FastAPI app: endpoints for REST + WebSocket
# --------------------------------------------------
app = FastAPI(title='Dynamic AI Chatbot - Demo')
service = ChatService(use_redis=False)

@app.post('/chat', response_model=ChatResponse)
async def chat_endpoint(req: ChatRequest):
    res = service.handle(req.user_id, req.text, req.channel)
    return res

# Simple webhook-style endpoints for Slack / WhatsApp (demo placeholders)
@app.post('/webhook/slack')
async def slack_webhook(request: Request):
    payload = await request.json()
    # Slack event parsing logic goes here (verification, event type)
    user_id = payload.get('user', 'slack_user')
    text = payload.get('text', '')
    resp = service.handle(user_id, text, channel='slack')
    # You'd respond with a JSON response according to Slack API or call chat.postMessage
    return {'ok': True, 'reply': resp.text}

@app.post('/webhook/whatsapp')
async def whatsapp_webhook(request: Request):
    payload = await request.json()
    user_id = payload.get('from', 'wa_user')
    text = payload.get('message', '')
    resp = service.handle(user_id, text, channel='whatsapp')
    return {'status': 'ok', 'reply': resp.text}

# WebSocket echo-style chat (for real-time UI)
@app.websocket('/ws/{user_id}')
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            res = service.handle(user_id, data, channel='websocket')
            await websocket.send_text(json.dumps(res.dict()))
    except Exception:
        await websocket.close()

# Feedback endpoint for RL
class FeedbackRequest(BaseModel):
    user_id: str
    query: str
    response: str
    reward: float

@app.post('/feedback')
async def feedback(req: FeedbackRequest):
    service.rl.record_feedback(req.user_id, req.query, req.response, req.reward)
    return {'ok': True}

# Simple analytics endpoint summarizing logs (demo)
@app.get('/analytics/summary')
async def analytics_summary():
    # In production parse a DB or analytics store
    total_msgs = 0
    intents = {}
    try:
        with open('chatbot.log', 'r') as f:
            for line in f:
                total_msgs += 1
                try:
                    # attempt to parse JSON payload inside the log line
                    if '{' in line:
                        j = json.loads(line.split(' - ', 2)[-1])
                        intents[j.get('intent', 'unknown')] = intents.get(j.get('intent', 'unknown'), 0) + 1
                except Exception:
                    pass
    except FileNotFoundError:
        pass
    return {'total_log_lines': total_msgs, 'intent_counts': intents, 'rl_records': service.rl.summarize()}


In [None]:
# Main (for local testing)
# --------------------------------------------------
import uvicorn

if __name__ == "__main__":
    uvicorn.run("dynamic_ai_chatbot:app", host="0.0.0.0", port=8000, reload=True)


INFO:     Will watch for changes in these directories: ['/content']
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [5147] using StatReload
