# Tutti.ch Webscraper
## Mini Challenge 5 - wdb @ FHNW BSc Data Science
### Author: Lukas Reber

Script für Webscraping von tutti.ch mittels Selenium. Das Script sucht sämtliche Inserate welche mit dem definierten Suchbegriff gefunden werden und speichert folgende Attribute:

* Titel
* Beschreibung
* Kategorie
* Preis
* Anzahl Aufrufe
* Benutzername
* Datum des Inserats

Damit das Script funktioniert muss Chrome und der entsprechende Chrome Driver installiert sein

In [1]:
# using ChromeDriver 89

from selenium import webdriver
from selenium.webdriver.common.keys import Keys # used to send keys

# https://selenium-python.readthedocs.io/waits.html
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

import re
import pandas as pd
from time import sleep
from datetime import date,timedelta
import requests
import json

In [2]:

def get_tutti_ad_links(searchterm):
    """Get a list of links from all Tutti ads from the defined search term.
        This list can then be further used to scrape all information form the tutti ads.

    Args:
        searchterm (String): String to search ads on tutti.ch

    Raises:
        Exception: Navigation could not access search bar
        Exception: Navigation could not access the number of search results

    Returns:
        list: List of links to all Tutti ads returned from the specific search term
    """    
    # defining the browser to be used
    PATH = './ChromeDriver/chromedriver'
    driver = webdriver.Chrome(PATH)

    # Website to be used - this is hardcoded since the function wouldn't work for another website anyways
    driver.get("https://tutti.ch")

    # accept cookie disclaimer
    cookie = driver.find_element_by_id('onetrust-accept-btn-handler')
    cookie.click()

    # access the search bar to search for defined term
    try:
        root = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.ID, "root"))
        )
        search = root.find_element_by_name('search')
        search.clear()
        search.send_keys(searchterm)
        search.send_keys(Keys.RETURN)

    except:
        driver.quit()
        raise Exception('Excecution failed on search bar navigation')

    # getting the number of search results the search term delivered
    try:
        root = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.ID, "root"))
        )

        # wait some more for everything to load
        sleep(5)

        # find number of search results by css selector (for unknown reasons xpath contains doesnt work here)
        # find_element_by_xpath("//*[contains(text(),'Inserate aus der ganzen Schweiz')]")
        num_results = root.find_element_by_css_selector('h1')
        
        # using regex to only get the number from the string
        n = re.findall(r'\d+',num_results.text)
        
        # return the number of search results
        print('number of search results: {0}'.format(''.join(n)))
        
    except:
        driver.quit()
        raise Exception('Excecution failed on getting the number of search results: no results')

    # getting the links from all ads
    links = []
    condition = True
    while condition:
        try:
            root = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.ID, "root"))
            )
            # find all titles by xpath search
            ads = root.find_elements_by_xpath('//a[@data-automation="ad-subject"]')

            # from each title get the href link
            for ad in ads:
                links.append(ad.get_property('href'))

            # move to next page until there is are no more pages left
            try:
                element = WebDriverWait(driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, '/html/body/div[2]/div/div/div/div[2]/div/div[1]/div/div/div/div[1]/div/div[6]/nav/ul/li[last()-1]/a'))
                )
                element.click()

                # do some more waiting, just to be sure
                sleep(5)
            except:
                condition=False
        except:
            driver.quit()

    # close browser when everything is done
    driver.close()
    driver.quit()

    # return list of links
    return links




In [8]:
def get_ad_data(links):
    """Get all defined attributes from each tutti ad and sends them to a database
        

    Args:
        links (List): List of tutti urls for each ad to be scraped

    Raises:
        Exception: could not find specific data

    Returns:
        DataFrame: DataFrame containing id, title, price, place, user, description, category, dateadded and views of each ad
    """    
    # defining the browser to be used
    PATH = './ChromeDriver/chromedriver'
    driver = webdriver.Chrome(PATH)

    # Open website to accept cookie message
    driver.get("https://tutti.ch")
    sleep(5)

    # accept cookie disclaimer
    cookie = driver.find_element_by_id('onetrust-accept-btn-handler')
    cookie.click()

    # create empty dataframe
    df = pd.DataFrame([], columns=['id','title','price','zipcode','user','description','category','dateadded','views'])

    # iterate through all links and get the defined information
    for link in links:
        driver.get(link)
        try:
            root = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.ID, "root"))
            )
            dict = {}
            dict['id'] = link.split('/')[-1]
            dict['title'] = root.find_element_by_xpath('/html/body/div[2]/div/div/div/div[2]/div/div/div/div[1]/div[1]/div[1]/div/div[1]/div[1]/div[1]/h1').text
            # remove special characters
            dict['title'] = re.sub('[^\w\d\-\. ]+','',dict['title'])
            dict['price'] = root.find_element_by_xpath('/html/body/div[2]/div/div/div/div[2]/div/div/div/div[1]/div[1]/div[1]/div/div[1]/div[2]/div[1]/h2').text.replace('\'','')[:-2]
            dict['zipcode'] = root.find_element_by_xpath('/html/body/div[2]/div/div/div/div[2]/div/div/div/div[1]/div[1]/div[1]/table/tbody/tr[4]/td/dd').text
            # if its an external ad:
            if dict['zipcode'] == 'Mehr Details':
                dict['zipcode'] = root.find_element_by_xpath('/html/body/div[2]/div/div/div/div[2]/div/div/div/div[1]/div[1]/div[1]/table/tbody/tr[3]/td[2]/dd').text
            # there is a difference in design if the user is a pro user
            try:
                dict['user'] = root.find_element_by_xpath('/html/body/div[2]/div/div/div/div[2]/div/div/div/div[1]/div[2]/div[2]/div[1]/div/div[2]/h4').text
            except:
                try:
                    dict['user'] = root.find_element_by_xpath('/html/body/div[2]/div/div/div/div[2]/div/div/div/div[1]/div[2]/div[2]/div[1]/div/div/h4').text
                except:
                    dict['user'] = None
            dict['description'] = root.find_element_by_xpath('/html/body/div[2]/div/div/div/div[2]/div/div/div/div[1]/div[1]/div[1]/table/tbody/tr[2]/td').text
            # remove special characters
            dict['description'] = re.sub('[^\w\d\-\. ]+','',dict['description'])
            dict['category'] = root.find_element_by_xpath('/html/body/div[2]/div/div/div/div[2]/div/div/div/div[1]/div[1]/div[1]/table/tbody/tr[1]/td[1]/div/a').text
            dict['dateadded'] = root.find_element_by_xpath('/html/body/div[2]/div/div/div/div[2]/div/div/div/div[1]/div[1]/div[1]/div/div[1]/div[2]/div[2]/div[1]/span').text
            if 'Heute' in dict['dateadded']:
                dict['dateadded'] = date.today().strftime("%d.%m.%Y")
            if 'Gestern' in dict['dateadded']:
                dict['dateadded'] = (date.today() - timedelta(days = 1)).strftime("%d.%m.%Y")
            dict['views'] = root.find_element_by_xpath('/html/body/div[2]/div/div/div/div[2]/div/div/div/div[1]/div[1]/div[1]/div/div[1]/div[2]/div[2]/div[2]/span').text
            
            df = df.append(dict, ignore_index=True)

            # check if user exists
            if not search_user(dict['user']):
                create_aduser(dict['user'])

            # send ad to database     
            status,reason = create_ad(dict['id'],dict['title'],dict['description'][0:5000],dict['price'],dict['zipcode'],dict['category'],dict['dateadded'],link,dict['views'],dict['user'])

            print('send ad {0} to database, status: {1} {2}'.format(dict['id'],status,reason))

        except:
            print('unable to scrape ad: {0}'.format(link))
    
    # close open connection
    driver.close()
    driver.quit()

    return df


