In [12]:
from tahoe import create_external_table, drop_external_table, execute_async
from s3 import result_bucket, get_s3_file_keys, get_df_from_parquet
import pandas as pd
import logging
logging.basicConfig(
    format='%(asctime)s %(levelname)s: %(message)s',
    level=logging.INFO
)
import numpy as np

# download data

In [None]:
# Credit: Jordan Long
query = """-- select products ranked by gmv over last 365 days, limiting to 1M products
-- remove products that aren't from silver+ merchants
-- remove products that have a high refund rate (>10%) over the last year
-- remove products with low lifetime average ratings (<3)
-- remove products via the microtag inappropriate filter
-- add a check to make sure they're active products (currently using filters from wish.master_products)
-- cluster by image hash and take the product with the highest gmv from each cluster

INSERT INTO test.collection_builder_candidates

WITH inappropriate_pids AS (
  SELECT pid
  FROM supply.microtagging_sensitive_products
  WHERE flag_for_removal = 1
) -- ~4 million pids?

, refund_rates AS (
SELECT
    product_id,
    -- note qty_refunded <= quantity
    COALESCE(cast(sum(qty_refunded) as DOUBLE), 0) / COALESCE(sum(quantity), 1) AS refund_rate
    FROM wish.fact_trans_detail a
    WHERE dt >= cast(date_add('day', -365, cast(current_date as date)) as varchar)
    and dt <= cast(current_date as varchar) -- looking over the past year
    and a.order_approved_time is not NULL
    and (a.is_ltd_product = 'false' or a.is_ltd_product is null)
    and (a.refund_reason_category NOT IN (4, 17, 18, 28, 40) or a.refund_reason_category is null) -- TODO: comment what these are
    and a.state not in (0,4,10,12) -- TODO: comment what these are
    -- below we're looking at 30 days before 2022-12-21 since 30 day window for returns
    and DATE_FORMAT(FROM_UNIXTIME(confirmed_delivered_time, 'America/Los_Angeles'), '%Y-%m-%d') <= cast(date_add('day', -30, cast(current_date as date)) as varchar)
    group by product_id
)

, top_products AS (
SELECT
    a.product_id,
    b.name,
    b.merchant_id,
    COALESCE(b.image_hash, c.image_hash) AS image_hash,
    wss.wss_tier,
    a.gmv_365d,
    refund_rates.refund_rate,
    b.average_rating,
    b.rating_count,
    a.dt
FROM analytics.agg_product_daily_stats a -- get gmv365d from here
JOIN ( -- use this as an active product filter, putting the filters in a subquery so there's less to join
    SELECT product_id, name, merchant_id, image_hash, average_rating, rating_count
    FROM wish.master_products
    WHERE merchant_id IS NOT NULL
    AND is_deleted != 'true'
    AND product_removed_by_merchant != 'true'
    AND commerce_active = 1
    AND state IN ('0','1') -- NEW or ACTIVE
) b
ON a.product_id = b.product_id
LEFT JOIN (SELECT product_id, MAX_BY(hash,time) AS image_hash FROM supply.microtagging_pid_hash_table GROUP BY 1) c
ON a.product_id = c.product_id
LEFT JOIN supply.merchant_quality_score wss
ON wss.merchant_id = b.merchant_id
LEFT JOIN refund_rates
ON refund_rates.product_id = a.product_id
LEFT JOIN inappropriate_pids
ON inappropriate_pids.pid = a.product_id
WHERE a.gmv_365d > 0 -- filters down to 4M products
AND wss.wss_tier IN ('Platinum', 'Gold', 'Silver')
AND refund_rates.refund_rate <= 0.1 -- cap at 10% refund rate (somewhat arbitrary)
AND average_rating >= 3 -- if there's no ratings the rating defaults to 5
AND inappropriate_pids.pid IS NULL -- meaning it's not inappropriate
)

SELECT r[1] AS product_id
, r[2] AS name
, r[3] AS merchant_id
, a.image_hash
, NULL AS img_embedding
, r[4] AS wss_tier
, CAST(r[5] AS DOUBLE) AS gmv_365d,
CAST(r[6] AS DOUBLE) AS refund_rate,
r[7] AS average_rating,
r[8] AS rating_count,
r[9] AS dt
FROM (
    SELECT
    image_hash,
    MAX_BY((product_id, name, merchant_id, image_hash, wss_tier, refund_rate, average_rating, rating_count, dt), gmv_365d) r
    FROM top_products
    GROUP BY image_hash
) a
LIMIT 1000000"""

In [14]:
# Define the export table
export_table = {
    "name": "collection_export_test_123",
    "columns": [
        {"name": "product_id", "type": "STRING"},
        {"name": "name", "type": "STRING"},
        {"name": "merchant_id", "type": "STRING"},
        {"name": "image_hash", "type": "STRING"},
        {"name": "img_embedding", "type": "STRING"},
        {"name": "wss_tier", "type": "STRING"},
        {"name": "gmv_365d", "type": "FLOAT"},
        {"name": "refund_rate", "type": "FLOAT"},
        {"name": "average_rating", "type": "FLOAT"},
        {"name": "rating_count", "type": "FLOAT"},
        {"name": "dt", "type": "STRING"}
    ]
}
db = "sweeper_dev"

