In [1]:
import numpy as np
import pandas as pd
# import matplotlib.pyplot as plt # for plotting price data
# from pandas.errors import EmptyDataError # error produced if empty csv if parsed

from bs4 import BeautifulSoup
import requests # fetches html content of a website, instead of urllib2 previously
from urllib.request import HTTPError # for catching timeout for website response
from urllib.request import urlopen
from urllib.request import URLError

import time # for sleep function
from datetime import datetime # for timestamp

import os # for creation of directories
import re # for regular expressions

### Tracker
- price_hist
- tracked items
    - deploy
    - current_prices
    - add and remove items
        - if item already exists, ask if it should be overwritten
    - fetch prices
    - reset
    - retrieve price hist and items from file
    - log activity
#### Components (sub-classes)
- notify
    - daily/weekly etc. (incl. plot?)
    - notify if price hike or drop for a certain item
    - notify if tracker goes down somehow
- connectivity
    - check connectivity
- item
    - price
    - url
    - ASIN
    - nickname
    - name
- scraper
    - find price
    - find items left
    - find prices of other vendors?
- visualise
    - different options to plot

### To Dos:
- Prevent addidtion of duplicates
- add ouput of messages and add them to log file
- maybe standardise product URLs after input. That means amazon. tplvldomain / sth / ASIN for every product
- notification functionality via email
- remove eval methods and change how the items are stored
- catch error between individual fetches or between updates and act accordingly, e.g. only fetch missing item again
- add documentation and better commenting
- add more prompts/log opportunities

In [61]:
class AmazonPriceTracker:
    
    def __init__(self, tracker_name="tracker"):
        self.items = {"nicknames": [], "names": [], "asins": [], "urls": []}
        self.name = tracker_name
        self.PATH = "./" + self.name + "/"
        try:
            os.mkdir(str(self.name))
        except FileExistsError:
            print("This tracker already exists. Using the existing one instead.")
        
        self.price_history = {}
        self.__retrieve_items()
        
        DateTime = ["year", "month", "day", "hour", "minute"]
        self.price_history = pd.DataFrame(columns=DateTime+self.items["nicknames"])
        
        self.__retrieve_price_hist()
        self.latest_prices = self.price_history.tail(1)
        
    def __webpage2html(self, URL, parser="html.parser"):
        headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36',
        }
        res = requests.get(URL, headers=headers)
        res.raise_for_status()

        soup = BeautifulSoup(res.text, parser)
        return soup

     
    def add_item(self, URL, nickname):
        if "amazon" not in URL:
            print("This is not a valid amazon url.")
        else:
            ASIN = URL.split("/")[4]
            URL = "/".join(URL.split("/")[:5])
            if ASIN not in self.items["asins"]:
                print("Adding item to list of tracked items.")
                try:
                    soup = self.__webpage2html(URL)

                    # extract name
                    for element in soup.find_all("span"):
                        if "productTitle" in str(element):
                            title_containing_str = str(element)
                            break
                    title_containing_str_start = title_containing_str.find(">")+1
                    title_containing_str_end = title_containing_str.find("</")
                    title_raw = title_containing_str[title_containing_str_start:title_containing_str_end]
                    title = title_raw.replace("\n", "").replace("  ", "")

                    # save title and URL to txt
                    f = open(self.PATH + "tracked_items.txt","a", newline="\n")
                    if title not in self.items["names"]:
                        f.write(nickname + " : " + title + " : " + URL + " : " + ASIN + "\n")
                    f.close()

                    # save title and URL to dict
                    self.items["names"].append(title)
                    self.items["urls"].append(URL)
                    self.items["nicknames"].append(nickname)
                    self.items["asins"].append(ASIN)
                    print("{} was succesfully added to list of tracked items.".format(nickname))

                except HTTPError:
                    print("HTTP 503 Error, try to add item again later.")
            else:
                print("This item is already being tracked.")
            
            
    def __retrieve_items(self):
        # retrieve tracked items
        try:
            f = open(self.PATH + "tracked_items.txt", "r")
            if f.read() == "":
                print("No items are being tracked so far. \
                Please add an item to be tracked using .add_item().")
                f.close()
            else:
                f = open(self.PATH + "tracked_items.txt", "r")
                lines = f.readlines()
                for line in lines:
                    nickname, title, url, asin = line.split(" : ")
                    if asin[:-1] not in self.items["asins"]:
                        self.items["names"].append(title)
                        self.items["urls"].append(url)
                        self.items["nicknames"].append(nickname)
                        self.items["asins"].append(asin[:-1])
            f.close()
        except FileNotFoundError:
            open(self.PATH + "tracked_items.txt", "x")
    
    
    def __retrieve_price_hist(self):
        try:
            self.price_history = pd.read_csv(self.PATH + "price_history.csv")
        except FileNotFoundError:
            open(self.PATH + "price_history.csv", "x")
        except EmptyDataError:
            if len(self.items["names"]) > 0:
                print("The price history is empty so far. \
                Please fetch prices using .fetch_prices() first.")
            else:
                pass
        
        
    def wipe_database(self):
        # delete contents of files
        items = open(self.PATH + "tracked_items.txt", "w")
        items.write("")
        items.close()
        
        hist = open(self.PATH + "price_history.csv", "w")
        hist.write("")
        hist.close()
        
        
    def fetch_prices(self, URLs=None):  
        # extract price
        if URLs is None:
            URLs = self.items["urls"]
        error_status = None
        delay = 1 # delay between fetching items in s
        DateTime = ["year", "month", "day", "hour", "minute"]
        new_row = pd.DataFrame(columns=DateTime+self.items["nicknames"])
        if len(self.items["names"]) > 0:
            for n, URL in enumerate(URLs):
                try:
                    print("Fetching price for {}.".format(self.items["nicknames"][n]))
                    soup = self.__webpage2html(URL, "html.parser")
                    time.sleep(delay)
                    item_name = self.items["nicknames"][n]
                    try:
                        price_str = soup.select("#priceblock_ourprice")[0].text.replace(",",".")
                        price = float(price_str[:price_str.index(".")+3])
                        new_row[item_name] = [price]
                    except IndexError:
                        print("The item or price is currently unavailable.")
                        new_row[item_name] = [np.NaN]
                    
                except HTTPError:
                    item_name = self.items["nicknames"][n]
                    new_row[item_name] = [np.NaN]
                    print("\n A price for {} could not be fetched.".format(item_name))
                    error_status = True
        
            now = datetime.now()
            datetime_vec = now.timetuple()[0:5]
            new_row[DateTime] = datetime_vec
            new_row.index = range(self.price_history.shape[0],self.price_history.shape[0]+1)
            self.price_history = self.price_history.append(new_row, sort=False, ignore_index=True)
            self.latest_prices = self.price_history.tail(1)

            # save price history
            self.price_history.to_csv(self.PATH + "price_history.csv", index_label=False, index=False)
    
        else:
            print("There is no items to fetch a price for. Please add items using .add_item() first.")
    
        return error_status
                
        
    def remove_item(self):
        print("The items currently being tracked are: \n")
        for i in range(len(self.items["nicknames"])):
            print("[" + str(i) + "] --> " + self.items["nicknames"][i])
        Input = input("\n To remove an item from tracking enter the corresponding number.\
        \n To cancel, press 'Enter'. ")
        if Input.isdigit():
            item2delete_idx = int(Input)
            if item2delete_idx < len(self.items["nicknames"]):
                item_name = self.items["nicknames"][item2delete_idx]

                # remove from hist
                self.price_history = self.price_history.drop(item_name, axis=1)
                
                # remove from tracked items
                self.items["names"].pop(item2delete_idx)
                self.items["nicknames"].pop(item2delete_idx)
                self.items["urls"].pop(item2delete_idx)
                self.items["asins"].pop(item2delete_idx)
                
                # remove from corresponding .txt and .csv
                f_read = open(self.PATH + "tracked_items.txt", "r")
                lines = f_read.readlines()
                lines.pop(item2delete_idx)
                f_write = open(self.PATH + "tracked_items.txt", "w")
                f_write.write("".join(lines))
                f_read.close()
                f_write.close()
                
                self.price_history.to_csv(self.PATH + "price_history.csv", index_label=False)
                
                print("Item was removed.")
            else:
                print("The input does not correspond to an item.")
        elif Input == "":
            print("The action has been canceled.")
        else:
            print("The input is not valid.")
        

    def plot_prices(self, timescale="day"):
        fig = plt.figure(figsize=(10,6))
        time_axis = self.price_history[timescale]
        tracked_items = list(self.price_history.columns)[5:]
        for item in tracked_items:
            plt.plot(time_axis,self.price_history[item], "-o" , label=item)
        
        plt.legend()
        plt.grid()
        plt.xlabel(timescale + "s")
        plt.ylabel("Price in €")
        plt.show()
        
        
    def current_prices(self):
        self.fetch_prices()
        current_price = self.latest_prices
        return current_price
    
    def __internet_on(self):
        try:
            urlopen('http://216.58.192.142', timeout=1)
            return True
        except URLError as err: 
            return False

        
    def deploy(self):
        while True:
            _time = datetime.now().timetuple()[2:5]
            today = _time[0]
            hour = _time[1]
            minute = _time[2]
            try:
                prev_year, prev_month, prev_day, *_ = np.loadtxt(self.PATH + "price_history.csv", skiprows=1, delimiter=",")[-1]
            except TypeError:
                prev_year, prev_month, prev_day, *_ = np.loadtxt(self.PATH + "price_history.csv", skiprows=1, delimiter=",")
            except StopIteration:
                prev_year, prev_month, prev_day = -1, -1, -1
            print("Checking time...")
            if hour == 0 and (minute < 59 and minute > 0):
                if prev_day != today:
                    attempt = 1
                    URLs = np.array(self.items["urls"])
                    while attempt < 10:
                        try:
                            print("Attempt {} to fetch prices.".format(attempt))
                            status = self.fetch_prices(URLs)
                            if status == None:
                                print("Fetching was a success!")
                                print("...waiting for next fetch.")
                                break
                            else:
                                latest_prices = self.price_history.iloc[-1,5:]
                                fails = np.array(latest_prices.isna())
                                URLs = URLs[fails]
                                attempt += 1
                                nicknames_of_fails = np.array(self.items["nicknames"])[fails]
                                print("Encountered an error while fetching prices for {}. Trying again in 10 min.".format(list(nicknames_of_fails)))
                                time.sleep(10*60)
                        except HTTPError:
                            print("HTTP 503 Error, trying again in 10 minutes.")
                            attempt += 1
                            time.sleep(10*60)
                else:
                    print("Item prices have already been updated today.")
            time.sleep(59*60)

In [3]:
#     def notify(self, email):
#             send email with price plot

#     def request_update(self.):
#         update prices and send email with current prices and history of them at the request
    
# add functionality to see how many items are left in stock if possible!!!
# add functionality to recieve an email every day with plot of price developement and if anything as changed
# add functionality to compare prices to other vendors
# e.g. open with urllib https://www.amazon.de/gp/offer-listing/ (ASIN --> B07SXMZLPK) /ref=dp_olp_new_mbc?ie=UTF8&condition=new
# and scrape webpage for all the prices in soup.select("#olpOfferList")[0].div.div

In [33]:
class Item():
    def __init__(self, nickname=None, description=None, url=None, asin=None, price=None, currency=None, last_updated=None,created=None):
        self.Nickname = nickname
        self.Description = description
        self.Asin = asin
        self.Url = url
        self.Price = price
        self.Currency = currency
        self.Created = created
        self.Last_updated = last_updated
        self.Price_log = {"timestamp": [last_updated], "price": [price]}
        self.DatetimeFormatStr = "%H:%M, %m/%d/%Y"# "(%Y, %m, %d, %H, %M)" # temporary better: "%H:%M, %m/%d/%Y"
    
    def __str__(self):
        return str({
                "Nickname": self.Nickname,
                "Description": self.Description,
                "Asin": self.Asin,
                "Url": self.Url,
                "Price": self.Price, 
                "Currency": self.Currency,
                "Created": self.Created.strftime(self.DatetimeFormatStr),
                "Last_updated": self.Last_updated.strftime(self.DatetimeFormatStr)
               })
    
    def from_txt(self, file):
        with open(file, "r") as f:
            class_attrs = eval(f.readline()) # eval is always dangerous! temporary
            self.Price_log = eval(f.readline()) # eval is always dangerous! temporary
            for index, (timestamp,price) in enumerate(zip(self.Price_log["timestamp"], self.Price_log["price"])):
                self.Price_log["timestamp"][index] = datetime.strptime(timestamp, self.DatetimeFormatStr) # str neccesary because of eval()
                self.Price_log["price"][index] = float(price)
        
        self.Nickname =  class_attrs["Nickname"]
        self.Description = class_attrs["Description"]
        self.Asin = class_attrs["Asin"]
        self.Url = class_attrs["Url"]
        self.Price = float(class_attrs["Price"])
        self.Currency = class_attrs["Currency"]
        self.Created = datetime.strptime(str(class_attrs["Created"]), self.DatetimeFormatStr) # str neccesary because of eval()
        self.Last_updated = datetime.strptime(str(class_attrs["Last_updated"]), self.DatetimeFormatStr) # str neccesary because of eval()

    def __reformat_date(self, date):
        return datetime.strftime(date, self.DatetimeFormatStr)

    def to_txt(self, path="./"):
        with open(path + self.Nickname + ".txt", "w") as f:
            f.write(self.__str__() + "\n")
            price_log = self.Price_log
            price_log["timestamp"] = list(map(self.__reformat_date, price_log["timestamp"]))
            f.write(str(price_log)) # temporary solution

class Scraper():
    def __init__(self):
        self.Online = False
        
    def webpage2soup(self, url, parser="lxml"):
        headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36',
        }
        res = requests.get(url, headers=headers)
        res.raise_for_status()

        soup = BeautifulSoup(res.text, parser)
        return soup
            
    def test_connection(self, url='http://216.58.192.142'):
        try:
            urlopen(url, timeout=1)
            self.Online = True
        except URLError as err:
            self.Online = False
        return self.Online
    
    def ping_AmazonDE(self):
        return test_connection("amazon.de")

class Parser():
    def __init__(self):
        self.Template_Amazon_Url = r"(https://)*(www.)*([a-z_-]+)\.([a-z]+)/([a-z0-9-_]+)/([a-z0-9-_]+)/([a-z0-9-_]+)" # Amazon regex
        self.Template_Amazon_Description = r"(<span\s(class=\"a-size-large\"\s)*(id=\"productTitle\")(\sclass=\"a-size-large\")*>\n\s+(.+)\n\s+</span>)"
        self.Template_Amazon_Price = r"([0-9,]+)\s(.)"
        
    def __groupbytemplate(self, string, re_template):
        regex = re.compile(re_template)
        m = regex.search(string)
        return m.groups()        
        
    def find_attributes(self, html):
        attributes = {"description": "",
                      "currency": "",
                      "price": ""}
        
        # find product description
        description = self.find_description(html)
        attributes["description"] = description
        
        # find price and currency
        price, currency = self.find_price(html)
        attributes["price"] = float(price)
        attributes["currency"] = currency
           
        return attributes
    
    def parse_url(self,url):
        regex = re.compile(self.Template_Amazon_Url)
        m = regex.search(url.lower())
        url_slices = m.groups()
        
        topleveldomain = "." + url_slices[3]
        
        if url_slices[3] == "de":
            if url_slices[4] == "gp":
                asin = url_slices[6]
            else:
                asin = url_slices[5]
        elif url_slices[3] == "com":
            asin = url_slices[6]
        else:
            pass # so far only .com and .de supported
            
        return asin, topleveldomain
    
    def find_price(self, html):
        price_str = str(html.select("span#priceblock_ourprice"))
        groups = self.__groupbytemplate(price_str, self.Template_Amazon_Price)
        price = groups[0].replace(",", ".")
        currency = groups[1]
        return price, currency
    
    def find_description(self, html):
        title_str = "" # why do I have to reference this var before?
        for element in html.find_all("span"):
            if "productTitle" in str(element):
                title_str = str(element)
                break
        groups = self.__groupbytemplate(title_str, self.Template_Amazon_Description)
        description = groups[4]
        return description
        
class Notifier():
    def __init__(self, path="./", logfile="events"):
        self.Last_event = (None, None) # event + timestamp
        open(path + logfile + ".log", "a")
        self.Log_path = path
        self.Logfile_name = logfile
        
    def prompt(self, event=""):
        timestamp = datetime.now()
        print(timestamp.strftime("%H:%M, %m/%d/%Y") + " -- " + event)
        self.Last_event = (timestamp, event)
        return timestamp, event
    
    def log(self, event=""):
        timestamp, event = self.prompt(event)
        with open(self.Log_path + self.Logfile_name + ".log", "a") as f:
            f.write(str(timestamp) + " -- " + event + "\n")
        pass
    
    def send_email(self):
        pass
    
