# Elastic search

In [1]:
from elasticsearch import Elasticsearch
import pandas as pd
import math
import codecs
import csv


In [2]:
# build index
INDEX_NAME="ecom2019"
QUERY_TO_CATEGORY_INDEX_NAME="ecom2019_domcat"
TYPE_NAME="listing"
ID_FIELD="id"
FILES_ROOT_PATH = 'C:/Users/cpieterse/OneDrive - eBay Inc/High accuracy recall/'
MAX_SIZE_TO_INDEX = 10000000
BULK_SIZE = 1000
DO_REINDEX = True # Should we reinsert all documents
ES_VERSION = 6

In [3]:
from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords
from nltk.stem.porter import *
from nltk.stem.snowball import SnowballStemmer
import unidecode

STOPWORDS = set(stopwords.words("english"))
STEMMER = SnowballStemmer("english")
TOKENIZER = RegexpTokenizer('[a-zA-Z0-9]+[a-zA-Z0-9-/]?[a-zA-Z0-9]+|[\d\.]+|[a-z.]+')


def remove_periods_in_acronyms(x):
    """Remove the . in acronyms like d.c. or u.s.a -> dc / usa """
    if re.match('^([a-z][.])+[a-z]?$', x):    
        return x.replace('.', '')
    return x

def remove_hyphens_in_words(x):
    """Remove the hyphen in the middel of a word, like ka-zar -> kazar, x-men -> xmen"""
    if re.match('^[a-z]+[-][a-z]+$', x):    
        return x.replace('-', '')
    return x

    
def tokenize_and_strip_accents(x, do_stemming=True):
    """
    Tokenize the sentence and remove stopwords. 
    Optionally: apply phrase detection with the supplied model
    """
    try:
        norm_string = ' ' + unidecode.unidecode(x).lower() + ' '
    except AttributeError as e:
        norm_string = ' ' + str(x) + ' '
        
    
    #tokenizer = TreebankWordTokenizer()
    words = TOKENIZER.tokenize(norm_string)
    filtered_words = [word for word in words if word not in STOPWORDS]
    if do_stemming:
        filtered_words = [STEMMER.stem(word) for word in filtered_words]
        
    filtered_words = [remove_periods_in_acronyms(word) for word in filtered_words]
    filtered_words = [remove_hyphens_in_words(word) for word in filtered_words]
    return " ".join(filtered_words)

tokenize_and_strip_accents("a.b. test mijn toke-nizer a-mmo")

'ab test mijn token ammo'

In [4]:
tokenize_and_strip_accents("5.1,52,53 hand cut")

'5.1 52 53 hand cut'

In [5]:
def trim_path_to_L(path, n):
    split_path = path.split(' > ')
    return ' > '.join(split_path[:n])
    
def dynamic_split_path(path):
    split_path = path.split(' > ')
    n = math.floor(len(split_path)/2)+1
    return ' > '.join(split_path[:n])

def L_minus1_split_path(path):
    split_path = path.split(' > ')
    n = len(split_path)-1
    return ' > '.join(split_path[:n])

print(trim_path_to_L("a > b > c > d", 1), dynamic_split_path("a > b > c > d"), L_minus1_split_path("a > b > c"), " > ".join("a > b".split(' > ')[:1]))

a a > b > c a > b a


## Create indexes

In [6]:
if not DO_REINDEX:
    raise ValueError("Please skip the index creation part")

In [7]:
es = Elasticsearch()

In [8]:
import os
os.getcwd()
os.path.join(os.getcwd(), 'a')

'C:\\Users\\cpieterse\\Documents\\GitHub\\ecom2019\\a'

### Index the ads

