In [183]:
from selenium import webdriver
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import Select

from bs4 import BeautifulSoup as bs

from datetime import timedelta
import time
import threading 
import _thread as thread

import json
import os
import platform
import sys
import csv
from pydub import AudioSegment
from pydub.playback import play

import smtplib
from email.message import EmailMessage as EmailMessage

import logging as log

In [184]:
#-------------------------------------
#         System Methods
#------------------------------------- 

def playSound():
    # log.info('Playing Sound.')
    try:
        wd = os.getcwd()
        NOTIFICATION_FILE_PATH = f'{wd}\\assets\\youGotmail.wav'
        command = f'powershell -c (New-Object Media.SoundPlayer "{NOTIFICATION_FILE_PATH}").PlaySync()'
        os.system(command)
    except:
        # log.exception('Something wrong happened when playing sound')
        print('Something wrong happened./')

#-------------------------------------
#         Selection Methods
#-------------------------------------
def getSelectionIfExists(soup, cssSelection, index=0): 
    result = soup.select(cssSelection)
    if len(result) == 0 or property == None:
        return None
    
    try:
        return result[index]
    except:
        log.debug(f'Possibly out of range. Selection: {cssSelection}')

def getSelectionText(soup, cssSelection, property='', index = 0):
    selection = getSelectionIfExists(soup, cssSelection, index)
    return '' if selection is None else selection.text

def getSelectionPropValue(soup, cssSelection, property='', index = 0):
    selection = getSelectionIfExists(soup, cssSelection, index)
    if selection is None:
        log.debug (f'Selection not found. Selection: {cssSelection}')
        return ''
    
    try:
        return selection[property]
    except:
        log.debug(f'Property not found. Selection: {cssSelection}')
        return ''
    
#-------------------------------------
#         General
#-------------------------------------      
    
class Listing:
  def __init__(self, store, listName = '', price = '', listingUrl='', fulfillmentSummary='', cartButtonText='', skuValue='', skuId=''):
    
    self.store = store
    self.listName = listName
    self.price = price
    self.listingUrl = listingUrl
    self.fulfillmentSummary = fulfillmentSummary
    self.cartButtonText = cartButtonText
    self.skuValue = skuValue
    self.skuId = skuId

def getProductDicts():
    productList = []
    input_file = csv.DictReader(open("products.csv"))
    for row in input_file:
        productList.append(dict(row))

    return productList

#-------------------------------------
#         Best Buy Specific Methods
#-------------------------------------  

def getBestBuySite(url: str):
    hasLeadingSlash = url.startswith('/')
    url = '/' + url if not hasLeadingSlash else url
    return f'https://www.bestbuy.com{url}' 

def getDataFromListingSoups_bestbuy(soup):
    result = {}
    name = getSelectionText(soup,'h4[class="sku-header"] > a')
    listingUrl = getBestBuySite(getSelectionPropValue(soup, 'h4[class="sku-header"] > a','href'))
    fulfillmentSummary = getSelectionText(soup, 'div[class="fulfillment-fulfillment-summary"]')
    addToCartText = getSelectionText(soup, 'button[class*="add-to-cart-button"]')
    skuValue = getSelectionText(soup, 'span[class*="sku-value"]', index=1)
    skuId = soup['data-sku-id']
    

    price = getSelectionText(soup, 'span[class="sr-only"]').split('$')[-1]
    return Listing(BESTBUY_STORE, name, price, listingUrl, fulfillmentSummary, addToCartText, skuValue, skuId)

#Function to see any listing is sold out
def getListingsInStock(listingsList):   #Fix typing here?
    return list(filter(lambda l: l.cartButtonText.lower() == 'add to cart', listingsList))

#-------------------------------------
#         General Methods
#------------------------------------- 

#Process Data then send email
def getListingData(store, listings):
    log.info('Parsing data from listings in page')
    
    listingList = []

    if store == BESTBUY_STORE:
        for l in listings:
            data = getDataFromListingSoups_bestbuy(l)
            listingList.append(data)
        
