In [None]:
#import libraries
import requests
from bs4 import BeautifulSoup
import pandas as pd
import random
import time
from itertools import cycle
import json
import logging
from utils import setup_logging

In [None]:
#setup logging
setup_logging()
logger = logging.getLogger(__name__)

In [None]:
def generate_payload_or_header(type, **kwargs):
    """
    generate_payload_or_header(type, **kwargs) -> dict

    Parameters: 
        type(str): the type of requests whther payload or header
        **kwargs: additional keyword arguments required based on request type
        For payload:
            query(str): the search query
            page(int): the page number
        For header:
            cookie(str): the cookie
            user_agent(str): the user agent
    Returns:
        `dict`: a dictionary containing the payload or header for the request
    

    Raises:
        ValueError: if the type is invalid or missing parameters
    """

    if type == "payload":
        payload = json.loads(payload_product)

        search_query = kwargs.get("query")
        page_number = kwargs.get("page")
        if search_query is None or page_number is None:
            logger.error("Missing Parameters: 'query' and 'page' are required.")
            raise ValueError("Missing Parameters: 'query' and 'page' are required.")

        payload["query"] = search_query
        payload["page"] = page_number

        return payload
    
    elif type == "header":
        header = json.loads(product_header)

        cookie = kwargs.get("cookie")
        user_agent = kwargs.get("user_agent")
        if cookie is None or user_agent is None:
            logger.error("Missing Parameters: 'cookie' and 'user_agent' are required.") 
            raise ValueError("Missing Parameters: 'cookie' and 'user_agent' are required.")

        header["User-Agent"] = user_agent
        header["cookie"] = cookie

        return header
    else:
        logger.warning("Invalid type. Choose 'payload' or 'header'.")
        raise ValueError("Invalid type. Choose 'payload' or 'header'.")


In [None]:
class aliexpress_scraper:
    def __init__(self, cookie_and_user_agent_review_dict):
        self.url = "https://www.aliexpress.com/fn/search-pc/index"
        self.user_agent = cycle(cookie_and_user_agent_product_dict.keys())
        self.cookies_cycle = cycle(cookie_and_user_agent_product_dict.values())
        self.page_rotation_interval = 5
        self.page_count = 0
        self.current_user_agent = next(self.user_agent)
        self.current_cookie = next(self.cookies_cycle)

    # rotate headers
    def rotate_headers(self):
        if self.page_count % self.page_rotation_interval == 0:
            self.current_user_agent = next(self.user_agent)
            self.current_cookie = next(self.cookies_cycle)  


    # extract product details
    def extract_product_details(data):
        contents = data["data"]["result"]["mods"]["itemList"]["content"]
        products = []

        for content in contents:
            # get product details
            productId = content.get("productId", None)
            display_title = content.get("title", None).get("displayTitle", None)
            product_url = content.get("productDetailUrl")
            image_url = content.get("image", None).get("imgUrl", None)

            # get store details
            store_url = content.get("store", None).get("storeUrl", None)
            store_name = content.get("store", None).get("storeName", None)

            # get prices
            prices = content.get("prices")
            if "originalPrice" in prices and "formatedPrice" in prices["originalPrice"]:
                original_price = prices["originalPrice"]["formatedPrice"]
            else:
                original_price = 0
            
            if "salePrice" in prices and "formattedPrice" in prices["salePrice"]:
                sale_price  = prices["salePrice"]["formattedPrice"]
            else:
                sale_price  = 0

            if "salePrice" in prices and "discount" in prices["salePrice"]:
                discount  = prices["salePrice"]["discount"]
            else:
                discount  = 0

            product_data = (productId, display_title, product_url, image_url, store_name, store_url, original_price,  sale_price, discount)

            products.append(product_data)

        

    def get_products_listings(self, query):
        self.page_count = 1
        self.rotate_headers()

        all_page_results = [] 

        headers = generate_payload_or_header(type="header", user_agent=self.current_user_agent, cookie=self.current_cookie)
        payload = generate_payload_or_header(type="payload", search_query=query, page_number=self.page_count)


        try:
            response = requests.request("POST", self.url, headers=headers, data=payload)
            time.sleep(random.randint(1, 5))
            response.raise_for_status()
            page_1_data = response.text
            
            all_page_results = []
            
            total_page_number = page_1_data["data"]["result"]["pageInfo"]["totalPage"]
            
            page_1_products = extract_product_details(page_1_data)
            all_page_results.extend(page_1_products)


            for page in range(2, total_page_number + 1):
                self.page_count += 1
                self.rotate_headers()

                headers = generate_payload_or_header(type="header", user_agent=self.current_user_agent, cookie=self.current_cookie)
                payload = generate_payload_or_header(type="payload", search_query=query, page_number=page)

                response = requests.request("POST", self.url, headers=headers, data=payload)
                time.sleep(random.randint(3, 7))

                if response.status_code == 200:
                    results = response.text
                    page_results = extract_product_details(results)
                    all_page_results.extend(page_results)

                else:
                    logger.error(f"Error fetching product listings: {response.status_code}")
                    print(f"Error fetching product listings: {response.status_code}")
                    continue
                
        except requests.exceptions.RequestExceptionxception as e:
            logger.error(f"Error fetching product listings: {e}")
            print(f"Error fetching product listings: {e}")
            return None
        
        return all_page_results
   

