# Webscraper

#### Install/Import

In [3]:
import time
import requests
import random
import os
import csv
import json
import jsonlines
import pandas as pd 
import geopandas as gpd
import numpy as np
import shutil
from selenium import webdriver
#from seleniumwire import webdriver as wd
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.common.exceptions import ElementNotInteractableException
import undetected_chromedriver as uc



#### Web Scraper Helpers

##### 1. Set up chrome driver

In [4]:
def setup_undetected_driver():
    # Create ChromeOptions instance
    chrome_options = uc.ChromeOptions()

    # Enable performance logging
    chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'})

    # Add any other arguments you need
    chrome_options.add_argument("--enable-logging")

    # Create the undetected ChromeDriver instance
    driver = uc.Chrome(options=chrome_options, version_main=126)

    # Set timeouts
    driver.set_script_timeout(30)
    driver.set_page_load_timeout(30)

    # Return the webdriver instance we just created
    return driver

##### 2. Query Address

In [5]:
# Address search function

def search_address(driver, address, max_attempts = 10): 
    # 1. Set up loop to allow for multiple query attempts in order to circumvent pop ups
    for attempt in range(max_attempts):
        try: 
            # 2. Locate Address Search Bar
            address_input = WebDriverWait(driver, 5).until(
                EC.presence_of_element_located((By.ID, "input-addressInput"))
                )
            
            
            # 3. Enter address and double check that search input matches function input
            address_input.send_keys(address)
            entered_text = address_input.get_attribute("value")
            print(entered_text)
            
            if entered_text != address:
                print(f"Entered Text: {entered_text}")
                print(f"Address: {address}")
                print("address entered incorrectly")
                if attempt < max_attempts - 1:
                    print(f"Attempt {attempt + 1} failed. Retrying...")
                    driver.refresh()  # Refresh the page and try again
                    continue # make sure to go to the next attampt after this (is this necessary?)
                else:
                    print("Max attempts reached. Could not enter address.")
                    return False
            
            # 4. Submit Address
            address_input.send_keys(Keys.RETURN)
        
            # 5. Return True if address is sucessfully submitted
            return True
        
        # 5. Retry or return false if pop up occurs
        except ElementNotInteractableException:
                if attempt < max_attempts - 1:
                    print(f"Attempt {attempt + 1} failed. Retrying...")
                    driver.refresh()  # Refresh the page and try again
                else:
                    print("Max attempts reached. Could not enter address.")
                    return False
                  
        # QUESTION: IF THE PAGE DID NOT LOAD (ACCESS DENIED), IS THERE A WAY TO RELOAD AND TRY AGAIN
        # MAYBE CALL SET UP DRIVER FUNCTION? 
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            return None

##### Collect API or Session Storage Data

In [6]:
# gather session storage data where applicable
def get_session_storage_data(driver, key):
  script = f"return window.sessionStorage.getItem('{key}');"
  return driver.execute_script(script) 

In [7]:
def collect_api_response(driver, api_name="service-address-validation", time_out=60):
    start_time = time.time()
    request_response_pairs = {}

    while time.time() - start_time < time_out:
        try:
            logs = driver.get_log("performance")
            for entry in logs:
                log = json.loads(entry['message'])['message']
                if log['method'] == 'Network.requestWillBeSent':
                    params = log.get('params', {})
                    request = params.get('request', {})
                    request_url = request.get('url', '')
                    if api_name in request_url.lower():
                        request_id = log["params"]["requestId"]
                        request_response_pairs[request_id] = {'request': request_url}
                elif log['method'] == 'Network.responseReceived':
                    response_request_id = log.get('params', {}).get('requestId')
                    if response_request_id in request_response_pairs:
                        request_response_pairs[response_request_id]['response'] = log.get('params', {})

            # Check if we have any complete request-response pairs
            for request_id, data in request_response_pairs.items():
                if 'request' in data and 'response' in data:
                    try:
                        response = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id})
                        response_body = response.get("body", "")
                        return json.loads(response_body)
                    except Exception:
                        # If there's an error getting the response body, immediately try session storage
                        break

            # If we haven't returned by now, try session storage
            session_storage_data = get_session_storage_data(driver, "qualificationInfo")
            if session_storage_data:
                return json.loads(session_storage_data)

        except Exception as e:
            print(f"An error occurred while processing logs: {type(e).__name__}")

    print("No API response or session storage data found within the timeout period")
    return None