#         For Testings
        # listingList.append(Listing(BESTBUY_STORE, 'Test PS5', '599.99', 'facebook.com', 'we got it', 'Add To Cart', 'randomNumber', 'anotherRandomNumer'))

    else:
        return None
    
    return listingList


#Function to make email notification
def getEmailMessageForInStockItem(listing: Listing): 
    msg = EmailMessage()
    msg['Subject'] = f'!!! {listing.listName} in stock! @ {listing.store.capitalize()}'
    msg['From'] = info['email_address']
    msg['To'] = info['email_address']
    msg.set_content(f'listing URL is here: {listing.listingUrl}')
    return msg

#Function to send email
def sendEmail(listing: Listing, message, debug = True):
    log.info(f'Sending Email Notification for {listing.listName}')
    mailServer = 'localhost' if debug else 'smtp.gmail.com'
    mailServerPort = '25' if debug else '465'
    
    try:
        if debug:
            with smtplib.SMTP(mailServer, mailServerPort) as smtp:
                smtp.login(EMAIL_ADDRESS, PASSWORD)
                smtp.send_message(message)
        else:
            with smtplib.SMTP_SSL(mailServer, mailServerPort) as smtp:
                smtp.login(EMAIL_ADDRESS, PASSWORD)
                smtp.send_message(message)
        log.info('Email Sent')
        
    except BaseException as err:
        log.error(f'Something wrong happened when sending notication')

def processListingData(listings: list):
    if listings == None or len(listings) == 0:
        log.info('No listings found.')
        return
    log.info(f'{len(listings)} listing(s) found')
    
    listingsInStock = getListingsInStock(listings)
    log.info(f'{len(listingsInStock)} listing(s) in stock')
    
    if(len(listingsInStock) > 0):
        playSound()
        for l in listingsInStock:
            message = getEmailMessageForInStockItem(l)
            sendEmail(l, message, False)
            
    listingFight = {}
    listingFight['listingCount'] = len(listings)
    listingFight['listingsInStock'] = len(listingsInStock)
    
    return listingFight

def checkAndAddArguments(options):
    for arg in sys.argv:
        if arg.startswith('--'):
            log.info(f'Adding following argument :=> {arg}')
            options.add_argument(arg)
    return options

def getOptions():
    options = None
    if '-chrome' in sys.argv:
        options = webdriver.ChromeOptions()
    
    elif '-firefox' in sys.argv:
        options = FirefoxOptions()
    else:
        raise BaseException('No Driver Selected')

    return checkAndAddArguments(options)

def getDriver(driver = None):
    if driver is None:
        try:
            options = getOptions()

            log.info(f'{options}')
            log.info(f'Driver arguments are {options.arguments}')
            
            if '-chrome' in sys.argv:
                log.info(f'Chrome Driver Path is {DRIVER_FILE_PATH}')
                driver = webdriver.Chrome(executable_path=DRIVER_FILE_PATH, options=options)
            
            elif '-firefox' in sys.argv:
                log.info('Using Geckodriver in PATH')
                driver = webdriver.Firefox(options=options)

        except BaseException as err:
            log.exception(f'Unable to stand up new driver => {err.args[0]}')
            raise
    
    return driver

def navigateToPage(driver, searchUrl):
    log.info(f'Navigating to {searchUrl[0:20]}...')
    try:
        driver.get(searchUrl)
        
    except BaseException as err:
        log.error(f'Driver navigation fail => {err.args[0]}')
        raise

    
def recycleDriver(driver):
    log.info('Quiting driver')
    try:
        driver.close()
        driver.quit()
    except BaseException as err:
        log.error(f'Quiting driver fail => {err.args[0]}')

    driver = None
    
    
def has_connection(driver):
    try:
        driver.find_element_by_xpath('//span[@jsselect="heading" and @jsvalues=".innerHTML:msg"]')
        return False
    
    except: 
        return True

def logRunReport(reportDict, storeInfoDict,timeElapsed):
    if reportDict is not None:
        log.info(f"Run report => Store: {storeInfoDict['store']} Product: {storeInfoDict['product']} ListingsFound: {reportDict['listingCount']} In-Stock {reportDict['listingsInStock']}")
    log.info(f'Run time => {str(timedelta(seconds=timeElapsed))}')

