# Amazon Price Tracker

**Data Science with Raghav**

### Price Tracking on Amazon

This notebook demonstrates how we can create an automated python script to autmatically alert us whenever there is a price change in the item we are interested in. In addition to alerts this script will also store the price history in a pandas DataFrame for further analysis.

For this we will need to scrape data from Amazon using BeautifulSoup library.

**Dependencies**

In [269]:
from bs4 import BeautifulSoup
import requests
import random
import time
import pandas as pd

import os
from os import path

from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from smtplib import SMTP
import smtplib, ssl
import sys


**Read Target URL from a file**

In [270]:
target_URL = []
target_name = []
file_name = 'target_products.txt'
def get_target_products_from_file(file_name):
    global target_URL
    global target_name
    target_df = pd.read_csv(file_name)
    print(f'Total products to be scraped: {target_df.shape[0]}')
    target_URL.extend(target_df['URL'].to_list())
    target_name.extend(target_df['Name'].to_list())

In [271]:
get_target_products_from_file('target_products.txt')

Total products to be scraped: 5


**Browser Header**

In [272]:
headers = {
        'user-agent': 'Chrome/83.0.4103.97',
        'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
        'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8'
}
     

**Get URL HTML**

In [273]:
def get_html(url):
    global headers
    response = requests.get(url,headers=headers)
    if response.status_code == 200:
        return response.content
    else:
        return False

**Get HTML Soup**

In [274]:
def get_html_soup(content):
    soup = BeautifulSoup(content,"html.parser")
    return soup

**Get Price**

In [275]:
def get_value_by_id(soup,element_id):
    try:
        value = soup.select(element_id)[0].get_text()
        return value
    except:
        #print(f'Element {element_id} not found')
        pass
    return False

def get_price(soup):
    element_ids = ['#priceblock_ourprice','#priceblock_dealprice']
    for element_id in element_ids:
        price = get_value_by_id(soup,element_id)
        if price:
            return price
    return 'Not Found'

**Get Title**

In [286]:
def get_title(soup):
    element_id = '#title'
    try:
        title = get_value_by_id(soup,element_id).strip('\n')
    except:
        title = 'Not Found'
    return title

**Get Current Date Time**

In [287]:
import datetime
def get_current_datetime():
    now = datetime.datetime.now()
    d1 = now.strftime("%Y%m%d_%H%M%S")
    #print(d1)
    return d1

**Data Frame to maintain Price History**

In [288]:
if path.exists('price_history.csv'):
    print('Loading Existing Price History')
    price_history_df = pd.read_csv('price_history.csv')
    print(f'Price History initialize with {price_history_df.shape[0]} records.')
else:
    price_history_df = pd.DataFrame(columns=['crawled_datetime','url','product_name','title','price','price_changed_flag'])

Loading Existing Price History
Price History initialize with 6 records.


**Add row to the DataFrame**

In [289]:
def append_row_to_price_history_df(row):
    global price_history_df
    df_length = len(price_history_df) 
    price_history_df.loc[df_length] = row

**Check if price has changed since last run**

In [290]:
def has_price_changed(row):
    product_name = row[2]
    #print(product_name)
    last_price = get_last_price(product_name)
    #print(last_price)
    if last_price =='Not Found':
        return False
    if last_price != row[4]:
        print('Price Changed Detected')
        return True
    else:
        return False

**Get Previous Run's price**

In [291]:
def get_last_price(product_name):
    global price_history_df
    most_recent_row = price_history_df.loc[price_history_df['product_name']==product_name]
    if len(most_recent_row) >0:
        return most_recent_row.iloc[-1]['price']
    else:
        return 'Not Found'

**Convert Price to Number**

In [292]:
def get_price_in_num(price):
    if len(price) >0 and price[0]=='$':
        return float(price[1:])
    else:
        return 'Not Found'

**Function to wait for random seconds before hitting Amazon.com**

In [293]:
def random_wait():
    sleep_times= [1,2,3,4]
    sleep_time = random.choice(sleep_times)
    print(f'Sleeping for {sleep_time} seconds before hitting Amazon again ')
    time.sleep(sleep_time)

**Main Loop**

In [294]:
last_run_index = price_history_df.shape[0]
for index,url in enumerate(target_URL):
    print(f'Getting prices for {target_name[index]}')
    scraped_date_time = get_current_datetime()
    content = get_html(url)
    if content:
        soup = get_html_soup(content)
        price = get_price_in_num(get_price(soup))
        title = get_title(soup)
        #print(f'{target_name[index]} - {title} price is {price}')
    else:
        print(f'Invalid URL - {url}')
        price = 'Not Found'
        title = 'Not Found'
    row = [scraped_date_time,url,target_name[index],title,price]
    if has_price_changed(row):
        price_changed_flag = 1
    else:
        price_changed_flag = 0
    row.append(price_changed_flag)
    append_row_to_price_history_df(row)
    if index < len(target_URL) -1:
        random_wait()
    else:
        print('Done.')