In [9]:
listing_properties = {
       "properties": {
            "title": {
                "type": "text",
                "analyzer": "my_analyzer",
                "fields": {
                    "title_shingles": {
                        "type": "text",
                        "analyzer": "my_shingle_analyzer",
                        "search_analyzer": "my_stemmer_analyzer"
                    }, 
                    "title_stems": {
                        "type": "text",
                        "analyzer": "my_stemmer_analyzer",
                        "search_analyzer": "my_stemmer_analyzer"
                    },
                    "title_tokens": {
                        "type": "text",
                        "analyzer": "my_analyzer",
                        "search_analyzer": "my_analyzer"
                    }

                }
            }, 
            "title_customized": {
                "type": "text",
                "analyzer": "my_SB_stemmer_analyzer",
                "search_analyzer": "my_SB_stemmer_analyzer",
                "fields": {
                    "stems":{
                        "type": "text",
                        "analyzer": "my_SB_stemmer_analyzer",
                        "search_analyzer": "my_SB_stemmer_analyzer"                        
                    }
                }

            },
            "category": {
                "type": "keyword",
            },
#            "category_L1": {
#                "type": "keyword",
#            },
#            "category_L2": {
#                "type": "keyword",
#            },
#            "category_L3": {
#                "type": "keyword",
#            },
#            "category_L4": {
#                "type": "keyword",
#            },
#            "category_L5": {
#                "type": "keyword",
#            },
#            "category_L-1": {
#                "type": "keyword",
#            },
#            "category_L~": {
#                "type": "keyword",
#            },
            "title_category": {
                "type": "text",
                "fields": {
                    "shingles": {
                        "type": "text",
                        "analyzer": "my_shingle_analyzer",
                        "search_analyzer": "my_stemmer_analyzer"
                    },
                    "stems": {
                        "type": "text",
                        "analyzer": "my_stemmer_analyzer",
                        "search_analyzer": "my_stemmer_analyzer"
                    }

                }
            }
        }
    }

if ES_VERSION < 7:
    listing_properties = {TYPE_NAME: listing_properties}

mapping = {
    "settings": {
        "index": {
          "similarity": {
            "default": {
              "type": "BM25",
            }
          }
        },
        "analysis": {
            "filter": {
                "my_shingle_filter": {
                    "type": "shingle",
                    "output_unigrams": False
                },
                "synonym_filter" : {
                        "type" : "synonym",
                        "lenient": True,
                        #"synonyms" : ["ka-zar,kazar"]
                        "synonyms_path" : "analysis/synonyms.txt"
                },
                "snowball_english" : {
                    "type" : "snowball",
                    "language" : "English"
                }
            },
            "analyzer": {
                "my_shingle_analyzer": {
                    "type":	"custom",
                    "tokenizer": "standard",
                    "filter": [
                        "lowercase",
                        "synonym_filter",
                        "my_shingle_filter",
                        "porter_stem"
                    ]
                },
                "my_stemmer_analyzer": {
                    "type":	"custom",
                    "tokenizer": "standard",
                    "filter": [
                        "lowercase",
                        "synonym_filter",
                        "porter_stem"
                    ]
                },
                "my_SB_stemmer_analyzer": {
                    "type":	"custom",
                    "tokenizer": "standard",
                    "filter": [
                        "lowercase",
                        "synonym_filter",
                        "snowball_english"
                    ]
                },
                "my_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": [
                        "lowercase",
                        "synonym_filter"
                    ]
                }
            }
        }
    },
    "mappings": listing_properties
}

In [10]:
if es.indices.exists(INDEX_NAME):
    print("deleting '%s' index..." % (INDEX_NAME))
    res = es.indices.delete(index = INDEX_NAME)
    
res = es.indices.create(index=INDEX_NAME, body=mapping)
print(" response: '%s'" % (res))

deleting 'ecom2019' index...
 response: '{'acknowledged': True, 'shards_acknowledged': True, 'index': 'ecom2019'}'


In [11]:
# read csv file as unicode
from elasticsearch import NotFoundError
from time import sleep

