In [1]:
#Automation framework https://docs.google.com/document/d/1j3sWxlI6XrtDu0Mfe67NY8MgXzydoYHU-fSaFrUfsbA/edit
import pandas as pd
import numpy as np
import snowflake.connector
from collections import defaultdict
import json
import re
import requests

In [2]:
#login to snowflake db
con = snowflake.connector.connect(user='vishal.kumar@scale.com',
                                 account='pxa65918',
                                 authenticator='externalbrowser',
                                 warehouse='COMPUTE_WH',
                                 database='SCALE_CRAWLER',
                                 role='GENERAL_RO')

cs = con.cursor()

Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...


In [57]:
cid = input("Enter catalog id:\n")
#2765771213705208 hampdenclothing
domain = input("Enter domain:\n")

Enter catalog id:
309405287009731
Enter domain:
liamandcompany


In [58]:
#Get current match rates by catalog
sql = f'''
 select catalog_id,
         site_url,
         sum(VARIANT_COUNT) as order_v,
         sum(case when state in ('unmatched_no_data_found') then VARIANT_COUNT else null end) ndf_v,
         sum(case when state in ('matched_submitted','matched_unsubmitted') then VARIANT_COUNT else null end ) as matched_v,
         matched_v/(0.0001+order_v) as match_rate,
       matched_v/(0.0001+order_v-ndf_v) as match_rate_exc_ndf
  from scale_prod.view.CATALOG_STATS_LATEST_MATCHING
  where 1=1
  and source_input='sent_by_customer'
  and catalog_id='{cid}'
  group by 1,2
  
'''
cs.execute(sql)
mrdf = cs.fetch_pandas_all()

In [59]:
mr = mrdf['MATCH_RATE']
print(mr)

0    0.037371
Name: MATCH_RATE, dtype: object


In [60]:
#Check if matching strategy exists
sql = f'''
with basic_info as (
select
CATALOG_ID,
BRANDS,
UPDATED_AT last_catalog_update,
tags
from
PUBLIC.CUSTOMERCATALOGS
where
customer = 'flamingo' and CATALOG_ID = '{cid}'
),

strat_info as(
select
DOMAIN,
UPDATED_AT last_strat_date,
f.value: catalogId catalog_id,
CONCAT(f.value: customerFields,'-', f.value:siteFields) strat
from PUBLIC.HUMANJOBS,
  lateral flatten(input => APPLY_CHANGES_INTERMEDIATE) f
where MATCH_FIELD like '%normalized_product_matching%'
and MATCH_VALUE like '%flamingo'
and status = 'manually_labelled' 
or  status = 'manually_labeled'
),

seller_config as (
select domain, 
configs:enableProductMatchingTask PMEnable 
from 
sellers
)

select
distinct (basic_info.CATALOG_ID) CATALOG_ID ,
basic_info.BRANDS,
seller_config.PMEnable,
strat,
last_strat_date,
last_catalog_update,
basic_info.tags
from strat_info
join basic_info on strat_info.catalog_id=basic_info.CATALOG_ID
join seller_config on basic_info.BRANDS like CONCAT('%',seller_config.domain,'%')
'''
cs.execute(sql)
msdf = cs.fetch_pandas_all()
msdf['LAST_STRAT_DATE'] = pd.to_datetime(msdf['LAST_STRAT_DATE']).dt.date

In [61]:
#find matching strategy and last applied date
msdt = msdf['LAST_STRAT_DATE']
ms = msdf['STRAT']
print("Product matching strategy applied on: \n",msdt.to_string())
print("Strategy: \n", ms.to_string())
msstatus = (msdf['PMENABLE'].eq('"true"')).any()
#msdf
print(msstatus)

Product matching strategy applied on: 
 0    2022-08-28
1    2022-08-28
Strategy: 
 0    ["name","retailer_item_id"]-["title","id"]
1    ["name","retailer_item_id"]-["title","id"]
True