In [246]:
def create_ad(nr,title,description,price,zipcode,category,dateadded,url,views,user):
    """send ad to database over graphql api
        
    Args:
        nr (String): Ad number
        title (String): Ad title
        description (String): Ad description
        price (Integer): Ad price
        zipcode (Integer): Ad Users Zipcode
        category (String): Ad category
        dateadded (String): Ad published date
        url (String): Ad direct link
        views (Integer): Ad views
        user: (String): Ad User

    Returns:
        Int: HTTP Status Code HTTP Request
        String: HTTP Reason of HTTP Request
    """

    query = """
    mutation {
    createAd(
        nr: "%s",
        title: "%s",
        description: "%s",
        price: %s,
        zipcode: %s,
        category: "%s",
        dateadded: "%s",
        url: "%s",
        views: %s,
        userName: "%s"
    ) {
        id
        nr
        title
        description
        price
        zipcode
        category
        dateadded
        url
        views
        user {
        id
        }
    }  
    }
    """ % (nr,title,description,price,zipcode,category,dateadded,url,views,user)

    url = 'http://127.0.0.1:8000/graphql/'
    r = requests.post(url, json={'query': query})
    return r.status_code,r.reason



In [4]:
def search_user(user):
    """query database over graphql api to check if user is already present
        
    Args:
        user: (String): Ad User

    Returns:
        Boolean: True if User is in Database
    """

    query = """
    {
        adusers(name: "%s") {
            id
            name
        }
    }
    """ % (user)

    url = 'http://127.0.0.1:8000/graphql/'
    r = requests.post(url, json={'query': query})
    if len(json.loads(r.text)['data']['adusers']):
        return True
    else:
        return False

In [70]:
def create_aduser(name):
    """create ad user in database over graphql api
        
    Args:
        user: (String): Ad User

    Returns:
        Int: HTTP Status Code HTTP Request
    """

    query = """
    mutation {
        createAduser(
            name: "%s"
        ) {
            name
        }  
    }
    """ % (name)

    url = 'http://127.0.0.1:8000/graphql/'
    r = requests.post(url, json={'query': query})
    return r.status_code


In [251]:
# getting links form all tutti ads
links = get_tutti_ad_links('macbook')

number of search results: 1755


In [257]:
# getting attributes from all links
data = get_ad_data(links)

nd ad 40482313 to database, status: 200 OK
send ad 6928319 to database, status: 200 OK
send ad 40469773 to database, status: 200 OK
send ad 43209006 to database, status: 200 OK
send ad 41819509 to database, status: 200 OK
send ad 41886574 to database, status: 200 OK
send ad 42387131 to database, status: 200 OK
send ad 41895541 to database, status: 200 OK
send ad 39860087 to database, status: 200 OK
send ad 21832048 to database, status: 200 OK
send ad 41880419 to database, status: 200 OK
send ad 26220685 to database, status: 200 OK
send ad 41716400 to database, status: 200 OK
send ad 41673745 to database, status: 200 OK
send ad 12547804 to database, status: 200 OK
send ad 12670629 to database, status: 200 OK
send ad 43210170 to database, status: 200 OK
send ad 41798502 to database, status: 200 OK
unable to scrape ad: https://www.tutti.ch/de/vi/aargau/computer-zubehoer/komponenten-zubehoer/macbook-screen-protector-fuer-retina-mac-book-ivisor-12/43208242
send ad 43207113 to database, stat

In [258]:
# adding all data to a csv
data.to_csv('ads.csv')

In [3]:
import ipytest
ipytest.autoconfig()

In [4]:
%%run_pytest

# just a simple test to see if the function returns any results
def test_get_tutti_ad_links():
    res = get_tutti_ad_links('fiat 500l oldtimer')
    assert len(res)


.                                                                        [100%]
1 passed in 43.50s
