In [1]:
# Imports
import requests
import json

from pytz import utc
from datetime import datetime
from bs4 import BeautifulSoup
#from consumer.api import upsert

In [2]:
# Constants
LOCATION = "Salt Lake City"
CATEGORIES = ["grocery"]

In [4]:
# Variables
session = requests.session()

# Send request to generate cookies
session.get('https://www.target.com')

# Initialization
visitor_id = session.cookies["visitorId"]

# Get nearby stores
nearby_stores = requests.get(f'https://redsky.target.com/v3/stores/nearby/{LOCATION}?key={visitor_id}&limit=1&within=100&unit=mile').json()

# Get store id
store_id = nearby_stores[0]['locations'][0]['location_id']

In [5]:
# Get all categories
def get_categories():
    categories_object = {}
    # Send requests to find categories
    featured_categories_req = requests.get(
        "https://redoak.target.com/content-publish/pages/v1?url=/c/-/-/N-4nav"
    ).json()

    # Get categories
    categories = featured_categories_req['slots']['100']['content']['taxonomy_nodes']

    # Remove wrong categories
    categories = categories[3:][:-3]

    # Create placeholder for category
    for category in categories:
        # Get category name
        category_name = category['name'].lower()

        # Get category url
        category_url = category['seo_data']['canonical_url']

        # Placeholder for categories
        categories_object[category_name] = {
            "url": category_url,
            "categories": []
        }

        category_childrens = category.get('children') 
        if category_childrens is not None and len(category_childrens) >= 1:
            for child_category in category_childrens:
                if child_category['type'] == "Tag":
                    continue
                childs_childrens = child_category.get("children")
                if childs_childrens is not None and len(childs_childrens) >= 1:
                    for childs_child in childs_childrens:
                        if childs_child['type'] == "Tag":
                            continue
                        if childs_child.get('seo_data') is not None:
                            categories_object[category_name]['categories'].append(childs_child.get('seo_data').get("canonical_url"))
                else:
                    categories_object[category_name]['categories'].append(child_category.get('seo_data').get("canonical_url"))

    return categories_object

In [6]:
# Get product attributes
def get_product_attributes(soup: BeautifulSoup):
    # Init product attributes dict
    product_attributes = {}

    # Find parent div of specification and description
    spec_and_desc_parent = soup.find("div", attrs={"id": "specAndDescript"})

    # Find specification and description row
    spec_and_desc_row = spec_and_desc_parent.find_all("div", recursive=False)[0]

    # Find specification and description divs
    spec_and_desc_divs = spec_and_desc_row.find_all("div", recursive=False)

    # Specification div
    specification_div = spec_and_desc_divs[0]

    # Find all product parameters
    product_parameters = specification_div.find_all("div", recursive=False)

    # Remove 2 last results, these are some notices
    product_parameters = product_parameters[:-2]

    for parameter in product_parameters:
        title = parameter.find("b")
        desc = parameter.find("div")

        if title and desc is not None:
            key = parameter.find("b").text[:-1].lower()
            value = parameter.find("div", recursive=False).find(text=True, recursive=False)[1:]

            product_attributes[key] = value

    return product_attributes

In [7]:
# Get product price
def get_product_price(product_url: str):
    product_id = product_url.split("/")[-1].split("-")[-1]
    request_url = f"https://redsky.target.com/web/pdp_location/v1/tcin/{product_id}"

    payload = {
        "pricing_store_id": store_id,
        "key": visitor_id
    }

    response = requests.get(request_url, params=payload).json()
    
    return float(response["price"]["current_retail"])

In [8]:
# Scrape product
def scrape_product_details(product_url: str):
    # Ignore wrong urls
    if "https://www.target.com/p/" not in product_url:
        raise ValueError("Provided url is not that of a product")

    # Send request
    product_request = requests.get(product_url)

    # Parse product
    product_soup = BeautifulSoup(product_request.text, "html.parser")

    # Find script tag
    script_tag = product_soup.find("script", attrs={"type": "application/ld+json"})

    # Get product json
    product_json = json.loads(script_tag.string)

    # Get product attributes
    product_attributes = find_product_attributes(product_soup)

    # Get product price
    product_price = get_product_price(product_url)

    product_object = {
        "item": {
            "source": "US_target",
            "source_key": product_json["@graph"][0]["sku"],
            "name": product_json["@graph"][0]["name"],
            "brand": product_json["@graph"][0]["brand"],
            "currency": product_json["@graph"][0]["offers"]["priceCurrency"],
            "category": product_json["@graph"][1]["itemListElement"][-1]["item"]["name"].lower(),
            "country": "US",
            "upc": product_json["@graph"][0]["gtin13"],
            "url": product_json["@graph"][0]["offers"]["url"],
            "image_url": product_json["@graph"][0]["image"],
            "attributes": json.dumps(product_attributes)
        },
        "price": {
            "price": product_price,
            "available": "InStock" == product_json["@graph"][0]["offers"]["availability"],
            "observed_date": utc.localize(datetime.utcnow())
        }
    }

    return product_object

In [9]:
# Get urls for all products for category
def get_products_urls(category_url: str):
    products_urls = []
    category_id = category_url.split("-")[-1]

    params = {
        # Key variable probably changes, so either I will find a way to find it somehow
        # or I will just keep updating it
        "key": "ff457966e64d5e877fdbad070f276d18ecec4a01",
        "category": category_id,
        "channel": "WEB",
        "count": 24,
        "default_purchasability_filter": "false",
        "include_sponsored": "true",
        "page": f"%2Fc%2F{category_id}",
        "platform": "desktop",
        "offset": 0,
        "pricing_store_id": store_id,
        "useragent": "Mozilla%2F5.0+%28Windows+NT+10.0%3B+Win64%3B+x64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F91.0.4472.101+Safari%2F537.36",
        "visitor_id": visitor_id
    }

    products_req = requests.get("https://redsky.target.com/redsky_aggregations/v1/web/plp_search_v1", params=params)
    products_json = products_req.json()

    current_page = products_json['data']['search']['search_response']['typed_metadata']['current_page']
    last_page = products_json['data']['search']['search_response']['typed_metadata']['total_pages']

    products = [product['item']['enrichment']['buy_url'] for product in products_json['data']['search']['products']]
    products_urls.extend(products)

    print(f"Page {int(current_page)}/{last_page}")

    while int(current_page) < last_page:
        params['offset'] = len(products_urls)
        products_req = requests.get("https://redsky.target.com/redsky_aggregations/v1/web/plp_search_v1", params=params)
        products_json = products_req.json()

        products = [product['item']['enrichment']['buy_url'] for product in products_json['data']['search']['products']]
        products_urls.extend(products)
        
        current_page = products_json['data']['search']['search_response']['typed_metadata']['current_page']
        print(f"Page {int(current_page)}/{last_page}")
        time.sleep(1)

    print(f"Found {len(products_urls)} products in {category_url}")
    return products_urls

In [1]:
def main():
    all_objects = []
    products_urls = []
    categories = get_categories()
    for category in CATEGORIES:
        print(f"Scraping: {category}")
        category_urls = categories[category]
        for category_url in category_urls['categories']:
            print(f"Scraping sub: {category_url}")
            category_products = get_products_urls(category_url)
            products_urls.extend(category_products)

    print(f"Found {len(products_urls)} products in all categories")
    for index, product_url in enumerate(products_urls):
        print(f"Scraping {product_url} [{index + 1}/{len(products_urls)}]")
        time.sleep(1)
        product_details = scrape_product_details(product_url)
        all_objects.append(product_details)

    # upsert(all_object, "US_target")
    

In [None]:
main()