In [None]:
class extract_reviews():
    def __init__(self):
        self.url = "https://feedback.aliexpress.com/pc/searchEvaluation.do?productId={product_id}&lang=en_US&country=UK&page={page}&pageSize=10&filter=all&sort=complex_default"
        self.user_agent_cycle = cycle(cookie_and_user_agent_review_dict.keys())
        self.cookies_cycle = cycle(cookie_and_user_agent_review_dict.values())
        self.page_number = 0
        self.rotation_interval = 2
        self.current_user_agent = next(self.user_agent_cycle)
        self.current_cookies =next(self.cookies_cycle)

    # rotate headers
    def rotate_headers(self):
        if self.page_number % self.rotation_interval == 0:
            self.current_user_agent = next(self.user_agent_cycle)
            self.current_cookies =next(self.cookies_cycle)

    def extract_products_reviews(productIDs):

        data = productIDs["data"]["evaViewList"]
        reviews = []

        for review in data:
            is_aigc = review.get("aigc", False)
            is_anonymous = review.get("anonymous", False)
            buyerCountry = review.get("buyerCountry", "Unknown")
            buyerEval =  review.get("buyerEval", 0)
            buyerFeedback  = review.get("buyerFeedback", None)
            buyerGender =  review.get("buyerGender", None)
            buyerTranslationFeedback = review.get("buyerTranslationFeedback", None)
            evalDate = review.get("evalDate", None)
            downVoteCount = review.get("downVoteCount", 0)
            upVoteCount = review.get("upVoteCount", 0)
            logistics = review.get("logistics", None)
            reviewLabel1 = review.get("reviewLabel1", None)
            reviewLabel2 = review.get("reviewLabel2", None)
            reviewLabel3 = review.get("reviewLabel3", None)
            reviewLabelValue1 = review.get("reviewLabelValue1", None)
            reviewLabelValue2 = review.get("reviewLabelValue2", None)
            reviewLabelValue3 = review.get("reviewLabelValue3", None)
            skuInfo = review.get("skuInfo", None)
            evaluationIdStr = review.get("evaluationIdStr", None)
            
            review_data = (is_aigc, is_anonymous, buyerCountry, buyerEval, buyerFeedback, buyerGender, buyerTranslationFeedback, evalDate, downVoteCount, upVoteCount, logistics, reviewLabel1, reviewLabel2, reviewLabel3, reviewLabelValue1, reviewLabelValue2, reviewLabelValue3, skuInfo, evaluationIdStr)

            reviews.append(review_data)

        return reviews

                
    def get_product_reviews(self, page, product_ids):
        self.page_number = 1
        self.rotate_headers()

        all_page_reviews = []

        # extract first page
        payload = {}
        headers = generate_header(type="header", user_agent=self.current_user_agent, cookie=self.current_cookie)
        
        try:
            url = self.url.format(product_id=product_id, page=self.page_number)

            response = requests.request("GET", url, headers=headers, data=payload)
            time.sleep(random.randint(1, 3))

            if response.status_code == 200:
                results = response.text
                total_page_number = results["totalPage"]

                reviews = extract_products_reviews(results)
                all_page_reviews.extend(reviews)

                for page in range(2, total_page_number + 1):
                    self.page_number += 1
                    self.rotate_headers()

                    headers = generate_header(type="header", user_agent=self.current_user_agent, cookie=self.current_cookie)
                    url = self.url.format(product_id=product_id, page=self.page_number)

                    response = requests.request("GET", url, headers=headers, data=payload)
                    time.sleep(random.randint(1, 5))

                    if response.status_code == 200:
                        results = response.text
                        reviews = extract_products_reviews(results)
                        all_page_reviews.extend(reviews)
                    else:
                        logger.error(f"Error fetching product reviews: {response.status_code}")
                        print(f"Error fetching product reviews: {response.status_code}")
                        continue

        except requests.exceptions.RequestExceptionxception as e:
            logger.error(f"Error fetching product listings: {e}")
            print(f"Error fetching product listings: {e}")
            return None


        return all_page_reviews
