In [None]:
# use groq client and connect to openai opensource model    

import os
import groq

groq_client = groq.Groq(api_key=os.getenv("GROQ_API_KEY"))
OPENSOURCE_OSS_MODEL = "openai/gpt-oss-120b"

# create system prompt
SYSTEM_PROMPT = "You are an Activity Recommendation Assistant. \
\
Your job is to help users discover suitable activities, classes, or event types \
based on their age, interests, physical comfort level, time availability, and goals. \
You must follow these rules: \
1. Use ONLY the information provided in the context and user messages. \
2. Do NOT invent activities, classes, or benefits that are not explicitly stated. \
3. If information is missing, ask a clarifying question instead of guessing. \
4. Be respectful of physical limitations and accessibility needs. \
5. Do NOT provide medical advice. Phrase benefits in general wellness terms. \
6. When recommending activities, include: \
   - Activity name \
   - Typical intensity level \
   - Typical session length \
   - Recommended weekly frequency \
   - Why it fits the user’s preferences \
7. If multiple activities fit, rank them from best to least suitable. \
8. If nothing fits well, explain why and suggest alternatives. \
\
Your tone should be friendly, practical, and encouraging. \
"

# create user prompt for the system prompt above
USER_PROMPT = "You are continuing a conversation with a user about activities and events. \
\
Conversation history (user inputs only): \
{USER_HISTORY} \
\
Current user request: \
{CURRENT_MESSAGE} \
\
Relevant activity knowledge: \
{RAG_CONTEXT} \
\
Instructions: \
- Recommend the top 2 suitable activity types from the knowledge provided. \
- Explain clearly why each recommendation fits the user. \
- Get the exact event name from the context provided and which location the event is located at. \
- Include intensity, session length, and typical weekly frequency. \
- Cite the activity source using [filename | activity name]. \
- If the knowledge is insufficient, say what is missing. \
"

# create user prompt

In [21]:
# import os
import glob
import numpy as np
from dotenv import load_dotenv
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from sklearn.manifold import TSNE
from chromadb import PersistentClient
import plotly.graph_objects as go

In [None]:
import os
import re
import json
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple, Iterable

from langchain_core.documents import Document
# -----------------------------
# 1) Parsing & chunking
# -----------------------------

EVENT_HEADING_RE = re.compile(r"^###\s+(.*)\s*$", re.MULTILINE)
FIELD_RE = {
    "event_type": re.compile(r"^\s*-\s*Event Type:\s*(.+?)\s*$", re.MULTILINE | re.IGNORECASE),
    "category": re.compile(r"^\s*-\s*Category:\s*(.+?)\s*$", re.MULTILINE | re.IGNORECASE),
    "age_tags": re.compile(r"^\s*-\s*Age Tags:\s*(.+?)\s*$", re.MULTILINE | re.IGNORECASE),
    "instructor": re.compile(r"^\s*-\s*(Instructor|Facilitator):\s*(.+?)\s*$", re.MULTILINE | re.IGNORECASE),
    "date_range": re.compile(r"^\s*-\s*Date Range:\s*(.+?)\s*$", re.MULTILINE | re.IGNORECASE),
    "time_slots": re.compile(r"^\s*-\s*Time Slots:\s*(.+?)\s*$", re.MULTILINE | re.IGNORECASE),
    "duration": re.compile(r"^\s*-\s*Duration:\s*(.+?)\s*$", re.MULTILINE | re.IGNORECASE),
    "spots": re.compile(r"^\s*-\s*Spots:\s*(.+?)\s*$", re.MULTILINE | re.IGNORECASE),
}

CENTER_RE = re.compile(r"^##\s+(.+?)\s*$", re.MULTILINE)
LOCATION_RE = re.compile(r"^\*\*Location:\*\*\s*(.+?)\s*$", re.MULTILINE)
TYPE_RE = re.compile(r"^\*\*Type:\*\*\s*(.+?)\s*$", re.MULTILINE)

