In [1]:
import requests
from lxml import html

import unicodecsv as csv
import pandas as pd
import numpy as np

import time
import random
import re
import json

from IPython.display import display, Markdown, HTML
import IPython.display as dsp

pd.set_option('display.width', None)
pd.set_option('display.max_rows', 120)
pd.set_option('display.max_info_rows', 120)
pd.set_option('display.max_info_columns', 120)
pd.options.display.max_colwidth = 200  # pd.set_option('display.max_colwidth', 200)
pd.set_option('display.float_format', lambda fp: f'{fp:,.3f}')


import missingno as msno
import seaborn as sns
import matplotlib.pyplot as plt

import sklearn.preprocessing
import sklearn.impute
import sklearn.neighbors
import sklearn.cluster

import scipy.stats

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.support.ui import Select
from time import sleep

LINE_UP, LINE_CLEAR = None, None


In [2]:
def write(name, data):
    with open(name, 'w') as f:
        f.write(data)

In [3]:
def random_ua():
    user_agents = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.3',
        #'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
        #'Mozilla/5.0 (X11; CrOS x86_64 8172.45.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.64 Safari/537.36',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0',
    ]
    selected_ua = random.choice(user_agents)
    print(f'Selected UA: {selected_ua}')
    return selected_ua

In [4]:
def countdown( min, max=None, buffer=None, prefix='Sleeping', postfix='...'):
    '''Print a countdown to screen.  Erase it when it is done.  
    Accepts either the time to countdown OR a min and max for randint for random sleeps.'''
    if max is None:
        max = min
    
    delay = random.randint(min, max)
    for i in range(delay):
        msg = f'{prefix} {delay-i:<2}s'
        msg = f'{msg:<14}s / {delay:>2}s {postfix}'
        if buffer is None:
            print(msg, end='')
        else:
            buffer.update(Markdown(msg))

        time.sleep(1)
        if buffer is None:
            print(f'\r                      ', end='\r')
        #else:
            #buffer.update(Markdown(f' '))
    if buffer is None:
        print(LINE_UP, LINE_CLEAR, end='')
    else:
        buffer.update(Markdown('Sleeping complete ...'))
    

In [5]:
def write_csv(data, target, fieldnames=None):
    # Save data to csv

    with open(f'properties-{target}.csv', 'wb') as csvfile:
        if fieldnames is None:
            fieldnames = list(data[0].keys())
        print(f'Fieldnames: {fieldnames}')
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for row in data:
            writer.writerow(row)

In [6]:
def pr(msg, buffer=None, end='\n', auto_format=True):
    #format = dict(top='## {}', mid='{}', bot='{}')  # buffers are IPython.display.display objects with a 'name' attribute added

    if not buffer:
        print(msg, end=end)
    
    else:
       #print('PR: msg type: ', type(msg))
       if isinstance(msg, str):
        if auto_format:
            try:
              msg = buffer.format.format(msg)
            except Exception as e:
              #print('In pr(): buffer formatting failed')
              pass
        buffer.update(Markdown(msg))
       
       else:
          #print('PR: msg type: ',type(msg), 'auto_format: ', auto_format)
          buffer.update(msg)


In [None]:
def get_display_handles():
    dh_top = display('...', display_id=True)
    dh_mid = display('...', display_id=True)
    dh_bot = display('...', display_id=True)

    dh_top.name = 'top'
    dh_mid.name = 'mid'
    dh_bot.name = 'bot'

    dh_top.format = '#### {}'
    dh_bot.format = '{}'
    dh_mid.format = '{}'
    
    return (dh_top, dh_mid, dh_bot)

dh_top, dh_mid, dh_bot = get_display_handles()

'...'

'...'

'...'

In [8]:
def ALCO_fetch_data(target=None, filter=''):
    base_url = 'https://www2.alleghenycounty.us/RealEstate{}'
    if target is None:
        target = '/Default.aspx'
        #url = 'https://www.redfin.com/county/2362/PA/Allegheny-County/'
    else:
        if target[0] != '/':
            target = '/' + target
        if target[-1] != '/':
            target += '/'
        if filter != '':
            if filter[0] == '/':
                filter = filter[1:]
            if filter[-1] != '/':
                filter += '/'
    url = base_url.format(target+filter)

    headers = {
        'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
        'accept-language': 'en-US,en;q=0.9',
        'cache-control': 'max-age=0',
        'sec-ch-ua': '"Microsoft Edge";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
        'sec-ch-ua-mobile':'?',
        'sec-ch-ua-platform':'"Windows"',
        'sec-fetch-dest':'document',
        'sec-fetch-mode':'navigate',
        'sec-fetch-site':'none',
        'sec-fetch-user':'?1',
        'upgrade-insecure-requests':'1',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0',
    }
    s = requests.Session()
    s.headers.update(headers)

    #options = webdriver.FirefoxOptions()
    #service = webdriver.FirefoxService(executable_path='/usr/bin/firefox')
    #driver = webdriver.Firefox(service=service, options=options)
    driver = webdriver.Firefox()
    df = None
    dh_top, dh_mid, dh_bot = get_display_handles()
    print(dh_top, dh_mid, dh_bot)
    #input()
    # Loop
    while url is not None:
        response = s.get(url=url)
        driver.get(url)
        parser = html.fromstring(response.text)
        return s, driver, parser
        pr(f'**{response.status_code}**: {url}', buffer=dh_top)
        
        new_data = RF_get_df(parser, index='propertyId')
        if df is None:
            df = new_data
        else:
            df = pd.concat((df, new_data))
            pr(f'Page {url[-1]}: Added {df.shape[0]:>5} new rows.', buffer=dh_bot)

        last_url = url
        next_suffix = RF_next_url_suffix(parser)
        url = base_url.format(next_suffix) if next_suffix is not None else None
        if url is None or last_url == url:
            pr(f'**BREAKING**: No more URLs. Last URL: {last_url}', buffer=dh_mid)
            break

        countdown(min=3, max=7, buffer=dh_mid, postfix=f'Next URL: {url}')   # Sleep
        pr(f'Looping from beginning.  Next URL: {url}', buffer=dh_mid)
    
    return df

In [16]:
def get_driver(url='https://www2.alleghenycounty.us/RealEstate/Default.aspx'):
    options = webdriver.FirefoxOptions()
    service = webdriver.FirefoxService(executable_path=r'/snap/bin/geckodriver')
    driver = webdriver.Firefox(options=options, service=service)
    driver.get(url)
    return driver

In [10]:
def setup_page(driver):
    # Determine if this is the disclaimer page
    try:
        if driver.find_element(By.TAG_NAME, value='h1').text.lower() == 'legal disclaimer':
            # We need to click to go forward
            btn_continue = driver.find_element(by='id', value='btnContinue')
            btn_continue.click()
    except NoSuchElementException as e:
        print('No h1 tag found to compare for legal disclaimer')
        pass

    try:
        driver.find_element(By.CLASS_NAME, 'beta-button-2').click()
    except NoSuchElementException as e:
        # Not found ... move on
        print('beta popup close button was not found; failing silently')
        pass

In [11]:
def address_search(driver, selectm='Bethel Park'):
    #street_name = driver.find_element(By.ID, 'txtStreetName')
    #street_name.send_keys(' ')
    
    select_muni = Select(driver.find_element(By.ID, 'ddlMuniCode'))
    select_muni.select_by_visible_text(selectm)

    btn_search = driver.find_element(By.ID, 'btnSearch')
    btn_search.click()
    sleep(3)

    pager = None
    while pager is None:
        try:
            pager = driver.find_element(By.ID, 'pnlPageNums')
        except Exception as e:
            pager = None
        driver.find_element(By.ID, 'txtStreetName').send_keys(' ')  
        driver.find_element(By.ID, 'btnSearch').click()
        sleep(10)

    print('loaded')

    link = driver.find_element(By.XPATH, '//table[@id="dgSearchResults"]//tr[2]/td/a')
    link.click()
    
#address_search(driver)

In [82]:
def grab_page(driver, VERBOSE=False, dh=None):
    data = [td.text for td in driver.find_elements(By.XPATH, '//table[@id="Table3"]//td/span')]
    
    next_key = True
    assessments = False
    assessment_no = 0
    assessment_first = True
    
    data1 = []
    data2 = []
    
    for idx, d in enumerate(data):
        if assessments:
            if assessment_no < 4:
                if assessment_first:  # It's a key
                    #print(d, 'is assessment key.')
                    data1.append(d)    
                else:  # It's a value
                    #print(d, 'is assessment value.')
                    d = d.replace('$', '').replace(',', '')

                    data2.append(d)
                    assessment_no += 1
                    
                assessment_first = not assessment_first

        elif next_key:  # Looking for a key
            if d[-1] == ':':  # It's a key
                #print(d, 'is a key')
                d = d[:-2]  # Strip trailing ' :'
                data1.append(d)
                next_key = False
            else:  # Not actually a key
                if d[4:] == ' County Assessed Value (Projected)':
                    #print(d, 'is last header for assessments.')
                    assessments = True
                else:
                    if data[idx-1] == 'Sale Price':
                        d = d.replace('$', '').replace(',', '')
                    elif data[idx-1] == 'Lot Area':
                        d = d.replace(' Acres', '')
                    #print(d, 'is not actually a key.  Skipping ...')
                    next_key = True
        else: # expecting a value
            #print(d, 'is this a value?')
            data2.append(d)
            next_key = True

    
    as_dict = dict(zip(data1, data2))
    try:
        as_dict['PIN'] = driver.find_element(By.ID, 'BasicInfo1_lblParcelID').text
        as_dict['Address'] = driver.find_element(By.ID, 'BasicInfo1_lblAddress').text
        as_dict['Owner'] = driver.find_element(By.ID, 'BasicInfo1_lblOwner').text
    except NoSuchElementException as e:
        dh.update('PIN/Address/Owner not found.  Probably got timed out.')
        
    if VERBOSE:
        if dh is None:
            print(as_dict.get('PIN', 'NO_PIN_FOUND'), end='    ')
        else:
            dh.update(f'Record {i}:\t{as_dict['PIN']}')
    
    record_number = int(driver.find_element(By.ID, 'Header1_lblRecNum').text)
    
    return as_dict, record_number
