In [1]:
from pymongo import MongoClient, errors
import keepa
import json
import requests
from tqdm import tqdm
import pandas as pd

Initializations

In [2]:
key = '<keepa_key>'
api = keepa.Keepa(key)

domain = 'US'
domain_id = 1

client = MongoClient('mongodb://<username>:<password>@<ip_address>/')

In [3]:
asin_per_category_to_pull = 1000

Create and Initialize DB

In [4]:
db = client[f'keepa_db_{domain.lower()}']
categories_collection = db['categories']
best_sellers_collection = db['best_seller_asins']
product_object = db['product_object']
seller_object = db['seller_object']


Functions

In [5]:
def get_category_list(domain_id):
    base = "https://api.keepa.com/category?key={k}&domain={d}&category=0&parents=1"
    finder = base.format(k=key, d=domain_id)
    request_fail = True
    while request_fail:
        try:
            category_list = list(requests.get(finder).json()['categories'].values())
            request_fail = False
        except Exception as e:
            continue
    return category_list

def get_best_sellers(domain_id, category):    
    base = "https://api.keepa.com/bestsellers?key={k}&domain={d}&category={cat}&range=180"
    finder = base.format(k=key, d=domain_id, cat=category)
    request_fail = True
    while request_fail:
        try:
            best_sellers_list = requests.get(finder).json()['bestSellersList']
            request_fail = False
        except Exception as e:
            continue
    return best_sellers_list

def get_tokens_left():    
    base = "https://api.keepa.com/token?key={k}"
    finder = base.format(k=key)
    request_fail = True
    while request_fail:
        try:
            tokens_left = requests.get(finder).json()
            request_fail = False
        except Exception as e:
            continue
    return tokens_left


Get Category list

In [6]:


cat_list = get_category_list(domain_id)
for cat_item in cat_list:
    cat_item.update( {"_id":cat_item['catId']})

try:
    categories_collection.insert_many(cat_list,ordered=False, bypass_document_validation=True)
except errors.BulkWriteError as e:
    pass

category_list = categories_collection.distinct('_id')


Get Bestsellers List

In [7]:
best_sellers_dict_list = []
for category in tqdm(category_list):
    best_sellers_dict = {}
    best_sellers_dict['category'] = category
    best_sellers_dict['_id'] = category
    best_sellers_dict['best_sellers'] = get_best_sellers(domain_id, category)
    best_sellers_dict_list.append(best_sellers_dict)
try:
    best_sellers_collection.insert_many(best_sellers_dict_list,ordered=False, bypass_document_validation=True)
except errors.BulkWriteError as e:
    pass

Getting Product Data

In [8]:
category_start_num = 27
failed_products = 0

In [None]:


for category in category_list[category_start_num:]:
    # tokens_list = []
    print(f'Category {category_start_num} - {category}')
    asin_list_all = best_sellers_collection.find_one({"_id":category})['best_sellers']['asinList']
    asins_to_pull = min(asin_per_category_to_pull, len(asin_list_all))
    asin_list_to_pull = asin_list_all[0:asins_to_pull]
    # asins_in_db = product_object.find({"rootCategory": category}).distinct('_id')
    asins_in_db = list(product_object.aggregate([{"$group": {"_id": "$_id"}}]))
    asins_in_db = [item['_id'] for item in asins_in_db]
    asin_list = list(set(asin_list_to_pull).difference(set(asins_in_db)))
    
    if len(asin_list) == 0:
        category_start_num = category_start_num + 1
        continue
    
    if len(asin_list) <= 100:
        asin_list_chunks = [asin_list]
    else:
        asin_list_chunks = [asin_list[i:i + 100] for i in range(0,len(asin_list),100)]
        
    for asins in tqdm(asin_list_chunks):
        # Getting Product Data
        request_fail = True
        while request_fail:
            try:
                asin_query_response = api.query(asins,domain=domain,offers=100,wait=True,progress_bar=False)
                request_fail = False
            except Exception as e:
                continue
        # tokens_list.append(get_tokens_left())
        
        [x.pop("data", None) for x in asin_query_response]
        for asin_item in asin_query_response:
            asin_item.update( {"_id":asin_item['asin']})
        try:
            product_object.insert_many(asin_query_response,ordered=False, bypass_document_validation=True)
        except errors.BulkWriteError as e:
            pass
        except UnicodeEncodeError as e:
            for item in asin_query_response:
                try:
                    product_object.insert_one(item, bypass_document_validation=True)
                except Exception as e:
                    failed_products = failed_products+1
                    continue
    # token_df = pd.concat([token_df, pd.DataFrame(tokens_list)])
    # token_df.write_csv(token_csv_path, index = False)
    category_start_num = category_start_num + 1