# Senate Website 

> We create a class for the Senate Website from which we can extract various information including Senate Bills and other documents later on. In order to be responsible users of the public senate website, we make sure to cache our page requests so that we don't overload the website whenever we are doing our analysis. 

In [2]:
# | default_exp senate.website
# | export
import datetime
import re
import pickle

from requests import Response
from datetime import timedelta
from bs4 import BeautifulSoup, NavigableString
from itertools import islice, chain
from urllib.parse import urljoin, urlencode, parse_qsl, urlparse
from more_itertools import split_when, grouper
from pathlib import Path
from diskcache import Cache
from typing import List, Generator, Union

from legisph.website import Website, NotFoundError, ServerError, Link
from legisph.core import logger
from legisph.senate.models import SenateBill, Senator, SenateCommittee

In [17]:
from typing import List, Generator, Union
from nbdev.showdoc import show_doc
from rich import print

from legisph.website import Website, NotFoundError, ServerError, Link
from legisph.core import logger
from legisph.senate.models import SenateBill, Senator, SenateCommittee

In [18]:
# | export
class SenateWebsite(Website):
    """
    The Senate Website, accessible at [senate.gov.ph](https://senate.gov.ph/).
    """

    def __init__(self, cache_dir=Path(".cache/senate"), **kwargs):
        super().__init__("senate_requests", cache_dir=cache_dir, **kwargs)
        self.cache = Cache(str(cache_dir / "senate_cache"))
        if hasattr(self, "fetch_bills"):
            self.fetch_bills = self.cache.memoize(ignore=["self"])(self.fetch_bills)

    def fetch_bill(
        self,
        congress: int,  # Congress Number
        billno: str,  # Bill number (in the format SBN-XXXX)
        expire_after: Union[timedelta, int] = -1,  # Passed to CachedSession.get
    ) -> Response:
        """
        Fetch and parse a bill from the Senate Website and return a `SenateBill` object.
        """

        # Initial parameters
        url = "http://legacy.senate.gov.ph/lis/bill_res.aspx"
        params = {"q": billno, "congress": congress}

        # Initial call to get form session parameters
        resp = self.session.get(url=url, params=params, expire_after=expire_after)
        html = BeautifulSoup(resp.text, features="html5lib")

        # Handle error cases
        content_str = html.find("td", {"id": "content"}).text.strip()
        resource = f"Senate Bill {billno} in Congress {congress}"
        match content_str:
            case "Not found.":
                raise NotFoundError(f"{resource} not found", resource)
            case "An error has occured. Exception has been logged.":
                raise ServerError(f"Internal Error for {resource}", resource)

        # Access the form data
        form = html.find(name="form", attrs={"name": "form1"})
        inputs = form.find_all(name="input")
        data = {i["id"]: i.attrs.get("value", "") for i in inputs}

        # Set to fetch all information
        data.update({"__EVENTTARGET": "lbAll", "__EVENTARGUMENT": ""})

        # Fetch bill information
        resp = self.session.post(url=url, params=params, data=data)

        # Break into parts
        html = BeautifulSoup(resp.text, features="html5lib")
        content = html.find("td", attrs={"id": "content"})
        title = list(islice(content.children, 5))
        data = {
            item.find_previous().text.strip(): item
            for item in content.find_all("blockquote", recursive=False)
        }

        # Parse through complex votes table
        votes = data.get("Vote(s)")
        if votes:

            def parse_tally(votes):
                tally = {}
                for vote in votes:
                    elems = vote.find_all("td")
                    tally = tally | {
                        vote.text.strip(): (
                            [
                                Senator(name=voter.text.strip())
                                for voter in voters.find("blockquote").children
                                if voter.text.strip() != ""
                            ]
                            if vote.text.strip() != "Abstained"
                            else [
                                Senator(name=f"{senator[0]}, {senator[1]}")
                                for senator in grouper(
                                    voters.text.strip().split(", "), 2
                                )
                            ]
                        )
                        for vote, voters in zip(
                            elems[0 : (len(elems) // 2)],
                            elems[len(elems) // 2 : len(elems) + 1],
                        )
                    }
                return tally

            elems = (e for e in votes.children if not isinstance(e, NavigableString))
            votes = list(split_when(elems, lambda _, y: y.name == "blockquote"))
            votes = [
                SenateBill.Vote(
                    type=vote[0].text.split("(")[0].strip(),
                    date=datetime.datetime.strptime(
                        vote[0].text.split("(")[1].replace(")", ""), "%m/%d/%Y"
                    ),
                    tally=parse_tally(vote[1:]),
                )
                for vote in votes
            ]

        # Parse subtitle
        subtitle = title[4].text.strip()
        subtitle = subtitle.split("Filed on ")[1].split(" by")

        # Construct Senate Bill
        bill = SenateBill(
            url=url + "?" + urlencode(params),
            congress=congress,
            billno=billno,
            congress_text=title[0].text.strip(),
            billno_text=title[2].text.strip(),
            title=content.find("div", class_="lis_doctitle").text.strip(),
            long_title=data["Long title"].text.strip(),
            filing_date=datetime.datetime.strptime(subtitle[0], "%B %d, %Y"),
            filers=(
                [
                    Senator(name=f"{s[0]}, {s[1]}".strip())
                    for s in grouper(subtitle[1].split(", "), 2)
                ]
                if subtitle[1] != ""
                else None
            ),
            links=(
                [
                    Link(url=urljoin(url, t["href"]), name=t.text.strip())
                    for t in links.find_all("a")
                ]
                if (links := content.find("div", id="lis_download"))
                else None
            ),
            scope=data["Scope"].text.strip(),
            legislative_status=SenateBill.SenateBillStatus(
                date=datetime.datetime.strptime(
                    re.findall(
                        r"\((.+)\)", (s := data["Legislative status"].text.strip())
                    )[0],
                    "%m/%d/%Y",
                ),
                item=re.findall(r"(.+) \(", s)[0],
            ),
            subjects=(
                [
                    SenateBill.Subject(name=subject.text)
                    for subject in subjects
                    if subject.text != ""
                ]
                if (subjects := data.get("Subject(s)"))
                else None
            ),
            primary_committee=(
                [
                    SenateCommittee(name=committee.text)
                    for committee in committees.children
                    if committee.text != ""
                ]
                if (committees := data.get("Primary committee"))
                else None
            ),
            secondary_committee=(
                [
                    SenateCommittee(name=committee.text)
                    for committee in committees.children
                    if committee.text != ""
                ]
                if (committees := data.get("Secondary committee"))
                else None
            ),
            committee_reports=(
                [
                    Link(url=urljoin(url, report["href"]), name=report.text)
                    for report in reports.find_all("a")
                ]
                if (reports := data.get("Committee report"))
                else None
            ),
            sponsors=(
                [
                    Senator(name=f"{senator[0]}, {senator[1]}")
                    for senator in grouper(sponsors.text.split(", "), 2)
                ]
                if (sponsors := data.get("Sponsor(s)"))
                else None
            ),
            cosponsors=(
                [
                    Senator(name=f"{senator[0]}, {senator[1]}")
                    for senator in grouper(cosponsors.text.split(", "), 2)
                ]
                if (cosponsors := data.get("Co-sponsor(s)"))
                else None
            ),
            document_certification=(
                certification.text
                if (certification := data.get("Document certification"))
                else None
            ),
            floor_activity=(
                [
                    SenateBill.FloorActivity(
                        date=datetime.datetime.strptime(
                            cols[0].text.strip(), "%m/%d/%Y"
                        ),
                        parliamentary_status=cols[1].text.strip(),
                        senators=(
                            [
                                Senator(name=senator.text.strip())
                                for senator in cols[2].children
                                if senator.text.strip() != ""
                            ]
                            if len(list(cols[2].children)) > 0
                            else None
                        ),
                    )
                    for cols in [
                        row.find_all("td") for row in floor_activity.find_all("tr")
                    ][1:]
                ]
                if (floor_activity := data.get("Floor activity"))
                else None
            ),
            votes=votes,
            legislative_history=[
                SenateBill.SenateBillStatus(
                    date=datetime.datetime.strptime(row[0].text.strip(), "%m/%d/%Y"),
                    item=row[1].text.strip(),
                )
                for row in (
                    row.find_all("td")
                    for row in data["Legislative History"].find_all("tr")
                )
                if len(row) == 2
            ],
        )

        return bill

    def generate_bills(
        self,
        congress: int,  # Number of the congress from which to fetch bills
        expire_after: Union[int, timedelta] = -1,  # Cache expiration
    ) -> Generator[SenateBill | Exception, None, None]:
        """
        Generator function that eventually produces all bills from a congress.
        """

        page = 1
        while True:
            # Fetch and parse the page
            resp = self.session.get(
                url="http://legacy.senate.gov.ph/lis/leg_sys.aspx",
                params={"type": "bill", "congress": congress, "p": page},
                expire_after=expire_after,
            )
            html = BeautifulSoup(resp.text, features="html5lib")
            bills = [
                dict(parse_qsl(urlparse(a["href"]).query))
                for a in html.find("div", class_="alight").find_all("a")
            ]
            # Extract and yield each bill, handle exceptions
            for b in bills:
                try:
                    yield self.fetch_bill(
                        congress=b["congress"], billno=b["q"], expire_after=expire_after
                    )
                except (NotFoundError, ServerError) as exc:
                    logger.error(exc.message)
                    yield exc
                except Exception as exc:
                    logger.error(f"Exception at {b['q']}, congress {b['congress']}")
                    raise exc
            # Try to find the next button and advance if found
            if html.find("a", string="Next\n"):
                page += 1
            else:
                break

    def fetch_bills(
        self,
        congress: int,  # Congress from which to fetch bills
        expire_after: Union[int, timedelta] = -1,  # Cache expiration
    ) -> List[SenateBill]:
        "Fetch all the bills from a certain congress"
        return list(self.generate_bills(congress, expire_after=expire_after))

    def get_congresses(self, expire_after=0) -> List[int]:  # Cache expiration time
        "Retrieve a list of all congresses available on the website."
        resp = self.session.get(
            "https://legacy.senate.gov.ph/lis/leg_sys.aspx",
            headers=self.session.headers,
            expire_after=expire_after,
        )
        html = BeautifulSoup(resp.content, features="html5lib")
        congresses = [
            int(dict(parse_qsl(link["href"].split("?")[1]))["congress"])
            for link in html.find(id="div_ChangeCongress").find("ul").find_all("a")
        ]
        return congresses

    def fetch_all_bills(
        self,
        refresh_current_congress: bool = True,  # Do not use the cache for the current congress
    ):
        "Returns all bills from all congresses."
        congresses = self.get_congresses()
        if refresh_current_congress:
            current_congress = max(congresses)
            congresses.remove(current_congress)
            return list(
                chain(
                    self.fetch_bills(current_congress, expire_after=0),
                    *(self.fetch_bills(congress) for congress in congresses),
                )
            )
        else:
            return list(chain(*(self.fetch_bills(congress) for congress in congresses)))

## Fetching Senate Bills

We create a method that:

1. Accesses the "All Information" tab of a senate bill given its congress number and Senate Bill number. This requires first retriving the main page, then sending a post request with some session parameters.
2. Breaks up the HTML source into its component parts, in order to generate a SenateBill object.
3. In cases where server returns 200 response but the body says that there was an internal error on the part of the server, handle that use case accordingly.

In [19]:
show_doc(SenateWebsite.fetch_bill)

---

### SenateWebsite.fetch_bill

>      SenateWebsite.fetch_bill (congress:int, billno:str,
>                                expire_after:Union[datetime.timedelta,int]=-1)

Fetch and parse a bill from the Senate Website and return a `SenateBill` object.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| congress | int |  | Congress Number |
| billno | str |  | Bill number (in the format SBN-XXXX) |
| expire_after | Union | -1 | Passed to CachedSession.get |
| **Returns** | **Response** |  |  |

We provide this example of SBN-2332 in the 18th congress, which was a particularly complex one, leading to an abstention vote on Third Reading but eventually still being passed into law:

In [20]:
senate = SenateWebsite()
print(senate.fetch_bill(18, "SBN-2332"))

We then want to extract a series of Senate Bills from a particular congress. The listing is paginated, so we create a generator function that eventually returns all of the bills in that congress:

In [21]:
show_doc(SenateWebsite.generate_bills)

---

### SenateWebsite.generate_bills

>      SenateWebsite.generate_bills (congress:int,
>                                    expire_after:Union[int,datetime.timedelta]=
>                                    -1)

Generator function that eventually produces all bills from a congress.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| congress | int |  | Number of the congress from which to fetch bills |
| expire_after | Union | -1 | Cache expiration |
| **Returns** | **Generator** |  |  |

In [26]:
senate = SenateWebsite()
print(list(islice(senate.generate_bills(19), 2)))

We then create a simple function to materialize that generator function into the final list of bills. Because we don't expect that congress data will change when it is complete, we memoize this function to disk to increase performance and set the stage for continued pipeline in the future.

In [None]:
show_doc(SenateWebsite.fetch_bills)

---

### SenateWebsite.fetch_bills

>      SenateWebsite.fetch_bills (congress:int,
>                                 expire_after:Union[int,datetime.timedelta]=-1)

Fetch all the bills from a certain congress

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| congress | int |  | Congress from which to fetch bills |
| expire_after | Union | -1 | Cache expiration |
| **Returns** | **List** |  |  |

In order to retrieve all bills from all congresses, then we'd need to fetch all congress numbers. We retrieve that by scraping the dropdown menu in the LIS.

In [None]:
show_doc(SenateWebsite.get_congresses)

---

### SenateWebsite.get_congresses

>      SenateWebsite.get_congresses (expire_after=0)

Retrieve a list of all congresses available on the website.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| expire_after | int | 0 |  |
| **Returns** | **List** |  | **Cache expiration time** |

In [None]:
senate = SenateWebsite()
senate.get_congresses()

[19, 18, 17, 16, 15, 14, 13]

Finally, fetching all bills requires chaining together all of those generator functions into one very big list of Senate Bills.  

In [23]:
show_doc(SenateWebsite.fetch_all_bills)

---

### SenateWebsite.fetch_all_bills

>      SenateWebsite.fetch_all_bills (refresh_current_congress:bool=True)

Returns all bills from all congresses.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| refresh_current_congress | bool | True | Do not use the cache for the current congress |

Now that we have all of that infra, we can start extracting the data.

In [24]:
senate = SenateWebsite()
bills = senate.fetch_all_bills()
errors = list(filter(lambda x: isinstance(x, Exception), bills))
print(
    f"""
    {len(bills)} bills attempted.
    {len(errors)} server errors found.
    {len(bills) - len(errors)} bills successfully collected.
    """
)

Now that we have all of the bills, we will then persist is as a pickle file first for
future transformations.

In [25]:
with open(".cache/senate/senate_bills.pickle", "wb") as f:
    pickle.dump(bills, f, protocol=pickle.HIGHEST_PROTOCOL)