# Getting Libraries and Datasets

## Our Libraries

In [12]:
!pip install keepa




In [13]:
import keepa
import pandas as pd
import os
from os import listdir
from os.path import isfile, join
from openpyxl import load_workbook
import datetime
from datetime import date
from google.colab import drive

In [14]:
# First of all, we need to make sure that my directory is in My Drive
drive.mount('/content/drive')
os.getcwd()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


'/content/drive/My Drive/Sales Data/Product Prices'

## Our Functions

In [15]:
## Extracting the product information

def getting_most_important_columns(products, x):
    """
    Extracts and compiles key product information into a pandas DataFrame.

    Parameters:
    -----------
    products : list
        A list of dictionaries, where each dictionary represents product data
        containing price and timestamp information under the 'data' key.

    x : str
        A string indicating the region or market name. Also used to access
        region-specific keys from an external dictionary (e.g., dict_l[x]).

    Returns:
    --------
    df : pandas.DataFrame
        A DataFrame containing the following columns:
            - 'Time': Timestamp of the new price
            - 'Price': New price of the product
            - 'ASIN': Amazon Standard Identification Number
            - 'Date': Date extracted from the timestamp
            - 'Region': The input region string (x)

    Notes:
    ------
    - The function silently skips over entries that raise exceptions
      (e.g., missing keys or malformed data).
    - Assumes the existence of a global dictionary named `dict_l`
      indexed by `x`, used to determine the number of iterations.
    - The resulting DataFrame is reset and cleaned before returning.
    """

    df = pd.DataFrame()
    for i in range(0, len(dict_l[x])):
        try:
            df_t =pd.DataFrame()
            newprice = products[i]['data']['NEW']
            newpricetime = products[i]['data']['NEW_time']
            ASIN = products[i]['asin']
            df_t['Time'] = newpricetime
            df_t['Price'] = newprice
            df_t['ASIN'] = ASIN
            df_t['Date'] = df_t['Time'].dt.date
            df_t['Region'] = x
        except:
            pass
        df = pd.concat([df_t, df])
    df.reset_index(inplace = True)
    df.drop(columns = {'index'}, inplace = True)
    return df

In [16]:
# Sometimes, a product's price changes multiple times in the same day, but our sales data is recorded on a daily basis.
# Therefore, if there are multiple price changes within the same day, we will keep only the last one, as it reflects the final price that lasted through the day.
# This is the purpose of this function

def Enure_no_date_duplicates(df_t):
    """
    Removes consecutive rows with duplicate 'Price' values from a DataFrame.

    Parameters:
    -----------
    df_t : pandas.DataFrame
        A DataFrame that includes at least a 'Price' column and is indexed by row numbers.

    Returns:
    --------
    df_t : pandas.DataFrame
        The cleaned DataFrame with consecutive duplicate price entries removed.

    Notes:
    ------
    - This function compares each row's 'Price' value to the previous row's.
    - If the price is the same as the previous one, the row is dropped.
    - The function modifies the DataFrame in place and drops the 'index' column if it exists.
    """

    list_bad_index = []
    for i in df_t.index:
        if i == 0:
            pass
        else:
            if df_t.loc[old_in, 'Price'] == df_t.loc[i, 'Price']:
                list_bad_index.append(i)
                old_in = I
                df_t.drop(list_bad_index, inplace = True )
                df_t.drop(columns = 'index', inplace = True)

    return df_t

In [17]:
# The list returned by Keepa provides only the start date of each price, but not the end date
# This function helps determine the end date for each price entry.
# It does this by sorting the price records by their start dates in ascending order, then assigning the start date of the next price as the end date of the current one
# If the current price is the last recorded price change, we assign the end date as today's date (i.e., the date when this notebook is run).


