In [None]:
from playwright.async_api import async_playwright, TimeoutError
from bs4 import BeautifulSoup

URL = "https://apps.supremecourt.az.gov/publicaccess/caselookup.aspx"

pw = await async_playwright().start()
browser = await pw.chromium.launch(headless=False)
context = await browser.new_context()
page = await context.new_page()

await page.goto(url=URL)
# await page.wait_for_selector('input[value="Search"]', state='attached', timeout=999999999)

In [None]:
import os
from dotenv import load_dotenv

load_dotenv(dotenv_path='../.env')

TWOCAPTCHA_API_KEY = os.getenv('TWOCAPTCHA_API_KEY')

from twocaptcha import TwoCaptcha
solver = TwoCaptcha(TWOCAPTCHA_API_KEY)

In [None]:
number = 2024000140
PREFIX = "TR"

In [None]:
async def solve_captcha(page):
    for i in range(0, 10):
        try:
            captcha_image = await page.wait_for_selector("#caselookup_ctl00_contentplaceholder1_samplecaptcha_CaptchaImage", state='attached', timeout=3000)
            await captcha_image.screenshot(path="Temp/captcha.png")
            captcha_text = solver.normal("Temp/captcha.png")["code"]
            await page.fill("#ctl00_ContentPlaceHolder1_CaptchaCodeTextBox", captcha_text)
            await page.click('#ctl00_ContentPlaceHolder1_btnCaptcha')
        except TimeoutError:
            return
    print("Captcha Failed To Solve After 10 Tries")
    return
    


In [None]:
await solve_captcha(page)
await page.fill("#ctl00_ContentPlaceHolder1_txtCNum1", PREFIX)
await page.fill("#ctl00_ContentPlaceHolder1_txtCNum2", str(number))
await page.locator("#ctl00_ContentPlaceHolder1_btnGoNum").click(force=True)

In [None]:
table = await page.wait_for_selector("#ctl00_ContentPlaceHolder1_gvSearchResults", state='attached', timeout=5000)
rows = await table.query_selector_all("tr")
header_row = rows[0]
columns_elements = await header_row.query_selector_all("th")
columns = [await column.inner_text() for column in columns_elements]
columns

In [None]:
cases = []
index = 1
while index < len(rows):
    table = await page.query_selector("#ctl00_ContentPlaceHolder1_gvSearchResults")
    rows = await table.query_selector_all("tr")
    case_row = rows[index]
    case_elements = await case_row.query_selector_all("td")
    case = {column: await case_element.inner_text() for column, case_element in zip(columns,case_elements)}
    
    # Go to the link and scrape detailed info
    link = await case_elements[0].query_selector("a")
    await link.click(force=True)
    await solve_captcha(page)
    back_to_results = await page.wait_for_selector("#ctl00_ContentPlaceHolder1_lbToResults", state='attached', timeout=5000)
    # Get Filing and Disposition Dates
    filing_date = await page.query_selector("#ctl00_ContentPlaceHolder1_gvCaseInfo_ctl02_tcFileDate")
    filing_date = await filing_date.inner_text() if filing_date else None
    disp_date = await page.query_selector("#ctl00_ContentPlaceHolder1_gvCaseInfo_ctl02_tcDispDate")
    disp_date = await disp_date.inner_text() if disp_date else None
    case["filing_date"] = filing_date
    case["disp_date"] = disp_date
    # Get Charges Info
    charges = []
    charges_table = await page.query_selector("#ctl00_ContentPlaceHolder1_gvPartyInfo_ctl02_gvCounts")
    if charges_table:
        charge_keys = await charges_table.query_selector_all("th")
        charge_keys = [await charge_key.inner_text() for charge_key in charge_keys]
        charges_rows = await charges_table.query_selector_all("tr")
        charges_rows = charges_rows[1:]
        for charge_row in charges_rows:
            charge_elements = await charge_row.query_selector_all("td")
            charge = {charge_key: await charge_element.inner_text() for charge_key, charge_element in zip(charge_keys,charge_elements)}
            charges.append(charge)
    case["charges"] = charges
    
    cases.append(case)

    # Go back to search results
    await back_to_results.click(force=True)
    await solve_captcha(page)
    await page.wait_for_selector("#ctl00_ContentPlaceHolder1_gvSearchResults", state='attached', timeout=5000)
    index += 1


In [None]:
cases[0]

In [None]:
import sys
sys.path.append("..")

from playwright.async_api import async_playwright, TimeoutError
import pandas as pd
from models.cases import Case
from models.scraper import ScraperBase
from datetime import date, datetime, time
from tempfile import NamedTemporaryFile
from rich.console import Console
from models.leads import Lead
from models.scraper import ScraperBase
from rich.progress import Progress
import re

import os
from dotenv import load_dotenv
from twocaptcha import TwoCaptcha
load_dotenv(dotenv_path='.env')
TWOCAPTCHA_API_KEY = os.getenv('TWOCAPTCHA_API_KEY')

console = Console()