dumpfile=codecs.open(FILES_ROOT_PATH+"documents.tsv","r",encoding='utf=8')
reader=csv.reader(dumpfile,delimiter='\t')
next(reader, None)  # skip the headers
BULK_SIZE = 10000
# index in batches
bulk_data = [] 
i=1
listings_failed_to_index=open(FILES_ROOT_PATH+"listings_failed_to_index.txt","w")
for line in reader:
    try:
        es.get(index=INDEX_NAME
               , doc_type=TYPE_NAME if ES_VERSION<7 else None
               , id=line[0])
        if i % 1000 == 0: 
            print("Skipped until", i)
        i=i+1
        continue
    except NotFoundError as e:
        pass
    
    data_dict = {}
    data_dict['id']=line[0]
    data_dict['title']=line[2]
    data_dict['title_customized']=tokenize_and_strip_accents(line[2], do_stemming=False)
    data_dict['category']=line[3].lower()
#    data_dict['category_L1']=trim_path_to_L(line[3].lower(), 1)
#    data_dict['category_L2']=trim_path_to_L(line[3].lower(), 2)
#    data_dict['category_L3']=trim_path_to_L(line[3].lower(), 3)
#    data_dict['category_L4']=trim_path_to_L(line[3].lower(), 4)
#    data_dict['category_L5']=trim_path_to_L(line[3].lower(), 5)
#    data_dict['category_L-1']=L_minus1_split_path(line[3].lower())
#    data_dict['category_L~']=dynamic_split_path(line[3].lower())
    data_dict['title_category'] = data_dict['title'] + ' ' + data_dict['category']

    op_dict = {
        "index": {
            "_index": INDEX_NAME, 
            "_id": line[0]
            # , "_source": data_dict
        }
    }
    if ES_VERSION < 7:
        op_dict["index"].update({"_type": TYPE_NAME})
        
    #es.index(index=INDEX_NAME, id=line[0], body=data_dict)

    # op_dict.update(data_dict)
    bulk_data.append(op_dict)
    bulk_data.append(data_dict)
        
    if i%BULK_SIZE==0:
        print("bulk index "+str(i))
        try:
            res = es.bulk(index = INDEX_NAME, body = bulk_data, refresh = True)
            # res = 1
        except Exception as e:
            print(e)
            listings_failed_to_index.write(line[0]+"\n")
            i -= BULK_SIZE
            sleep(2)
                
        bulk_data = []
    
    if i >= MAX_SIZE_TO_INDEX:
        break        
            
    i=i+1


if len(bulk_data)>0:
    print("bulk index "+str(i))
    res = es.bulk(index = INDEX_NAME, body = bulk_data, refresh = True)

dumpfile.close()
listings_failed_to_index.close()


bulk index 10000
bulk index 20000
bulk index 30000
bulk index 40000
ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host='localhost', port=9200): Read timed out. (read timeout=10))
bulk index 40000
bulk index 50000
bulk index 60000
bulk index 70000
bulk index 80000
bulk index 90000
bulk index 100000
bulk index 110000
bulk index 120000
bulk index 130000
bulk index 140000
bulk index 150000
bulk index 160000
bulk index 170000
bulk index 180000
bulk index 190000
bulk index 200000
bulk index 210000
bulk index 220000
bulk index 230000
bulk index 240000
bulk index 250000
bulk index 260000
bulk index 270000
bulk index 280000
bulk index 290000
bulk index 300000
bulk index 310000
bulk index 320000
bulk index 330000
bulk index 340000
bulk index 350000
bulk index 360000
bulk index 370000
bulk index 380000
bulk index 390000
bulk index 400000
bulk index 410000
bulk index 420000
bulk index 430000
bulk index 440000
bulk index 450000
bulk index 460000
bulk index 470000
bulk index 4800

In [12]:
id = 1733264
try:
    if ES_VERSION < 7:
        print(es.get(index=INDEX_NAME, doc_type=TYPE_NAME, id=id))
    else:
        print(es.get(index=INDEX_NAME, id=id))
except NotFoundError as e:
    print(e)