PAGE_RE = re.compile(r"^#\s+PAGE\s+\d+\s+—\s+(.+?)\s*$", re.MULTILINE)


def _safe_find(regex: re.Pattern, text: str) -> Optional[str]:
    m = regex.search(text)
    return m.group(1).strip() if m else None


def _safe_find2(regex: re.Pattern, text: str) -> Optional[str]:
    m = regex.search(text)
    return m.group(2).strip() if m else None


def parse_center_metadata(md_text: str, source: str) -> Dict[str, Optional[str]]:
    center_name = _safe_find(CENTER_RE, md_text)
    location = _safe_find(LOCATION_RE, md_text)
    center_type = _safe_find(TYPE_RE, md_text)

    city, state = None, None
    if location:
        # "Salem, Massachusetts" or "Plymouth, Massachusetts"
        parts = [p.strip() for p in location.split(",")]
        if len(parts) >= 2:
            city, state = parts[0], parts[1]

    return {
        "source": source,
        "center_name": center_name,
        "center_type": center_type,
        "city": city,
        "state": state,
    }


def split_event_blocks(md_text: str) -> List[Tuple[str, str]]:
    """
    Returns list of (event_title, event_block_text).
    Event blocks start with '### ' and continue until next '### ' or end.
    """
    matches = list(EVENT_HEADING_RE.finditer(md_text))
    blocks: List[Tuple[str, str]] = []
    for i, m in enumerate(matches):
        title = m.group(1).strip()
        start = m.start()
        end = matches[i + 1].start() if i + 1 < len(matches) else len(md_text)
        block = md_text[start:end].strip()
        blocks.append((title, block))
    return blocks


def parse_event_metadata(event_title: str, block: str) -> Dict[str, Optional[str]]:
    # Event Type may appear as "Event Type:" or "Category:" depending on file style
    event_type = _safe_find(FIELD_RE["event_type"], block) or _safe_find(FIELD_RE["category"], block)

    age_tags = _safe_find(FIELD_RE["age_tags"], block)
    instructor = _safe_find2(FIELD_RE["instructor"], block)
    date_range = _safe_find(FIELD_RE["date_range"], block)
    time_slots = _safe_find(FIELD_RE["time_slots"], block)
    duration = _safe_find(FIELD_RE["duration"], block)
    spots = _safe_find(FIELD_RE["spots"], block)

    return {
        "event_title": event_title,
        "event_type": event_type,
        "age_tags": age_tags,
        "instructor": instructor,
        "date_range": date_range,
        "time_slots": time_slots,
        "duration": duration,
        "spots": spots,
    }


def build_event_documents(md_text: str, source: str) -> List[Document]:
    center_md = parse_center_metadata(md_text, source)
    docs: List[Document] = []

    for title, block in split_event_blocks(md_text):
        meta = parse_event_metadata(title, block)
        combined_meta = {**center_md, **meta, "doc_type": "event"}

        # Use compact but rich page_content for embeddings
        content = (
            f"Center: {center_md.get('center_name')} ({center_md.get('center_type')})\n"
            f"Location: {center_md.get('city')}, {center_md.get('state')}\n"
            f"Event: {title}\n"
            f"Event Type: {meta.get('event_type')}\n"
            f"Age Tags: {meta.get('age_tags')}\n"
            f"Instructor: {meta.get('instructor')}\n"
            f"Date Range: {meta.get('date_range')}\n"
            f"Time Slots: {meta.get('time_slots')}\n"
            f"Duration: {meta.get('duration')}\n"
            f"Spots: {meta.get('spots')}\n\n"
            f"Raw Block:\n{block}\n"
        ).strip()

        docs.append(Document(page_content=content, metadata=combined_meta))

    return docs