def runScrapForSearchUrl(storeInfoDict):
    runCounter = 0
    while storeInfoDict['stop'] is False:
        runCounter += 1
        try:
            if navigateToPage(storeInfoDict['driver'],storeInfoDict['url']) is False:
                url = storeInfoDict['url']
                log.error(f'Unable to navigate to page, {url}')
                raise BaseException('Recyclng....')

            pageSoup = bs(storeInfoDict['driver'].page_source, 'html.parser')
            listings = pageSoup.find_all('li', {"class": "sku-item"})
            data = getListingData(storeInfoDict['store'], listings)
            reportDict = processListingData(data)
            logRunReport(reportDict, storeInfoDict)

            WebDriverWait(storeInfoDict['driver'], 30, poll_frequency=30, ignored_exceptions=None)

        except BaseException as err:
            log.error(f'Something happened. => {err.args[0]}')
            recycleDriver(storeInfoDict['driver'])
            storeInfoDict['driver'] = getDriver()
    
    log.error('Stop called..')
    recycleDriver(storeInfoDict['driver'])
    log.info(f'Exiting Thread for {storeInfoDict["product"]}')




In [185]:


#Worker functions
def doWork_Threads(searchInfos):
    threads = []
    for info in searchInfos:
        info['stop'] = False
        product = info['product']
        store = info['store']

        log.info(f'Getting driver for {product} => {store}')
        info['driver'] = getDriver()
        log.info(f'Starting thread for {product} => {store}')
        thread = threading.Thread(target=runScrapForSearchUrl, args=(info,))
        threads.append(thread)
        thread.start()

    while True:
        try:
            log.info('Things are going well..')
            time.sleep(5)
        except KeyboardInterrupt:
            for info in searchInfos:
                info['stop'] = True
            break

    for t in threads:
        t.join()


def doWork_Single(searchInfos):
    runCounter = 0
    driver = getDriver()
    while True:
        try:
            startTime = time.time()

            indexForSearch = runCounter % len(searchInfos)
            storeInfoDict = searchInfos[indexForSearch]

            if navigateToPage(driver,storeInfoDict['url']) is False:
                    url = storeInfoDict['url']
                    log.error(f'Unable to navigate to page, {url}')
                    raise BaseException('Recyclng....')

            pageSoup = bs(driver.page_source, 'html.parser')
            listings = pageSoup.find_all('li', {"class": "sku-item"})
            data = getListingData(storeInfoDict['store'], listings)
            reportDict = processListingData(data)

            endTime = time.time()

            logRunReport(reportDict,storeInfoDict, endTime-startTime)
            
            runCounter += 1
            WebDriverWait(driver, 30, poll_frequency=30, ignored_exceptions=None)

        except KeyboardInterrupt:
            log.Error('Stop Called')
            recycleDriver(driver)
    
        except BaseException as err:
            log.exception(f'Something happened. => {err.args[0]}')
            recycleDriver(driver)
            driver = getDriver()


In [186]:
#Configuring Logging
log.basicConfig(format='%(asctime)s: %(levelname)s: %(funcName)s => %(message)s', filename='log.log', level=log.INFO)

#Constants
isWindows = 'windows' in platform.platform().lower() 
DRIVER_FILE_NAME = 'chromedriver.exe' if isWindows else '/usr/lib/chromium-browser/chromedriver'
DRIVER_FILE_PATH = os.path.join(os.getcwd(), DRIVER_FILE_NAME)
BESTBUY_STORE = 'best_buy'

NOTIFICATION_FILE_PATH = './assets/youGotmail.mp3'

info = json.loads(os.environ.get('G_INFO'))

EMAIL_ADDRESS = os.environ.get('G_USE')
PASSWORD = os.environ.get('G_PASS')

searchInfos = getProductDicts()

# doWork_Threads(searchInfos)


In [187]:
def getElemSelector(driver, cssSelector):
    try:
        return WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, cssSelector))
    )
    except:
        return None 

def getElemId(driver, id):
    try:
        return WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.ID, id))
    )
    except:
        return None 
    
def findElemBySelectorAndClick(driver, cssSelector):
    elem = getElemSelector(driver, cssSelector)
    elem.click()
    
