In [None]:
"""
ES → Sharetribe uploader (Integration API)

This notebook lets you select products from your serverless Elasticsearch cluster and upload them as listings to your Sharetribe marketplace using the Integration API.

- Loads credentials from your `.env` (Sharetribe) and from the existing scraper config (Elasticsearch/OpenAI)
- Inspects current Sharetribe listing structure for reference
- Lets you search products in ES, preview and confirm before transfer
- Minimizes OpenAI calls (only fills missing/essential texts)
- Uploads images to Sharetribe, creates draft listings and publishes them
- Includes an end-to-end test with one Roborock product
"""


In [None]:
# %pip install python-dotenv elasticsearch openai requests tqdm tenacity
import os
import json
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple

from dotenv import load_dotenv
from elasticsearch import Elasticsearch
from openai import OpenAI
import requests
from urllib.parse import urlparse
from tqdm import tqdm

# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger("es2sharetribe")

# Load env
load_dotenv(dotenv_path=os.path.join(os.getcwd(), ".env"))

# Sharetribe credentials from .env (generated by scripts/config.js)
SHARETRIBE_CLIENT_ID = os.getenv("REACT_APP_SHARETRIBE_SDK_CLIENT_ID")
SHARETRIBE_CLIENT_SECRET = os.getenv("SHARETRIBE_SDK_CLIENT_SECRET")
SHARETRIBE_AUTH_BASE_URL = os.getenv("SHARETRIBE_AUTH_BASE_URL", "https://flex-api.sharetribe.com")
SHARETRIBE_INTEG_BASE_URL = os.getenv("SHARETRIBE_INTEG_BASE_URL", "https://flex-integ-api.sharetribe.com")

if not (SHARETRIBE_CLIENT_ID and SHARETRIBE_CLIENT_SECRET):
    logger.warning("Sharetribe credentials not found in .env. Please run yarn run config or set env vars.")

# Reuse OpenAI/ES config from scraper v1.ipynb conventions
# (OpenAI uses env OPENAI_API_KEY; ES host/api_key embedded in that notebook code)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
openai_client = OpenAI() if OPENAI_API_KEY else None

# You may adapt these from your scraper notebook if needed
ES_HOST = os.getenv("ES_HOST", "https://my-elasticsearch-project-caece8.es.us-east-1.aws.elastic.cloud:443")
ES_API_KEY = os.getenv("ES_API_KEY", "X1JBeVpKY0I2VFdpQ3RTRlpTZjk6TDd5aDNkYlJELTdsRDJjTWNFVldJUQ==")

es = Elasticsearch(ES_HOST, api_key=ES_API_KEY)


In [7]:
import base64
from typing import TypedDict

class STAuth(TypedDict):
    access_token: str
    token_type: str

class ListingInput(TypedDict, total=False):
    title: str
    description: str
    geolocation: dict
    price: dict
    publicData: dict
    privateData: dict
    images: List[str]

SESSION: Dict[str, Any] = {}


def get_integration_token() -> STAuth:
    """Client credentials OAuth for Sharetribe Flex Integration API."""
    url = f"{SHARETRIBE_AUTH_BASE_URL}/v1/auth/token"
    payload = {
        "grant_type": "client_credentials",
        "client_id": SHARETRIBE_CLIENT_ID,
        "client_secret": SHARETRIBE_CLIENT_SECRET,
        "scope": "integration",  # Access Integration API endpoints
    }
    resp = requests.post(url, data=payload, timeout=20)
    try:
        resp.raise_for_status()
    except Exception:
        logger.error("Auth failed %s: %s", resp.status_code, resp.text)
        raise
    data = resp.json()
    auth = STAuth(access_token=data.get("access_token"), token_type=data.get("token_type", "Bearer"))
    SESSION["st_auth"] = auth
    return auth


def st_headers() -> Dict[str, str]:
    auth = SESSION.get("st_auth") or get_integration_token()
    return {"Authorization": f"Bearer {auth['access_token']}", "Content-Type": "application/json"}


def get_existing_listings(limit: int = 10) -> List[Dict[str, Any]]:
    """Inspect current listings to learn structure (for mapping)."""
    url = f"{SHARETRIBE_INTEG_BASE_URL}/v1/integration_api/listings"
    params = {"per_page": limit}
    r = requests.get(url, headers=st_headers(), params=params, timeout=20)
    r.raise_for_status()
    return r.json().get("data", [])