def build_activitytype_documents(md_text: str, source: str) -> List[Document]:
    """
    Simple section chunking:
    - Split by '## ' headings (category sections) if present
    - Else split by '### ' headings
    Keeps chunks reasonably sized; you can add recursive splitting if needed.
    """
    # Prefer ## sections
    if "## " in md_text:
        splitter = re.compile(r"(?m)^##\s+")
        parts = splitter.split(md_text)
        # parts[0] is preamble; subsequent parts start after heading marker
        docs: List[Document] = []
        preamble = parts[0].strip()
        if preamble:
            docs.append(Document(page_content=preamble, metadata={"source": source, "doc_type": "activity_type"}))

        # Re-add the heading marker for clarity in content
        for sec in parts[1:]:
            sec = sec.strip()
            if not sec:
                continue
            # Extract heading line
            lines = sec.splitlines()
            heading = lines[0].strip()
            body = "\n".join(lines[1:]).strip()
            chunk = f"## {heading}\n{body}".strip()
            docs.append(
                Document(
                    page_content=chunk,
                    metadata={"source": source, "doc_type": "activity_type", "activity_heading": heading},
                )
            )
        return docs

    # Fallback: split by ### blocks
    docs = []
    for title, block in split_event_blocks(md_text):  # reuse ### splitter
        docs.append(
            Document(
                page_content=f"### {title}\n{block}",
                metadata={"source": source, "doc_type": "activity_type", "activity_heading": title},
            )
        )
    return docs


In [17]:
# Load in everything in the knowledgebase using LangChain's loaders

# Get the project root directory
# In notebooks, we need to find the project root relative to current working directory
current_dir = os.getcwd()
# If we're in helper/, go up one level; otherwise assume we're at project root
if os.path.basename(current_dir) == "helper":
    project_root = os.path.dirname(current_dir)
else:
    project_root = current_dir

documents_path = os.path.join(project_root, "documents")

# point this to the documents folder
folders = glob.glob(os.path.join(documents_path, "*"))

documents = []
print(f"Looking in: {documents_path}")
print(f"Found folders: {folders}")
for folder in folders:
    if os.path.isdir(folder):
        doc_type = os.path.basename(folder)
        loader = DirectoryLoader(folder, glob="**/*.md", loader_cls=TextLoader, loader_kwargs={'encoding': 'utf-8'})
        folder_docs = loader.load()
        print(f"Loaded {len(folder_docs)} documents from {doc_type}")
        for doc in folder_docs:
            doc.metadata["doc_type"] = doc_type
            documents.append(doc)

print(f"Loaded {len(documents)} documents total")

Looking in: /Users/adipole/github/ai_portfolio/event_recommendation/documents
Found folders: ['/Users/adipole/github/ai_portfolio/event_recommendation/documents/activityType', '/Users/adipole/github/ai_portfolio/event_recommendation/documents/Events', '/Users/adipole/github/ai_portfolio/event_recommendation/documents/Reviews']
Loaded 6 documents from activityType
Loaded 0 documents from Events
Loaded 0 documents from Reviews
Loaded 6 documents total


In [12]:
# create pydantic model class for the response from the model with the format:
# activty_category: string
# activity_name: string
# activity_description: string

from pydantic import BaseModel

class Activity(BaseModel):
    activity_category: str
    activity_name: str
    activity_description: str   



Split into 12 chunks
page_content='# Dancing Activities

Dance activities combine rhythmic movement, coordination, balance, and cardiovascular exercise. They also offer strong cognitive and social benefits through pattern learning, memory, and group interaction.' metadata={'source': '/Users/adipole/github/ai_portfolio/event_recommendation/documents/activityType/dancing.md', 'doc_type': 'activityType'}
Vectorstore created with 12 documents


In [26]:
from dataclasses import dataclass
from typing import List, Optional
import re
import numpy as np
from sentence_transformers import SentenceTransformer


@dataclass
class Chunk:
    text: str
    start_idx: int
    end_idx: int
    avg_similarity: Optional[float] = None