##### Parsing and Saving Data

In [8]:
# close matches often occur when an address is given without a subaddress. 
# this function selects a random subaddress and returns an address including the subaddress to requery
def handling_close_matches(data):
  alt_add = data["content"]['alternateAddress']
  if type(alt_add) == list: # randomly select an alretnate address to query 
    #-> else, there is just one dictionary for one alternate address
    index = random.randint(0, len(alt_add) - 1)
    print(index)
    alt_add = data["content"]['alternateAddress'][index]
  
  str_num = alt_add.get("streetNr", "")
  str_dir = alt_add.get("streetDirection", "")
  str_name = alt_add.get("streetName", "")
  str_type = alt_add.get("streetType", "")
  city = alt_add.get("city", "")
  state = alt_add.get("stateOrProvince", "")
  postcode = alt_add.get("postcode", "")
  
  try: 
    sub_address = alt_add.get('subAddress', "")
    return str_num + " " + str_dir + " " + str_name + " " + str_type + " " + sub_address + ", " + city + ", " + state + " " + postcode
  except KeyError:
    try:
      sub_unit_type = data["content"]['alternateAddress']['geographicSubAddress']['subUnitType']
      unit_type = sub_unit_type.get("addrType")
      unit_val = sub_unit_type.get("value")
      return str_num + " " + str_dir + " " + str_name + " " + str_type + " " + unit_type + " " + unit_val + ", " + city + ", " + state + " " + postcode
    except KeyError:
      return None


In [14]:
# helper function for writing results as dictionaries to JSON lines (jsonl) files
def append_result_to_jsonl(results_dict, filename = "att_fiber_availability_data.jsonl"):
  with open(filename, "a") as f:
    json.dump(results_dict, f)
    f.write("\n")
  print(f"Result appended to {filename}")
  

In [9]:
# parses json api response to retain only necessary data and writes to a json file containing the output 

def parse_and_save_available_resp(address, driver, output_file, api_name = "service-address-validation", time_out = 30):
  invalid = False
  
  try: 
  
    # 1. Query address and collect response indicating fiber availability
    data = collect_api_response(driver, api_name, time_out)
  
    if data:
      # 2. Initialize data values
      fiber_available = False
      services_enabled = None 
      existing_services = None
      upload_speed = 0
      download_speed = 0
      
      # 2.1. Check for non matches/addresses not in system
      if data["content"]["status"] == "nomatch":
        invalid_add = address
        invalid = True
        return [invalid_add, invalid]
      
      # 2.2. Check for close matches --> may help with issues of omitted subaddresses
      if data["content"]["status"] == "closematch-mdu" or data["content"]["status"] == "closematch":
        new_add = handling_close_matches(data)
        
        if not new_add:
          invalid = True
          invalid_add = address
          return [invalid_add, invalid] # no alternate address found
        
        return [new_add, invalid] #invalid = false; this address will be requeried, if necessary
        
        
      # 3. Check serviceQualification section for availability indicators
      for cat in data["content"]["serviceQualification"]:
        if cat["category"]["name"] == "INTERNET":
          for service in cat["category"]["services"]:
            #print(type(service))
            if service["name"] == "FIBER":
              fiber_available = service["QualificationResult"] == "qualified"
              upload_speed = service.get('MaxUploadSpeedMbps', 0)
              download_speed = service.get('MaxDownloadSpeedMbps', 0)
              break
      

      # 4. Check additional fields relating to fiber availability and adoption
      services_enabled = data['content'].get('enabled', [])
      existing_services = data['content'].get('existingServices', [])

      # 5 Compile results as a dictionary
      results_dict = {
              'address': address,
              'fiber_available': fiber_available,
              'services_enabled': services_enabled,
              'existing_services': existing_services,
              'upload_speed': upload_speed,
              'download_speed': download_speed
          }
      
      # 6. Write Results to a JSON File
      print("appending data to outputfile...")
      append_result_to_jsonl(results_dict, filename = output_file)
      return [None, None]
  
  except Exception as e:
    print(f"An error occurred: {type(e).__name__}, {str(e)}")
    invalid = True
    return [1, invalid]
    