#grab_page(driver)

In [13]:
def next_page(driver):
    driver.find_element(By.ID, 'Header1_btnNextRec').click()
#next_page(driver)

In [20]:
driver = get_driver()
type(driver)

selenium.webdriver.firefox.webdriver.WebDriver

In [70]:
def goto_page(driver, start_record_num, dh):
    current_record = int(driver.find_element(By.ID, 'Header1_lblRecNum').text)
    pages_crossed = 0

    while current_record != start_record_num:
        if pages_crossed == 30:
            pages_crossed = 0
            countdown(min=5, max=7, buffer=dh, prefix=f'On page {current_record}.  Sleeping')
        pages_crossed += 1
        next_page(driver)
        current_record = int(driver.find_element(By.ID, 'Header1_lblRecNum').text)


In [83]:
def ALCO_fetch(muni='Bethel Park', start_record_num=1, num=30):
    driver = get_driver()
    setup_page(driver)
    address_search(driver, selectm=muni)
    dh_top, dh_mid, dh_bot = get_display_handles()

    #  Handle data
    i = 0
    max_records = 30  # int(driver.find_element(By.ID, 'Header1_lblTotRec').text)
    record_nos = range(max_records)
    current_record = int(driver.find_element(By.ID, 'Header1_lblRecNum').text)
    df = None
    if current_record != start_record_num:
        goto_page(driver, start_record_num, dh_bot)
    current_record = int(driver.find_element(By.ID, 'Header1_lblRecNum').text)
    dh_top.update(f'On record **{current_record}**.  Beginning extraction.')
    for i in record_nos:
        vb = (i % 10) == 0
        if vb:
            countdown(min=5, max=30, buffer=dh_bot)
        else:
            countdown(min=1, max=6, buffer=dh_bot)

        data, num = grab_page(driver, VERBOSE=vb, dh=dh_mid)

        if df is None:
            df = pd.DataFrame(data, index=[i])
        else:
            df = pd.concat([df, pd.DataFrame(data, index=[i])])
        next_page(driver)

    return df

df = ALCO_fetch(start_record_num=90)
df

loaded


'On record **90**.  Beginning extraction.'

'PIN/Address/Owner not found.  Probably got timed out.'

Sleeping complete ...

NoSuchElementException: Message: Unable to locate element: [id="Header1_lblRecNum"]; For documentation on this error, please visit: https://www.selenium.dev/documentation/webdriver/troubleshooting/errors#no-such-element-exception
Stacktrace:
RemoteError@chrome://remote/content/shared/RemoteError.sys.mjs:8:8
WebDriverError@chrome://remote/content/shared/webdriver/Errors.sys.mjs:193:5
NoSuchElementError@chrome://remote/content/shared/webdriver/Errors.sys.mjs:511:5
dom.find/</<@chrome://remote/content/shared/DOM.sys.mjs:136:16


In [71]:
first_df = df.copy()

In [None]:
# enter 0 in fifth parcel id textbox
textbox5 = driver.find_element(By.ID, 'txtParcelID5')
textbox5.send_keys('0')

NoSuchElementException: Message: Unable to locate element: [id="txtParcelID5"]; For documentation on this error, please visit: https://www.selenium.dev/documentation/webdriver/troubleshooting/errors#no-such-element-exception
Stacktrace:
RemoteError@chrome://remote/content/shared/RemoteError.sys.mjs:8:8
WebDriverError@chrome://remote/content/shared/webdriver/Errors.sys.mjs:193:5
NoSuchElementError@chrome://remote/content/shared/webdriver/Errors.sys.mjs:511:5
dom.find/</<@chrome://remote/content/shared/DOM.sys.mjs:136:16


In [None]:
btn_search = driver.find_element(By.ID, 'btnSearch')
btn_search.click()
sleep(3)

In [None]:
pager = None
while pager is None:
    try:
        pager = driver.find_element(By.ID, 'pnlPageNums')
    except Exception as e:
        pager = None
    driver.find_element(By.ID, 'btnSearch').click()
    sleep(10)

print('loaded')

loaded


In [None]:
link = driver.find_element(By.XPATH, '//table[@id="dgSearchResults"]//tr[2]/td/a')
link.text

'0001-A-00100-0000-00'

In [None]:
driver.find_element(By.XPATH, '//table[@id="dgSearchResults"]//tr[2]/td/a')

<selenium.webdriver.remote.webelement.WebElement (session="86ae0c1c-1997-4455-b7b9-dd492023d26e", element="fc311bfb-8939-49be-9940-4b445b8e5af3")>