class SemanticChunker:
    """
    Semantic chunking using sentence-transformers cosine similarity between consecutive sentences.

    Heuristics:
      - Start a new chunk when similarity < threshold
      - Enforce min/max chunk sizes
    """

    def __init__(
        self,
        model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
        similarity_threshold: float = 0.72,
        min_sentences: int = 3,
        max_sentences: int = 10,
    ):
        self.model = SentenceTransformer(model_name)
        self.similarity_threshold = similarity_threshold
        self.min_sentences = min_sentences
        self.max_sentences = max_sentences

    @staticmethod
    def _split_sentences(text: str) -> List[str]:
        # Simple sentence splitter; good enough for markdown prose.
        text = re.sub(r"\s+", " ", text).strip()
        if not text:
            return []
        # split on sentence end punctuation + space
        sents = re.split(r"(?<=[.!?])\s+", text)
        return [s.strip() for s in sents if s.strip()]

    @staticmethod
    def _cos_sim(a: np.ndarray, b: np.ndarray) -> float:
        # a,b assumed normalized
        return float(np.dot(a, b))

    def chunk(self, text: str) -> List[Chunk]:
        sentences = self._split_sentences(text)
        if not sentences:
            return []

        # Embed all sentences, normalize for cosine similarity via dot product
        emb = self.model.encode(sentences, normalize_embeddings=True)
        emb = np.asarray(emb, dtype=np.float32)

        chunks: List[Chunk] = []
        cur_start = 0
        cur_end = 0
        sims: List[float] = []

        def flush(end_idx: int):
            nonlocal cur_start, cur_end, sims
            chunk_text = " ".join(sentences[cur_start:end_idx])
            avg = float(np.mean(sims)) if sims else None
            chunks.append(Chunk(text=chunk_text, start_idx=cur_start, end_idx=end_idx, avg_similarity=avg))
            cur_start = end_idx
            cur_end = end_idx
            sims = []

        cur_end = 1
        for i in range(1, len(sentences)):
            # similarity between consecutive sentences
            sim = self._cos_sim(emb[i - 1], emb[i])
            sims.append(sim)

            cur_len = (cur_end - cur_start)

            should_split = (
                (sim < self.similarity_threshold and cur_len >= self.min_sentences)
                or (cur_len >= self.max_sentences)
            )

            if should_split:
                flush(cur_end)
            cur_end += 1

        # flush last chunk
        if cur_start < len(sentences):
            flush(len(sentences))

        return chunks

In [27]:
# Divide into chunks using SemanticChunker
# Note: SemanticChunker requires embeddings - using HuggingFace embeddings
# load HG_KEY from .env
load_dotenv()
HG_KEY = os.getenv("HF_TOKEN")

db_name = "vector_db"
collection_name = "docs"

# Initialize embeddings
embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small"
)

from langchain_core.documents import Document

chunker = SemanticChunker(similarity_threshold=0.70, min_sentences=2, max_sentences=4)

# Chunk each document and convert to Document objects
chunked_documents = []
for doc in documents:
    # Chunk the document's text content
    chunks = chunker.chunk(doc.page_content)
    # Convert each chunk to a Document object with original metadata
    for chunk in chunks:
        chunked_documents.append(
            Document(
                page_content=chunk.text,
                metadata=doc.metadata.copy()
            )
        )

print(f"Split into {len(chunked_documents)} chunks")