class ArizonaCountyScraper(ScraperBase):
    field_mapping = {
        "Case Number": "case_id",
        "Name": "name",
        "Party\xa0Type": "party_type",
        "Birth\xa0Date": "birth_date",
        "Address": "address_line_1",
        "Court": "court_id",
        "Filing Date": "filing_date",
        "Disposition Date": "disp_date",
        "Charges": "charges",
    }
    solver = TwoCaptcha(TWOCAPTCHA_API_KEY)

    def split_full_name(self, name):
        # Use regular expression to split on space, comma, hyphen, or period.
        # This can be expanded to include other delimiters if required.
        parts = re.split(r'[\s,\-\.]+', name)
        
        # Prepare variables for first, middle, and last names
        first_name = middle_name = last_name = ''

        # The list 'parts' now contains the split name parts.
        # How we assign these parts depends on the number of elements in 'parts'.
        if len(parts) > 2:
            first_name = parts[0]
            middle_name = ' '.join(parts[1:-1])  # All parts except first and last are considered middle names
            last_name = parts[-1]
        elif len(parts) == 2:
            first_name, last_name = parts
        elif len(parts) == 1:
            first_name = parts[0]

        return first_name, middle_name, last_name

    async def solve_captcha(self):
        for i in range(0, 5):
            try:
                captcha_image = await self.page.wait_for_selector("#caselookup_ctl00_contentplaceholder1_samplecaptcha_CaptchaImage", state='attached', timeout=3000)
                with NamedTemporaryFile(delete=False, suffix='.png') as temp_file:
                    await captcha_image.screenshot(path=temp_file.name)
                    temp_file_path = temp_file.name  # Store the temporary file path to use after the context is closed
                captcha_text = self.solver.normal(temp_file_path)["code"]
                await self.page.fill("#ctl00_ContentPlaceHolder1_CaptchaCodeTextBox", captcha_text)
                await self.page.click('#ctl00_ContentPlaceHolder1_btnCaptcha')
            except TimeoutError:
                return
        console.log("Captcha Failed To Solve After 5 Tries")
        return
    
    async def search_by_case_number(self, prefix, number):
        await self.page.fill("#ctl00_ContentPlaceHolder1_txtCNum1", prefix)
        await self.page.fill("#ctl00_ContentPlaceHolder1_txtCNum2", str(number))
        await self.page.locator("#ctl00_ContentPlaceHolder1_btnGoNum").click(force=True)
        table = await self.page.wait_for_selector("#ctl00_ContentPlaceHolder1_gvSearchResults", state='attached', timeout=5000)
        rows = await table.query_selector_all("tr")
        header_row = rows[0]
        columns_elements = await header_row.query_selector_all("th")
        headers = [await column.inner_text() for column in columns_elements]
        return headers, rows

    async def init_browser(self):
        pw = await async_playwright().start()
        self.browser = await pw.chromium.launch(headless=True)
        context = await self.browser.new_context()
        self.page = await context.new_page()
        await self.page.goto(self.url)

    async def get_basic_info(self, headers, index):
        table = await self.page.query_selector("#ctl00_ContentPlaceHolder1_gvSearchResults")
        rows = await table.query_selector_all("tr")
        case_row = rows[index]
        case_elements = await case_row.query_selector_all("td")
        case = {header: await case_element.inner_text() for header, case_element in zip(headers,case_elements)}
        link = await case_elements[0].query_selector("a")
        return case, link
    
    async def get_detailed_info(self, link):
        await link.click(force=True)
        await self.solve_captcha()
        back_to_results = await self.page.wait_for_selector("#ctl00_ContentPlaceHolder1_lbToResults", state='attached', timeout=5000)
        # Get Filing and Disposition Dates
        filing_date = await self.page.query_selector("#ctl00_ContentPlaceHolder1_gvCaseInfo_ctl02_tcFileDate")
        filing_date = await filing_date.inner_text() if filing_date else None
        disp_date = await self.page.query_selector("#ctl00_ContentPlaceHolder1_gvCaseInfo_ctl02_tcDispDate")
        disp_date = await disp_date.inner_text() if disp_date else None
        
        # Get Charges Info
        charges = []
        charges_table = await self.page.query_selector("#ctl00_ContentPlaceHolder1_gvPartyInfo_ctl02_gvCounts")
        if charges_table:
            charge_keys = await charges_table.query_selector_all("th")
            charge_keys = [await charge_key.inner_text() for charge_key in charge_keys]
            charges_rows = await charges_table.query_selector_all("tr")
            charges_rows = charges_rows[1:]
            for charge_row in charges_rows:
                charge_elements = await charge_row.query_selector_all("td")
                charge = {charge_key: await charge_element.inner_text() for charge_key, charge_element in zip(charge_keys,charge_elements)}
                charges.append(charge)

        return filing_date, disp_date, charges, back_to_results

    async def scrape(self, search_parameters):
        prefix = search_parameters.get("case_prefix")
        number = search_parameters.get("case_number")

        self.url = "https://apps.supremecourt.az.gov/publicaccess/caselookup.aspx"
        await self.init_browser()
        await self.solve_captcha()
        headers, rows = await self.search_by_case_number(prefix, number)
        headers = [self.field_mapping.get(header) for header in headers]
        
        index = 1
        while index < len(rows):
            # Get Basic case info and detail link
            case_dict, link = await self.get_basic_info(headers, index)

            # Get Detailed case info
            filing_date, disp_date, charges, back_to_results = await self.get_detailed_info(link)
            case_dict['filing_date'] = filing_date
            case_dict['disp_date'] = disp_date
            case_dict['charges'] = charges
            
            # Get first, middle, and last name
            case_dict['first_name'], case_dict['middle_name'], case_dict['last_name'] = self.split_full_name(case_dict.get("name"))

            case = Case(**case_dict)
            lead = Lead(**case_dict)
            self.insert_case(case)
            self.insert_lead(lead)
            
            # Go back to search results
            await back_to_results.click(force=True)
            await self.solve_captcha()
            await self.page.wait_for_selector("#ctl00_ContentPlaceHolder1_gvSearchResults", state='attached', timeout=5000)
            
            index += 1

        await self.browser.close()
        

In [None]:
acscraper = ArizonaCountyScraper()
await acscraper.scrape(search_parameters={"case_prefix": "TR", "case_number": 2024000140})