{'_index': 'ecom2019', '_type': 'listing', '_id': '1733264', '_version': 1, '_seq_no': 146369, '_primary_term': 1, 'found': True, '_source': {'id': '1733264', 'title': 'Comic  KAZAR #1', 'title_customized': 'comic kazar 1', 'category': ' collectibles > comics > collections', 'title_category': 'Comic  KAZAR #1  collectibles > comics > collections'}}


### Index the DomCat mappings

In [13]:
mapping = {
    "settings": {
        "index": {
          "similarity": {
            "default": {
              "type": "BM25",
            }
          }
        },
        "analysis": {
            "filter": {
                "my_shingle_filter": {
                    "type": "shingle",
                    "output_unigrams": False
                },
            },
            "analyzer": {
                "my_shingle_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": [
                        "lowercase",
                        "my_shingle_filter",
                    ]
                }
            }
        }
    },
    "mappings": {
       "properties": {
            "query": {
                "type": "text",
                "fields": {
                    "shingles": {
                        "type": "text",
                        "analyzer": "my_shingle_analyzer"
                    }
                }
            },
            "category_level": {
                "type": "keyword",
            },
            "category": {
                "type": "keyword",
            },
           "score": {
               "type": "float"
           }
        }
    }
}


In [14]:
if es.indices.exists(QUERY_TO_CATEGORY_INDEX_NAME):
    print("deleting '%s' index..." % (QUERY_TO_CATEGORY_INDEX_NAME))
    res = es.indices.delete(index = QUERY_TO_CATEGORY_INDEX_NAME)
    
res = es.indices.create(index=QUERY_TO_CATEGORY_INDEX_NAME, body=mapping)
print(" response: '%s'" % (res))

RequestError: RequestError(400, 'mapper_parsing_exception', 'Root mapping definition has unsupported parameters:  [category_level : {type=keyword}] [score : {type=float}] [query : {type=text, fields={shingles={analyzer=my_shingle_analyzer, type=text}}}] [category : {type=keyword}]')

In [None]:
def insert_in_es(index_name, doc_id, **kwargs):   
    es.index(index=index_name, id=doc_id, body=kwargs)
    for k,v in kwargs.items(): 
        print(k,v)

doc_id = 0
#for level in ['breadcrumb', 'L1', 'L2', 'L3', 'L4', 'L5', 'L-1', 'L~']:
for level in ['L1']:
    df = pd.read_csv(FILES_ROOT_PATH + 'queries_with_DomCat_10_' + level.replace('~', '_') + '.tsv', sep='\t')
    for i, r in df.iterrows():
        print(i,doc_id = doc_id, query=r.query, level=level, category=)
#insert_in_es(QUERY_TO_CATEGORY_INDEX_NAME, queryid=5)

In [None]:
r[2]

## Test some queries

In [None]:
es = Elasticsearch()

In [None]:
for (field_to_use, analyzer) in {"title.title_shingles":"my_shingle_analyzer"
                                 , "title.title_stems":"my_stemmer_analyzer"
                                 , "title.title_tokens":"my_analyzer"
                                 , "title_category.stems":"my_stemmer_analyzer"
                                 , "title_customized.stems":"my_SB_stemmer_analyzer"}.items():
    query = 'hand cut'
    
    if field_to_use=="title_customized.stems":
        query = tokenize_and_strip_accents(query, False)
        
    adidbody={
            "from" : 0
            , "size" : 10000
            , "query": {
                "match" : {
                    field_to_use:{"query":query,
                                  "operator" : "and", 
                                  "analyzer": analyzer}
                }
            }
        }
    adidhit = es.search(index=INDEX_NAME
                        #, doc_type=TYPE_NAME if ES_VERSION<7 else None
                        , body=adidbody)
    print(field_to_use, adidhit['hits']['total'])

In [None]:
tokenize_and_strip_accents(query, False)

## Execute queries

In [None]:
QUERY_FIELDS =  {"title.title_shingles":"my_shingle_analyzer"
                 , "title.title_stems":"my_stemmer_analyzer"
                 , "title.title_tokens":"my_analyzer"
                 , "title_category.stems":"my_stemmer_analyzer"
                 , "title_customized.stems":"my_SB_stemmer_analyzer"}