def Ensure_ordering_of_dates(df_t, Date):

    """
    Assigns an end date to each price record for a product (ASIN) based on the sequence of price changes.

    Description:
    ------------
    The data returned by sources like Keepa typically includes the start date of each price, but not the end date.
    This function fills in the missing 'End_Date' for each price entry. It does so by:

    - Sorting each product's (ASIN's) price records by their 'Date' (start date).
    - Setting the 'End_Date' of each price as the start date of the next price change.
    - For the last price entry of each ASIN, the 'End_Date' is set to the provided `Date`, which usually represents the current date or the end of the data range.

    Parameters:
    -----------
    df_t : pandas.DataFrame
        A DataFrame containing at least the following columns:
        - 'ASIN': The unique product identifier.
        - 'Date': The date when a price became active.

    Date : datetime-like
        A reference date used as the 'End_Date' for the last price entry of each ASIN.

    Returns:
    --------
    df : pandas.DataFrame
        A new DataFrame with the same structure as the input, but with an additional column:
        - 'End_Date': The calculated end date for each price validity period.

    Notes:
    ------
    - This function assumes that 'Date' is in datetime format or can be converted to it.
    - It processes each ASIN separately to ensure date ordering and accurate 'End_Date' assignment.
    - The DataFrame is re-indexed and cleaned before being returned.
    """

    df_tt = pd.DataFrame()
    df = pd.DataFrame()
    df_t['Date'] = pd.to_datetime(df_t['Date'])
    for i in df_t["ASIN"].unique():
        df_tt = df_t[df_t["ASIN"] == i]
        df_tt.reset_index(inplace = True)
        df_tt.drop(columns = 'index', inplace = True)
        for dat in range(0,df_tt.shape[0]):
            if dat +1 < df_tt.shape[0]:
                df_tt.loc[dat, 'End_Date'] = df_tt.loc[dat+1, 'Date']
            else:
                df_tt.loc[dat, "End_Date"] = Date
        df = pd.concat([df_tt, df])
    df.reset_index(inplace = True)
    df.drop(columns = 'index', inplace = True)
    return df

In [18]:
# Sometimes a product is still listed but out of stock.
# In such cases, Keepa returns null values in the 'Price' column.
# Therefore, we need to remove these null entries while also keeping track of the out-of-stock periods."


def Overcome_null_values(df_t):

    """
    Handles and removes rows with missing price values from the product price history,
    while preserving the validity of the surrounding date ranges.

    Description:
    ------------
    This function processes product price history grouped by ASIN. It identifies rows where
    the 'Price' column is null (NaN) and applies the following logic:

    - If the null price is in the first or last row of the ASIN's records, the row is removed.
    - If the null price is in the middle:
        - The duration ('No. of days') from the row after the null is added to the previous row.
        - The 'End_Date' of the previous row is updated to the 'End_Date' of the next row.
        - Both the null row and the next row are removed to eliminate any inconsistencies.

    This ensures that periods with null prices do not break the continuity of the price timeline.

    Parameters:
    -----------
    df_t : pandas.DataFrame
        A DataFrame containing at least the following columns:
        - 'ASIN': Unique product identifier
        - 'Price': Product price (may contain NaN)
        - 'No. of days': Duration the price remained active
        - 'End_Date': End date of the price period

    Returns:
    --------
    df : pandas.DataFrame
        A cleaned DataFrame with null price entries removed and adjusted durations where applicable.

    Notes:
    ------
    - The function assumes that 'No. of days' and 'End_Date' are already computed correctly.
    - It resets and cleans the index after processing each ASIN.
    - Designed to be used after ensuring the data is ordered and end dates are assigned.
    """

    df = pd.DataFrame()
    df_tt = pd.DataFrame()
    for i in df_t["ASIN"].unique():
        df_tt = df_t[df_t["ASIN"] == i]
        df_tt.reset_index(inplace = True)
        df_tt.drop(columns = 'index', inplace = True)
        index = df_tt[df_tt['Price'].isna()].index
        list_of_bad_index  = []
        for x in index:
            if x == 0 or x +1 == df_tt.shape[0]:
                list_of_bad_index.append(x)
            else:
                df_tt.loc[x-1,'No. of days'] = int(df_tt.loc[x-1,'No. of days']) + int(df_tt.loc[x+1,'No. of days'])
                df_tt.loc[x-1,'End_Date'] = df_tt.loc[x+1,'End_Date']
                list_of_bad_index.append(x+1)
                list_of_bad_index.append(x)
        df_tt.drop(index = list_of_bad_index, inplace = True)
        df = pd.concat([df,df_tt ])
        df.reset_index(inplace = True)
        df.drop(columns = 'index', inplace = True)
    return df

In [19]:
def save_historical_file(inp1):
    """
    Opens and re-saves the historical product prices Excel file for the specified country.

    Description:
    ------------
    This function navigates to the historical price data folder in Google Drive
    and attempts to load and immediately re-save the Excel file for a given country.
    If the country is 'UK', the filename is modified to 'GB.xlsx' to align with Keepa's naming convention.

    Parameters:
    -----------
    inp1 : str
        The country code or name entered by the user (e.g., 'UK', 'DE', 'FR').

    Notes:
    ------
    - The function assumes the use of Google Colab with mounted Google Drive.
    - For 'UK', the file is saved as 'GB.xlsx' to match Keepa's format.
    - If the file does not exist for the given country, it fails silently (not recommended in production).
  """

    # Paths
    original_root_path = r'/content/drive/My Drive/Sales Data/Product Prices'
    root_path = '/content/drive/My Drive/Sales Data/Products Historical Prices'

    # Create today's folder
    today = date.today()
    full_path = os.path.join(root_path, str(today))
    os.makedirs(full_path, exist_ok=True)

    # Handle UK/GB naming
    file_name = 'GB.xlsx' if inp1 == 'UK' else f'{inp1}.xlsx'
    original_file_path = os.path.join(original_root_path, file_name)
    save_file_path = os.path.join(full_path, file_name)

    # Read and save
    try:
        df = pd.read_excel(original_file_path)
        df.to_excel(save_file_path, index=False)
        print(f"✔️ File for {inp1} successfully saved to: {save_file_path}")
    except FileNotFoundError:
        print(f"❌ File for {inp1} not found at: {original_file_path}")
    except Exception as e:
        print(f"⚠️ Error while processing file for {inp1}: {e}")


