In [1]:
from aips import get_engine, set_engine
from aips.spark.dataframe import from_sql
from aips.spark import create_view_from_collection
import tqdm

engine = get_engine("opensearch")

### Current state:
- Plugin successfully install and loads, all indexing are mostly functional, but data isn't landing in the appropriate buckets
- Debug why query ingestion isn't sticking (maybe initialization of UBI collections?)
- UBI Dashboard needs to be pre-installed into the docker image (and configured?)

In [2]:
#%run chapters/ch04/1.setting-up-the-retrotech-dataset.ipynb

### Step 1 - Install and configure the OpenSearch UBI plugin

bin/opensearch-plugin install https://github.com/o19s/opensearch-ubi/releases/download/release-v0.0.12.1-os2.14.0/opensearch-ubi-plugin-v0.0.12.1-os2.14.0.zip --batch

### Step 2 - Example ingestion of query data by adding the `ext` object to search requests

In [3]:
import requests

response = requests.get(f"http://opensearch-node1:9200/products/_search",
                        json={"ext": {"ubi": {"query_id": "1234"}}})
display(response.json())

{'took': 4,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 10000, 'relation': 'gte'},
  'max_score': 1.0,
  'hits': [{'_index': 'products',
    '_id': '28944744927',
    '_score': 1.0,
    '_source': {'upc': '28944744927',
     'name': 'Faust Symphony - CD',
     'manufacturer': 'UNKNOWN',
     'short_description': ' ',
     'long_description': ' '}},
   {'_index': 'products',
    '_id': '786936811605',
    '_score': 1.0,
    '_source': {'upc': '786936811605',
     'name': "Grey's Anatomy: The Complete Seventh Season [6 Discs] - DVD",
     'manufacturer': ' ',
     'short_description': ' ',
     'long_description': ' '}},
   {'_index': 'products',
    '_id': '24543753100',
    '_score': 1.0,
    '_source': {'upc': '24543753100',
     'name': 'Burn Notice: The Fall of Sam Axe - Widescreen Subtitle AC3 - DVD',
     'manufacturer': ' ',
     'short_description': ' ',
     'long_description': ' '}},
   {'_index': 'pro

### Step 3 - Bulk ingest events

In [4]:
def get_events_dataframe():
    signals_collection = engine.get_collection("signals")
    create_view_from_collection(signals_collection, "signals")
    query = """SELECT type AS action_name, query_id, user AS client_id,
                      signal_time AS timestamp, type AS message_type,
                      target AS message, target AS target
               FROM signals WHERE type != 'query'"""
    events = from_sql(query)
    return events

In [5]:
events_collection = engine.create_collection("ubi_events")
ubi_events_dataframe = get_events_dataframe()
events_collection.write(ubi_events_dataframe)

Wiping "ubi_events" collection
Creating "ubi_events" collection
Successfully written 1447146 documents


### Optional Step 3 - Bulk ingest queries

In [6]:
def get_queries_dataframe():
    signals_collection = engine.get_collection("signals")
    create_view_from_collection(signals_collection, "signals")
    queries = from_sql("SELECT * FROM signals WHERE type = 'query'")
    queries_transformed = queries.rdd.map(lambda r: 
        (r["signal_time"], r["query_id"], r["user"], r["target"]))
    ubi_queries_dataframe = queries_transformed.toDF(
        ["timestamp", "query_id", "client_id", "user_query"])
    return ubi_queries_dataframe

### Step 4 - Indexing search signals upon executing a search (intended)

In [7]:
def execute_search(collection, signal, log=False):
    request = {"query": signal["user_query"],
               "query_fields": ["name", "manufacturer",
                                "long_description", "short_description"],
               "return_fields": ["*"],
               "limit": 10,
               "ubi": signal | {"store_name": "aips_store"}}
    try:
        return collection.search(**request)
    except:
        pass

In [8]:
products_collection = engine.get_collection("products")
ubi_queries_dataframe = get_queries_dataframe()
for q in tqdm.tqdm(ubi_queries_dataframe.collect(), total=ubi_queries_dataframe.count()):
    execute_search(products_collection, q.asDict())

100%|██████████| 725459/725459 [01:44<00:00, 6915.09it/s]


In [9]:
def batch_ingest_queries():
    queries_collection = engine.create_collection("ubi_queries")
    ubi_queries_dataframe = get_queries_dataframe()
    queries_collection.write(ubi_queries_dataframe)

#batch_ingest_queries()

In [10]:
### Generating AIPS Signals collection from  - Indexing search signals upon executing a search (intended)

In [15]:
from pyspark.sql.functions import from_unixtime
def create_events_dataframe():
    ubi_events_collection = engine.get_collection("ubi_events")
    create_view_from_collection(ubi_events_collection, "ubi_events")
    events = from_sql("SELECT * FROM ubi_events")
    events_transformed = events.rdd.map(lambda r: 
        (r["timestamp"], r["query_id"], r["client_id"],
         r["message"], r["message_type"]))
    return events_transformed.toDF(["signal_time", "query_id", "user", "target", "type"])

def create_queries_dataframe():
    ubi_queries_collection = engine.get_collection("ubi_queries")
    create_view_from_collection(ubi_queries_collection, "ubi_queries")
    queries = from_sql("SELECT * FROM ubi_queries")
    queries.show(10)
    queries_transformed = queries.rdd.map(lambda r: 
        (r["timestamp"], r["query_id"], r["client_id"],
         r["user_query"], "query"))
    return queries_transformed.toDF(["signal_time", "query_id", "user", "target", "type"])

def _create_queries_dataframe():
    ubi_queries_collection = engine.get_collection("ubi_queries")
    create_view_from_collection(ubi_queries_collection, "ubi_queries")
    queries = from_sql("SELECT * FROM ubi_queries")
    queries.show(10)
    queries_transformed = queries.rdd.map(lambda r: 
        (r["timestamp"], r["query_id"], r["client_id"],
         r["user_query"], "query"))
    return queries_transformed.toDF(["signal_time", "query_id", "user",
                                                    "target", "type"])
    return queries_transformed.withColumn("signal_time", from_unixtime("signal_time"))
    

#signals_collection = engine.create_collection("signals")
events = create_events_dataframe()
events.show(10)
queries = create_queries_dataframe()
queries.show(10)
#signals_collection.write(queries.union(events))

+-------------+-----------+-------+------------+--------+
|  signal_time|   query_id|   user|      target|    type|
+-------------+-----------+-------+------------+--------+
|1585854599197|u530435_0_1|u530435|786936817218|purchase|
|1586692556578|u595885_1_2|u595885|786936817218|purchase|
|1586669080953|u695122_0_1|u695122|786936817218|purchase|
|1581706502786|u734452_0_1|u734452|786936817218|purchase|
|1588861261426|u135438_0_1|u135438|786936817218|purchase|
|1585184225191|u219056_0_1|u219056|786936817218|purchase|
|1588843902245| u72400_0_1| u72400|786936817218|purchase|
|1585057472120|u442883_0_1|u442883|786936817218|purchase|
|1577072668322|u353276_0_1|u353276|786936817218|purchase|
|1587130028623|  u7654_0_1|  u7654|786936817218|purchase|
+-------------+-----------+-------+------------+--------+
only showing top 10 rows

+---------+--------------------+--------+----------------------+--------------------+--------------------+----------+
|client_id|               query|query_id|que

ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling