In [None]:
# Install pipeline-sec-filings
!git clone https://github.com/Unstructured-IO/pipeline-sec-filings.git --depth=1
%cd pipeline-sec-filings

Cloning into 'pipeline-sec-filings'...
remote: Enumerating objects: 69, done.[K
remote: Counting objects: 100% (69/69), done.[K
remote: Compressing objects: 100% (63/63), done.[K
remote: Total 69 (delta 2), reused 42 (delta 1), pack-reused 0[K
Receiving objects: 100% (69/69), 216.03 KiB | 3.38 MiB/s, done.
Resolving deltas: 100% (2/2), done.
/content/pipeline-sec-filings/pipeline-sec-filings/pipeline-sec-filings


In [None]:
# Install Python requirements
!pip install -q ratelimit unstructured==0.4.6
# upgrade to the latest, though has not been tested
# !pip install -q --upgrade ratelimit unstructured

In [None]:
# Install NLTK Data
import nltk
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [None]:
!mkdir clean

In [None]:
!mkdir unclean

In [None]:
!mkdir unclean/AAPL

In [None]:
import os

In [None]:
import io


# pipeline-api
class timeout:
    def __init__(self, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        try:
            signal.signal(signal.SIGALRM, self.handle_timeout)
            signal.alarm(self.seconds)
        except ValueError:
            pass
    def __exit__(self, type, value, traceback):
        try:
            signal.alarm(0)
        except ValueError:
            pass

# pipeline-api
def get_regex_enum(section_regex):
    class CustomSECSection(Enum):
        CUSTOM = re.compile(section_regex)

        @property
        def pattern(self):
            return self.value

    return CustomSECSection.CUSTOM

# pipeline-api
import csv
from typing import Dict
from unstructured.documents.elements import Text, NarrativeText, Title, ListItem
def convert_to_isd_csv(results:dict) -> str:
    """
    Returns the representation of document elements as an Initial Structured Document (ISD)
    in CSV Format.
    """
    csv_fieldnames: List[str] = ["section", "element_type", "text"]
    new_rows = []
    for section, section_narrative in results.items():
        rows: List[Dict[str, str]] = convert_to_isd(section_narrative)
        for row in rows:
            new_row_item = dict()
            new_row_item["section"] = section
            new_row_item["element_type"] = row["type"]
            new_row_item["text"] = row["text"]
            new_rows.append(new_row_item)

    with io.StringIO() as buffer:
        csv_writer = csv.DictWriter(buffer, fieldnames=csv_fieldnames)
        csv_writer.writeheader()
        csv_writer.writerows(new_rows)
        return buffer.getvalue()

# pipeline-api
from unstructured.staging.label_studio import stage_for_label_studio

# List of valid response schemas
LABELSTUDIO = "labelstudio"
ISD = "isd"

def pipeline_api(text, response_type="application/json", response_schema="isd", m_section=[], m_section_regex=[]):
    """Many supported sections including: RISK_FACTORS, MANAGEMENT_DISCUSSION, and many more"""
    validate_section_names(m_section)

    sec_document = SECDocument.from_string(text)
    if sec_document.filing_type not in VALID_FILING_TYPES:
        raise ValueError(
            f"SEC document filing type {sec_document.filing_type} is not supported, "
            f"must be one of {','.join(VALID_FILING_TYPES)}"
        )
    results = {}
    if m_section == [ALL_SECTIONS]:
        filing_type = sec_document.filing_type
        if filing_type in REPORT_TYPES:
            if filing_type.startswith("10-K"):
                m_section = [enum.name for enum in SECTIONS_10K]
            elif filing_type.startswith("10-Q"):
                m_section = [enum.name for enum in SECTIONS_10Q]
            else:
                raise ValueError(f"Invalid report type: {filing_type}")

        else:
            m_section = [enum.name for enum in SECTIONS_S1]
    for section in m_section:
        results[section] = sec_document.get_section_narrative(
            section_string_to_enum[section]
        )
    for i, section_regex in enumerate(m_section_regex):
        regex_enum = get_regex_enum(section_regex)
        with timeout(seconds=5):
            section_elements = sec_document.get_section_narrative(regex_enum)
            results[f"REGEX_{i}"] = section_elements
    if response_type == "application/json":
        if response_schema == LABELSTUDIO:
            return {section:stage_for_label_studio(section_narrative) for section, section_narrative in results.items()}
        elif response_schema == ISD:
            return {section:convert_to_isd(section_narrative) for section, section_narrative in results.items()}
        else:
            raise ValueError(f"output_schema '{response_schema}' is not supported for {response_type}")
    elif response_type == "text/csv":
        if response_schema != ISD:
            raise ValueError(f"output_schema '{response_schema}' is not supported for {response_type}")
        return convert_to_isd_csv(results)
    else:
        raise ValueError(f"response_type '{response_type}' is not supported")

def _get_session(company: Optional[str] = None, email: Optional[str] = None) -> requests.Session:
    """Creates a requests sessions with the appropriate headers set. If these headers are not
    set, SEC will reject your request.
    ref: https://www.sec.gov/os/accessing-edgar-data"""
    if company is None:
        company = os.environ.get("SEC_API_ORGANIZATION")
    if email is None:
        email = os.environ.get("SEC_API_EMAIL")
    assert company
    assert email
    session = requests.Session()
    session.headers.update(
        {
            "User-Agent": f"{company} {email}",
            "Content-Type": "text/html",
        }
    )
    return session

session = _get_session("IITM", "21f1001906@ds.study.iitm.ac.in")

def _drop_dashes(accession_number: Union[str, int]) -> str:
    """Converts the accession number to the no dash representation."""
    accession_number = str(accession_number).replace("-", "")
    return accession_number.zfill(18)

from datetime import datetime
from bs4 import BeautifulSoup

def get_10k_filings_by_ticker_with_years(ticker: str,
                                         company: Optional[str] = "IITM",
                                         email: Optional[str] = "21f1001906@ds.study.iitm.ac.in") -> List[Tuple[int, str]]:
    session = _get_session(company, email)
    cik = get_cik_by_ticker(session, ticker)
    forms_dict = get_forms_by_cik(session, cik)
    ten_k_filings = []
    for accession_number, form_type in forms_dict.items():
        if form_type == "10-K":
            text = get_filing(cik, _drop_dashes(accession_number), company, email)
            year = extract_filing_year(text)
            ten_k_filings.append((year, text))
    return ten_k_filings

def extract_filing_year(text: str) -> int:
    # Extract the filing year from the text content of the filing
    pattern = re.compile(r"CONFORMED PERIOD OF REPORT:\s*(\d{4})\d{4}")
    match = pattern.search(text)
    if match:
        return int(match.group(1))
    else:
        raise ValueError("Unable to extract filing year")

# Example usage
tickers = ["AAPL","RGLD","IBM"]

for tick in tickers:
    ten_k_filings_with_years = get_10k_filings_by_ticker_with_years(tick)
    print("==========================")
    print(tick)
    for year, filing_text in ten_k_filings_with_years:
        print(f"Year: {year}")

        if not os.path.exists(f'unclean/{tick}'):
            os.makedirs(f'unclean/{tick}')

        with open(f'unclean/{tick}/{year}.txt', 'w') as out:
            out.write(filing_text)

        all_narratives = pipeline_api(filing_text, response_type="text/csv", m_section=["_ALL"])

        print(all_narratives[:3])
        if not os.path.exists(f'clean/{tick}'):
            os.makedirs(f'clean/{tick}')

        with open(f'clean/{tick}/{year}.csv', 'w') as out:
            out.write(all_narratives)

AAPL
Year: 2023
sec
Year: 2022


KeyboardInterrupt: 

In [None]:
import io


# pipeline-api
class timeout:
    def __init__(self, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        try:
            signal.signal(signal.SIGALRM, self.handle_timeout)
            signal.alarm(self.seconds)
        except ValueError:
            pass
    def __exit__(self, type, value, traceback):
        try:
            signal.alarm(0)
        except ValueError:
            pass

# pipeline-api
def get_regex_enum(section_regex):
    class CustomSECSection(Enum):
        CUSTOM = re.compile(section_regex)

        @property
        def pattern(self):
            return self.value

    return CustomSECSection.CUSTOM

# pipeline-api
import csv
from typing import Dict
from unstructured.documents.elements import Text, NarrativeText, Title, ListItem
def convert_to_isd_csv(results:dict) -> str:
    """
    Returns the representation of document elements as an Initial Structured Document (ISD)
    in CSV Format.
    """
    csv_fieldnames: List[str] = ["section", "element_type", "text"]
    new_rows = []
    for section, section_narrative in results.items():
        rows: List[Dict[str, str]] = convert_to_isd(section_narrative)
        for row in rows:
            new_row_item = dict()
            new_row_item["section"] = section
            new_row_item["element_type"] = row["type"]
            new_row_item["text"] = row["text"]
            new_rows.append(new_row_item)

    with io.StringIO() as buffer:
        csv_writer = csv.DictWriter(buffer, fieldnames=csv_fieldnames)
        csv_writer.writeheader()
        csv_writer.writerows(new_rows)
        return buffer.getvalue()

# pipeline-api
from unstructured.staging.label_studio import stage_for_label_studio

# List of valid response schemas
LABELSTUDIO = "labelstudio"
ISD = "isd"

def pipeline_api(text, response_type="application/json", response_schema="isd", m_section=[], m_section_regex=[]):
    """Many supported sections including: RISK_FACTORS, MANAGEMENT_DISCUSSION, and many more"""
    validate_section_names(m_section)

    sec_document = SECDocument.from_string(text)
    if sec_document.filing_type not in VALID_FILING_TYPES:
        raise ValueError(
            f"SEC document filing type {sec_document.filing_type} is not supported, "
            f"must be one of {','.join(VALID_FILING_TYPES)}"
        )
    results = {}
    if m_section == [ALL_SECTIONS]:
        filing_type = sec_document.filing_type
        if filing_type in REPORT_TYPES:
            if filing_type.startswith("10-K"):
                m_section = [enum.name for enum in SECTIONS_10K]
            elif filing_type.startswith("10-Q"):
                m_section = [enum.name for enum in SECTIONS_10Q]
            else:
                raise ValueError(f"Invalid report type: {filing_type}")

        else:
            m_section = [enum.name for enum in SECTIONS_S1]
    for section in m_section:
        results[section] = sec_document.get_section_narrative(
            section_string_to_enum[section]
        )
    for i, section_regex in enumerate(m_section_regex):
        regex_enum = get_regex_enum(section_regex)
        with timeout(seconds=5):
            section_elements = sec_document.get_section_narrative(regex_enum)
            results[f"REGEX_{i}"] = section_elements
    if response_type == "application/json":
        if response_schema == LABELSTUDIO:
            return {section:stage_for_label_studio(section_narrative) for section, section_narrative in results.items()}
        elif response_schema == ISD:
            return {section:convert_to_isd(section_narrative) for section, section_narrative in results.items()}
        else:
            raise ValueError(f"output_schema '{response_schema}' is not supported for {response_type}")
    elif response_type == "text/csv":
        if response_schema != ISD:
            raise ValueError(f"output_schema '{response_schema}' is not supported for {response_type}")
        return convert_to_isd_csv(results)
    else:
        raise ValueError(f"response_type '{response_type}' is not supported")

def _get_session(company: Optional[str] = None, email: Optional[str] = None) -> requests.Session:
    """Creates a requests sessions with the appropriate headers set. If these headers are not
    set, SEC will reject your request.
    ref: https://www.sec.gov/os/accessing-edgar-data"""
    if company is None:
        company = os.environ.get("SEC_API_ORGANIZATION")
    if email is None:
        email = os.environ.get("SEC_API_EMAIL")
    assert company
    assert email
    session = requests.Session()
    session.headers.update(
        {
            "User-Agent": f"{company} {email}",
            "Content-Type": "text/html",
        }
    )
    return session

session = _get_session("IITM", "21f1001906@ds.study.iitm.ac.in")

def _drop_dashes(accession_number: Union[str, int]) -> str:
    """Converts the accession number to the no dash representation."""
    accession_number = str(accession_number).replace("-", "")
    return accession_number.zfill(18)

from datetime import datetime
from bs4 import BeautifulSoup

def get_10k_filings_by_ticker_with_years(ticker: str,
                                         company: Optional[str] = "IITM",
                                         email: Optional[str] = "21f1001906@ds.study.iitm.ac.in") -> List[Tuple[int, str]]:
    session = _get_session(company, email)
    cik = get_cik_by_ticker(session, ticker)
    forms_dict = get_forms_by_cik(session, cik)
    ten_k_filings = []
    for accession_number, form_type in forms_dict.items():
        if form_type == "10-K":
            text = get_filing(cik, _drop_dashes(accession_number), company, email)
            year = extract_filing_year(text)
            ten_k_filings.append((year, text))
    return ten_k_filings

def extract_filing_year(text: str) -> int:
    # Extract the filing year from the text content of the filing
    pattern = re.compile(r"CONFORMED PERIOD OF REPORT:\s*(\d{4})\d{4}")
    match = pattern.search(text)
    if match:
        return int(match.group(1))
    else:
        raise ValueError("Unable to extract filing year")

# Example usage
tickers = ["AAPL","RGLD","IBM"]

for tick in tickers:
    ten_k_filings_with_years = get_10k_filings_by_ticker_with_years(tick)
    print("==========================")
    print(tick)
    for year, filing_text in ten_k_filings_with_years:
        print(f"Year: {year}")

        if not os.path.exists(f'unclean/{tick}'):
            os.makedirs(f'unclean/{tick}')

        with open(f'unclean/{tick}/{year}.txt', 'w') as out:
            out.write(filing_text)

        all_narratives = pipeline_api(filing_text, response_type="text/csv", m_section=["_ALL"])

        if not os.path.exists(f'clean/{tick}'):
            os.makedirs(f'clean/{tick}')

        with open(f'clean/{tick}/{year}.csv', 'w') as out:
            out.write(all_narratives)

AAPL
Year: 2023
Year: 2022
Year: 2021


KeyboardInterrupt: 

In [None]:
from prepline_sec_filings.sections import section_string_to_enum, validate_section_names, SECSection
from prepline_sec_filings.sec_document import SECDocument, REPORT_TYPES, VALID_FILING_TYPES
from prepline_sec_filings.fetch import *
import pandas as pd

In [None]:
SECTIONS_10K = (
    SECSection.BUSINESS,  # ITEM 1
    SECSection.RISK_FACTORS,  # ITEM 1A
    SECSection.UNRESOLVED_STAFF_COMMENTS,  # ITEM 1B
    SECSection.PROPERTIES,  # ITEM 2
    SECSection.LEGAL_PROCEEDINGS,  # ITEM 3
    SECSection.MINE_SAFETY,  # ITEM 4
    SECSection.MARKET_FOR_REGISTRANT_COMMON_EQUITY,  # ITEM 5
    # NOTE(robinson) - ITEM 6 is "RESERVED"
    SECSection.MANAGEMENT_DISCUSSION,  # ITEM 7
    SECSection.MARKET_RISK_DISCLOSURES,  # ITEM 7A
    SECSection.FINANCIAL_STATEMENTS,  # ITEM 8
    SECSection.ACCOUNTING_DISAGREEMENTS,  # ITEM 9
    SECSection.CONTROLS_AND_PROCEDURES,  # ITEM 9A
    # NOTE(robinson) - ITEM 9B is other information
    SECSection.FOREIGN_JURISDICTIONS,  # ITEM 9C
    SECSection.MANAGEMENT,  # ITEM 10
    SECSection.COMPENSATION,  # ITEM 11
    SECSection.PRINCIPAL_STOCKHOLDERS,  # ITEM 12
    SECSection.RELATED_PARTY_TRANSACTIONS,  # ITEM 13
    SECSection.ACCOUNTING_FEES,  # ITEM 14
    SECSection.EXHIBITS,  # ITEM 15
    SECSection.FORM_SUMMARY,  # ITEM 16
)

In [None]:
def get_all_section_text(text):
  """
  text: text as string type
  returns:
    Pandas dataframe
    columns: Title, Text
  """
  df = pd.DataFrame(columns=['Title', 'Text'])
  sec_document = SECDocument.from_string(text)

  for sec in SECTIONS_10K:
    section_title = str(sec).split(".")[1]
    #print(section_title)

    for sec in SECTIONS_10K:
      narrative = sec_document.get_section_narrative(sec)

      for element in narrative:
        #print(element)

        new_row = {
        'Title': section_title,
        'Text': str(element)
            }
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
  return df

In [None]:
%%time

df = get_all_section_text(text)

KeyboardInterrupt: 

In [None]:
sec_document = SECDocument.from_string(text)

In [None]:
# Install pipeline-sec-filings
!git clone https://github.com/Unstructured-IO/pipeline-sec-filings.git --depth=1
%cd pipeline-sec-filings

Cloning into 'pipeline-sec-filings'...
remote: Enumerating objects: 69, done.[K
remote: Counting objects: 100% (69/69), done.[K
remote: Compressing objects: 100% (63/63), done.[K
remote: Total 69 (delta 2), reused 43 (delta 1), pack-reused 0[K
Receiving objects: 100% (69/69), 216.03 KiB | 3.38 MiB/s, done.
Resolving deltas: 100% (2/2), done.
/content/pipeline-sec-filings


In [None]:
# Install Python requirements
!pip install -q ratelimit unstructured==0.4.6
# upgrade to the latest, though has not been tested
# !pip install -q --upgrade ratelimit unstructured

  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m420.8/420.8 kB[0m [31m17.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m239.6/239.6 kB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m471.6/471.6 kB[0m [31m23.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.9/75.9 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.1/17.1 MB[0m [31m24.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m159.9/159.9 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import nltk
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


True

In [None]:
# pipeline-api
from enum import Enum
import re
import signal

from unstructured.staging.base import convert_to_isd
from prepline_sec_filings.sections import (
    ALL_SECTIONS,
    SECTIONS_10K
)
from prepline_sec_filings.sections import section_string_to_enum, validate_section_names, SECSection
from prepline_sec_filings.sec_document import SECDocument, REPORT_TYPES, VALID_FILING_TYPES
from prepline_sec_filings.fetch import *

import io

In [None]:
text = get_form_by_ticker(
    'rgld',
    '10-K',
    company='IITM',
    email='21f1001906@ds.study.iitm.ac.in'
)

In [None]:
# pipeline-api
class timeout:
    def __init__(self, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        try:
            signal.signal(signal.SIGALRM, self.handle_timeout)
            signal.alarm(self.seconds)
        except ValueError:
            pass
    def __exit__(self, type, value, traceback):
        try:
            signal.alarm(0)
        except ValueError:
            pass

In [None]:
# pipeline-api
def get_regex_enum(section_regex):
    class CustomSECSection(Enum):
        CUSTOM = re.compile(section_regex)

        @property
        def pattern(self):
            return self.value

    return CustomSECSection.CUSTOM

In [None]:
# pipeline-api
import csv
from typing import Dict
from unstructured.documents.elements import Text, NarrativeText, Title, ListItem
def convert_to_isd_csv(results:dict) -> str:
    """
    Returns the representation of document elements as an Initial Structured Document (ISD)
    in CSV Format.
    """
    csv_fieldnames: List[str] = ["section", "element_type", "text"]
    new_rows = []
    for section, section_narrative in results.items():
        rows: List[Dict[str, str]] = convert_to_isd(section_narrative)
        for row in rows:
            new_row_item = dict()
            new_row_item["section"] = section
            new_row_item["element_type"] = row["type"]
            new_row_item["text"] = row["text"]
            new_rows.append(new_row_item)

    with io.StringIO() as buffer:
        csv_writer = csv.DictWriter(buffer, fieldnames=csv_fieldnames)
        csv_writer.writeheader()
        csv_writer.writerows(new_rows)
        return buffer.getvalue()

In [None]:
# pipeline-api
from unstructured.staging.label_studio import stage_for_label_studio

# List of valid response schemas
LABELSTUDIO = "labelstudio"
ISD = "isd"

def pipeline_api(text, response_type="application/json", response_schema="isd", m_section=[], m_section_regex=[]):
    """Many supported sections including: RISK_FACTORS, MANAGEMENT_DISCUSSION, and many more"""
    validate_section_names(m_section)

    sec_document = SECDocument.from_string(text)
    if sec_document.filing_type not in VALID_FILING_TYPES:
        raise ValueError(
            f"SEC document filing type {sec_document.filing_type} is not supported, "
            f"must be one of {','.join(VALID_FILING_TYPES)}"
        )
    results = {}
    if m_section == [ALL_SECTIONS]:
        filing_type = sec_document.filing_type
        if filing_type in REPORT_TYPES:
            if filing_type.startswith("10-K"):
                m_section = [enum.name for enum in SECTIONS_10K]
            elif filing_type.startswith("10-Q"):
                m_section = [enum.name for enum in SECTIONS_10Q]
            else:
                raise ValueError(f"Invalid report type: {filing_type}")

        else:
            m_section = [enum.name for enum in SECTIONS_S1]
    for section in m_section:
        results[section] = sec_document.get_section_narrative(
            section_string_to_enum[section]
        )
    for i, section_regex in enumerate(m_section_regex):
        regex_enum = get_regex_enum(section_regex)
        with timeout(seconds=5):
            section_elements = sec_document.get_section_narrative(regex_enum)
            results[f"REGEX_{i}"] = section_elements
    if response_type == "application/json":
        if response_schema == LABELSTUDIO:
            return {section:stage_for_label_studio(section_narrative) for section, section_narrative in results.items()}
        elif response_schema == ISD:
            return {section:convert_to_isd(section_narrative) for section, section_narrative in results.items()}
        else:
            raise ValueError(f"output_schema '{response_schema}' is not supported for {response_type}")
    elif response_type == "text/csv":
        if response_schema != ISD:
            raise ValueError(f"output_schema '{response_schema}' is not supported for {response_type}")
        return convert_to_isd_csv(results)
    else:
        raise ValueError(f"response_type '{response_type}' is not supported")

In [None]:
risk_narrative = pipeline_api(text, m_section=["RISK_FACTORS"])["RISK_FACTORS"]
risk_narrative[:5]

[]

In [None]:
all_narratives = pipeline_api(text, m_section=["_ALL"])
for section, elems in all_narratives.items():
    print(section)
    print(elems[:4])
    print("---------------")

In [None]:
import time

start_time = time.time()

In [None]:
all_narratives = pipeline_api(text, response_type="text/csv", m_section=["_ALL"])

with open('all_narratives.csv', 'w') as out:
    out.write(all_narratives)

In [None]:
end_time = time.time()

# Calculate the execution time
execution_time = end_time - start_time
print("Execution time:", execution_time, "seconds")

Execution time: 110.09461545944214 seconds


In [None]:
print(text[1375:3284])

LSchema-instance" xmlns:iso4217="http://www.xbrl.org/2003/iso4217" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:ixt="http://www.xbrl.org/inlineXBRL/transformation/2020-02-12" xmlns:us-gaap="http://fasb.org/us-gaap/2023" xmlns:nvda="http://www.nvidia.com/20240128" xmlns:country="http://xbrl.sec.gov/country/2023" xmlns:xbrldi="http://xbrl.org/2006/xbrldi" xmlns="http://www.w3.org/1999/xhtml" xmlns:srt="http://fasb.org/srt/2023" xmlns:stpr="http://xbrl.sec.gov/stpr/2023" xmlns:ecd="http://xbrl.sec.gov/ecd/2023" xmlns:dei="http://xbrl.sec.gov/dei/2023" xmlns:ix="http://www.xbrl.org/2013/inlineXBRL" xmlns:link="http://www.xbrl.org/2003/linkbase" xmlns:xbrli="http://www.xbrl.org/2003/instance" xmlns:ixt-sec="http://www.sec.gov/inlineXBRL/transformation/2015-08-31" xml:lang="en-US"><head><meta http-equiv="Content-Type" content="text/html"/>


<title>nvda-20240128</title></head><body><div style="display:none"><ix:header><ix:hidden><ix:nonNumeric contextRef="c-1" name="dei:EntityCentralInde

In [None]:
recent = get_recent_cik_and_acc_by_ticker(
    'rgld',
    '10-K',
    company='IITM',
    email='21f1001906@ds.study.iitm.ac.in'
)

In [None]:
cik = recent[0]
cik

'0000085535'

In [None]:
def _get_session(company: Optional[str] = None, email: Optional[str] = None) -> requests.Session:
    """Creates a requests sessions with the appropriate headers set. If these headers are not
    set, SEC will reject your request.
    ref: https://www.sec.gov/os/accessing-edgar-data"""
    if company is None:
        company = os.environ.get("SEC_API_ORGANIZATION")
    if email is None:
        email = os.environ.get("SEC_API_EMAIL")
    assert company
    assert email
    session = requests.Session()
    session.headers.update(
        {
            "User-Agent": f"{company} {email}",
            "Content-Type": "text/html",
        }
    )
    return session

In [None]:
session = _get_session("IITM", "21f1001906@ds.study.iitm.ac.in")

In [None]:
form = get_forms_by_cik(session=session, cik=cik)

In [None]:
recent

('0000085535', '000155837024001301', '10-K/A')

In [None]:
filtered_dict = {key: value for key, value in form.items() if value == '10-K'}

print(filtered_dict)

{'0001558370-24-001192': '10-K', '0001558370-23-001391': '10-K', '0001558370-21-011343': '10-K', '0001558370-20-009452': '10-K', '0001558370-19-007532': '10-K', '0001558370-18-006805': '10-K', '0001558370-17-006462': '10-K', '0001047469-16-014916': '10-K', '0001047469-15-006623': '10-K', '0001047469-14-006723': '10-K', '0001047469-13-008238': '10-K', '0001047469-12-008035': '10-K', '0001047469-11-007500': '10-K', '0001047469-10-007729': '10-K'}


In [None]:
def _drop_dashes(accession_number: Union[str, int]) -> str:
    """Converts the accession number to the no dash representation."""
    accession_number = str(accession_number).replace("-", "")
    return accession_number.zfill(18)

In [None]:
!mkdir RGLD

In [None]:
from datetime import datetime
from bs4 import BeautifulSoup

def get_10k_filings_by_ticker_with_years(ticker: str,
                                         company: Optional[str] = "IITM",
                                         email: Optional[str] = "21f1001906@ds.study.iitm.ac.in") -> List[Tuple[int, str]]:
    session = _get_session(company, email)
    cik = get_cik_by_ticker(session, ticker)
    forms_dict = get_forms_by_cik(session, cik)
    ten_k_filings = []
    for accession_number, form_type in forms_dict.items():
        if form_type == "10-K":
            text = get_filing(cik, _drop_dashes(accession_number), company, email)
            year = extract_filing_year(text)
            ten_k_filings.append((year, text))
    return ten_k_filings

def extract_filing_year(text: str) -> int:
    # Extract the filing year from the text content of the filing
    pattern = re.compile(r"CONFORMED PERIOD OF REPORT:\s*(\d{4})\d{4}")
    match = pattern.search(text)
    if match:
        return int(match.group(1))
    else:
        raise ValueError("Unable to extract filing year")

# Example usage
ticker = "RGLD"  # Example ticker symbol for Apple Inc.
ten_k_filings_with_years = get_10k_filings_by_ticker_with_years(ticker)

for year, filing_text in ten_k_filings_with_years:
    if int(year) > 2015:
      continue
    print(f"Year: {year}")
    all_narratives = pipeline_api(filing_text, response_type="text/csv", m_section=["_ALL"])

    with open(f'{ticker}/{year}.csv', 'w') as out:
        out.write(all_narratives)
    #print(f"Filing Text: {filing_text[:5000]}...")
    #print("=" * 50)

Year: 2015
Year: 2014
Year: 2013
Year: 2012
Year: 2011
Year: 2010


In [None]:
# Define the folder you want to zip
folder_to_zip = '/content/pipeline-sec-filings/RGLD'

# Define the destination path and name for the zip file
destination_zip = '/content/your_zip_file_name.zip'

# Use the zip command to compress the folder
!zip -r "$destination_zip" "$folder_to_zip"

  adding: content/pipeline-sec-filings/RGLD/ (stored 0%)
  adding: content/pipeline-sec-filings/RGLD/2014.csv (deflated 74%)
  adding: content/pipeline-sec-filings/RGLD/2018.csv (deflated 75%)
  adding: content/pipeline-sec-filings/RGLD/2020.csv (deflated 75%)
  adding: content/pipeline-sec-filings/RGLD/2017.csv (deflated 75%)
  adding: content/pipeline-sec-filings/RGLD/2019.csv (deflated 76%)
  adding: content/pipeline-sec-filings/RGLD/2022.csv (deflated 75%)
  adding: content/pipeline-sec-filings/RGLD/2021.csv (deflated 76%)
  adding: content/pipeline-sec-filings/RGLD/2011.csv (deflated 73%)
  adding: content/pipeline-sec-filings/RGLD/2010.csv (stored 0%)
  adding: content/pipeline-sec-filings/RGLD/2012.csv (deflated 74%)
  adding: content/pipeline-sec-filings/RGLD/2015.csv (deflated 74%)
  adding: content/pipeline-sec-filings/RGLD/2023.csv (deflated 75%)
  adding: content/pipeline-sec-filings/RGLD/2013.csv (deflated 74%)


Wow! We're able to pull in the document, but it's really messy.
To help, we'll apply Unstructured Bricks to extract the information we're most interested in. Ultimately, we want to be able to ask the API
for a section and get back the narrative text within that section like the JSON file below. Once
we extract the narrative text, we can spin up a labeling task or send
it to a downstream ML service for inference.

```json
[
  {
    "text": "You should carefully consider the risks described in this section. Our future performance is subject to risks and uncertainties that could have a material adverse effect on our business, results of operations, and financial condition and the trading price of our common stock. We may be subject to other risks and uncertainties not presently known to us. In addition, please see our note about forward-looking statements included in the MD&A.",
    "type": "NarrativeText"
  },
  {
    "text": "Our revenue is subject to volatility in metal prices, which could negatively affect our results of operations or cash flow.",
    "type": "NarrativeText"
  },
  {
    "text": "Market prices for gold, silver, copper, nickel, and other metals may fluctuate widely over time and are affected by numerous factors beyond our control. These factors include metal supply and demand, industrial and jewelry fabrication, investment demand, central banking actions, inflation expectations, currency values, interest rates, forward sales by metal producers, and political, trade, economic, or banking conditions.",
    "type": "NarrativeText"
  },
```

In [None]:
from unstructured.documents.html import HTMLDocument

html_document = HTMLDocument.from_string(text).doc_after_cleaners(skip_headers_and_footers=True, skip_table_text=True)

In [None]:
for element in html_document.pages[0].elements[71:75]:
    print(element)
    print("\n")

In [None]:
print(html_document.pages[7])

Table of Contents

tier-1 suppliers, and start-ups. Our AV solution also includes the GPU-based hardware required to train the neural networks before their in-vehicle deployment, as well as to re-simulate their operation prior to any over-the-air software updates. We believe our comprehensive, top-to-bottom and end-to-end approach will enable the transportation industry to solve the complex problems arising from the shift to autonomous driving.

Leveraging our intellectual property, or IP.

We believe our IP is a valuable asset that can be accessed by our customers and partners through license and development agreements when they desire to build such capabilities directly into their own products or have us do so through a custom development. Such license and development arrangements can further enhance the reach of our technology.

Sales and Marketing

Our worldwide sales and marketing strategy is key to achieving our objective of providing markets with our high-performance and efficie

In [None]:
html_document.pages[0].elements[71:75]

[]

In [None]:
from unstructured.nlp.partition import is_possible_title

is_possible_title("Regulation")

True

In [None]:
is_possible_title("""Operators of the mines that are subject to our
stream and royalty interests must comply with numerous environmental,
mine safety, land use, waste disposal, remediation and public health
laws and regulations promulgated by federal, state, provincial and
local governments in the United States, Canada, Chile, the Dominican
Republic, Ghana, Mexico, Botswana, Australia and other countries where
we hold interests. Although we, as a stream or royalty interest owner,
are not""")

False

In [None]:
from unstructured.nlp.partition import is_possible_narrative_text

is_possible_narrative_text("Regulation")

False

In [None]:
is_possible_narrative_text("""Operators of the mines that are subject to our
stream and royalty interests must comply with numerous environmental,
mine safety, land use, waste disposal, remediation and public health
laws and regulations promulgated by federal, state, provincial and
local governments in the United States, Canada, Chile, the Dominican
Republic, Ghana, Mexico, Botswana, Australia and other countries where
we hold interests. Although we, as a stream or royalty interest owner,
are not""")

True

In [None]:
import re
from unstructured.documents.elements import Title

In [None]:
ITEM_TITLE_RE = re.compile(
    r"(?i)item \d{1,3}(?:[a-z]|\([a-z]\))?(?:\.)?(?::)?"
)

In [None]:
def is_10k_item_title(title: str) -> bool:
    """Determines if a title corresponds to a 10-K item heading."""
    return ITEM_TITLE_RE.match(title) is not None

In [None]:
for element in html_document.elements:
    if isinstance(element, Title) and is_10k_item_title(element.text):
        print(element)

Item 1. Business
Item 1A. Risk Factors
Item 1B. Unresolved Staff Comments
Item 1C. Cybersecurity
Item 2. Properties
Item 3. Legal Proceedings
Item 4. Mine Safety Disclosures
Item 7A. Quantitative and Qualitative Disclosures about Market Risk
Item 9C.  Disclosure Regarding Foreign Jurisdictions that Prevent Inspections
Item 10. Directors, Executive Officers and Corporate Governance
Item 11. Executive Compensation
Item 13. Certain Relationships and Related Transactions, and Director Independence
Item 14. Principal Accountant Fees and Services


In [None]:
for element in html_document.elements:
    if isinstance(element, Title) and is_10k_item_title(element.text):
        print(element)

Item 1. Business
Item 1A. Risk Factors
Item 1B. Unresolved Staff Comments
Item 1C. Cybersecurity
Item 2. Properties
Item 3. Legal Proceedings
Item 4. Mine Safety Disclosures
Item 7A. Quantitative and Qualitative Disclosures about Market Risk
Item 9C.  Disclosure Regarding Foreign Jurisdictions that Prevent Inspections
Item 10. Directors, Executive Officers and Corporate Governance
Item 11. Executive Compensation
Item 13. Certain Relationships and Related Transactions, and Director Independence
Item 14. Principal Accountant Fees and Services


In [None]:
from unstructured.cleaners.core import clean_extra_whitespace

In [None]:
titles = []
for element in html_document.elements:
    #print(element)
    element.text = clean_extra_whitespace(element.text)
    if isinstance(element, Title) and is_10k_item_title(element.text):
        titles.append(element)
        print(element)

Item 1. Business
Item 1A. Risk Factors
Item 1B. Unresolved Staff Comments
Item 1C. Cybersecurity
Item 2. Properties
Item 3. Legal Proceedings
Item 4. Mine Safety Disclosures
Item 7A. Quantitative and Qualitative Disclosures about Market Risk
Item 8. Financial Statements and Supplementary Data
Item 9. Changes in and Disagreements with Accountants on Accounting and Financial Disclosure
Item 9A. Controls and Procedures
Item 9B. Other Information
Item 9C. Disclosure Regarding Foreign Jurisdictions that Prevent Inspections
Item 10. Directors, Executive Officers and Corporate Governance
Item 11. Executive Compensation
Item 13. Certain Relationships and Related Transactions, and Director Independence
Item 14. Principal Accountant Fees and Services
Item 15. Exhibit and Financial Statement Schedules
Item 16. Form 10-K Summary


In [None]:
import pandas as pd

In [None]:
titles = []
df = pd.DataFrame(columns=['Title', 'Text'])
current_title = "Misc."

for element in html_document.elements:
    #print(element)

    element.text = clean_extra_whitespace(element.text)
    if isinstance(element, Title) and is_10k_item_title(element.text):
        titles.append(element)
        print(element)
        current_title = element.text
    else:
        new_row = {
        'Title': current_title,
        'Text': element.text
    }
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)

Item 1. Business
Item 1A. Risk Factors
Item 1B. Unresolved Staff Comments
Item 1C. Cybersecurity
Item 2. Properties
Item 3. Legal Proceedings
Item 4. Mine Safety Disclosures
Item 7A. Quantitative and Qualitative Disclosures about Market Risk
Item 8. Financial Statements and Supplementary Data
Item 9. Changes in and Disagreements with Accountants on Accounting and Financial Disclosure
Item 9A. Controls and Procedures
Item 9B. Other Information
Item 9C. Disclosure Regarding Foreign Jurisdictions that Prevent Inspections
Item 10. Directors, Executive Officers and Corporate Governance
Item 11. Executive Compensation
Item 13. Certain Relationships and Related Transactions, and Director Independence
Item 14. Principal Accountant Fees and Services
Item 15. Exhibit and Financial Statement Schedules
Item 16. Form 10-K Summary


In [None]:
df.Title.value_counts()

Title
Item 15. Exhibit and Financial Statement Schedules                                              436
Item 1A. Risk Factors                                                                           245
Item 4. Mine Safety Disclosures                                                                 168
Item 1. Business                                                                                154
Item 16. Form 10-K Summary                                                                      140
Misc.                                                                                            49
Item 9A. Controls and Procedures                                                                 17
Item 10. Directors, Executive Officers and Corporate Governance                                  14
Item 7A. Quantitative and Qualitative Disclosures about Market Risk                              13
Item 1C. Cybersecurity                                                                        

In [None]:
df.Text

In [None]:
titles = []
for element in html_document.elements:
    print(type(str(element)))
    break
    element.text = clean_extra_whitespace(element.text)
    if isinstance(element, Title) and is_10k_item_title(element.text):
        titles.append(element)
        print(element)

<class 'str'>


In [None]:
for i, el in enumerate(html_document.elements):
  if el.id == titles[0].id:
    break
first_title_index = i
for i in range(first_title_index, first_title_index+10):
  print(html_document.elements[i])

Item 1
Note About Forward-Looking Statements
This report includes estimates, projections, statements relating to our business plans, objectives, and expected operating results that are “forward-looking statements” within the meaning of the Private Securities Litigation Reform Act of 1995, Section 27A of the Securities Act of 1933, and Section 21E of the Securities Exchange Act of 1934. Forward-looking statements may appear throughout this report, including the following sections: “Business” (Part I, Item 1 of this Form 10-K), “Risk Factors” (Part I, Item 1A of this Form 10-K), and “Management’s Discussion and Analysis of Financial Condition and Results of Operations” (Part II, Item 7 of this Form 10-K). These forward-looking statements generally are identified by the words “believe,” “project,” “expect,” “anticipate,” “estimate,” “intend,” “strategy,” “future,” “opportunity,” “plan,” “may,” “should,” “will,” “would,” “will be,” “will continue,” “will likely result,” and similar express

In [None]:
{type(el) for el in html_document.elements}

{unstructured.documents.html.HTMLListItem,
 unstructured.documents.html.HTMLNarrativeText,
 unstructured.documents.html.HTMLText,
 unstructured.documents.html.HTMLTitle}

In [None]:
from unstructured.documents.html import HTMLListItem
for i, el in enumerate(html_document.elements):
  if isinstance(el, HTMLListItem):
    break
first_list_item_idx = i
for i in range(first_list_item_idx-1, first_list_item_idx+10):
  print(html_document.elements[i], type(html_document.elements[i]))

First Quarter 2023: <class 'unstructured.documents.html.HTMLTitle'>
iPad and iPad Pro; <class 'unstructured.documents.html.HTMLListItem'>
Next-generation Apple TV 4K; and <class 'unstructured.documents.html.HTMLListItem'>
MLS Season Pass, a Major League Soccer subscription streaming service. <class 'unstructured.documents.html.HTMLListItem'>
Second Quarter 2023: <class 'unstructured.documents.html.HTMLTitle'>
MacBook Pro 14”, MacBook Pro 16” and Mac mini; and <class 'unstructured.documents.html.HTMLListItem'>
Second-generation HomePod. <class 'unstructured.documents.html.HTMLListItem'>
Third Quarter 2023: <class 'unstructured.documents.html.HTMLTitle'>
MacBook Air 15”, Mac Studio and Mac Pro; <class 'unstructured.documents.html.HTMLListItem'>
Apple Vision Pro™, the Company’s first spatial computer featuring its new visionOS™, expected to be available in early calendar year 2024; and <class 'unstructured.documents.html.HTMLListItem'>
iOS 17, macOS Sonoma, iPadOS 17, tvOS 17 and watchOS 

In [None]:
# pipeline-api
from prepline_sec_filings.sections import section_string_to_enum, validate_section_names, SECSection
from prepline_sec_filings.sec_document import SECDocument, REPORT_TYPES, VALID_FILING_TYPES

In [None]:
sec_document = SECDocument.from_string(text)
risk_narrative = sec_document.get_section_narrative(SECSection.RISK_FACTORS)

KeyboardInterrupt: 

In [None]:
for element in risk_narrative[:3]:
    print(element)
    print("\n")

The Company’s business, reputation, results of operations, financial condition and stock price can be affected by a number of factors, whether currently known or unknown, including those described below. When any one or more of these risks materialize from time to time, the Company’s business, reputation, results of operations, financial condition and stock price can be materially and adversely affected.


Because of the following factors, as well as other factors affecting the Company’s results of operations and financial condition, past financial performance should not be considered to be a reliable indicator of future performance, and investors should not use historical trends to anticipate results or trends in future periods. This discussion of risk factors contains forward-looking statements.


This section should be read in conjunction with Part II, Item 7, “Management’s Discussion and Analysis of Financial Condition and Results of Operations” and the consolidated financial state

In [None]:
from unstructured.staging.label_studio import stage_for_label_studio

In [None]:
label_studio_data = stage_for_label_studio(risk_narrative)
label_studio_data[:5]

[{'data': {'text': 'You should carefully consider the risks described in this section. Our future performance is subject to risks and uncertainties that could have a material adverse effect on our business, results of operations, and financial condition and the trading price of our common stock. We may be subject to other risks and uncertainties not presently known to us. In addition, please see our note about forward-looking statements included in the MD&A.',
   'ref_id': '7a912bb639b547404be4ceaf5d9083a9'}},
 {'data': {'text': 'Our revenue is subject to volatility in metal prices, which could negatively affect our results of operations or cash flow.',
   'ref_id': 'd4cc8e0e0c2b68ef69282c5250b721c9'}},
 {'data': {'text': 'Market prices for gold, silver, copper, nickel, and other metals may fluctuate widely over time and are affected by numerous factors beyond our control. These factors include metal supply and demand, industrial and jewelry fabrication, investment demand, central bank

In [None]:
# pipeline-api
from enum import Enum
import re
import signal

from unstructured.staging.base import convert_to_isd
from prepline_sec_filings.sections import (
    ALL_SECTIONS,
    SECTIONS_10K,
    SECTIONS_10Q,
    SECTIONS_S1,
)

In [None]:
# pipeline-api
class timeout:
    def __init__(self, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        try:
            signal.signal(signal.SIGALRM, self.handle_timeout)
            signal.alarm(self.seconds)
        except ValueError:
            pass
    def __exit__(self, type, value, traceback):
        try:
            signal.alarm(0)
        except ValueError:
            pass

In [None]:
# pipeline-api
def get_regex_enum(section_regex):
    class CustomSECSection(Enum):
        CUSTOM = re.compile(section_regex)

        @property
        def pattern(self):
            return self.value

    return CustomSECSection.CUSTOM

In [None]:
# pipeline-api
def pipeline_api(text, m_section=[], m_section_regex=[]):
    """Many supported sections including: RISK_FACTORS, MANAGEMENT_DISCUSSION, and many more"""
    validate_section_names(m_section)

    sec_document = SECDocument.from_string(text)
    if sec_document.filing_type not in VALID_FILING_TYPES:
        raise ValueError(
            f"SEC document filing type {sec_document.filing_type} is not supported, "
            f"must be one of {','.join(VALID_FILING_TYPES)}"
        )
    results = {}
    if m_section == [ALL_SECTIONS]:
        filing_type = sec_document.filing_type
        if filing_type in REPORT_TYPES:
            if filing_type.startswith("10-K"):
                m_section = [enum.name for enum in SECTIONS_10K]
            elif filing_type.startswith("10-Q"):
                m_section = [enum.name for enum in SECTIONS_10Q]
            else:
                raise ValueError(f"Invalid report type: {filing_type}")

        else:
            m_section = [enum.name for enum in SECTIONS_S1]
    for section in m_section:
        results[section] = sec_document.get_section_narrative(
            section_string_to_enum[section]
        )
    for i, section_regex in enumerate(m_section_regex):
        regex_enum = get_regex_enum(section_regex)
        with timeout(seconds=5):
            section_elements = sec_document.get_section_narrative(regex_enum)
            results[f"REGEX_{i}"] = section_elements
    return {section:convert_to_isd(section_narrative) for section, section_narrative in results.items()}

In [None]:
risk_narrative = pipeline_api(text, ["RISK_FACTORS"])["RISK_FACTORS"]
risk_narrative[:5]

[{'text': 'You should carefully consider the risks described in this section. Our future performance is subject to risks and uncertainties that could have a material adverse effect on our business, results of operations, and financial condition and the trading price of our common stock. We may be subject to other risks and uncertainties not presently known to us. In addition, please see our note about forward-looking statements included in the MD&A.',
  'type': 'NarrativeText'},
 {'text': 'Our revenue is subject to volatility in metal prices, which could negatively affect our results of operations or cash flow.',
  'type': 'NarrativeText'},
 {'text': 'Market prices for gold, silver, copper, nickel, and other metals may fluctuate widely over time and are affected by numerous factors beyond our control. These factors include metal supply and demand, industrial and jewelry fabrication, investment demand, central banking actions, inflation expectations, currency values, interest rates, for

In [None]:
all_narratives = pipeline_api(text, ["_ALL"])
for section, elems in all_narratives.items():
    print(section)
    print(elems[:4])
    print("---------------")

BUSINESS
[]
---------------
RISK_FACTORS
[{'text': 'You should carefully consider the risks described in this section. Our future performance is subject to risks and uncertainties that could have a material adverse effect on our business, results of operations, and financial condition and the trading price of our common stock. We may be subject to other risks and uncertainties not presently known to us. In addition, please see our note about forward-looking statements included in the MD&A.', 'type': 'NarrativeText'}, {'text': 'Our revenue is subject to volatility in metal prices, which could negatively affect our results of operations or cash flow.', 'type': 'NarrativeText'}, {'text': 'Market prices for gold, silver, copper, nickel, and other metals may fluctuate widely over time and are affected by numerous factors beyond our control. These factors include metal supply and demand, industrial and jewelry fabrication, investment demand, central banking actions, inflation expectations, c