## Getting List Of Our Products

In [20]:
# Keepa requires a list of ASINs in order to return product price changes.
# We already have a list of our products' ASINs, so the first step is to retrieve this list
# We also need to updated each time, we need to run this notebook

root_path = r'/content/drive/My Drive/Sales Data/Final Shape of Files'
os.chdir(root_path)
df_tl = pd.read_excel("List_Of_All_Products.xlsx")

# Choosing the countries for which you want to retrieve product prices

In [21]:
# Keepa requires both the ASIN and the corresponding marketplace to retrieve product data.
# Due to limitations in our API permissions, we can only extract data for one marketplace at a time.
# Additionally, if you have more than 100 products in a single marketplace, you may need to wait an extra 30 to 60 minutes to retrieve all the price data.

root_path= r'/content/drive/My Drive/Sales Data/Product Prices'
os.chdir(root_path)

# Display the list of available countries (marketplaces) in the dataset
print('Countries are', df_tl['Region'].unique())

# Prompt the user to input the country they want to process
inp1 = input('Please enter the country for which you want to retrieve product prices\n')

# Filter the products list to include only the product in the selected country
df_tl = df_tl[(df_tl['Region'] == inp1)]

# In Keepa, 'UK' should be written as 'GB' to match the marketplace code format
df_tl['Region'].replace({'UK':'GB'}, inplace = True)

# Initialize an empty dictionary to store ASINs grouped by region
dict_l  = {}

# Loop through each unique region in the dataset
for i in df_tl['Region'].drop_duplicates().to_list():

  # Filter the dataset to include only rows from the current region
  df_l = df_tl[df_tl['Region'] == i]

  # Extract a list of unique ASINs for that region and store it in the dictionary
  dict_l[i] = df_l['ASIN'].drop_duplicates().to_list()

Countries are ['CA' 'GB' 'US' 'MX' 'DE' 'ES' 'FR' 'IT' 'JP']
Please enter the country for which you want to retrieve product prices
ES


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_tl['Region'].replace({'UK':'GB'}, inplace = True)


# Getting and save the current and the pervious prices

In [None]:
# Initialize Keepa API with our personal access key
accesskey = 'cra6goonfe26d65juk9fi7rbf0f8tim6ljpiiitcbetl3g92l4duiuurd83q490f' # enter real access key here
api = keepa.Keepa(accesskey)

# Initialize empty DataFrames to collect processed data and null entries
df_cp = pd.DataFrame()   # For all combined product price data
df_na = pd.DataFrame()   # For all null price records (out-of-stock)

# Iterate over each region in the ASIN dictionary (dict_l)
for x in dict_l:

    # Query Keepa API for product data using ASIN list and specified region
    products = api.query(dict_l[x], domain=x, out_of_stock_as_nan=True)


    # Extract and transform the relevant columns from the Keepa response
    df_t = getting_most_important_columns(products, x)
    df_t['Date'] = pd.to_datetime(df_t['Date'])

    # Define today's date as the cutoff for the final price period
    Date = pd.to_datetime(datetime.date.today())

    # Assign 'End_Date' for each price record based on the next price change or today
    df_t =Ensure_ordering_of_dates(df_t, Date)

    # Capture entries with null prices (typically out-of-stock)
    df_na = df_t[df_t['Price'].isna()]

    # Remove exact duplicate rows based on ASIN, Date, and Price
    df_t.drop_duplicates(subset =['ASIN', 'Date', 'Price'], keep = 'last', inplace = True)

    # Reset and clean index
    df_t.reset_index(inplace = True, drop = True)

    # Ensure 'End_Date' is in datetime format
    df_t['End_Date'] = pd.to_datetime( df_t['End_Date'])

    # Calculate the duration each price remained active
    df_t['No. of days'] = (df_t['End_Date'] - df_t['Date'] ).dt.days + 1

    # Handle and clean out rows with null prices
    df_t = Overcome_null_values(df_t)

    # Save historical price file (function handles file naming and format)
    try:
         save_historical_file(inp1)
    except:
      pass

    # Set working directory to where historical prices will be saved
    root_path= r'/content/drive/My Drive/Sales Data/Product Prices'
    os.chdir(root_path)

    # Save cleaned price data to Excel
    df_t.to_excel(x + '.xlsx')

    # Save null price entries to a separate Excel file in a different location
    root_path= r'/content/drive/My Drive/Sales Data/Gaps'
    os.chdir(root_path)
    df_na.to_excel(x + 'Null.xlsx')

    # Append cleaned region data to the master DataFrame
    df_cp = pd.concat([df_cp, df_t])