In [62]:
#Get human matching jobs status
sql = f'''
with brands as (
  select
    catalog_id :: string catalog_id,
    f.value brand,
    INTERNAL_PRIORITY
  from
    scale_crawler.public.CUSTOMERCATALOGS,
    lateral flatten(input => brands) f -- where (tags like '%batch-2022%'
    --or tags like '%top20%' or catalog_id in ('550463902135666', '565074204068341', '191727785496694', '2195881547356927', '810816272744510'))
    --   where brand in ('www.arula.com','www.aveda.com','www.yandy.com','www.francosarto.com','www.roxyaustralia.com.au','www.itcosmetics.com','degsandsal.com','www.jomalone.com','shop.oliversapparel.com','beautybio.com','www.deckofscarlet.com','naturelab.com','www.rarebeauty.com','www.petitenpretty.com','www.aphrodites.com','www.toofaced.com','mignonnegavigan.com','khaite.com','www.philosophy.com','6dollarshirts.com','oliversapparel.com','us.burberry.com','www.aliceandolivia.com','www.bershka.com','www.bl101.com','www.cutmaps.com','www.driduck.com','www.hollisterco.com')
  where
    customer = 'flamingo'
    AND catalog_id='{cid}'

),
seller_config as (
  select
    domain,
    configs :enableProductMatchingTask flag
  from
    sellers
),
hj_progress as (
  select
    hjs.task_payload :catalogId catalog_id,
    domain,
    count(*) n_hj,
    count_if(hjs.status = 'labeled') n_hj_completed,
    count_if(hjs.status = 'labeled') / count(*) frac
  from
    PRODUCTVARIANTS,
    lateral flatten(input => MATCH_IDENTIFIERS) f
    join HUMANJOBS hjs on f.value :matchField = hjs.match_field
    and f.value :matchValue = hjs.match_value
  where
    f.value :matchField = 'normalized_product_matching'
    and hjs.task_payload:customer = 'flamingo'
  group by
    1,
    2
),
strat_progress as (
  select
    CASE
      when (
        status = 'manually_labelled'
        or status = 'manually_labeled'
      ) then 1
      else 0
    end frac,
    domain,
    TASK_PAYLOAD :catalogId catalog_id
  from
    PUBLIC.HUMANJOBS
  where
    MATCH_FIELD = 'normalized_product_matching_strategy'
    and task_payload:customer = 'flamingo'
),
pvs_with_predictions as (
  select
    brand,
    pvid,
    f.value :catalogId pvCatalogId,
    GET(f.value :productIds, 0) pvProductId,
    CASE
      when MATCH_IDENTIFIERS not like '%clusterS3Artifact%'
      and f.value :considerForMatching = true
      and pvProductId is not null then 1
      else 0
    end rule,
    CASE
      when MATCH_IDENTIFIERS like '%clusterS3Artifact%'
      and f.value :considerForMatching = true then 1
      else 0
    end hj,
    CASE
      when MATCH_IDENTIFIERS like '%product_matching_strategy%' then 1
      else 0
    end strategy_job,
    CASE
      when f.value :considerForMatching = true then 1
      else 0
    end consider_match,
    CASE
      when f.value :considerForMatching = false then 1
      else 0
    end ignore_match
  from
    PUBLIC.PRODUCTVARIANTS pvs,
    lateral flatten(input => FINAL_PRODUCT_MATCHES, OUTER => TRUE) f
  where
    status != 'cancelled'
    and f.value:customer = 'flamingo'
),
cis_per_catalog as (
  select
    count(*) ncis,
    count_if(batch like '%priority-highimp-2022%') ncis_pri,
    catalog_id
  from
    PUBLIC.CUSTOMERINPUTS

  where customer = 'flamingo'
  group by
    3
)
select
  REPLACE(brands.brand, '"', '') _brand,
  brands.catalog_id,
  brands.INTERNAL_PRIORITY,
  seller_config.flag,
  SUM(strategy_job) n_strategy_job,
  max(strat_progress.frac) strat_completion_frac,
  SUM(rule) n_rule,
  COALESCE(max(hj_progress.n_hj), 0) n_hj,
  COALESCE(max(hj_progress.n_hj_completed), 0) n_hj_completed,
  COALESCE(max(hj_progress.n_hj), 0) - COALESCE(max(hj_progress.n_hj_completed), 0) n_hj_pending,
  COALESCE(max(hj_progress.frac), 1) hj_completion_frac,
  SUM(consider_match) n_consider_match,
  SUM(ignore_match) n_ignore_match,
  count_if(pvProductId is not null) npvs_matched,
  count(*) npvs,
  npvs_matched / npvs frac,
  npvs_matched / max(ncis) cis_frac,
  (COALESCE(max(hj_progress.n_hj), 0) + n_rule) / (n_consider_match + 1) tot_comp_frac,
  max(ncis) ncis,
  max(ncis_pri) ncis_pri,
  CASE
    when n_strategy_job != npvs then 'bfill_error'
    when (n_strategy_job = npvs)
    and (strat_completion_frac != 1) then 'pending_strategy'
    when (n_strategy_job = npvs)
    and (strat_completion_frac = 1)
    and (hj_completion_frac < 1) then 'pending_matching_job'
    when (n_strategy_job = npvs)
    and (strat_completion_frac = 1)
    and (hj_completion_frac >= 1) then 'completed'
  end matching_status
from
  pvs_with_predictions
  join brands on pvs_with_predictions.brand = brands.brand
  left join cis_per_catalog on brands.catalog_id = cis_per_catalog.catalog_id
  left join seller_config on seller_config.domain = brands.brand
  left join hj_progress on brands.brand = hj_progress.domain
  and brands.catalog_id = hj_progress.catalog_id
  left join strat_progress on brands.brand = strat_progress.domain
  and brands.catalog_id = strat_progress.catalog_id
where
  (
    brands.CATALOG_ID = pvCatalogId
    or pvCatalogId is null
  )
group by
  1,
  2,
  3,
  4
'''
cs.execute(sql)
hjdf = cs.fetch_pandas_all()