def findElemByIdAndClick(driver, id):
    elem = getElemId(driver, id)
    elem.click()
    
def findElemBySelectorAndSendKeys(driver, cssSelector, text):
    elem = getElemSelector(driver, cssSelector)
    elem.send_keys(text)
    
def findElemByIdAndSendKeys(driver, id, text):
    elem = getElemId(driver, id)
    elem.send_keys(text)
    
def findSelectByIdAndSelect(driver,id,option):
    elem = getElemId(driver, id)
    select = Select(elem)
    select.select_by_visible_text(option)
    
def findSelectBySelectorAndSelect(driver,cssSelector,option):
    elem = getElemSelector(driver, cssSelector)
    select = Select(elem)
    select.select_by_visible_text(option)
    
def isProductInCart(driver):
    soup = bs(driver.page_source,'html.parser')
    dotSoup = soup.find('div', {'class':'dot'})
    if dotSoup is None:
        return False
    else:
        return True
    

In [188]:
url = 'https://www.bestbuy.com/site/promo/black-friday-headphone-speaker-deals-1'
driver = webdriver.Chrome(executable_path=DRIVER_FILE_PATH)
driver.get(url)
soup = bs(driver.page_source,'html.parser')
listings = soup.select('li[class="sku-item"]')
firstListing = listings[0]

#Check if successfully added 
addToCartButtonSoup = soup.find('button', {'data-button-state':'ADD_TO_CART'})
sku = addToCartButtonSoup['data-sku-id']

attempt = 0
while True:
    findElemBySelectorAndClick(driver, f'button[data-sku-id="{sku}"]')
    time.sleep(3)
    if isProductInCart(driver):
        break
    else:
        attempt += 1
        if attempt == 9:
            raise BaseException('Unable to add product to cart')

#Navigating To Cart
cartUrl = 'https://www.bestbuy.com/cart'
driver.get(cartUrl)
findElemBySelectorAndClick(driver, 'button[data-track="Checkout - Top"]')
findElemBySelectorAndClick(driver, 'button[class*="cia-guest-content__continue"]')

#Contact info
findElemByIdAndSendKeys(driver,'user.emailAddress',info['email_address'])
findElemByIdAndSendKeys(driver,'user.phone','2022308743')
findElemBySelectorAndClick(driver, 'div[class="button--continue"] > button')

#Credit Card Information
findElemByIdAndSendKeys(driver,'optimized-cc-card-number','4242424242424242')
findElemByIdAndSendKeys(driver,'credit-card-cvv','123')
findSelectBySelectorAndSelect(driver, 'select[name="expiration-month"]', '12')
findSelectBySelectorAndSelect(driver, 'select[name="expiration-year"]', '2025')

#Address
findElemByIdAndSendKeys(driver,'payment.billingAddress.firstName','Ike')
findElemByIdAndSendKeys(driver,'payment.billingAddress.lastName','Nwosu')
findElemByIdAndSendKeys(driver,'payment.billingAddress.street','915 Silver Spring Ave')
time.sleep(2)
findElemBySelectorAndClick(driver, 'body')
findElemBySelectorAndClick(driver, 'button[class*="address-form__showAddress2Link"]')
findElemByIdAndSendKeys(driver,'payment.billingAddress.street2','1135')
findElemByIdAndSendKeys(driver,'payment.billingAddress.city','Silver Spring')
findElemByIdAndSendKeys(driver,'payment.billingAddress.zipcode','20910')
findSelectByIdAndSelect(driver, 'payment.billingAddress.state', 'MD')

ElementClickInterceptedException: Message: element click intercepted: Element <button class="c-button-link address-form__showAddress2Link" type="button">...</button> is not clickable at point (118, 454). Other element would receive the click: <div class="autocomplete__item" role="option" aria-selected="false" id="street-a11y-autocomplete-list-item-0">...</div>
  (Session info: chrome=95.0.4638.69)


In [None]:
findElemBySelectorAndClick(driver, 'button[data-track="Place your Order - Contact Card"]')

In [None]:
# doWork_Single(searchInfos)

# log.info('Exiting program')