def create_draft_listing(payload: ListingInput) -> Dict[str, Any]:
    url = f"{SHARETRIBE_INTEG_BASE_URL}/v1/integration_api/listings"
    r = requests.post(url, headers=st_headers(), json=payload, timeout=30)
    r.raise_for_status()
    return r.json()


def upload_image_to_listing(listing_id: str, image_url: str) -> Dict[str, Any]:
    """Upload remote image by URL to a listing (Integration API supports multipart uploads)."""
    # Download first to bytes
    img_resp = requests.get(image_url, timeout=30)
    img_resp.raise_for_status()

    files = {"image": ("image.jpg", img_resp.content, "image/jpeg")}
    url = f"{SHARETRIBE_INTEG_BASE_URL}/v1/integration_api/listings/{listing_id}/images"
    r = requests.post(url, headers={"Authorization": st_headers()["Authorization"]}, files=files, timeout=60)
    r.raise_for_status()
    return r.json()


def publish_listing(listing_id: str) -> Dict[str, Any]:
    url = f"{SHARETRIBE_INTEG_BASE_URL}/v1/integration_api/listings/{listing_id}/publish"
    r = requests.post(url, headers=st_headers(), timeout=20)
    r.raise_for_status()
    return r.json()


In [None]:
def es_search(query: Dict[str, Any], index: str = "inventory_vector") -> List[Dict[str, Any]]:
    res = es.search(index=index, body=query)
    return res.get("hits", {}).get("hits", [])


def sample_es_schema(query_brand: str = "roborock") -> List[Dict[str, Any]]:
    q = {
        "query": {
            "match": {"brand": query_brand}
        },
        "size": 5
    }
    hits = es_search(q)
    logger.info("Sample ES docs for brand=%s: %s", query_brand, len(hits))
    return hits


# Show a quick peek at existing Sharetribe data structure
existing = get_existing_listings(limit=3)
logger.info("Fetched %d existing listings from Sharetribe for reference", len(existing))
existing[:1]


def es_search(query: Dict[str, Any], index: str = "inventory_vector") -> List[Dict[str, Any]]:
    res = es.search(index=index, body=query)
    return res.get("hits", {}).get("hits", [])


def sample_es_schema(query_brand: str = "roborock") -> List[Dict[str, Any]]:
    q = {
        "query": {
            "match": {"brand": query_brand}
        },
        "size": 5
    }
    hits = es_search(q)
    logger.info("Sample ES docs for brand=%s: %s", query_brand, len(hits))
    return hits


# Show a quick peek at existing Sharetribe data structure
existing = get_existing_listings(limit=3)
logger.info("Fetched %d existing listings from Sharetribe for reference", len(existing))
existing[:1]


In [None]:
def build_sharetribe_payload_from_es(es_doc: Dict[str, Any]) -> ListingInput:
    src = es_doc.get('_source', {})
    title = src.get('product_name') or src.get('name') or "Untitled"
    description = src.get('product_description') or src.get('extended_description')

    # Minimize OpenAI calls: only if description missing/too short
    if (not description or len(description) < 50) and openai_client and src.get('raw_html'):
        try:
            prompt = f"""
You will write a concise, technical product description (max 500 chars) for a marketplace listing based on this content:
{src.get('raw_html')[:7000]}
Ensure no brand repetition if the title already contains it; avoid marketing fluff.
"""
            resp = openai_client.chat.completions.create(
                model="gpt-4.1-mini",
                messages=[{"role":"system","content":"You write compact, technical marketplace descriptions."},{"role":"user","content":prompt}],
                temperature=0.2
            )
            description = resp.choices[0].message.content.strip()
        except Exception as e:
            logger.warning("OpenAI description fallback failed: %s", e)
            description = description or ""

    price = src.get('price') or {"amount": 0, "currency": "USD"}
    # Sharetribe expects price in subunits (e.g. cents)
    if isinstance(price, dict) and 'amount' in price:
        amount = price['amount']
        if amount and amount < 1000:  # if likely dollars, convert to cents
            amount = int(round(float(amount) * 100))
        price = {"amount": int(amount or 0), "currency": price.get('currency', 'USD')}
    else:
        price = {"amount": 0, "currency": "USD"}

    public_data = {
        "brand": src.get("brand"),
        "product_types": src.get("product_types"),
        "connectivity": src.get("connectivity"),
        "amazon_url": src.get("amazon_url"),
        "vendor_product_id": src.get("vendor_product_id"),
        "tech_specs": src.get("tech_specs"),
    }

    payload: ListingInput = {
        "title": title[:70],
        "description": description[:5000] if description else "",
        "price": price,
        "publicData": {k: v for k, v in public_data.items() if v is not None},
        # Optional: place in a generic location or omit
    }
    return payload