In [63]:
nhj = hjdf['N_HJ']
nhjc = hjdf['N_HJ_COMPLETED']
nhjp = hjdf['N_HJ_PENDING']
mst = hjdf['MATCHING_STATUS']
hjstatus = nhjc/nhj
print("Number of human jobs created:",nhj.to_string())
print("Number of human jobs completed:",nhjc.to_string())
print("Number of human jobs pending:",nhjp.to_string())
print("HUMAN JOBS COMPLETION %:",((hjstatus)*100).to_string(),"%")
print(mst.to_string())

Number of human jobs created: 0    1052
Number of human jobs completed: 0    670
Number of human jobs pending: 0    382
HUMAN JOBS COMPLETION %: 0    63.688213 %
0    pending_matching_job


In [64]:
#Get list of crawled product variants
sql = f'''
select 
distinct (PVID) as PVID,
brand,
_ID,
unique_id,
scraped_attributes :sku sku,
scraped_attributes :item_group_id item_group_id,
scraped_attributes :link link,
scraped_attributes :title title,
scraped_attributes :display_color display_color,
scraped_attributes :size size,
scraped_attributes :options options,
trim (g.value: productIds:: string ,'[]""') AS MATCHED_CI_ID,
--trim (FINAL_PRODUCT_MATCHES: productIds:: string ,'[]""') AS MATCHED_CI_ID,
LAST_CRAWLED_DATE,
status,
FINAL_PRODUCT_MATCHES
from
  PUBLIC.PRODUCTVARIANTS,
  lateral flatten(input => FINAL_PRODUCT_MATCHES) g
where
   brand like '%{domain}%'
--and status != 'cancelled'
'''
cs.execute(sql)
pvdf = cs.fetch_pandas_all()