In [10]:
# load addresses from files instead of lists and back up results
def load_addresses(filename):
    addresses = []
    with open(filename, 'r') as f:
        for line in f:
            try:
                address = json.loads(line.strip())
                if isinstance(address, str):
                    addresses.append(address)
                elif isinstance(address, dict) and 'address' in address:
                    addresses.append(address['address'])
            except json.JSONDecodeError:
                print(f"Skipping invalid JSON line: {line.strip()}")
    return addresses

##### Full Web Scraper

In [15]:
# Final Webscraper Function built from previous helpers--> handles multiple addresses and saves results to a json file

def att_web_scraper(address_file, output_file_1, output_file_2):
  #print("Beginning Web scraper...")
  
  # 0. Initialize an empty list of addresses that need to be retried due to query errors
  address_list =  load_addresses(address_file)
  adds_to_requery = []
  invalid_adds = [] # may be necessary to track addresses that are not in the system
  
  # 1. Iterate through addresses in address list
  adds_successfully_queried = 0
  for i, address in enumerate(address_list):
    #driver.refresh()
   
    print(f"Processing address number {i}: {address}")
    driver = None
  
    try:
      # 2. Initialize Driver
     
      try:
          driver = setup_undetected_driver()
          driver.get("https://www.att.com/internet/fiber/")
          print("New Driver initialized")
      except TimeoutException:
          print(f"Timeout occurred while loading the initial page for address {address}")
          adds_to_requery.append(address)
          append_result_to_jsonl(address, output_file_2)
          continue
        
      # 3. Query Addresses
      if not search_address(driver, address):
        adds_to_requery.append(address)
        append_result_to_jsonl(address, output_file_2)
        #print("Address search failed. Try again")
        #driver.refresh()
        continue
      
      # 4. Gather and parse fiber data
      print("Address successfully queried")
      
      parsing_result = parse_and_save_available_resp(address, driver, output_file_1)
      print("parsing result:", parsing_result)
      
      if parsing_result[0]:
        if parsing_result[1] and parsing_result[0] != 1:
          print("Invalid address")
          invalid_adds.append(parsing_result[0])
          continue
        elif parsing_result[1] and parsing_result[0] == 1:
          print("Unexpected error. Retry Address.")
          adds_to_requery.append(address)
          append_result_to_jsonl(address, output_file_2)
        else:
          print("address modified for requerying")
          adds_to_requery.append(parsing_result[0])
          append_result_to_jsonl(address, output_file_2)
          continue
          
      else:
        print("parsed network/session storage data")
        adds_successfully_queried += 1
        #driver.refresh()
        continue
       
    except Exception  as e:
      print("reached web scraper except block")
      print("An unexpected error occurred.")
      adds_to_requery.append(address)
      append_result_to_jsonl(address, output_file_2)
      #driver.refresh()
      continue
    
    finally:
      if driver:
                try:
                    driver.quit()
                    print("Browser closed successfully")
                except Exception as e:
                    print(f"Error closing browser: {str(e)}")
                    
    if i < len(address_list) - 1:  # Don't wait after the last address
            print("Waiting before processing the next address...")
            wait_time_sec = random.randint(3, 14)
            time.sleep(wait_time_sec)
    
  # Ensure driver is always closed, even if an exception occurs
  if driver:
    driver.quit()
    print("Driver closed")
  
  print("Finished all address queries!")
  return adds_to_requery, invalid_adds, adds_successfully_queried