# Final cleanup of the master DataFrame
df_cp.reset_index(inplace = True, drop = True)

INFO:keepa.interface:Using key ending in 3q490f
DEBUG:keepa.interface:Executing 108 item product query

  0%|          | 0/108 [00:00<?, ?it/s][AINFO:keepa.interface:-37 tokens remain
INFO:keepa.interface:0 tokens remain
INFO:keepa.interface:-99 tokens remain

 93%|█████████▎| 100/108 [38:14<03:03, 22.95s/it][AINFO:keepa.interface:-99 tokens remain
INFO:keepa.interface:0 tokens remain
INFO:keepa.interface:-7 tokens remain

100%|██████████| 108/108 [2:18:32<00:00, 76.97s/it]
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_tt.drop(columns = 'index', inplace = True)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  

✔️ File for ES successfully saved to: /content/drive/My Drive/Sales Data/Products Historical Prices/2025-08-22/ES.xlsx


# Getting and save Sales Rank and Product Description

In [None]:
ranking_df = pd.DataFrame()
for i in range(0, len(products)):
    # Initialize a DataFrame for categories
    categories = pd.DataFrame(products[i]['categoryTree'])
# Initialize a list to store the rankings
    ranking_list = []

# Iterate through the salesRanks dictionary
    ASIN = products[i]['asin']
    try:
        for category_id, sales_rank_history in products[i]['salesRanks'].items():
# Filter the category tree for the matching category ID
            category_data = categories[categories['catId'] == int(category_id)].reset_index()

    # Get the category name
            category_name = category_data.loc[0, 'name'] if not category_data.empty else 'Unknown'

    # Extract the current rank (last value) and best rank (minimum value)
            current_rank = sales_rank_history[-1]
            best_rank = min(sales_rank_history)

    # Append the data to the ranking list
            ranking_list.append({
                'ASIN': ASIN,
                'Category Name': category_name,
                'Current Rank': current_rank,
                'Best Rank': best_rank
            })

            ranking_df = pd.concat([ranking_df,pd.DataFrame(ranking_list)])

    except:
          ranking_list.append({
                'ASIN': ASIN,
                'Category Name': 'unavailable',
                'Current Rank':  'unavailable',
                'Best Rank':  'unavailable'
            })
          ranking_df = pd.concat([ranking_df,pd.DataFrame(ranking_list)])

# Cleaning the dataFrame
ranking_df.reset_index(inplace = True, drop = True)
ind = ranking_df[ranking_df['Category Name'] == 'Unknown'].index
ranking_df.drop(ind, inplace = True)
ranking_df.drop_duplicates(inplace = True)
ranking_df.replace({-1: 'Unknown'},inplace = True)

In [None]:
data = []

for product in products:
    try:
        # Extract relevant data with .get() to avoid KeyErrors
        product_data = {
            'ASIN': product.get('asin', 'Unavailable'),
            'Title': product.get('title', 'Unavailable'),
            'Sales Rank': product.get('salesRanks', {}).get(product.get('salesRankReference'), 'Unavailable'),
            'Product Weight': product.get('itemWeight', 'Unavailable'),
            'Item Weight': product.get('itemWeight', 'Unavailable'),
            'Color': product.get('color', 'Unavailable'),
            'Manufacturer': product.get('manufacturer', 'Unavailable'),
            'Product Group': product.get('productGroup', 'Unavailable'),
            'Frequently Bought Together': product.get('frequentlyBoughtTogether', 'Unavailable'),
            'Target Audience Keyword': product.get('targetAudienceKeyword', 'Unavailable'),
            'Item Type Keyword': product.get('itemTypeKeyword', 'Unavailable'),
        }

        data.append(product_data)

    except Exception as e:
        # Fallback entry if an error occurs
        data.append({key: 'Unavailable' for key in product_data.keys()})
# Create a DataFrame
df_b = pd.DataFrame(data)
df_all = ranking_df.merge(df_b, on ='ASIN', how = 'left')

In [None]:
# Save the products' description
for x in dict_l:
    root_path= r'/content/drive/My Drive/Sales Data/Product Description'
    os.chdir(root_path)
    df_all.to_excel(x + '.xlsx')