In [65]:
#Get list of Customer Inputs
sql = f'''
with state as (
  select
    _id,
    CATALOG_ID,
    state
  from
    scale_prod.view.variant_stats_latest_matching
  where
    source_input = 'sent_by_customer'
    and state in (
      'unmatched_no_data_found',
      'unmatched_matching_issue',
      'unmatched_other_reasons'
    )
    and catalog_id in ('{cid}')
)    
select
  ci.raw_site_url,
  state.catalog_id,
  ci.customer,
  state.state,
  ci._id,
  product_id,
  attribute_data:gtin gtin,
  attribute_data:mfr_part_no mfr_part_no,
  attribute_data:retailer_item_id retailer_item_id, 
  raw_url link,
  attribute_data:name name,
  attribute_data:color :: string color,
  attribute_data:size :: string size,
  attribute_data:gender gender,
  attribute_data:subvertical_attributes sab,
  attribute_data:custom_data :: string cd,
  ci.updated_at

  
from
  state
  join PUBLIC.CUSTOMERINPUTS ci on state.catalog_id = ci.catalog_id
  --join PUBLIC.SITECRAWLS sc on ci.raw_site_url = sc.SITE_URL
  and ci._id = state._id
  and ci.customer = 'flamingo'
--and ci.tags like '%priority-highimp-2022%'
order by catalog_id,name,color,size
'''
cs.execute(sql)
cidf = cs.fetch_pandas_all()

In [66]:
npv = len(pvdf)
nci = len(cidf)
mmr = npv/nci

if npv >= nci:
    mmr = 1
print("Total CIs =",nci,"\n","Total PVs =",npv,"\n","Best case scenario Match Rate =",(mmr*100),"%")

Total CIs = 27536 
 Total PVs = 25430 
 Best case scenario Match Rate = 92.35183033120279 %


In [67]:
#get customer inputs ingestion status
sql = f'''
with cf as (
select 
    CAST(CREATED_AT as date) cdate,
    catalog_id,
    product_id,
    ingestion_status,
    URL_REPAIR_STATUS,
    PRODUCT_MATCH_STRATEGY_KEY
from public.customerinputs
WHERE catalog_id='{cid}'

)
select
cf.cdate,
count(product_id) from cf
group by 1
order by 2 DESC
'''
cs.execute(sql)
cidates = cs.fetch_pandas_all()


In [68]:
cidates

Unnamed: 0,CDATE,COUNT(PRODUCT_ID)
0,2021-09-25,27977
1,2022-06-24,4538
2,2022-07-28,3268
3,2021-09-28,1085
4,2021-10-20,956
...,...,...
125,2022-06-10,2
126,2022-06-19,2
127,2022-05-07,1
128,2022-08-01,1


In [69]:
#get count of unrepaired URLs
sql = f'''
select COUNT(_ID) from PUBLIC.CUSTOMERINPUTS
WHERE catalog_id='{cid}' AND URL_REPAIR_STATUS='Unrepaired'
'''
cs.execute(sql)
urlstatus = cs.fetch_pandas_all()

In [70]:
#get CI ingestions status
sql = f'''
select INGESTION_STATUS, COUNT(_ID) from PUBLIC.CUSTOMERINPUTS WHERE customer='flamingo' and catalog_id='2765771213705208' and INGESTION_STATUS !='Processed'
group by 1
'''
cs.execute(sql)
ciingest = cs.fetch_pandas_all()

In [71]:
#get extractor status
sql = f'''
Select approval_status, CAST(CREATED_AT as date) cdate, CAST(UPDATED_AT as date) udate, COUNT(name)  from PUBLIC.EXTRACTORS 
WHERE NAME LIKE '%{domain}%'
group by APPROVAL_STATUS, cdate, udate
'''
cs.execute(sql)
extdf = cs.fetch_pandas_all()
exstatus = (extdf['APPROVAL_STATUS'].eq('manual_approved')).any()
#print(exstatus)