In [3]:
# Create the export table
create_external_table(
    table_name=export_table["name"],
    table_definition=export_table,
    db=db,
    bucket=result_bucket
)

2023-04-11 23:27:13,719 INFO: USE `default`
2023-04-11 23:27:14,584 INFO: 
    CREATE TABLE sweeper_dev.collection_export_test_123 (
    product_id STRING,
	name STRING,
	merchant_id STRING,
	image_hash STRING,
	img_embedding STRING,
	wss_tier STRING,
	gmv_365d FLOAT,
	refund_rate FLOAT,
	average_rating FLOAT,
	rating_count FLOAT,
	dt STRING
    )
    
STORED AS PARQUET
LOCATION 's3://wish-tahoe-query-results/sweeper_dev/collection_export_test_123'
TBLPROPERTIES ('parquet.compression'='SNAPPY')
2023-04-11 23:27:15,563 INFO: The query returned no records.


In [6]:
# Check the number of rows in the external table
q = f"SELECT COUNT(*) FROM {db}.{export_table['name']}"
execute_async(q)

2023-04-12 00:25:28,550 INFO: SELECT COUNT(*) FROM sweeper_dev.collection_export_test_123


[(0,)]

In [7]:
q = f"SELECT COUNT(*) FROM test.collection_builder_candidates"
execute_async(q)

2023-04-12 00:26:39,783 INFO: SELECT COUNT(*) FROM test.collection_builder_candidates


[(1000000,)]

In [8]:
# Export data to the external table
q = f"""
INSERT INTO {db}.{export_table['name']}
SELECT * FROM test.collection_builder_candidates
"""
execute_async(q)

2023-04-12 00:26:52,813 INFO: 
INSERT INTO sweeper_dev.collection_export_test_123
SELECT * FROM test.collection_builder_candidates



[(1000000,)]

In [9]:
# Check again the number of rows in the external table
q = f"SELECT COUNT(*) FROM {db}.{export_table['name']}"
execute_async(q)

2023-04-12 00:27:02,283 INFO: SELECT COUNT(*) FROM sweeper_dev.collection_export_test_123


[(1000000,)]

In [12]:
# Show the data files for the external table
file_keys = get_s3_file_keys(s3_bucket=result_bucket, s3_prefix=f"{db}/{export_table['name']}")
file_keys

[('sweeper_dev/collection_export_test_123/20230412_002652_01985_afvpw_f99673e8-770d-4db1-aaff-ef28dab30a90',
  68),
 ('sweeper_dev/collection_export_test_123/20230412_002652_01985_afvpw_ff248bbe-19fe-4177-89ee-5a537e8cc2ec',
  68)]

In [13]:
dfs = []
for file_key, file_size in file_keys:
    df_chunk = get_df_from_parquet(s3_bucket=result_bucket, s3_key=file_key)
    dfs.append(df_chunk)
    
df_data = pd.concat(dfs)
df_data.head()

2023-04-12 00:27:51,174 INFO: Importing from sweeper_dev/collection_export_test_123/20230412_002652_01985_afvpw_f99673e8-770d-4db1-aaff-ef28dab30a90...
2023-04-12 00:27:56,706 INFO: Imported DF (498296, 11) from sweeper_dev/collection_export_test_123/20230412_002652_01985_afvpw_f99673e8-770d-4db1-aaff-ef28dab30a90.
2023-04-12 00:27:56,716 INFO: Importing from sweeper_dev/collection_export_test_123/20230412_002652_01985_afvpw_ff248bbe-19fe-4177-89ee-5a537e8cc2ec...
2023-04-12 00:28:01,865 INFO: Imported DF (501704, 11) from sweeper_dev/collection_export_test_123/20230412_002652_01985_afvpw_ff248bbe-19fe-4177-89ee-5a537e8cc2ec.


Unnamed: 0,product_id,name,merchant_id,image_hash,img_embedding,wss_tier,gmv_365d,refund_rate,average_rating,rating_count,dt
0,6087aa1a4c0529be4b5eea2b,Butterfly Girl Resin Sculpture Character Model...,5ac9993f2c49563aee00b471,4563e9eda70ae371df01fac754a9ab81,,Silver,823.19043,0.090909,5.0,7.0,2023-04-09
1,5fb80589dcb5ba391b6334f1,54Pcs/Box KPOP BTS BLACKPINK EXO NCT TXT Stray...,5f8d118fab43aa62703be176,29658f7ed2164fc030d6130d00f82031,,Gold,159.111664,0.0,4.777778,9.0,2023-04-09
2,611b4f6ca8caa50a2ac538b5,10Pcs/bag Glass Bottle Filling Fruits Polymer ...,5de5d5edd30dacbae2e29cdf,645dab48d5e5b306599bf06022fbc94f,,Gold,6137.541016,0.005583,4.578397,287.0,2023-04-09
3,5720b4c91e16345d0ad4ff0f,"Toy-Yoda - FUNNY - 8.5"" X 2.3"" - High Quality ...",549c0f29b9b7e70cbc5394e8,03e4b5dcdc1cebe29fae6f3ecb28e2c9,,Silver,15.443636,0.0,5.0,0.0,2023-04-09
4,60920c9d2d595f41eaefe6fb,Promotion of popular fashion Rick and Morty 3D...,5d5537d67ad24225f68499b2,21e3abc761c9a86beeead6ed7ce417bc,,Silver,342.433716,0.035714,5.0,2.0,2023-04-09


