In [1]:
import gspread
from oauth2client.service_account import ServiceAccountCredentials
from time import sleep
import datetime
import pandas as pd
import json
from pygame import mixer
import AppKit
import usaddress
import pprint


pygame 2.2.0 (SDL 2.0.22, Python 3.9.15)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.common.exceptions import (
    NoSuchElementException,
    NoAlertPresentException,
    UnexpectedAlertPresentException,
    ElementClickInterceptedException,
    StaleElementReferenceException,
)
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

In [3]:
options = Options()
# options.headless = True
driver = webdriver.Chrome(
    service=Service(ChromeDriverManager().install()), options=options
)
driver.set_window_position(0, 0)
driver.set_window_size(1400, 768)

In [17]:
sa = gspread.service_account(filename="../credentials/jp-macdonnel-b7aa48547774.json")
sh = sa.open("green-lights")
sheet = sh.worksheet("2023-08-26")
master = sh.worksheet("master")
df_master = pd.DataFrame(master.get_all_records())


class InvalidAddressException(Exception):
    "Raised when an address can't be parsed."
    ...


class InvalidStateError(Exception):
    "Raised when state can't be converted to ISO format."
    ...


us_state_to_abbrev = {
    "Alabama": "AL",
    "Alaska": "AK",
    "Arizona": "AZ",
    "Arkansas": "AR",
    "California": "CA",
    "Colorado": "CO",
    "Connecticut": "CT",
    "Delaware": "DE",
    "Florida": "FL",
    "Georgia": "GA",
    "Hawaii": "HI",
    "Idaho": "ID",
    "Illinois": "IL",
    "Indiana": "IN",
    "Iowa": "IA",
    "Kansas": "KS",
    "Kentucky": "KY",
    "Louisiana": "LA",
    "Maine": "ME",
    "Maryland": "MD",
    "Massachusetts": "MA",
    "Michigan": "MI",
    "Minnesota": "MN",
    "Mississippi": "MS",
    "Missouri": "MO",
    "Montana": "MT",
    "Nebraska": "NE",
    "Nevada": "NV",
    "New Hampshire": "NH",
    "New Jersey": "NJ",
    "New Mexico": "NM",
    "New York": "NY",
    "North Carolina": "NC",
    "North Dakota": "ND",
    "Ohio": "OH",
    "Oklahoma": "OK",
    "Oregon": "OR",
    "Pennsylvania": "PA",
    "Rhode Island": "RI",
    "South Carolina": "SC",
    "South Dakota": "SD",
    "Tennessee": "TN",
    "Texas": "TX",
    "Utah": "UT",
    "Vermont": "VT",
    "Virginia": "VA",
    "Washington": "WA",
    "West Virginia": "WV",
    "Wisconsin": "WI",
    "Wyoming": "WY",
    "District of Columbia": "DC",
    "American Samoa": "AS",
    "Guam": "GU",
    "Northern Mariana Islands": "MP",
    "Puerto Rico": "PR",
    "United States Minor Outlying Islands": "UM",
    "U.S. Virgin Islands": "VI",
}


def beep_5_times():
    for _ in range(5):
        AppKit.NSBeep()
        sleep(0.2)


