In [3]:
from superlinked import framework as sl
import json
import pandas as pd
import numpy as np
import urllib.request
import warnings
warnings.filterwarnings("ignore")

## Overview

In this notebook we will walk you through on how to set up superlinked index and queries based on the sample data we saved from last space.

Here we will experiment with superlinked settings and spaces configurations on our sample data before spinning up a local server and vector data base.


## Superlinked setup

### Setup schema and index

In [5]:

def get_categories(path: str) -> dict[str, list[str]]:
    with urllib.request.urlopen(path) as response:
        return json.load(response)

## Schema

class ProductSchema(sl.Schema):
    id: sl.IdField
    product_image: sl.Blob
    description: sl.String
    topic: sl.StringList
    brand: sl.StringList
    product_type: sl.StringList
    popularity: sl.Float
    item_w2v: sl.FloatList
    is_active: sl.Integer # hard - filter
    price: sl.Integer

class UserSchema(sl.Schema):
    user_topic: sl.StringList
    user_brand: sl.StringList
    user_product_type: sl.StringList
    user_item_w2v: sl.FloatList
    user_image: sl.Blob
    id: sl.IdField


class EventSchema(sl.EventSchema):
    product: sl.SchemaReference[ProductSchema]
    user: sl.SchemaReference[UserSchema]
    event_type: sl.String
    id: sl.IdField
    created_at: sl.CreatedAtField

product_schema = ProductSchema()
user_schema = UserSchema()
event_schema = EventSchema()

uncategorized_as_category = False

topic_space = sl.CategoricalSimilaritySpace(
    category_input=[user_schema.user_topic, product_schema.topic],
    categories=get_categories("https://storage.googleapis.com/superlinked-recipes/ecommerce-recsys/data/topics.json"),
    uncategorized_as_category=uncategorized_as_category
)

brand_space = sl.CategoricalSimilaritySpace(
    category_input= [user_schema.user_brand,product_schema.brand],
    categories=get_categories("https://storage.googleapis.com/superlinked-recipes/ecommerce-recsys/data/brands.json"),
    uncategorized_as_category=uncategorized_as_category
)

product_type_space = sl.CategoricalSimilaritySpace(
    category_input= [user_schema.user_product_type,product_schema.product_type],
    categories=get_categories("https://storage.googleapis.com/superlinked-recipes/ecommerce-recsys/data/product_types.json"),
    uncategorized_as_category=uncategorized_as_category
)

popularity_space = sl.NumberSpace(
    number=product_schema.popularity,
    mode=sl.Mode.MAXIMUM,
    min_value=0,
    max_value=1
)

item2vec_space = sl.CustomSpace(
    vector=[user_schema.user_item_w2v, product_schema.item_w2v], 
    length=100
)

image_space = sl.ImageSpace(
    image = [product_schema.product_image + product_schema.description , user_schema.user_image]
)
# create the index using the defined spaces
event_weights = {
    "product_viewed": 0.5,
    "product_added": 0.7,
    "product_purchased": 1,
    "product_removed": -0.5,
}

product_index = sl.Index(
    spaces=[
        topic_space,
        brand_space,
        product_type_space,
        popularity_space,
        item2vec_space,
        image_space
    ],
    effects=[
        sl.Effect(
            item2vec_space,
            event_schema.user,
            event_weight * event_schema.product,
            event_schema.event_type == event_type,
        )
        for event_type, event_weight in event_weights.items()
    ]
    + [
        sl.Effect(
            image_space,
            event_schema.user,
            event_weight * event_schema.product,
            event_schema.event_type == event_type,
        )
        for event_type, event_weight in event_weights.items()
    ] + [
        sl.Effect(
            topic_space,
            event_schema.user,
            event_weight * event_schema.product,
            event_schema.event_type == event_type,
        )
        for event_type, event_weight in event_weights.items()
    ] + [
        sl.Effect(
            brand_space,
            event_schema.user,
            event_weight * event_schema.product,
            event_schema.event_type == event_type,
        )
        for event_type, event_weight in event_weights.items()
    ] + [
        sl.Effect(
            product_type_space,
            event_schema.user,
            event_weight * event_schema.product,
            event_schema.event_type == event_type,
        )
        for event_type, event_weight in event_weights.items()
    ]
    ,
    fields =[
        product_schema.is_active,
        product_schema.topic,
        product_schema.product_type,
        product_schema.brand,
    ],
    temperature=0.9
)