def extract_primary_image(es_doc: Dict[str, Any]) -> Optional[str]:
    src = es_doc.get('_source', {})
    if src.get('image_url'):
        return src['image_url']
    rel = src.get('related_images') or []
    for img in rel:
        if isinstance(img, dict) and img.get('source_link'):
            return img['source_link']
    return None


In [None]:
from IPython.display import display
import pandas as pd


def interactive_select_from_es(default_brand: str = "roborock") -> List[Dict[str, Any]]:
    print("Enter an Elasticsearch simple query (brand, keywords, etc.). Leave blank to use default.")
    try:
        user_brand = input(f"Brand (default {default_brand}): ").strip() or default_brand
    except Exception:
        user_brand = default_brand

    query = {
        "query": {
            "match": {"brand": user_brand}
        },
        "size": 50
    }
    hits = es_search(query)
    rows = []
    for h in hits:
        s = h.get('_source', {})
        rows.append({
            "_id": h.get('_id'),
            "brand": s.get('brand'),
            "product_name": s.get('product_name'),
            "product_types": s.get('product_types'),
            "has_image": 1 if (s.get('image_url') or s.get('related_images')) else 0,
            "amazon_url": s.get('amazon_url'),
        })
    df = pd.DataFrame(rows)
    if not df.empty:
        display(df)
    else:
        print("No results.")

    print("Enter comma-separated row numbers to transfer (e.g. 0,2,3). Leave blank to cancel.")
    try:
        sel = input("Selection: ").strip()
    except Exception:
        sel = ""
    if not sel:
        return []

    idxs = []
    for part in sel.split(','):
        part = part.strip()
        if part.isdigit():
            idxs.append(int(part))
    selected = [hits[i] for i in idxs if 0 <= i < len(hits)]

    print(f"You selected {len(selected)} items. Confirm transfer? [y/N]")
    try:
        confirm = input("").strip().lower()
    except Exception:
        confirm = "n"
    if confirm != 'y':
        return []
    return selected


In [None]:
def transfer_one_es_doc(es_doc: Dict[str, Any]) -> Optional[str]:
    src = es_doc.get('_source', {})
    payload = build_sharetribe_payload_from_es(es_doc)
    logger.info("Creating draft listing: %s", payload.get('title'))
    draft = create_draft_listing(payload)
    listing_id = draft.get('data', {}).get('id')

    # Upload image(s)
    img_url = extract_primary_image(es_doc)
    if img_url and listing_id:
        try:
            logger.info("Uploading primary image for %s", listing_id)
            upload_image_to_listing(listing_id, img_url)
        except Exception as e:
            logger.warning("Image upload failed for %s: %s", listing_id, e)

    # Publish
    if listing_id:
        try:
            logger.info("Publishing %s", listing_id)
            publish_listing(listing_id)
            return listing_id
        except Exception as e:
            logger.error("Publish failed for %s: %s", listing_id, e)
            return listing_id
    return None


def transfer_interactive():
    selected = interactive_select_from_es(default_brand="roborock")
    if not selected:
        print("No transfer executed.")
        return []
    created_ids = []
    for doc in tqdm(selected, desc="Transferring"):
        try:
            lid = transfer_one_es_doc(doc)
            if lid:
                created_ids.append(lid)
        except Exception as e:
            logger.exception("Transfer failed: %s", e)
    print(f"Done. Created/updated {len(created_ids)} listings.")
    return created_ids


In [None]:
# Quick Roborock end-to-end smoke test (single item)
q = {
    "query": {"match": {"brand": "Roborock"}},
    "size": 1
}
robo = es_search(q)
if robo:
    print("Testing with:", robo[0].get('_source', {}).get('product_name'))
    _ = transfer_one_es_doc(robo[0])
else:
    print("No Roborock product found in ES.")


In [None]:
# Run the interactive transfer
# created_ids = transfer_interactive()
# created_ids