class GreenLight:
    def __init__(self, sheet, ignore_master=False):
        self.sheet = sheet
        self.ignore_master = ignore_master
        self.df = pd.DataFrame(sheet.get_all_records())
        self.filled_df = self.df[
            [
                "Customer Name",
                "First Name",
                "Last Name",
                "Job Title",
                "Work Phone",
                "Email",
                "Address Line 1",
                "City",
                "State",
                "Zip Code",
            ]
        ]
        self.username = None
        self.password = None
        self.logged_in = False
        self.attempts = 0
        self.input_names = {
            "customer_name": "s_1_1_87_0",
            "first_name": "s_1_1_79_0",
            "last_name": "s_1_1_78_0",
            "job_title": "s_1_1_81_0",
            "email": "s_1_1_100_0",
            "work_phone": "s_1_1_83_0",
            "address_line_1": "s_1_1_95_0",
            "city": "s_1_1_96_0",
            "state": "s_1_1_97_0",
            "zip_code": "s_1_1_98_0",
            "acct_team_owner": "s_1_1_91_0",
            "franchise_name": "s_1_1_62_0",
            "primary_opportunity": "s_1_1_72_0",
            "expected_quantity": "s_1_1_73_0",
            "updated": "s_1_1_75_0",
        }
        self.seen_companies = set()
        for item in df_master["Customer Name"].values.tolist():
            self.seen_companies.add(item)
        mixer.init()
        self.alarm = mixer.Sound("../driftveil.mp3")

    def parse_address(self, address):
        try:
            parsed = usaddress.tag(
                address.replace("United States, ", "")
                .replace(", United States", "")
                .replace("United States", "")
            )[0]
            return {
                "address": " ".join(
                    (
                        list(
                            filter(
                                lambda x: x is not None,
                                [
                                    parsed.get("BuildingName", None),
                                    parsed.get("AddressNumber", None),
                                    parsed.get("StreetNamePreDirectional", None),
                                    parsed.get("StreetName", None),
                                    parsed.get("StreetNamePostType", None),
                                ],
                            )
                        )
                    )
                ),
                "city": parsed["PlaceName"],
                "state": parsed["StateName"],
                "zip_code": parsed["ZipCode"],
            }
        except:
            print("Couldn't parse address. Skipping Row.")
            raise InvalidAddressException

    def parse_state(self, state):
        try:
            return us_state_to_abbrev[state]
        except:
            print("Couldn't convert state to ISO. Skipping Row.")
            raise InvalidStateError

    def attempt(self, callback):
        if self.attempts >= 50:
            self.attempts = 0
            raise Exception("attempt limit exceeded")
        print(f"attempting {callback.__name__} for the {self.attempts + 1} time")
        sleep(1)
        self.attempts += 1
        return callback()

    def get_credentials(self):
        file = open("../credentials/unishipper-credentials.json")
        data = json.load(file)
        self.username = data["username"]
        self.password = data["password"]

    def login(self):
        if not self.username or not self.password:
            self.get_credentials()
        driver.get("http://uone.unishippers.com")
        sleep(3)
        try:
            input_username = driver.find_element(By.ID, "s_swepi_1")
            input_password = driver.find_element(By.ID, "s_swepi_2")
            input_username.send_keys(self.username)
            input_password.send_keys(self.password)
            button_login = driver.find_element(By.ID, "s_swepi_22")
            button_login.click()
            self.attempts = 0
        except NoSuchElementException:
            if self.attempts > 3:
                print("cant login. try again")
                return
            if (
                "The server you are trying to access is either busy or experiencing difficulties."
                in driver.find_element(By.TAG_NAME, "body").text
            ):
                sleep(5)
                self.attempt(self.login)
            else:
                self.attempt(self.login)

    def open_leads_tab(self):
        print("opening leads tab")
        self.attempts = 45 if self.attempts == 0 else self.attempts
        try:
            applet = driver.find_element(By.CLASS_NAME, "AppletTitle")
            if applet:
                print("applet")
                tabs = driver.find_elements(By.CLASS_NAME, "ui-corner-top")
                for tab in tabs:
                    if tab.text == "Leads":
                        tab.click()
                        self.attempts = 0
                        break
        except NoSuchElementException:
            self.attempt(self.open_leads_tab)

    def start_new_lead(self):
        try:
            button = driver.find_element(By.ID, "s_1_1_111_0_Ctrl")
            print("starting new lead")
            button.click()
            self.attempts = 0
        except NoSuchElementException:
            self.attempt(self.start_new_lead())

    def cancel_lead(self):
        try:
            button = driver.find_element(By.ID, "s_1_1_113_0_Ctrl")
            print("canceling lead")
            button.click()
            self.attempts = 0
        except NoSuchElementException:
            self.attempt(self.cancel_lead())

    def compare_customer_to_master(self, customer_name):
        return not self.ignore_master and customer_name in self.seen_companies

    def fill(
        self,
        customer_name,
        first_name,
        last_name,
        job_title,
        work_phone,
        email,
        address,
        city,
        state,
        zip_code,
    ):
        print("filling form")

        if not (address and city and state and zip_code):
            raise InvalidAddressException

        inputs = driver.find_elements(By.TAG_NAME, "input")

        for input in inputs:
            try:
                if input.get_attribute("name") == self.input_names["customer_name"]:
                    input.send_keys(customer_name)
                elif input.get_attribute("name") == self.input_names["first_name"]:
                    input.send_keys(first_name)
                elif input.get_attribute("name") == self.input_names["last_name"]:
                    input.send_keys(last_name)
                elif input.get_attribute("name") == self.input_names["job_title"]:
                    if len(job_title) > 30:
                        job_title = job_title[:30]
                    input.send_keys(job_title)
                elif input.get_attribute("name") == self.input_names["email"] and email:
                    input.send_keys(email)
                elif input.get_attribute("name") == self.input_names["work_phone"]:
                    input.send_keys(work_phone)
                elif input.get_attribute("name") == self.input_names["address_line_1"]:
                    if len(address) > 30:
                        address = address[:30]
                    input.send_keys(address)
                elif input.get_attribute("name") == self.input_names["city"]:
                    input.send_keys(city)
                elif input.get_attribute("name") == self.input_names["state"]:
                    input.send_keys(state)
                elif input.get_attribute("name") == self.input_names["zip_code"]:
                    input.send_keys(zip_code)
                elif input.get_attribute("name") == self.input_names["acct_team_owner"]:
                    input.send_keys("JP.MacDonell")
                elif input.get_attribute("name") == self.input_names["franchise_name"]:
                    input.send_keys("Unishippers 1578")
                elif (
                    input.get_attribute("name")
                    == self.input_names["primary_opportunity"]
                ):
                    input.send_keys("US Express")
                elif (
                    input.get_attribute("name") == self.input_names["expected_quantity"]
                ):
                    input.send_keys(100)
                elif input.get_attribute("name") == self.input_names["updated"]:
                    input.send_keys(
                        datetime.datetime.now().strftime("%-m/%-d/%Y %-I:%M:%S %p")
                    )
            except TypeError as e:
                print(e.__class__.__name__, " while attempting to self.fill")
            except StaleElementReferenceException as e:
                print(e.__class__.__name__, " while attempting to self.fill")

    def accept_alert(self):
        try:
            alert = driver.switch_to.alert
            self.attempts = 45 if self.attempts == 0 else self.attempts
            status = alert.text
            alert.accept()
            print("accepting alert")
            return status
        except NoAlertPresentException:
            return self.attempt(self.accept_alert)

    def check_loading(self):
        html = driver.find_element(By.TAG_NAME, "html")
        if html.get_attribute("class") == "siebui-busy":
            sleep(2.5)
            print("still saving")
            self.check_loading()
        else:
            return

    def save_form(self):
        print("saving form")
        action = ActionChains(driver)
        action.key_down(Keys.CONTROL).send_keys("S").key_up(Keys.CONTROL).perform()
        self.check_loading()

    def check_protection_status(self):
        print("checking protection status")
        sleep(0.5)
        input = driver.find_element(By.CSS_SELECTOR, "input[name='s_1_1_34_0']")
        val = input.get_attribute("value")
        if val == "<Case Required>":
            val = None
        elif val:
            self.attempts = 0
            print("protection status: ", val)
            return val
        else:
            if self.attempts > 10:
                self.attempts = 0
                print("Unable to get protection status")
                return None
            return self.attempt(self.check_protection_status)

    def check_protected_until(self):
        sleep(0.5)
        # fix next line
        input = driver.find_element(By.CSS_SELECTOR, "input[name='s_1_1_33_0']")
        val = input.get_attribute("value")
        if val:
            self.attempts = 0
            print("protected until: ", val)
            return val
        else:
            if self.attempts > 4:
                self.attempts = 0
                status = "Unable to get protected until date"
                print(status)
                return status
            return self.attempt(self.check_protected_until)

    def update_sales_stage(self):
        print("updating sales stage")
        input = driver.find_element(By.CSS_SELECTOR, "input[name='s_1_1_92_0']")
        input.clear()
        input.send_keys("Prospect")

    def request_green_light(self):
        print("requesting green light")
        driver.find_element(By.ID, "s_1_1_48_0_Ctrl").click()

    def determine_status_and_protected_until(self):
        self.save_form()
        sleep(0.5)
        print("checking protection status")
        protection_status = self.check_protection_status()
        sleep(0.5)
        print("checking protected until")
        protected_until = self.check_protected_until()
        new_status = ""
        if protection_status and "protected elsewhere" not in protection_status.lower():
            print("not protected elsewhere")
            self.update_sales_stage()
            sleep(0.5)
            self.save_form()
            sleep(0.5)
            self.request_green_light()
            sleep(0.5)
            new_status = self.accept_alert()
            self.attempts = 0
        else:
            new_status = (
                protection_status
                if protection_status
                else "Could not determine protection status"
            )
        print(new_status)
        return [new_status, protected_until]

    def update_sheet(self, i, status, protected_until=""):
        today = datetime.datetime.now()
        self.sheet.update(f"K{i+2}", status)
        self.sheet.update(f"L{i+2}", today.strftime("%B %d, %Y – %H:%M:%S"))
        if protected_until:
            self.sheet.update(f"M{i+2}", protected_until)

    def close_popup(self, text="close"):
        for el in driver.find_elements(By.CLASS_NAME, "ui-button-text"):
            if el.text == text:
                el.click()

    def full_form(
        self,
        i,
        customer_name,
        first_name,
        last_name,
        job_title,
        work_phone,
        email,
        address,
        city,
        state,
        zip_code,
        status,
    ):
        try:
            if status:
                print(f"{i}", "\nstatus already exists:", status)
            elif not (customer_name and address):
                status = "Insufficient info. Skipping row."
                print(f"{i}", f"\n{status}")
                self.update_sheet(i, status)
            elif self.compare_customer_to_master(customer_name):
                status = f"Customer already exists in master: {customer_name}"
                print(f"{i}", f"\n{status}")
                self.update_sheet(i, status)
            else:
                self.seen_companies.add(customer_name)
                print(f"{i}", "\nno status, filling in fields")
                work_phone = (
                    work_phone
                    if work_phone and work_phone != "#ERROR!"
                    else "(310) 000-0000"
                )
                first_name = first_name if first_name else "FakeFirstName"
                last_name = last_name if last_name else "FakeLastName"
                if not (city or state or zip_code):
                    print(
                        "no city or state or zip code. Atempting to parse address.",
                    )
                    parsed = self.parse_address(address)
                    address = parsed["address"]
                    city = parsed["city"]
                    state = parsed["state"]
                    zip_code = parsed["zip_code"]
                sleep(1)
                if not (address or city or state or zip_code):
                    status = "Insufficient address info. Skipping row."
                else:
                    if len(state) > 2:
                        state = self.parse_state(state)
                    self.fill(
                        customer_name,
                        first_name,
                        last_name,
                        job_title,
                        work_phone,
                        email,
                        address,
                        city,
                        state,
                        zip_code,
                    )
                    status, protected_until = self.determine_status_and_protected_until()
                sleep(0.5)
                self.update_sheet(i, status, protected_until)
                sleep(0.5)
                self.start_new_lead()
        except UnexpectedAlertPresentException as e:
            print(e.__class__.__name__)
            print("switching to alert")
            print(e.msg)
            self.update_sheet(
                i,
                f"UnexpectedAlertPresentException: {e.msg}. \n\nWill still attempt to request green light, however row may have been skipped.",
            )
            print("moving on to next lead")
            self.start_new_lead()
        except ElementClickInterceptedException as e:
            print(e.__class__.__name__)
            self.close_popup()
            sleep(0.5)
            status = self.determine_status_and_protected_until()
            self.update_sheet(i, status)
            sleep(0.5)
            self.start_new_lead()
            try:
                self.request_green_light()
            except:
                self.update_sheet(
                    i, "Element Click Intercepted Exception. Row likely skipped."
                )
        except InvalidStateError as e:
            print(e.__class__.__name__)
            self.update_sheet(
                i,
                "Key Error. Unable to convert state name to ISO code. Row likely skipped.",
            )
        except InvalidAddressException as e:
            print(e.__class__.__name__)
            self.update_sheet(i, "Invalid Address. Row likely skipped.")
        except Exception as e:
            beep_5_times()
            ex_type = e.__class__.__name__
            print(ex_type)
            self.skip_row_with_unexpected_exception(i, ex_type)

    def skip_row_with_unexpected_exception(self, i, ex_type):
        print("attempting to skip row due to unexpected exception")
        try:
            sleep(0.5)
            alert = driver.switch_to.alert
            self.update_sheet(
                i,
                f"Unexpected exception: {ex_type}; with alert present: {alert.text}. \n\nRow likely skipped.",
            )
            self.accept_alert()
            sleep(0.5)
            self.cancel_lead()
            sleep(0.5)
            self.start_new_lead()
        except Exception as e:
            print(e.__class__.__name__)
            raise e

    def iterate(self, start=0, end=None):
        # changed unishippers to self might need to change back
        df = self.df
        if end:
            df = df[start:end]
        for (
            i,
            customer_name,
            first_name,
            last_name,
            job_title,
            work_phone,
            email,
            address,
            city,
            state,
            zip_code,
            status,
            updated,
            protected_until,
        ) in df.itertuples():
            print(datetime.datetime.now().strftime("%H:%M:%S"))
            self.full_form(
                i,
                customer_name,
                first_name,
                last_name,
                job_title,
                work_phone,
                email,
                address,
                city,
                state,
                zip_code,
                status,
            )
            print("\n\n")

    def iterate_with_alarm(self):
        try:
            self.iterate()
        except Exception as e:
            self.alarm.play()
            raise e