class Tracker(Item, Scraper, Notifier, Parser):
    def __init__(self, path="./", name="default_tracker", load=False):
        self.Path = path + name + "/"
        self.Name = name
        self.Items = []
        
        if load:
            self.load(self.Path)
        else:
            try:
                os.mkdir(self.Path)
            except FileExistsError:
                print("This tracker already exists.") # do you want to load it?
                
        Scraper.__init__(self)
        Parser.__init__(self)
        Notifier.__init__(self, self.Path)
        
    def add_item(self, nickname=None, description=None, url=None, asin=None, price=None, currency=None, last_updated=None, created=None, save=False):
        item = Item(nickname, description, url, asin, price, currency, last_updated, created)
        self.Items.append(item)
        
        if save:
            item.to_txt(self.Path)
    
    def __asin(self, item):
        return item.Asin
    
    def add_item_by_url(self, alias, url, save=False):
        asin, _ = self.parse_url(url)
        if asin not in list(map(self.__asin,tracker.Items)):
            html = self.webpage2soup(url)
            attributes = self.find_attributes(html)

            nickname = alias
            description = attributes["description"]
            price = attributes["price"]
            currency = attributes["currency"]
            created = datetime.now()

            self.add_item(nickname, description, url, asin, price, currency, created, created, save)
            self.log(nickname + " was successfully added to: " + self.Name + ".")
        else:
            self.log("ASIN matches an item that is already being tracḱed.")
        
    def list_items(self):
        for item in self.Items:
            print(item.Nickname)
    
    def fetch_price(self, Item):
        html = self.webpage2soup(Item.Url)
        price, currency = self.find_price(html)
        self.log("The Price for " + Item.Nickname + " has been successfully fetched.")
        return price, currency
    
    def update_prices(self, timeb4nextfetch=0):
        now = datetime.now()
        for Item in self.Items:
            try:
                price, _ = self.fetch_price(Item)
                Item.Price = price
            except:
                Item.Price = np.nan
                self.log("The Price for " + Item.Nickname + " could not be fetched.")
                
            Item.Last_updated = now
            Item.Price_log["timestamp"].append(now)
            Item.Price_log["price"].append(price)
            time.sleep(timeb4nextfetch)
                
    def deploy(self):
        self.log(self.Name + " has been deployed.")
        while(True):
            if self.test_connection(url="amazon.de"):
                self.update_prices(5)
                self.log("Prices have been updated.")
                self.save()
                self.log("New Prices have been saved.")
                self.history_to_csv(True)
                time.sleep(60*60*12)
                self.log("Waiting 12 hours for next update...")
            else:
                time.sleep(60*10)
                self.log("Could not establish connection with Amazon, waiting 10min before trying again...")
    
    def load(self, path):
        self.Path = path
        regex = re.compile(r"/([a-zA-Z0-9-_]+)/$")
        m = regex.search(path)
        self.Name = m.groups()[0]
        if len(self.Items) == 0:
            files_in_dir = [f for f in os.listdir(self.Path) if os.path.isfile(os.path.join(self.Path, f))]
            for file in files_in_dir:
                if file[-4:] == ".txt":
                    item = Item()
                    item.from_txt(self.Path + file)
                    self.Items.append(item)
        self.log(self.Name + " has been successfully loaded.")
                    
    def save(self):
        for item in self.Items:
            item.to_txt(self.Path)
    
    def history_to_csv(self, save=False):  
        df = pd.DataFrame({})
        for item in self.Items:
            dct = {item.Nickname: []}
            timestamps, prices = item.Price_log.values()
            for timestamp, price in zip(timestamps, prices):
                dct[item.Nickname].append(price)
            df_col = pd.DataFrame(dct, index=[timestamp.strftime(Item().DatetimeFormatStr)])
            df = pd.concat([df, df_col])
        # merge rows with the same timestamp
        for timestamp in df.index.unique():
            n_prices = df.loc[timestamp].notna().sum() # implement check, to prevent 0s !!!
            df = (df.loc[timestamp].fillna(0).sum() / n_prices).to_frame(timestamp).T
        df.index.name = "timestamp"
        if save:
            df.to_csv(self.Path + "price_hist.csv")
            self.log("Price history has been saved to .csv")
            
        return df         

In [34]:
tracker = Tracker()

This tracker already exists.


In [35]:
# tracker.add_item_by_url("AMD Ryzen 7 3700x", "https://www.amazon.de/gp/product/B07SXMZLPK/ref=ox_sc_saved_title_1?smid=A27FVGL1U6882E&psc=1")
# tracker.add_item_by_url("32GB DDR4 RAM", "https://www.amazon.de/gp/product/B016ORTNI2/ref=ox_sc_saved_title_4?smid=A3JWKAKR8XB7XF&psc=1")
# tracker.add_item_by_url("512 GB M.2 SSD", "https://www.amazon.de/gp/product/B07CJ3RVP3/ref=ox_sc_saved_title_5?smid=A3JWKAKR8XB7XF&psc=1")

In [36]:
# tracker.save()
tracker.load("./default_tracker/")
tracker.update_prices()
tracker.list_items()

01:27, 03/30/2020 -- default_tracker has been successfully loaded.
01:27, 03/30/2020 -- The Price for 512 GB M.2 SSD has been successfully fetched.
01:27, 03/30/2020 -- The Price for 32GB DDR4 RAM has been successfully fetched.
01:27, 03/30/2020 -- The Price for AMD Ryzen 7 3700x has been successfully fetched.
512 GB M.2 SSD
32GB DDR4 RAM
AMD Ryzen 7 3700x