In [72]:
#check CI and PV quality
#clean datasets of unnecessary characters, get variant number from url into a new column, extract duplicate customer inputs
cidf['RETAILER_ITEM_ID'] = cidf['RETAILER_ITEM_ID'].map(lambda x: x.lstrip('"').rstrip('"'), na_action='ignore')
cidf['LINK'] = cidf['LINK'].map(lambda x: x.lstrip('"').rstrip('"'), na_action='ignore')
cidf['NAME'] = cidf['NAME'].map(lambda x: x.lstrip('"').rstrip('"'), na_action='ignore')
cidf['GENDER'] = cidf['GENDER'].map(lambda x: x.lstrip('"').rstrip('"'), na_action='ignore')
cidf['PRODUCT_ID'] = cidf['PRODUCT_ID'].map(lambda x: x.lstrip('"').rstrip('"'), na_action='ignore')
cidf['VARIANTNUM'] = cidf['LINK'].str.slice(-14,)

pvdf['SKU'] = pvdf['SKU'].map(lambda x: x.lstrip('"').rstrip('"'), na_action='ignore')
pvdf['ITEM_GROUP_ID'] = pvdf['ITEM_GROUP_ID'].map(lambda x: x.lstrip('"').rstrip('"'), na_action='ignore')
pvdf['LINK'] = pvdf['LINK'].map(lambda x: x.lstrip('"').rstrip('"'), na_action='ignore')
pvdf['TITLE'] = pvdf['TITLE'].map(lambda x: x.lstrip('"').rstrip('"'), na_action='ignore')
pvdf['DISPLAY_COLOR'] = pvdf['DISPLAY_COLOR'].map(lambda x: x.lstrip('"').rstrip('"'), na_action='ignore')
pvdf['SIZE'] = pvdf['SIZE'].map(lambda x: x.lstrip('"').rstrip('"'), na_action='ignore')
pvdf['VARIANTNUM'] = pvdf['LINK'].str.slice(-14,)

#extract duplicate CIs in a new dataframe
duplicatecilink = cidf[cidf.duplicated(['LINK'])]
duplicatecivariantnum = cidf[cidf.duplicated(['VARIANTNUM'])]
cidf['NAMESIZE'] =  cidf['NAME'] + cidf['SIZE']
duplicateciattributes = cidf[cidf.duplicated(['NAMESIZE'])]
#duplicateciattributes.to_csv('duplicateci.csv')

duplinks = len(duplicatecilink.index)
dupvars = len(duplicatecivariantnum.index)
dupatt = len(duplicateciattributes.index)
#print("Duplicate links =", duplinks,"| Duplicate Variant Numbers =", dupvars,"| Duplicate Attributes =", dupatt)

cidf.drop_duplicates(subset=['VARIANTNUM'], keep="first", inplace=True)
dedupcicount = len(cidf.index)

print("Total CIs =", nci)
print("Dedup CIs to match =", dedupcicount)

Total CIs = 27536
Dedup CIs to match = 27517


In [73]:
#All_information
print("Current match rate=",mr.to_string())
print("Total CIs =",nci,"\n","Total PVs =",npv,"\n","Best case scenario Match Rate =",(mmr*100),"%")
print("Product matching strategy applied on: \n",msdt.to_string())
print("Strategy: \n", ms.to_string())
print("Number of human jobs created:",nhj.to_string())
print("Number of human jobs completed:",nhjc.to_string())
print("Number of human jobs pending:",nhjp.to_string())
print("HUMAN JOBS COMPLETION %:",((hjstatus)*100).to_string(),"%")
print("Matching status",mst.to_string())
print("CI's received on",len(cidates),"days")

#main logical actions block
print("\n The following actions are recommended for",domain,":\n")

#maximum_match_rate
if mmr < 1:
    print("- ACTION: Total product variants found are less than customer inputs available. Potential NDF and Out of Stocks")
else:
    print("- Sufficient product variants",npv,"extracted to match with",nci,"customer inputs.")

#customer_inputs_std_dev_across_dates
cistd = cidates['COUNT(PRODUCT_ID)'].std()
if cistd/nci > 0.2:
    print("- ACTION: Staggered customer inputs received. May need merging")
    