unishippers = GreenLight(sheet, ignore_master=True)

In [5]:
unishippers.login()


In [6]:
unishippers.open_leads_tab()


opening leads tab
applet


In [7]:
unishippers.start_new_lead()


starting new lead


AttributeError: 'NoneType' object has no attribute '__name__'

In [18]:
unishippers.iterate_with_alarm()


13:51:00
0 
status already exists: Your request will be sent to UPS in the next batch(SBL-EXL-00151)



13:51:00
1 
status already exists: Your request will be sent to UPS in the next batch(SBL-EXL-00151)



13:51:00
2 
status already exists: Your request will be sent to UPS in the next batch(SBL-EXL-00151)



13:51:00
3 
status already exists: Invalid Address. Row likely skipped.



13:51:00
4 
no status, filling in fields
no city or state or zip code. Atempting to parse address.
filling form
saving form
still saving
checking protection status
checking protection status
protection status:  Protected
checking protected until
protected until:  9/2/2023 12:00:00 AM
not protected elsewhere
updating sales stage
saving form
still saving
requesting green light
accepting alert
Your request will be sent to UPS in the next batch(SBL-EXL-00151)
starting new lead



13:51:20
5 
status already exists: Key Error. Unable to convert state name to ISO code. Row likely skipped.



13:51:20
6 
status al

In [19]:
unishippers.alarm.fadeout(1200)


In [None]:
unishippers.cancel_lead()
unishippers.start_new_lead()


In [20]:
driver.quit()