In [14]:
len(df_data)

1000000

In [16]:
set(df_data['img_embedding'])

{None}

In [53]:
df_data.to_parquet("collection_products_041123.parquet", index=False)

In [16]:
drop_external_table(
    db=db,
    table_name=export_table["name"],
    delete_files=True,
    s3_bucket=result_bucket,
    s3_prefix=f"{db}.{export_table['name']}", 
)

2023-04-12 14:46:17,743 INFO: USE `default`
2023-04-12 14:46:18,654 INFO: 
    DROP TABLE IF EXISTS sweeper_dev.collection_export_test_123
    
2023-04-12 14:46:19,556 INFO: The query returned no records.
2023-04-12 14:46:19,557 INFO: Dropped sweeper_dev.collection_export_test_123
2023-04-12 14:46:20,178 INFO: Files in 's3://wish-tahoe-query-results/sweeper_dev.collection_export_test_123 are deleted.


# download data with image embedding

In [2]:
# Define the export table
export_table = {
    "name": "collection_export_withemb_test_123",
    "columns": [
        {"name": "product_id", "type": "STRING"},
        {"name": "name", "type": "STRING"},
        {"name": "merchant_id", "type": "STRING"},
        {"name": "image_hash", "type": "STRING"},
        {"name": "img_embedding", "type": "STRING"},
        {"name": "wss_tier", "type": "STRING"},
        {"name": "gmv_365d", "type": "FLOAT"},
        {"name": "refund_rate", "type": "FLOAT"},
        {"name": "average_rating", "type": "FLOAT"},
        {"name": "rating_count", "type": "FLOAT"},
        {"name": "dt", "type": "STRING"}
    ]
}
db = "sweeper_dev"

In [3]:
# Create the export table
create_external_table(
    table_name=export_table["name"],
    table_definition=export_table,
    db=db,
    bucket=result_bucket
)

2023-04-12 16:27:29,214 INFO: USE `default`
2023-04-12 16:27:30,085 INFO: 
    CREATE TABLE sweeper_dev.collection_export_withemb_test_123 (
    product_id STRING,
	name STRING,
	merchant_id STRING,
	image_hash STRING,
	img_embedding STRING,
	wss_tier STRING,
	gmv_365d FLOAT,
	refund_rate FLOAT,
	average_rating FLOAT,
	rating_count FLOAT,
	dt STRING
    )
    
STORED AS PARQUET
LOCATION 's3://wish-tahoe-query-results/sweeper_dev/collection_export_withemb_test_123'
TBLPROPERTIES ('parquet.compression'='SNAPPY')
2023-04-12 16:27:30,923 INFO: The query returned no records.


In [4]:
# Credit: Jordan Long
q = f"""
INSERT INTO {db}.{export_table['name']}
SELECT a.product_id, a.name, a.merchant_id, a.image_hash, b.img_embedding, a.wss_tier, a.gmv_365d, a.refund_rate, a.average_rating, a.rating_count, a.dt
FROM test.collection_builder_candidates a
JOIN supply.microtagging_master_image_embeds_hash b
ON a.image_hash = b.image_hash
"""
execute_async(q)

2023-04-12 16:27:36,454 INFO: 
INSERT INTO sweeper_dev.collection_export_withemb_test_123
SELECT a.product_id, a.name, a.merchant_id, a.image_hash, b.img_embedding, a.wss_tier, a.gmv_365d, a.refund_rate, a.average_rating, a.rating_count, a.dt
FROM test.collection_builder_candidates a
JOIN supply.microtagging_master_image_embeds_hash b
ON a.image_hash = b.image_hash



[(986750,)]

In [5]:
# Check again the number of rows in the external table
q = f"SELECT COUNT(*) FROM {db}.{export_table['name']}"
execute_async(q)

2023-04-12 16:32:13,938 INFO: SELECT COUNT(*) FROM sweeper_dev.collection_export_withemb_test_123


[(986750,)]

In [6]:
# Show the data files for the external table
file_keys = get_s3_file_keys(s3_bucket=result_bucket, s3_prefix=f"{db}/{export_table['name']}")

In [7]:
len(file_keys)

50

In [8]:
from tqdm import tqdm

In [9]:
metadatas = []
img_embs = []
for file_key, file_size in tqdm(file_keys):
    df_chunk = get_df_from_parquet(s3_bucket=result_bucket, s3_key=file_key)
    img_embs.append(np.array(df_chunk
        .img_embedding
        .apply(lambda x: np.fromstring(x[1:-1], sep=','))
        .tolist())
    )
    del df_chunk['img_embedding']
    metadatas.append(df_chunk.to_dict('records'))

  0%|          | 0/50 [00:00<?, ?it/s]2023-04-12 16:32:31,465 INFO: Importing from sweeper_dev/collection_export_withemb_test_123/20230412_162736_60854_afvpw_00384228-ac5d-456c-bcd6-e0080a3877ce...