### Execute superlinked in memory

In [6]:
# parse our data into the schemas - not matching column names can be conformed to schemas using the mapping parameter
product_df_parser = sl.DataFrameParser(schema=product_schema)
user_df_parser = sl.DataFrameParser(schema=user_schema)
event_df_parser = sl.DataFrameParser(schema=event)
# setup our application
source_product: sl.InMemorySource = sl.InMemorySource(schema=product_schema, parser=product_df_parser)
source_user: sl.InMemorySource = sl.InMemorySource(user_schema)
source_event: sl.InMemorySource = sl.InMemorySource(schema=event)

executor_with_events: sl.InMemoryExecutor = sl.InMemoryExecutor(
    sources=[source_product, source_user, source_event],
    indices=[product_index],
)
app_with_events: sl.InMemoryApp = executor_with_events.run()

### Ingest data

For our session based recommendation notebook example setup we will need to load 2 sources.

1. Products - the dataframe with all custome embeddings on it, this source is ingested at first indexing all exsiting products.
2. Events - this source will mimic the events coming in. In "real life" we will have the events ingested in realtime.


We will use a sample of products and a sample of user events that interact with the sampled product just for experimintation

In [8]:
## read data
products_df = pd.read_json("https://storage.googleapis.com/superlinked-recipes/ecommerce-recsys/data/products.json", lines=True, nrows=1000)
events_df = pd.read_json("https://storage.googleapis.com/superlinked-recipes/ecommerce-recsys/data/events.json", lines=True, nrows=1000)
events_df["created_at"] = pd.to_datetime(events_df["created_at"]).astype(int) // 10**9
events_df_sample = pd.merge(events_df, products_df[['id']].rename(columns={'id':'product'}), on='product')

Lets take a look on the data before ingesting

In [9]:
events_df.head()

Unnamed: 0,user,session_id,product,event_type,created_at
0,9999211,1724940658471,18938567,product_viewed,1724941630
1,2375656,1721633128877,18904223,product_viewed,1721633494
2,2391885,1718520365363,18888360,product_viewed,1718520414
3,2097335,1724517034717,18881606,product_viewed,1724517252
4,2363497,1724590923651,18890987,product_viewed,1724590989


In [10]:
products_df.head()

Unnamed: 0,id,is_active,product_image,description,topic,brand,product_type,popularity,item_w2v,price
0,1383239,1,https://storage.googleapis.com/superlinked-rec...,"Made from soft, durable and highly insulating ...",male_clothing,regatta,sweatshirts_&_fleeces,0.048134,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",26
1,14807127,1,https://storage.googleapis.com/superlinked-rec...,Brand: Parks London Collection: Vintage Aromat...,unisex_home,parks_london,candles_&_home_fragrance,0.490409,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",38
2,14807133,1,https://storage.googleapis.com/superlinked-rec...,Brand: Parks London Collection: Vintage Aromat...,unisex_home,parks_london,candles_&_home_fragrance,0.490409,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",38
3,14808099,1,https://storage.googleapis.com/superlinked-rec...,Easy to clean Fingerprint proof Ten year guara...,unisex_home,brabantia,kitchen_storage,0.272473,"[0.217785418, 0.0114847049, -0.0036433504, -0....",64
4,14809572,1,https://storage.googleapis.com/superlinked-rec...,Brand: Parks London Collection: Parks Exclusiv...,unisex_home,parks_london,candles_&_home_fragrance,0.490409,"[0.2159118056, 0.0520496592, 0.0123813841, -0....",30


Now we can add the products df to the index, as we have images represented as URL's this might take a little longer to execute

In [11]:
source_product.put([products_df])