#extractor_status
if exstatus == True:
    print("- Extractor looks good")
else:
    print("- ACTION: Extractor does not exist. Please create extractor")
    
#matching_strategy_status
if msstatus == True:
    print("- Matching strategy exists")
else:
    print("- ACTION: Matching strategy needs to be created")

#check_human_jobs_completion_status
if hjstatus.iloc[0] < 0.95:
    print("- ACTION: ",nhjp[0],"human jobs are pending. Complete human jobs.")
else:
    print("- Low pending human jobs for matching")

#duplicate_customer_inputs
#print("- Duplicate links =", duplinks,"| Duplicate Variant Numbers =", dupvars,"| Duplicate Attributes =", dupatt)
print("-",nci-dedupcicount,"duplicate customer inputs cannot be matched")

Current match rate= 0    0.037371
Total CIs = 27536 
 Total PVs = 25430 
 Best case scenario Match Rate = 92.35183033120279 %
Product matching strategy applied on: 
 0    2022-08-28
1    2022-08-28
Strategy: 
 0    ["name","retailer_item_id"]-["title","id"]
1    ["name","retailer_item_id"]-["title","id"]
Number of human jobs created: 0    1052
Number of human jobs completed: 0    670
Number of human jobs pending: 0    382
HUMAN JOBS COMPLETION %: 0    63.688213 %
Matching status 0    pending_matching_job
CI's received on 130 days

 The following actions are recommended for liamandcompany :

- ACTION: Total product variants found are less than customer inputs available. Potential NDF and Out of Stocks
- Extractor looks good
- Matching strategy exists
- ACTION:  382 human jobs are pending. Complete human jobs.
- 19 duplicate customer inputs cannot be matched


In [74]:
#cidf.to_csv('tripletheta.csv')

In [75]:
#apply matching strategy 'urlvariantnumbermatching', match by variant id present in LINK in CI and PVs

#def matchbyurlvariantnumber ():
mergedf = cidf.merge(pvdf,left_on=['VARIANTNUM'],right_on=['VARIANTNUM'],how='left')
mergedf = mergedf.reindex(sorted(mergedf.columns), axis=1)
mergedf = mergedf [['NAME','TITLE','VARIANTNUM', 'COLOR', 'DISPLAY_COLOR', 'SIZE_x', 'SIZE_y', 'LINK_x', 'LINK_y', 'MFR_PART_NO', 'SKU', 'PRODUCT_ID', 'GTIN', 'ITEM_GROUP_ID', 'MATCHED_CI_ID', 'PVID','BRAND', 'CATALOG_ID', 'CD', 'CUSTOMER', 'FINAL_PRODUCT_MATCHES', 'GENDER', 'LAST_CRAWLED_DATE', 'OPTIONS', 'RAW_SITE_URL', 'RETAILER_ITEM_ID', 'SAB', 'STATE', 'STATUS', 'UNIQUE_ID', 'UPDATED_AT', '_ID_x', '_ID_y']]
unmatchcount = mergedf['TITLE'].isna().sum()
unmatcheddf = mergedf[mergedf['TITLE'].isna()]
unmatcheddfc = len(unmatcheddf.index)
matchrate_urlvariantnum = (1-((unmatchcount)/(nci)))

print("Duplicate CIs=", dupvars)
print("Total CIs=", nci)
print("Dedup CIs to match=", dedupcicount)
print("CIs left unmatched=", unmatchcount)
print("Number of unmatched CIs =",unmatchcount,"| Strategy Match =",matchrate_urlvariantnum)

#df[df[2].isna()]

#unmatcheddf
#unmatcheddfc
#list(mergedf.columns)
mergedf.to_csv('teeforthesoulmatched.csv')

Duplicate CIs= 19
Total CIs= 27536
Dedup CIs to match= 27517
CIs left unmatched= 22945
Number of unmatched CIs = 22945 | Strategy Match = 0.1667271934921557
