In [None]:
import re
import os
import time
import shutil
import config
from word2number import w2n
from selenium import webdriver
from tqdm.notebook import tqdm
import pandas as pd
# for browser interaction & load waiting:
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By

# SCerIS scraper

In [None]:
# returns target element once it is loaded
def wait_for(element, by, timeout=10, multi=False):
    global d
    time.sleep(.3)
    try:
        WebDriverWait(d, timeout).until(
            EC.presence_of_element_located((by, element)))
    except TimeoutException: print(f"Timeout limit reached ({timeout} s)")
    finally:
        if multi: return d.find_elements(by, element)
        else: return d.find_element(by, element)

In [None]:
def launch_new_search(verb=True):
    global d
    # open search panel
    url = 'https://scerisecm.boston.gov/ScerIS/CmPublic/#/Home'
    open_search = '/html/body/div[1]/div/div/div[2]/div[3]/search-dashboard/div/div/div[2]/div/div/div/div/ul/li[3]/dashboard-folders-renderer/div/div/div/a[3]'
    if d:
        wait_for("button[ng-click='back()']", By.CSS_SELECTOR).click()
    else:
        d = webdriver.Chrome()
        d.get(url)
    print('Launching new search...')
    time.sleep(.8)
    wait_for(open_search, By.XPATH).click()
    return d

In [None]:
def check_all_button(verb=True):
    # display ALL results data 
    display_all = "input[data-ng-model='displayAllInResults.displayAll']"
    wait_for(display_all, By.CSS_SELECTOR).click()
    wait_for(display_all, By.CSS_SELECTOR).click()
    if verb: print('> Querying all parameters')

In [None]:
def get_search_params(verb=True):
    # define search params and their corresponding input elements on the page
    params = wait_for("div[ng-repeat='dynamicTerm in dynamicSearchFieldTerms'",
                      By.CSS_SELECTOR, multi=True)
    names = [p.text.split('\n')[0] for p in params]
    input_containers = wait_for('input-with-help', By.CLASS_NAME, multi=True)
    inputs = [i.find_element_by_tag_name('input') for i in input_containers]
    
    # list available params
    search_param = {n: {'title':p,'input':i}  for n, p, i in zip(names, params, inputs)}
    
    return(search_param)

In [None]:
def set_search(param, keys, verb=True):
    global search_param
    param = search_param[param]
    if verb:
        d.execute_script("arguments[0].scrollIntoView();", param['title'])
    param['input'].send_keys(keys)

In [None]:
def organize_file(newname, verb=True, download_dir='../../Downloads', target_dir='sceris_data/'):
    time.sleep(1)
    newname = newname+'_COs.csv'
    filename = max([f for f in os.listdir(download_dir)],
                   key=lambda xa : os.path.getctime(os.path.join(download_dir,xa)))
    if '.part' in filename:
        time.sleep(1)
        os.rename(os.path.join(download_dir, filename), os.path.join(download_dir, newname))
    else:
        os.rename(os.path.join(download_dir, filename),os.path.join(download_dir,newname))
    shutil.move(os.path.join(download_dir, newname), os.path.join(target_dir,newname))
    if verb: print(f"> File {newname} saved to '{target_dir}'")

# partial credit to dmb - stackoverflow.com/questions/34548041

In [None]:
def download_if_data(verb=True):
    # any results?
    wait_for('ui-grid-viewport', By.CLASS_NAME)
    records = [t.text for t in
           d.find_elements_by_class_name('ui-grid-viewport')]
    
    if 'No records found.' in ''.join(records):
        if verb: print(f'> No records found.')
        return

    # if any records:
    else: 
        downloader = "button[ng-click='exportSearchResult()']"
        if verb: print(f'> Downloading data...')
        wait_for(downloader, By.CSS_SELECTOR).click()
        wait_for('btnDownload', By.ID).click()  
        print(f"> Clicked 'download'")
        organize_file(st_name)

In [None]:
def sceris_search(st_name):
    print('—'*30)
    print(f"Searching primary st address: {st_name}")
    launch_new_search()
    check_all_button()
    global search_param
    search_param = get_search_params()

    # make inputs
    set_search('Document Type', 'C/O')

    time.sleep(.1)
    set_search('Street Name', st_name)

    # get results
    wait_for('btnSearch', By.ID).click()

    # scrape and reset
    download_if_data(st_name)

# import STR ADCO data

In [None]:
df = pd.read_csv('data_raw/STRbldgsGPSplus.csv')

def parse_st_num(x):
    f_char = x.split()[0]
    if any(w in f_char for w in ['One']):
        return str(w2n.word_to_num(x) )
    else:
        try:  return f'{f_char}-{int(x.split()[1])}'
        except: return f'{f_char}'
        
def parse_st_name(x):
    try:
        int(x.split()[1])
        return x.split()[2]
    except:
        return x.split()[1]
    
df['st_name'] = df.GPSaddress.apply(parse_st_name)
df['st_num'] = df.GPSaddress.apply(parse_st_num)

d = False

In [None]:
for st_name in tqdm(df.st_name.unique()):
    if st_name+'_COs.csv' not in os.listdir('sceris_data'):
        sceris_search(st_name)
    else:
        print(f'[{st_name} -  data already found]')