### Setup queries

In this stage we will define the queries we will want to extract for our recommendation. For our use case we will define 4 type of queries.

1. Item2Item - the basic product similarity query.
2. User2Item - in our case "user" is actuaclly "user-topic". this query will be the similarity extarctio between "user-topic" vector and the products index.
3. Item complementary - same as item2item but negating topic/product type.
4. popularity - specific/generic popularity query

In [14]:
from superlinked.framework.dsl.query.param import Param
from superlinked.framework.dsl.query.query import Query

In [15]:
item2item_query = (
    sl.Query(
        product_index,
        weights={
            topic_space: sl.Param("topic_weight"),
            brand_space: sl.Param("brand_weight"),
            product_type_space: sl.Param("product_type_weight"),
            popularity_space: sl.Param("popularity_weight"),
            item2vec_space: sl.Param("item2vec_weight"),
            image_space: sl.Param("image_weight")
        },
    )
    .find(product_schema)
    .with_vector(product_schema, sl.Param("product_id"))
    .similar(item2vec_space, sl.Param("collaborative_vector"))
    .filter(product_schema.is_active > 0)
    .limit(sl.Param('limit'))
)

item_popularity_query = (
    sl.Query(
        product_index,
        weights={
            popularity_space: 1
        },
    )
    .find(product_schema)
    .filter(product_schema.is_active > 0)
    .limit(sl.Param('limit'))
)

topic_popularity_query = (
    sl.Query(
        product_index,
        weights={
            popularity_space: 1,
            topic_space: 1,
            product_type_space: 1
        },
    )
    .find(product_schema)
    .similar(topic_space, sl.Param("query_topic"))
    .similar(product_type_space, sl.Param("query_product_type"))
    .filter(product_schema.is_active > 0)
    .limit(sl.Param('limit'))
)


user2item_query = (
   sl.Query(
        product_index,
        weights={
            topic_space: sl.Param("topic_weight"),
            brand_space: sl.Param("brand_weight"),
            product_type_space: sl.Param("product_type_weight"),
            item2vec_space: sl.Param("item2vec_weight"),
            image_space: sl.Param("image_weight")
        },
    )
    .find(product_schema)
    .with_vector(user_schema, sl.Param("user_id"))
    .similar(item2vec_space, sl.Param("collaborative_vector"))
    .filter(product_schema.is_active > 0)
    .limit(sl.Param('limit'))
)

### User events ingestion

This is will mimick a session based interaction event by event.
For each event we will need first to create a dummy "user" vector and then ingest the effect.
Additionaly, in our case user represented multiple time, as we seperate effects for different topics, so we will need a varaible tracking the topic interactions for each user. This will allow us to fetch the right vectors for querying recommendations.

In [16]:
from io import BytesIO
import base64
from PIL import Image


empty_image = Image.new('RGB', (255, 255))
buffered = BytesIO()
empty_image.save(buffered, format="JPEG")
base64_empty_image = base64.b64encode(buffered.getvalue()).decode("utf-8")

def get_user_dummy_vector(user_id):
    return {
        "id": user_id,
        "user_topic": "",
        "user_brand": "",
        "user_product_type": "",
        "user_item_w2v": [0.0]*100,
        "user_image": base64_empty_image
    }

In [17]:

product_id_to_topic = products_df.set_index("id")["topic"].to_dict()
USER_TOPICS_TIME_MAP = {}
for event in events_df_sample.sample(10).to_dict(orient="records"):
    user_id = event['user']
    product_id = event['product']
    topic = product_id_to_topic.get((product_id))
    if topic is None:
        continue
    user_topic_id = f"{user_id}_{topic}"
    user_dummy_object = get_user_dummy_vector(user_topic_id)
    source_user.put([user_dummy_object])
    event["user"] = user_topic_id
    source_event.put([event])
    if user_id not in USER_TOPICS_TIME_MAP:
        USER_TOPICS_TIME_MAP[user_id] = [(topic, event['created_at'])]
    else:
        USER_TOPICS_TIME_MAP[user_id].append((topic, event['created_at']))