In [None]:
es = Elasticsearch()

In [None]:
queries = pd.read_csv(FILES_ROOT_PATH+"queries.tsv",encoding='utf=8', sep='\t')
#reader=csv.reader(testfile,delimiter='\t')
#next(reader, None)  # skip the headers

In [None]:
def open_results_file(filename):
    """
    Open the file for writing and append the header
    Returns file writer object
    """
    
    # similar_listings = codecs.open("results/similar_listings.tsv","w",'utf-8')
    similar_listings = codecs.open(filename, "w",'utf-8')
    similar_listings.write('\t'.join(['queryid', 'adid', 'score', 'query', 'query_tokens', 'title', 'category'
                                      # , 'category_L1', 'category_L2', 'category_L3', 'category_L4'
                                      # , 'category_L5', 'category_L-1', 'category_L~'
                                     ]))
    similar_listings.write('\n')
    return similar_listings


In [None]:
def to_string_output(str_value, sep='\t'):
    """
    Enclose a string with quotes and replace special characters that will break the file
    Retuns: enclosed and escaped string
    """
    return '"' + (str_value
                  .replace(sep, '_')
                  .replace('\n', '_')
                  .replace('"', '_')) + '"'
    
to_string_output('a'), to_string_output('a\tb')


In [None]:
def construct_es_query(query, field_to_use, analyzer):
        return {
            "from" : 0, 
            "size" : 10000, 
            "query": {
                "match" : {
                    field_to_use:{
                        "query":query,
                        "operator" : "and",
                        "analyzer": analyzer
                    }
                }
            }
        }


In [None]:
for (field_to_use, analyzer) in QUERY_FIELDS.items():
    print("Starting", field_to_use)
    file_writer = open_results_file("results/found_ads_" + field_to_use + ".tsv")

    for i, row  in queries.iterrows():
        queryid = row.query_id
        query = row.query.lower()
        if field_to_use=="title_customized.stems":
            query = tokenize_and_strip_accents(query, False)

        #adcat=line[2]
        adidbody = construct_es_query(query, field_to_use, analyzer)
        if queryid % 10 == 0:
            print(queryid)

        page = es.search(index=INDEX_NAME, body=adidbody, scroll='1m')
        sid = page['_scroll_id']
        page_size=len(page['hits']['hits'])
        while page_size>0:        
            for hit in page['hits']['hits']:
                simadid=hit["_id"]
                simtitle=hit["_source"]['title']
                score=hit["_score"]
                try:
                    # similar_listings.write(str(queryid)+"\t"+str(simadid)+"\t"+str(score)+"\t"+str(hit["_source"])+"\n")
                    file_writer.write('\t'.join([str(queryid)                                             
                                                      , str(simadid)
                                                      , str(score)
                                                      , to_string_output(query)
                                                      , str(len(query.split(' ')))
                                                      , to_string_output(hit['_source']['category'])
                                                      , to_string_output(hit['_source']['title'])
                                                      #, to_string_output(hit['_source']['category_L1'])
                                                      #, to_string_output(hit['_source']['category_L2'])
                                                      #, to_string_output(hit['_source']['category_L3'])
                                                      #, to_string_output(hit['_source']['category_L4'])
                                                      #, to_string_output(hit['_source']['category_L5'])
                                                      #, to_string_output(hit['_source']['category_L-1'])
                                                      #, to_string_output(hit['_source']['category_L~'])
                                                ]))
                    file_writer.write('\n')

                    # print(str(queryid)+"\t"+str(simadid)+"\t"+str(score)+"\t"+str(hit["_source"])+"\n")
                except Exception as e:
                    #TODO check for file writer related exception only
                    print(e)
            
            # Next page
            page = es.scroll(scroll_id=sid, scroll = '1m')
            sid = page['_scroll_id']
            page_size=len(page['hits']['hits'])


    file_writer.close()
    