vectorstore = Chroma.from_documents(
    documents=chunked_documents,
    embedding=embeddings,
    persist_directory=db_name
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
print("Vectorstore created with", len(chunked_documents), "documents")


Split into 24 chunks
Vectorstore created with 24 documents


In [None]:
def retrieve_context(query_string): 
    from langchain_core.documents import Document
    
    RETRIEVAL_K = 2
    query_vec = OpenAIEmbeddings(model="text-embedding-3-small").embed_query(query_string)
    results = retriever.query(query_embeddings=[query_vec], n_results=RETRIEVAL_K)
    print("Length of results: ", len(results['documents']))
    # Convert ChromaDB results to Document objects
    retrieved_docs = []
    if results['documents'] and len(results['documents']) > 0:
        for i, doc_text in enumerate(results['documents'][0]):
            metadata = results['metadatas'][0][i] if results['metadatas'] and len(results['metadatas']) > 0 else {}
            retrieved_docs.append(Document(page_content=doc_text, metadata=metadata))
    
    print("Retrieved documents:")
    print(retrieved_docs)
    return retrieved_docs


In [7]:
retrieve_context("I am a 34 year old woman who loves a water activity for knee pain, low cost")

Length of results:  1
Retrieved documents:
[Document(metadata={'source': '/Users/adipole/github/ai_portfolio/event_recommendation/documents/activityType/aquatics.md', 'doc_type': 'activityType'}, page_content='## AQUA CARDIO\nA water-based cardiovascular workout using continuous movement to elevate heart rate with minimal joint impact. **Intensity:** Moderate | **Session:** 45–60 min | **Frequency:** 2–4/week  \n**Benefits:** Cardio endurance, circulation, calorie burn  \n**Ailments:** Joint pain, low stamina, back discomfort  \n\n## AQUA CARDIO DANCE\nDance-style choreography combined with water resistance for rhythm-driven cardio. **Intensity:** Low–Moderate | **Session:** 45–60 min | **Frequency:** 1–3/week  \n**Benefits:** Coordination, balance, enjoyment  \n**Ailments:** Balance issues, mood decline  \n\n## AQUA FIT\nFull-body aquatic workout combining aerobics, toning, and resistance. **Intensity:** Moderate | **Session:** 45–60 min | **Frequency:** 2–4/week  \n**Benefits:** Musc

[Document(metadata={'source': '/Users/adipole/github/ai_portfolio/event_recommendation/documents/activityType/aquatics.md', 'doc_type': 'activityType'}, page_content='## AQUA CARDIO\nA water-based cardiovascular workout using continuous movement to elevate heart rate with minimal joint impact. **Intensity:** Moderate | **Session:** 45–60 min | **Frequency:** 2–4/week  \n**Benefits:** Cardio endurance, circulation, calorie burn  \n**Ailments:** Joint pain, low stamina, back discomfort  \n\n## AQUA CARDIO DANCE\nDance-style choreography combined with water resistance for rhythm-driven cardio. **Intensity:** Low–Moderate | **Session:** 45–60 min | **Frequency:** 1–3/week  \n**Benefits:** Coordination, balance, enjoyment  \n**Ailments:** Balance issues, mood decline  \n\n## AQUA FIT\nFull-body aquatic workout combining aerobics, toning, and resistance. **Intensity:** Moderate | **Session:** 45–60 min | **Frequency:** 2–4/week  \n**Benefits:** Muscle tone, endurance, joint mobility  \n**Ail

In [30]:
# import gradio chat interface
import gradio as gr

# groq client to openai opensource openai oss model
# use the system prompt and user prompt to generate the response
def chat(message, history):

    user_prompt = USER_PROMPT


    
    # add previous user messages only
    # Handle Gradio 6.x history format - each item is a tuple (user_msg, assistant_msg)
       # add previous user messages only
    for h in history:
        if h.get("role") == "user":
            user_prompt += f"\n\nUser: {h['content']}"

    user_prompt += f"\n\nUser: {message}"

    # RAG retrieval uses the current message as query (not the full prompt)
    retrieved_docs = retriever.invoke(message)

    rag_context = "\n\n---\n\n".join(
        [f"[{d.metadata.get('source','')}] {d.page_content}" for d in retrieved_docs]
    )

    final_user_prompt = f"""
{user_prompt}

Relevant knowledge base context:
{rag_context}

Answer using ONLY the context above when answering factual questions about activities.
If the context is insufficient, say what is missing.
"""
    

    response = groq_client.chat.completions.create(
        model=OPENSOURCE_OSS_MODEL,
        messages=[{"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": final_user_prompt}],
    )
    return response.choices[0].message.content

# gradio chat interface
demo = gr.ChatInterface(chat)
demo.launch()

* Running on local URL:  http://127.0.0.1:7866
* To create a public link, set `share=True` in `launch()`.