Lets see how we can fetch latest topics for user

In [20]:
def get_latest_topics(user_id, k):
    if user_id not in USER_TOPICS_TIME_MAP:
        return []
    return [topic for topic, _ in sorted(USER_TOPICS_TIME_MAP[user_id], key=lambda x: x[1], reverse=True)[:k]]

get_latest_topics(2316466, 3)

['female_shoes']

Now we can create a recommendation wrapper and use our queries

In [24]:
def recommend_user(user_id, limit=10):
    topics = get_latest_topics(int(user_id), 3)
    user_topic_ids = [f"{user_id}_{topic}" for topic in topics]
    recs = []
    for uid in user_topic_ids:
        res = app_with_events.query(
            user2item_query,
            user_id = uid,
            item2vec_weight = 1,
            image_weight = 1,
            product_type_weight = 1,
            topic_weight = 1,
            brand_weight = 1,
            limit = limit
        )
        processed_res = [res.to_pandas()[['id', 'topic', 'product_type', 'brand']].to_dict(orient='records')]
        recs.append(processed_res)
    return [item for sublist in zip(*recs) for item in sublist]

    
def recommend_item_similar(item_id, limit=10):
    res = app_with_events.query(
        item2item_query,
        product_id = item_id,
        topic_weight = 0,
        brand_weight = 0,
        product_type_weight = 0,
        popularity_weight = 0.5,
        item2vec_weight = 1,
        image_weight = 1,
        limit=limit
    )
    return [res.to_pandas()[['id', 'topic', 'product_type', 'brand']].to_dict(orient='records')]

def recommend_item_complementary_topic(item_id, limit=10):
    res = app_with_events.query(
        item2item_query,
        product_id = item_id,
        topic_weight = -1,
        brand_weight = 0,
        product_type_weight = 0,
        popularity_weight = 0,
        item2vec_weight = 1,
        image_weight = 0,
        limit=limit
    )
    return [res.to_pandas()[['id', 'topic', 'product_type', 'brand']].to_dict(orient='records')]

def recommend_item_complementary_type(item_id, limit=10):
    res = app_with_events.query(
        item2item_query,
        product_id = item_id,
        topic_weight = 0,
        brand_weight = 0,
        product_type_weight = -1,
        popularity_weight = 0,
        item2vec_weight = 1,
        image_weight = 0,
        limit=limit
    )
    return [res.to_pandas()[['id', 'topic', 'product_type', 'brand']].to_dict(orient='records')]

In [25]:
recommend_user(2316466, 10)

[[{'id': '18451620',
   'topic': ['female_shoes'],
   'product_type': ['slippers'],
   'brand': ['australia_luxe_collective']},
  {'id': '18984999',
   'topic': ['female_shoes'],
   'product_type': ['ankle_boots'],
   'brand': ['keen']},
  {'id': '19005626',
   'topic': ['unisex_shoes'],
   'product_type': ['boys_shoes'],
   'brand': ['crocs']},
  {'id': '18119880',
   'topic': ['unisex_home'],
   'product_type': ['side_tables'],
   'brand': ['gallery_living']},
  {'id': '19129960',
   'topic': ['male_accessories'],
   'product_type': ['jewellery'],
   'brand': ['stephen_oliver']},
  {'id': '17558571',
   'topic': ['unisex_home'],
   'product_type': ['sofas_&_armchairs'],
   'brand': ['the_great_sofa_company']},
  {'id': '19105971',
   'topic': ['unisex_home'],
   'product_type': ['sofas_&_armchairs'],
   'brand': ['the_great_sofa_company']},
  {'id': '18933840',
   'topic': ['female_accessories'],
   'product_type': ['gloves'],
   'brand': ['n°·_eleven']},
  {'id': '18032710',
   'top

You can keep playing with the sample here just to get a feeling of how tyo define the index.
Remeber we are only working on a small subset of products so non optimal results are expected.
To better test and debug your application pleaese process to creating the `superlinked_app` files and deploy a local server.