2023-04-12 16:32:39,875 INFO: Imported DF (32365, 11) from sweeper_dev/collection_export_withemb_test_123/20230412_162736_60854_afvpw_00384228-ac5d-456c-bcd6-e0080a3877ce.
  2%|▏         | 1/50 [00:16<13:28, 16.49s/it]2023-04-12 16:32:47,959 INFO: Importing from sweeper_dev/collection_export_withemb_test_123/20230412_162736_60854_afvpw_04492e76-a513-4bb2-b5cd-8fd6fbd5f49d...
2023-04-12 16:32:53,499 INFO: Imported DF (21385, 11) from sweeper_dev/collection_export_withemb_test_123/20230412_162736_60854_afvpw_04492e76-a513-4bb2-b5cd-8fd6fbd5f49d.
  4%|▍         | 2/50 [00:27<10:31, 13.16s/it]2023-04-12 16:32:58,783 INFO: Importing from sweeper_dev/collection_export_withemb_test_123/20230412_162736_60854_afvpw_0718f77f-f523-4006-8596-a382c2a33b54...
2023-04-12 16:33:04,491 INFO: Imported DF (243

In [11]:
import psutil

# function to convert bytes to a more human-readable format
def convert_bytes(num):
    for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
        if num < 1024.0:
            return f"{num:.2f} {x}"
        num /= 1024.0

# get the current process's memory usage
process = psutil.Process()
memory_info = process.memory_info().rss

# print the memory usage in a human-readable format
print(f"Current memory usage: {convert_bytes(memory_info)}")

Current memory usage: 10.07 GB


In [12]:
flat_metadatas = [j for i in metadatas for j in i]

In [13]:
flat_metadatas[:10], len(flat_metadatas)

([{'product_id': '5be10705c7a35019fc85eed0',
   'name': '36pcs/set Foam Letters Numbers Floating Bath Tub Toddler Children Learning Toys',
   'merchant_id': '57285b300cdedf592d80ac69',
   'image_hash': 'b1198308cd95a25bb5dd71e8249e8445',
   'wss_tier': 'Silver',
   'gmv_365d': 61.27290344238281,
   'refund_rate': 0.0,
   'average_rating': 4.75,
   'rating_count': 4.0,
   'dt': '2023-04-09'},
  {'product_id': '5e1ed37f1f937c0eed84f224',
   'name': '13CM*16.9CM Car Sticker Fairy Butterfly Removable Decal Art Decor Vinyl Black/Silver C24-0187',
   'merchant_id': '5773bd2129a2af1ae0d9ef6c',
   'image_hash': '04bfc69888d92e68dcc43ab34deada46',
   'wss_tier': 'Gold',
   'gmv_365d': 305.23193359375,
   'refund_rate': 0.016949152573943138,
   'average_rating': 4.2727274894714355,
   'rating_count': 22.0,
   'dt': '2023-04-09'},
  {'product_id': '625520a445dcebce7dfb81cf',
   'name': 'Compression Tool,Compression Tool Kit F-Type For RG58 RG59 RG6 Connectors Cable Stripper Coaxial Crimping Set',

In [14]:
len(set(pd.DataFrame(flat_metadatas)['product_id'])) == len(flat_metadatas)

True

In [15]:
all_img_embs = np.vstack(img_embs)

In [16]:
del img_embs
img_embs = None 
import gc 
gc.collect()

5049

In [17]:
all_img_embs.shape

(986750, 768)

In [1]:
from docarray import DocumentArray, Document

In [19]:
docs = DocumentArray.empty(len(all_img_embs))
docs = DocumentArray(
    [
        Document(id=flat_metadatas[i]['product_id'], tags=flat_metadatas[i])
        for i in range(len(flat_metadatas))
    ]
)

docs.embeddings = all_img_embs

In [51]:
docs.save('collection_products_withclip_peterhull_041223.bin')

In [2]:
docs = DocumentArray.load('collection_products_withclip_peterhull_041223.bin')

In [5]:
docs[:10000].save('collection_products_withclip_peterhull_top10000_041223.bin')

In [21]:
drop_external_table(
    db=db,
    table_name=export_table["name"],
    delete_files=True,
    s3_bucket=result_bucket,
    s3_prefix=f"{db}.{export_table['name']}", 
)

2023-04-12 17:02:09,561 INFO: USE `default`
2023-04-12 17:02:10,416 INFO: 
    DROP TABLE IF EXISTS sweeper_dev.collection_export_withemb_test_123
    
2023-04-12 17:02:12,878 INFO: INFO  : Compiling command(queryId=hive_20230412170210_833c0458-6e2b-4ecb-8cf7-f579552aea15): 
2023-04-12 17:02:12,879 INFO:     DROP TABLE IF EXISTS sweeper_dev.collection_export_withemb_test_123
2023-04-12 17:02:12,880 INFO:     
2023-04-12 17:02:12,881 INFO: INFO  : Concurrency mode is disabled, not creating a lock manager
2023-04-12 17:02:12,882 INFO: INFO  : Semantic Analysis Completed (retrial = false)
2023-04-12 17:02:12,882 INFO: INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
2023-04-12 17:02:12,883 INFO: INFO  : EXPLAIN output for queryid hive_20230412170210_833c0458-6e2b-4ecb-8cf7-f579552aea15 : STAGE DEPENDENCIES:
2023-04-12 17:02:12,884 INFO:   Stage-0 is a root stage [DDL]
2023-04-12 17:02:12,885 INFO: 
2023-04-12 17:02:12,886 INFO: STAGE PLANS:
2023-04-12 17:02:12,886

In [22]:
docs

In [10]:
docs.embeddings.shape

(986750, 768)

In [25]:
df_chunk.tail(1).to_dict('records')

[{'product_id': '5bbc837ea538ef6bb82bc191',
  'name': '18 Note Engraved Wooden Legend of Zelda Theme Music Box,Antique Carved Hand Crank Musical Box Gift,Blue',
  'merchant_id': '546c4f839719cd05f2974735',
  'image_hash': '0689018b37e5288938e5436e71528401',
  'wss_tier': 'Gold',
  'gmv_365d': 1385.9342041015625,
  'refund_rate': 0.07608695328235626,
  'average_rating': 4.655172348022461,
  'rating_count': 29.0,
  'dt': '2023-04-09'}]

In [27]:
all_img_embs[-1][:10]

array([ 0.9277181 , -0.88829875,  0.08980982,  0.11253998,  0.46524113,
        0.57791126, -0.17822023,  0.1061473 , -0.1309175 , -0.84999853])

In [37]:
(all_img_embs[-1]**2).sum()

314.6261344394277

In [42]:
import requests
import torch
import clip
from PIL import Image
phash = '0689018b37e5288938e5436e71528401'
url = f"https://sweeper-production-productimage.s3.amazonaws.com/{phash}.jpg"
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)
image = preprocess(Image.open(requests.get(url, stream=True).raw)).unsqueeze(0).to(device)


with torch.no_grad():
   image_features = model.encode_image(image)
pooled_output_clip = image_features.detach().cpu().numpy()

100%|███████████████████████████████████████| 338M/338M [00:07<00:00, 46.8MiB/s]


In [50]:
url

'https://sweeper-production-productimage.s3.amazonaws.com/0689018b37e5288938e5436e71528401.jpg'

In [44]:
pooled_output_clip[:,:10]

array([[-0.16014807, -0.1707036 ,  0.20969087,  0.01197619,  0.40974215,
        -0.2057969 , -0.2599944 ,  0.23702598, -0.3465825 ,  0.15175675]],
      dtype=float32)

In [45]:
import requests
def get_clip_emb(phashs):
    assert len(phashs) <= 64
    urls = [f"https://sweeper-production-productimage.s3.amazonaws.com/{phash}.jpg" for phash in phashs]
    input_json = {
        "inputs":[
            {	
                "name": "image_url",
                "shape": [len(urls),1],
                "datatype": "BYTES",
                "data": urls
            }
        ]
    }
    res = requests.post('http://coeus-gpu-multitask-ml-dev.service.consul:8081/v2/models/clip_image_ensemble/versions/1/infer', json=input_json).json()
    res_arr = {res['outputs'][ind]['name']: \
        np.array(res['outputs'][ind]['data']).reshape(res['outputs'][ind]['shape']) for ind in range(len(res['outputs']))}
    return res_arr

In [46]:
res = get_clip_emb(['0689018b37e5288938e5436e71528401'] * 2)

In [49]:
res['pooler_output'][0][:10]

array([-0.16015625, -0.17077637,  0.21044922,  0.01138306,  0.41088867,
       -0.2076416 , -0.25952148,  0.23266602, -0.34619141,  0.15161133])

# explore image embeddings inference

## vanilla http

In [28]:
import requests
def get_clip_emb(phashs):
    assert len(phashs) <= 64
    urls = [f"https://sweeper-production-productimage.s3.amazonaws.com/{phash}.jpg" for phash in phashs]
    input_json = {
        "inputs":[
            {	
                "name": "image_url",
                "shape": [len(urls),1],
                "datatype": "BYTES",
                "data": urls
            }
        ]
    }
    res = requests.post('http://coeus-gpu-multitask-ml-dev.service.consul:8081/v2/models/clip_image_ensemble/versions/1/infer', json=input_json).json()
    res_arr = {res['outputs'][ind]['name']: \
        np.array(res['outputs'][ind]['data']).reshape(res['outputs'][ind]['shape']) for ind in range(len(res['outputs']))}
    return res_arr

In [31]:
res = get_clip_emb(['0689018b37e5288938e5436e71528401'] * 2)

In [32]:
res

{'image_download_success': array([[ True],
        [ True]]),
 'pooler_output': array([[-0.16015625, -0.17077637,  0.21044922, ...,  0.43334961,
          0.20874023,  0.20043945],
        [-0.16015625, -0.17077637,  0.21044922, ...,  0.43334961,
          0.20874023,  0.20043945]])}

In [36]:
(res['pooler_output'][1]**2).sum()

101.6134683902485

In [154]:
%%timeit
res = get_clip_emb(['4563e9eda70ae371df01fac754a9ab81'] * 64)

1.55 s ± 43.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [155]:
1e6 / 64 * 1.55 / 60 / 60 # hours

6.727430555555555

## triton http client

In [156]:
# !pip install tritonclient[all]

In [260]:
import tritonclient.http as httpclient

In [261]:
triton_client = httpclient.InferenceServerClient(
    url="coeus-gpu-multitask-ml-dev.service.consul:8081", 
)

In [2]:
model_name = "clip_image_ensemble"

In [263]:
def test_infer(model_name,
               input0_data,
               request_compression_algorithm=None,
               response_compression_algorithm=None):
    inputs = []
    outputs = []
    inputs.append(httpclient.InferInput('image_url', input0_data.shape, "BYTES"))

    # Initialize the data
    inputs[0].set_data_from_numpy(input0_data, binary_data=False)

    outputs.append(httpclient.InferRequestedOutput('image_download_success', binary_data=True))
    outputs.append(httpclient.InferRequestedOutput('pooler_output', binary_data=True))

    results = triton_client.infer(
        model_name,
        inputs,
        outputs=outputs,
        request_compression_algorithm=request_compression_algorithm,
        response_compression_algorithm=response_compression_algorithm)

    return results

In [264]:
res2 = test_infer(model_name, 
    np.array(['https://sweeper-production-productimage.s3.amazonaws.com/4563e9eda70ae371df01fac754a9ab81.jpg'.encode('utf-8')] * 2, 
             dtype=np.object_).reshape(-1,1))

In [265]:
res2.as_numpy('pooler_output')

array([[-0.26953125,  0.39135742, -0.2565918 , ...,  0.2290039 ,
        -0.14978027,  0.07348633],
       [-0.26953125,  0.39135742, -0.2565918 , ...,  0.2290039 ,
        -0.14978027,  0.07348633]], dtype=float32)

In [266]:
res2.as_numpy('image_download_success')

array([[ True],
       [ True]])

In [164]:
%%timeit
res2 = test_infer(model_name, 
    np.array(['https://sweeper-production-productimage.s3.amazonaws.com/4563e9eda70ae371df01fac754a9ab81.jpg'.encode('utf-8')] * 64, dtype=np.object_).reshape(-1,1))
_,_ = res2.as_numpy('pooler_output'), res2.as_numpy('image_download_success')

1.39 s ± 57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [165]:
1e6 / 64 * 1.39 / 60 / 60 # hours

6.032986111111112

## triton grpc client

In [2]:
import tritonclient.grpc as grpcclient

In [3]:
triton_client = grpcclient.InferenceServerClient(
    url="coeus-gpu-multitask-ml-dev.service.consul:8082", 
)

In [7]:
def test_infer(model_name,
               input0_data):
    inputs = []
    outputs = []
    inputs.append(grpcclient.InferInput('image_url', input0_data.shape, "BYTES"))

    # Initialize the data
    inputs[0].set_data_from_numpy(input0_data)

    outputs.append(grpcclient.InferRequestedOutput('image_download_success'))
    outputs.append(grpcclient.InferRequestedOutput('pooler_output'))

    results = triton_client.infer(
        model_name,
        inputs,
        outputs=outputs
    )

    return results

In [8]:
res2 = test_infer(model_name, 
    np.array(['https://sweeper-production-productimage.s3.amazonaws.com/4563e9eda70ae371df01fac754a9ab81.jpg'.encode('utf-8')] * 2, 
             dtype=np.object_).reshape(-1,1))

In [9]:
res2.as_numpy('pooler_output')

array([[-0.26953125,  0.39135742, -0.2565918 , ...,  0.2290039 ,
        -0.14978027,  0.07348633],
       [-0.26953125,  0.39135742, -0.2565918 , ...,  0.2290039 ,
        -0.14978027,  0.07348633]], dtype=float32)

In [10]:
res2.as_numpy('image_download_success')

array([[ True],
       [ True]])

In [None]:
%%timeit
res2 = test_infer(model_name, 
    np.array(['https://sweeper-production-productimage.s3.amazonaws.com/4563e9eda70ae371df01fac754a9ab81.jpg'.encode('utf-8')] * 64, dtype=np.object_).reshape(-1,1))
_,_ = res2.as_numpy('pooler_output'), res2.as_numpy('image_download_success')

1.43 s ± 39 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
1e6 / 64 * 1.43 / 60 / 60 # hours

6.206597222222222

## vailla async

In [174]:
import asyncio
import aiohttp

In [176]:
async def call_oai(session, data):
    async with session.post(
        'http://coeus-gpu-multitask-ml-dev.service.consul:8081/v2/models/clip_image_ensemble/versions/1/infer',
        json=data
    ) as response:
        res = await response.json()
    return res

async def call_oais(datas):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for data in datas:
            task = asyncio.ensure_future(call_oai(session, data))
            tasks.append(task)
        reses = await asyncio.gather(*tasks)
        return reses

urls = [f"https://sweeper-production-productimage.s3.amazonaws.com/{phash}.jpg" for phash in ['4563e9eda70ae371df01fac754a9ab81'] * 64]
input_json = {
    "inputs":[
        {	
            "name": "image_url",
            "shape": [len(urls),1],
            "datatype": "BYTES",
            "data": urls
        }
    ]
}


In [186]:
res = await call_oais([input_json] * 10)

In [189]:
1e6 / 64 / 10 * 4.7 / 60 / 60 # 2 hours

2.0399305555555554

In [188]:
res = await call_oais([input_json] * 20)

In [190]:
1e6 / 64 / 20 * 7.4 / 60 / 60 # 1.6 hour

1.605902777777778

## triton http client async

In [243]:
import tritonclient.http as httpclient

In [244]:
triton_client = httpclient.InferenceServerClient(
    url="coeus-gpu-multitask-ml-dev.service.consul:8081", 
    concurrency=20
)

In [245]:
model_name = "clip_image_ensemble"

In [246]:
def test_infer(model_name,
               input0_data):
    inputs = []
    outputs = []
    inputs.append(httpclient.InferInput('image_url', input0_data.shape, "BYTES"))

    # Initialize the data
    inputs[0].set_data_from_numpy(input0_data, binary_data=False)

    outputs.append(httpclient.InferRequestedOutput('image_download_success', binary_data=True))
    outputs.append(httpclient.InferRequestedOutput('pooler_output', binary_data=True))
    async_requests = []
    for i in range(20):
        # Asynchronous inference call.
        async_requests.append(
            triton_client.async_infer(model_name=model_name,
                                      inputs=inputs,
                                      outputs=outputs))
    
    for async_request in async_requests:
        # Get the result from the initiated asynchronous inference request.
        # Note the call will block till the server responds.
        result = async_request.get_result()

        # Validate the results by comparing with precomputed values.
        # result.as_numpy('pooler_output'), result.as_numpy('image_download_success')
        # print(result.as_numpy('pooler_output').shape, result.as_numpy('image_download_success').shape)


In [None]:
res2 = test_infer(model_name, 
    np.array(['https://sweeper-production-productimage.s3.amazonaws.com/4563e9eda70ae371df01fac754a9ab81.jpg'.encode('utf-8')] * 2, 
             dtype=np.object_).reshape(-1,1))

In [214]:
test_infer(model_name, 
    np.array(['https://sweeper-production-productimage.s3.amazonaws.com/4563e9eda70ae371df01fac754a9ab81.jpg'.encode('utf-8')] * 64, 
             dtype=np.object_).reshape(-1,1))

In [215]:
1e6 / 64 / 20 * 7.2 / 60 / 60 # 1.5 hour

1.5625

## triton grpc client async (the design of this is not good)

In [3]:
import tritonclient.grpc as grpcclient

In [4]:
triton_client = grpcclient.InferenceServerClient(
    url="coeus-gpu-multitask-ml-dev.service.consul:8082", 
)

In [5]:
from functools import partial

In [7]:
def test_infer(model_name,
               input0_data):
    inputs = []
    outputs = []
    inputs.append(grpcclient.InferInput('image_url', input0_data.shape, "BYTES"))

    # Initialize the data
    inputs[0].set_data_from_numpy(input0_data)

    user_data = []
    
    def callback(user_data, result, error):
        if error:
            user_data.append(error)
        else:
            user_data.append(result)

    # list to hold the results of inference.
    

    outputs.append(grpcclient.InferRequestedOutput('image_download_success'))
    outputs.append(grpcclient.InferRequestedOutput('pooler_output'))
    for _ in range(10):
        results = triton_client.async_infer(
            model_name,
            inputs,
            callback=partial(callback, user_data),
            outputs=outputs
        )

    while (len(user_data) < 10):
        pass
    return user_data

In [8]:
res2 = test_infer(model_name, 
    np.array(['https://sweeper-production-productimage.s3.amazonaws.com/4563e9eda70ae371df01fac754a9ab81.jpg'.encode('utf-8')] * 2, 
             dtype=np.object_).reshape(-1,1))

In [10]:
res2 = test_infer(model_name, 
    np.array(['https://sweeper-production-productimage.s3.amazonaws.com/4563e9eda70ae371df01fac754a9ab81.jpg'.encode('utf-8')] * 64, dtype=np.object_).reshape(-1,1))

In [11]:
1e6 / 64 / 20 * 7.2 / 60 / 60 # 1.5 hour

1.5625

# run inference

In [22]:
df = pd.read_parquet('collection_products_041123.parquet')

In [23]:
df.head(2)

Unnamed: 0,product_id,name,merchant_id,image_hash,img_embedding,wss_tier,gmv_365d,refund_rate,average_rating,rating_count,dt
0,6087aa1a4c0529be4b5eea2b,Butterfly Girl Resin Sculpture Character Model...,5ac9993f2c49563aee00b471,4563e9eda70ae371df01fac754a9ab81,,Silver,823.19043,0.090909,5.0,7.0,2023-04-09
1,5fb80589dcb5ba391b6334f1,54Pcs/Box KPOP BTS BLACKPINK EXO NCT TXT Stray...,5f8d118fab43aa62703be176,29658f7ed2164fc030d6130d00f82031,,Gold,159.111664,0.0,4.777778,9.0,2023-04-09


In [24]:
import asyncio
import aiohttp

In [25]:
async def call_oai(session, data):
    async with session.post(
        'http://coeus-gpu-multitask-ml-stage.service.consul:8081/v2/models/clip_image_ensemble/versions/1/infer',
        json=data
    ) as response:
        res = await response.json()
    return res

async def call_oais(datas):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for data in datas:
            task = asyncio.ensure_future(call_oai(session, data))
            tasks.append(task)
        reses = await asyncio.gather(*tasks)
        return reses

urls = [f"https://sweeper-production-productimage.s3.amazonaws.com/{phash}.jpg" for phash in ['4563e9eda70ae371df01fac754a9ab81'] * 64]
input_json = {
    "inputs":[
        {	
            "name": "image_url",
            "shape": [len(urls),1],
            "datatype": "BYTES",
            "data": urls
        }
    ]
}
res = await call_oais([input_json] * 10)

In [42]:
phash = '4563e9eda70ae371df01fac754a9ab81'
res = await call_oais([
    {
        "inputs":[
        {	
            "name": "image_url",
            "shape": [1,1],
            "datatype": "BYTES",
            "data": [f"https://sweeper-production-productimage.s3.amazonaws.com/{phash}.jpg"]
        }
    ]
    }
] * 640)

In [57]:
list_df = np.array_split(df, int(1e6/639/40))

In [58]:
len(list_df), len(list_df[0]), len(list_df[-1])

(39, 25642, 25641)

In [59]:
from tqdm import tqdm

In [60]:
all_success = []
all_pids = []
all_embs = []
for df_i in tqdm(list_df):
    reses = await call_oais([
        {
            "inputs":[
                {	
                    "name": "image_url",
                    "shape": [1,1],
                    "datatype": "BYTES",
                    "data": [f"https://sweeper-production-productimage.s3.amazonaws.com/{phash}.jpg"]
                }
            ]
        } for phash in df_i.image_hash.tolist()
    ])
    res_arr = [
        {res['outputs'][ind]['name']: \
            np.array(res['outputs'][ind]['data']).reshape(res['outputs'][ind]['shape']) for ind in range(len(res['outputs']))} for res in reses
    ]
    concat_arr_success = np.vstack([i['image_download_success'] for i in res_arr])
    concat_arr_embs = np.vstack([i['pooler_output'] for i in res_arr])
    all_success.append(concat_arr_success)
    all_embs.append(concat_arr_embs)
    all_pids.append(df_i.product_id.tolist())

100%|██████████| 39/39 [2:19:31<00:00, 214.65s/it]  


In [61]:
all_success_arr = np.vstack(all_success)
all_embs_arr = np.vstack(all_embs)

In [74]:
all_pids_arr = np.array([j for i in all_pids for j in i])

In [72]:
len(all_pids_arr) == len(all_embs_arr) == len(all_success_arr)

True

In [67]:
all_success_arr_flat = all_success_arr.reshape(-1)

In [68]:
all_embs_arr.shape

(1000000, 512)

In [69]:
all_success_arr.sum()

999983

In [75]:
(all_pids_arr == np.array(df['product_id'].tolist())).all()

True

In [78]:
all_success_arr_flat

array([ True,  True,  True, ...,  True,  True,  True])

In [79]:
df_success = df[all_success_arr_flat]

In [81]:
all_embs_arr_success = all_embs_arr[all_success_arr_flat]

In [83]:
np.save('collection_products_withclip_multitask_041223.npy', 
        all_embs_arr_success)

In [84]:
df_success.to_parquet('collection_products_withclip_multitask_041223.parquet', index=False)

# upload collection

In [86]:
import s3fs
s3_file = s3fs.S3FileSystem()

In [56]:
s3_file.ls('structured-data-dev/vector-db-multitask-ml/product-collection')

[]

In [62]:
s3_file.put('collection_products_withclip_peterhull_041223.bin', 
            'structured-data-dev/vector-db-multitask-ml/product-collection/collection_products_withclip_peterhull_041223.bin')

[None]

In [63]:
s3_file.ls('structured-data-dev/vector-db-multitask-ml/product-collection')

['structured-data-dev/vector-db-multitask-ml/product-collection/collection_products_withclip_peterhull_041223.bin']

In [8]:
s3_file.put('collection_products_withclip_peterhull_top10000_041223.bin', 
    'structured-data-dev/vector-db-multitask-ml/product-collection/collection_products_withclip_peterhull_top10000_041223.bin')

[None]

In [9]:
s3_file.ls('structured-data-dev/vector-db-multitask-ml/product-collection')

['structured-data-dev/vector-db-multitask-ml/product-collection/collection_products_withclip_peterhull_041223.bin',
 'structured-data-dev/vector-db-multitask-ml/product-collection/collection_products_withclip_peterhull_top10000_041223.bin']

In [87]:
s3_file.put('collection_products_withclip_multitask_041223.parquet', 
    'structured-data-dev/vector-db-multitask-ml/product-collection/collection_products_withclip_multitask_041223.parquet')
s3_file.put('collection_products_withclip_multitask_041223.npy', 
    'structured-data-dev/vector-db-multitask-ml/product-collection/collection_products_withclip_multitask_041223.npy')

2023-04-13 01:59:16,242 INFO: Found credentials in environment variables.


[None]

In [88]:
s3_file.ls('structured-data-dev/vector-db-multitask-ml/product-collection')

['structured-data-dev/vector-db-multitask-ml/product-collection/collection_products_withclip_multitask_041223.npy',
 'structured-data-dev/vector-db-multitask-ml/product-collection/collection_products_withclip_multitask_041223.parquet',
 'structured-data-dev/vector-db-multitask-ml/product-collection/collection_products_withclip_peterhull_041223.bin',
 'structured-data-dev/vector-db-multitask-ml/product-collection/collection_products_withclip_peterhull_top10000_041223.bin']