Getting prices for Kindle Reader
Price Changed Detected
Sleeping for 4 seconds before hitting Amazon again 
Getting prices for Airpods
Price Changed Detected
Sleeping for 1 seconds before hitting Amazon again 
Getting prices for Apple Watch Series 6
Price Changed Detected
Sleeping for 3 seconds before hitting Amazon again 
Getting prices for Bose SoundLink Micro
Sleeping for 4 seconds before hitting Amazon again 
Getting prices for Bose SoundLink Color
Done.


In [295]:
price_history_df

Unnamed: 0,crawled_datetime,url,product_name,title,price,price_changed_flag
0,20201129_094904,https://www.amazon.com/Kindle-Now-with-Built-i...,Kindle Reader,Kindle - Now with a Built-in Front Light - Bla...,59.99,0
1,20201129_094909,https://www.amazon.com/Apple-AirPods-Charging-...,Airpods,Apple AirPods with Charging Case (Wired),109.99,0
2,20201129_094911,https://www.amazon.com/dp/B08J5MK16F/ref=fs_a_...,Apple Watch Series 6,"New Apple Watch Series 6 (GPS, 44mm) - (PRODUC...",409,0
3,20201129_095011,https://www.amazon.com/Kindle-Now-with-Built-i...,Kindle Reader,Kindle - Now with a Built-in Front Light - Bla...,59.99,0
4,20201129_095017,https://www.amazon.com/Apple-AirPods-Charging-...,Airpods,Apple AirPods with Charging Case (Wired),109.99,0
5,20201129_095019,https://www.amazon.com/dp/B08J5MK16F/ref=fs_a_...,Apple Watch Series 6,"New Apple Watch Series 6 (GPS, 44mm) - (PRODUC...",409,0
6,20201205_090702,https://www.amazon.com/Kindle-Now-with-Built-i...,Kindle Reader,Kindle - Now with a Built-in Front Light - Bla...,89.99,1
7,20201205_090708,https://www.amazon.com/Apple-AirPods-Charging-...,Airpods,Apple AirPods with Charging Case (Wired),Not Found,1
8,20201205_090710,https://www.amazon.com/dp/B08J5MK16F/ref=fs_a_...,Apple Watch Series 6,"New Apple Watch Series 6 (GPS, 44mm) - Space G...",414,1
9,20201205_090715,https://www.amazon.com/Bose-SoundLink-Micro-Bl...,Bose SoundLink Micro,Not Found,Not Found,0


In [222]:
#price_history_df.iloc[9,4] = '$60.9'

**Detect Changes and Send Email**

In [296]:

def get_pwd():
    with open('pwd.txt','r') as f:
        return f.readline()

    
    
def prepare_html_msg(df):
    msg = MIMEMultipart()
    msg['Subject'] = "Price Change Detected"
    msg['From'] = 'raagabot@gmail.com'
    html = """\
            <html>
            <head></head>
            <body>
            {0}
            </body>
            </html>
            """.format(df.to_html())
    part1 = MIMEText(html, 'html')
    msg.attach(part1)
    return msg

def send_email(msg):
    recipients = ['raghav.atal@gmail.com'] 
    emaillist = [elem.strip().split(',') for elem in recipients]
    sender = 'raagabot@gmail.com'
    # Create secure connection with server and send email
    
    context = ssl.create_default_context()
    with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context) as server:
        server.login(sender, get_pwd())
        server.sendmail(
            sender, emaillist, msg.as_string()
        )
        
def check_and_send_email():
    global price_history_df
    df = price_history_df.iloc[last_run_index:]
    changed_df = df[df['price_changed_flag']==1]
    if changed_df.shape[0] >0:
        msg = prepare_html_msg(changed_df)
        send_email(msg)
        print('Emal Sent.')
    else:
        print('No Price Change Detected')

In [297]:
check_and_send_email()

Emal Sent.


**Save price history to disk**

In [298]:
price_history_df.to_csv('price_history.csv',index=False)

In [299]:
!jupyter nbconvert --to script V9_Amazon_Price_Tracker.ipynb

[NbConvertApp] Converting notebook V9_Amazon_Price_Tracker.ipynb to script
[NbConvertApp] Writing 7082 bytes to V9_Amazon_Price_Tracker.py
