In [41]:
# Install required libraries
!pip install pandas numpy requests pydantic lxml pymupdf mistralai fastapi pytest pytest-asyncio pandas httpx uvicorn openpyxl pillow nest_asyncio


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


### Background Information: SEC ADV Filings

The SEC Form ADV is a key regulatory document used by investment advisers to register with the U.S. Securities and Exchange Commission (SEC). It serves as a comprehensive disclosure form that provides detailed information about an investment adviser’s business, including its operations, services, and any potential conflicts of interest.

**Key Components of the SEC Form ADV:**

1. **Part 1A:** This section contains information about the adviser's business, ownership, clients, employees, business practices, affiliations, and any disciplinary events of the adviser or its employees. It also includes details about the private funds managed by the adviser.

2. **Part 2A (Brochure):** This section provides a narrative description of the adviser’s business practices, fees, conflicts of interest, and disciplinary information. It is designed to be a plain-English disclosure document that is given to clients.

3. **Part 2B (Brochure Supplement):** This section includes information about the advisory personnel on whom clients rely for investment advice, including their education, business experience, and any disciplinary history.

**Why is the SEC Form ADV Important?**

- **Transparency:** The form promotes transparency by providing clients and regulators with essential information about the adviser’s business practices and any potential conflicts of interest.
- **Regulatory Compliance:** Filing the Form ADV is a regulatory requirement for investment advisers, ensuring they operate within the legal framework set by the SEC and state authorities.
- **Investor Protection:** By disclosing detailed information about their operations and practices, advisers help protect investors from fraud and misrepresentation.

**Private Fund Reporting:**

Within the SEC Form ADV, Sections 7.B.(1) and 7.B.(2) are specifically focused on private fund reporting. These sections require advisers to provide detailed information about the private funds they manage or advise, including the fund’s name, type, gross asset value, and regulatory status. This information helps regulators and investors understand the scope and nature of the adviser’s involvement with private funds.

**Task Context:**

For this task, you will be working with data extracted from SEC ADV filings. You will download metadata and PDFs for specific firms, extract relevant information, and analyze it to identify top-performing funds. This exercise will help you understand how regulatory filings can be used to gather critical information about investment advisers and their managed funds.

# Code

In [7]:
from typing import List

from pydantic import BaseModel, Field, field_validator


class RunConfiguration(BaseModel):
    """
    Configuration class for MLP pipeline runs. Uses Pydantic data class to enable JSON
    serialization/deserialization of configuration.
    """
    firm_crd_numbers: List[int]
    input_dir: str
    output_dir: str
    db_path: str
    raise_error: bool = Field(default=False, description="Raise error upon failure")
    verbose: bool = Field(default=False, description="Enable verbose logging")

    @field_validator('firm_crd_numbers')
    def check_firm_crd_numbers(cls, v):
        if not v:
            raise ValueError("At least one firm CRD number must be provided")
        return v


In [8]:
import logging
from abc import ABC


class BaseSource(ABC):
    def __init__(self):
        self.logger = logging.getLogger(__name__)


In [9]:
"""
Data source for SEC ADV metadata
"""

import gzip
import io
import os
from datetime import datetime
from typing import Dict, List

import pandas as pd
import requests


class Source(BaseSource):
    """Data source for SEC ADV metadata"""

    def __init__(self, config: RunConfiguration):
        super().__init__()
        self.config = config

    def run(self, as_of_date: datetime, firm_crd_numbers: List[int]) -> Dict[int, str]:
        df_metadata = self.download_metadata(as_of_date)
        return self.download_pdf(df_metadata, firm_crd_numbers)

    def download_metadata(self, as_of_date: datetime) -> pd.DataFrame:
        """Download metadata from SEC"""
        as_of_date_str = as_of_date.strftime('%m_%d_%Y')
        self.logger.info(f"Downloading metadata for {as_of_date_str}")

        path = rf"https://reports.adviserinfo.sec.gov/reports/CompilationReports/IA_FIRM_SEC_Feed_{as_of_date_str}.xml.gz"

        try:
            response = requests.get(path)

            if response.status_code == 200:
                # Unzip the content
                with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as gz:
                    # Ensure the content is read as a string with the correct encoding
                    xml_content = gz.read().decode('ISO-8859-1')

                # Read the data into a dataframe specifying the encoding explicitly if needed
                df = pd.read_xml(io.StringIO(xml_content), xpath='//Info', encoding='ISO-8859-1')

                # Apply metadata transformer
                return self._transform_metadata(df)
            else:
                self.logger.error(f"Failed to download metadata. Status code: {response.status_code}")
                if self.config.raise_error:
                    raise Exception(f"Failed to download metadata. Status code: {response.status_code}")

        except Exception as e:
            self.logger.error(f"Error downloading metadata: {str(e)}")
            if self.config.raise_error:
                raise

    def _transform_metadata(self, data: pd.DataFrame) -> pd.DataFrame:
        """Transform SEC ADV metadata"""
        self.logger.info("Transforming SEC ADV metadata...")

        # Create a column with paths for the PDFs
        data['DownloadPath'] = data['FirmCrdNb'].apply(
            lambda x: f"https://reports.adviserinfo.sec.gov/reports/ADV/{x}/PDF/{x}.pdf"
        )
        data.set_index('FirmCrdNb', inplace=True)

        return data

    def download_pdf(self, df_metadata: pd.DataFrame, firm_crd_numbers: List[int]) -> Dict[int, str]:
        """Download PDF files for the specified firm CRD numbers"""
        self.logger.info("Downloading PDF files...")
        result_paths = {}

        for firm_crd in firm_crd_numbers:
            try:
                # Validate the firm CRD exists in metadata
                if firm_crd not in df_metadata.index:
                    self.logger.warning(f"Firm CRD {firm_crd} not found in metadata")
                    continue

                # Prepare file paths
                url = df_metadata.loc[firm_crd]['DownloadPath']
                file_name = url.split('/')[-1]
                save_path = os.path.join(self.config.input_dir, file_name)

                # Check if the file already exists
                if os.path.isfile(save_path):
                    self.logger.info(f"File {file_name} already exists. Using existing file.")
                    result_paths[firm_crd] = save_path
                else:
                    # Download the file
                    if self._download_file(url, save_path):
                        result_paths[firm_crd] = save_path
                    elif self.config.raise_error:
                        self.logger.error(f"Failed to download file for Firm CRD {firm_crd}")
                        raise Exception(f"Failed to download file for Firm CRD {firm_crd}")

            except Exception as e:
                self.logger.error(f"Error processing Firm CRD {firm_crd}: {str(e)}")
                if self.config.raise_error:
                    raise

        return result_paths

    def _download_file(self, url: str, save_path: str) -> bool:
        """Download a file from a URL and save it locally"""
        try:
            # Request the file
            response = requests.get(url)

            # Check response status
            if response.status_code == 200:
                # Write file to disk
                with open(save_path, 'wb') as file:
                    file.write(response.content)
                self.logger.info(f"Downloaded {save_path}")
                return True
            else:
                self.logger.error(f"Failed to download from {url}. Status code: {response.status_code}")
                return False
        except Exception as e:
            self.logger.error(f"Error downloading {url}: {str(e)}")
            return False


In [10]:
"""
Base transformer class for MLP pipeline
"""

import logging
from abc import ABC, abstractmethod
from typing import Any


class BaseTransformer(ABC):
    """Base transformer class for MLP pipeline"""

    def __init__(self, config: RunConfiguration):
        self.config = config
        self.logger = logging.getLogger(__name__)

    @abstractmethod
    def transform(self, data: Any) -> Any:
        """Transform the input data"""
        pass


In [12]:
import logging
from abc import abstractmethod


class BaseOCRReader:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        logging.basicConfig(level=logging.INFO)

    @abstractmethod
    def read(self, pdf_path: str):
        pass


In [13]:
import logging

import fitz


class FitzOCRReader(BaseOCRReader):
    def __init__(self):
        super().__init__()

    def read(self, pdf_path: str):
        doc = fitz.open(pdf_path)
        full_text = "\n".join(page.get_text("text") for page in doc)
        return full_text


In [15]:
import logging
import re
from typing import Dict, Optional, Any, List

import fitz
import numpy as np
from PIL import Image


class PdfTransformer(BaseTransformer):
    """Transform extracted text from SEC ADV PDF files"""

    def __init__(self, config: RunConfiguration, ocr_reader: BaseOCRReader = FitzOCRReader()):
        super().__init__(config)
        logging.basicConfig(level=logging.INFO)
        self.ocr_reader = ocr_reader

    def transform(self, firm_crd_number: int) -> Dict[str, Any]:
        """Extract text from a PDF file and parse relevant information"""
        pdf_path = f"{self.config.input_dir}/{firm_crd_number}.pdf"
        self.logger.info(f"Transforming PDF content at {pdf_path}...")

        full_text = self.ocr_reader.read(pdf_path)

        return {
            'firm_crd_nb': self._extract_firm_crd_number(full_text),
            'sec_nb': self._extract_sec_number(full_text),
            'business_name': self._extract_business_name(full_text),
            'full_legal_name': self._extract_full_legal_name(full_text),
            'address': self._extract_address(full_text),
            'phone_number': self._extract_phone_number(full_text),
            'compensation_arrangements': self._extract_compensation_arrangements(pdf_path),
            'employee_count': self._extract_investment_advisory_employee_count(full_text),
            'client_types': self._extract_client_types(full_text),
            'private_funds': self._extract_private_funds_and_ids(full_text),
            'signatory': self._extract_signatory(full_text)
        }

    def _extract_with_regex(self, text: str, pattern: str) -> Optional[str]:
        """Extract information using regex and return the first capturing group"""
        match = re.search(pattern, text, re.IGNORECASE)
        if match:
            return match.group(1).strip()
        self.logger.debug(f"No match found for pattern: {pattern}")
        return None

    def _extract_firm_crd_number(self, text: str) -> Optional[int]:
        """Extract firm CRD number"""
        patterns = [
            r'CRD Number:\s*(\d+)',
            r'your CRD number:\s*(\d+)',
            r'CRD number:\s*(\d+)',
            r'CRD Number\D*(\d+)'
        ]

        for pattern in patterns:
            result = self._extract_with_regex(text, pattern)
            if result:
                try:
                    return int(result)
                except ValueError:
                    self.logger.warning("Failed to convert extracted firm CRD number to int.")
                    return None

        self.logger.warning("Failed to extract Firm CRD Number.")
        return None

    def _extract_sec_number(self, text: str) -> Optional[str]:
        """Extract SEC file number"""
        patterns = [
            r'SEC file number:\s*([\d-]+)',
            r'SEC File Number:\s*([\d-]+)',
            r'you are registered with the SEC as an investment adviser, your SEC file number:\s*([\d-]+)',
            r'your SEC file number:\s*([\d-]+)'
        ]

        for pattern in patterns:
            result = self._extract_with_regex(text, pattern)
            if result:
                return result

        self.logger.warning("Failed to extract SEC Number.")
        return None

    def _extract_business_name(self, text: str) -> Optional[str]:
        """Extract primary business name"""
        patterns = [
            r'Primary Business Name:\s*([^\n]+)',
            r'Name under which you primarily conduct your advisory business[^:]*:\s*([^\n]+)'
        ]

        for pattern in patterns:
            result = self._extract_with_regex(text, pattern)
            if result:
                return result

        self.logger.warning("Failed to extract Business Name.")
        return None

    def _extract_full_legal_name(self, text: str) -> Optional[str]:
        """Extract full legal name"""
        patterns = [
            r'Your full legal name.*?:\s*([^\n]+)',
            r'full legal name.*?:\s*([^\n]+)',
            r'A\.\s+Your full legal name.*?:\s*([^\n]+)'
        ]

        for pattern in patterns:
            result = self._extract_with_regex(text, pattern)
            if result:
                return result

        self.logger.warning("Failed to extract Full Legal Name.")
        return None

    def _extract_phone_number(self, text: str) -> Optional[str]:
        """Extract phone number"""
        patterns = [
            r'Telephone Number:\s*([\d\- \(\)\+]+)',
            r'Telephone number at this location:\s*([\d\- \(\)\+]+)'
        ]

        for pattern in patterns:
            result = self._extract_with_regex(text, pattern)
            if result:
                return result

        self.logger.warning("Failed to extract Phone Number.")
        return None

    def _extract_investment_advisory_employee_count(self, text: str) -> Optional[int]:
        """Extract the number of employees performing investment advisory functions"""
        pattern = r'Approximately how many of the employees reported in 5\.A\. perform investment advisory functions.*?\n(\d+)'
        match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)

        if match:
            try:
                return int(match.group(1).strip())
            except ValueError:
                self.logger.warning("Failed to convert extracted employee count to int.")
                return None

        self.logger.warning("Investment advisory employee count not found.")
        return None

    def _extract_address(self, text: str) -> Optional[str]:
        """Extract and format address from text input"""
        components = {}
        patterns = {
            'street1': r'Number and Street 1:\s*(.+?)(?:\s*Number and Street 2:|$)',
            'street2': r'Number and Street 2:\s*(.+?)(?:\s*City:|$)',
            'city': r'City:\s*(.+?)(?:\s*State:|$)',
            'state': r'State:\s*(.+?)(?:\s*Country:|$)',
            'country': r'Country:\s*(.+?)(?:\s*ZIP\+4/Postal Code:|$)',
            'postal_code': r'ZIP\+4/Postal Code:\s*(.+?)(?:\s*If this address is|$)'
        }

        for key, pattern in patterns.items():
            match = re.search(pattern, text, re.DOTALL)
            components[key] = match.group(1).strip() if match and match.group(1).strip() else None

        address_parts = []

        street_parts = []
        if components['street1']:
            street_parts.append(components['street1'])
        if components['street2']:
            street_parts.append(components['street2'])
        if street_parts:
            address_parts.append(', '.join(street_parts))

        if components['country'] == 'United States':
            city_state = []
            if components['city']:
                city_state.append(components['city'])
            if components['state']:
                city_state.append(components['state'])
            if city_state:
                address_parts.append(', '.join(city_state))
        elif components['city']:
            address_parts.append(components['city'])

        if components['country'] == 'United States' and components['postal_code']:
            if address_parts and components['state']:
                last_part = address_parts[-1]
                address_parts[-1] = f"{last_part} {components['postal_code']}"
            else:
                address_parts.append(components['postal_code'])
        elif components['postal_code']:
            address_parts.append(components['postal_code'])

        if components['country']:
            address_parts.append(components['country'])

        if not address_parts:
            self.logger.warning("No valid address components found.")
            return None

        return ', '.join(address_parts)

    def _is_checkbox_checked(self, page, rect, threshold=150):
        """Determine if a checkbox is checked based on pixel intensity."""
        pix = page.get_pixmap(matrix=fitz.Matrix(2, 2), clip=rect, colorspace="gray")  # High-res grayscale
        img = Image.frombytes("L", [pix.width, pix.height], pix.samples)  # Convert to PIL
        img_array = np.array(img)  # Convert to NumPy

        # Determine black pixel ratio
        return np.sum(img_array < threshold) / img_array.size > 0.12  # Adjust sensitivity if needed

    def _extract_compensation_arrangements(self, pdf_path: str) -> List[str]:
        doc = fitz.open(pdf_path)
        page = None
        for page_num in range(len(doc)):
            target_page = doc[page_num]
            found_rects = target_page.search_for("Compensation Arrangements")
            if found_rects:
                page = target_page
                break  # Stop at the first occurrence

        compensation_options = {
            "A percentage of assets under your management": "(1)",
            "Hourly charges": "(2)",
            "Subscription fees (for a newsletter or periodical)": "(3)",
            "Fixed fees (other than subscription fees)": "(4)",
            "Commissions": "(5)",
            "Performance-based fees": "(6)",
            "Other (specify):": "(7)"
        }

        checked_options = []

        for label, option_number in compensation_options.items():
            rects = page.search_for(label)
            if not rects:
                continue

            label_rect = rects[0]
            checkbox_rect = fitz.Rect(label_rect.x0 - 44, label_rect.y0 + 1, label_rect.x0 - 38, label_rect.y1 - 1)

            # Draw rectangles to visualize
            # page.draw_rect(label_rect, color=(1, 0, 0), width=0.5)  # Red for labels
            # page.draw_rect(checkbox_rect, color=(0, 1, 0), width=1.0)  # Green for checkboxes

            if self._is_checkbox_checked(page, checkbox_rect):
                checked_options.append(label)

        # for debugging pixel map
        # pix = page.get_pixmap()
        # img = Image.open(io.BytesIO(pix.tobytes()))
        # img.save("test.png")

        return checked_options

    def _extract_client_types(self, text: str) -> dict:
        """Extract types of clients served and their AUM"""
        section_pattern = r'Type of Client.*?under Management(.*?)(?:Item|Section|\Z)'
        section_match = re.search(section_pattern, text, re.DOTALL)

        if not section_match:
            section_pattern = r'Type of Client(.*?)(?:Item|\Z)'
            section_match = re.search(section_pattern, text, re.DOTALL)

            if not section_match:
                self.logger.warning("Client types section not found.")
                return {}

        client_section = section_match.group(1)
        aum_details = {}

        client_mapping = {
            'a': 'Individuals',
            'b': 'High Net Worth Individuals',
            'c': 'Banking Institutions',
            'd': 'Investment Companies',
            'e': 'Business Development Companies',
            'f': 'Pooled Investment Vehicles',
            'g': 'Pension Plans',
            'h': 'Charitable Organizations',
            'i': 'State Entities',
            'j': 'Other Advisers',
            'k': 'Insurance Companies',
            'l': 'Sovereign Wealth Funds',
            'm': 'Corporations',
            'n': 'Other'
        }

        letter_sequence = list(client_mapping.keys())

        for i, letter in enumerate(letter_sequence):
            client_type = client_mapping[letter]
            next_pattern = rf'\({letter_sequence[i + 1]}\)' if i < len(letter_sequence) - 1 else r'Item|\Z'
            client_pattern = rf'\({letter}\)(.*?)(?:{next_pattern})'
            client_match = re.search(client_pattern, client_section, re.DOTALL)

            if client_match:
                line_text = client_match.group(1)

                if letter == 'n' and ':' in line_text:
                    other_desc_match = re.search(r'Other:\s*([^\n$]+)', line_text)
                    if other_desc_match:
                        other_desc = other_desc_match.group(1).strip()
                        client_type = f"{client_type}: {other_desc}"

                aum_value = 0
                aum_match = re.search(r'\$\s*([\d,]+)', line_text)
                if aum_match:
                    aum_str = aum_match.group(1).replace(',', '')
                    if aum_str:
                        aum_value = int(aum_str)

                if aum_value > 0:
                    aum_details[client_type] = aum_value

        return aum_details

    def _extract_private_funds_and_ids(self, text: str) -> dict:
        """Extract private fund names and their identification numbers"""
        pattern = r"Name of the private fund:\s*([^\n]+?)\s*\n\s*\(b\) Private fund identification number:\s*\(include the \"805-\" prefix also\)\s*(805-\d+)"
        matches = re.findall(pattern, text)

        results = {}
        for fund_name, fund_id in matches:
            clean_name = fund_name.strip()
            clean_id = fund_id.strip()
            results[clean_name] = clean_id

        return results

    def _extract_signatory(self, text: str) -> Optional[str]:
        """Extract signatory information from the form"""
        patterns = [
            r'Printed Name:\s*\n([^\n]+)',
            r'Printed Name:\s*([^:\n]+?)(?=\s*\n)',
            r'Signature:\s*\n([^\n]+)',
            r'Signature:\s*([^:\n]+?)(?=\s*\n)'
        ]

        for pattern in patterns:
            result = self._extract_with_regex(text, pattern)
            if result:
                cleaned_result = result.strip()
                if "Title:" in cleaned_result:
                    cleaned_result = cleaned_result.split("Title:")[0].strip()

                if cleaned_result and not cleaned_result.isspace():
                    return cleaned_result

        self.logger.warning("Failed to extract Signatory.")
        return None


In [16]:
"""
Fund analysis transformer for MLP pipeline
"""

from typing import Dict

import pandas as pd


class FundAnalysisTransformer(BaseTransformer):
    """Transformer for analyzing fund performance data"""

    def __init__(self, config: RunConfiguration):
        super().__init__(config)

    def transform(self, data: pd.DataFrame) -> Dict[str, pd.DataFrame]:
        """
        Transform and analyze the joined data from database

        Args:
            data: DataFrame from query_all() with joined table data

        Returns:
            Dictionary of DataFrames with analysis results
        """
        self.logger.info("Starting fund analysis transformation...")

        # Clean the data and prepare for analysis
        cleaned_data = self._clean_data(data)

        # Perform analyses
        results = {}
        results["top_funds"] = self.get_top_funds(cleaned_data)
        results["aum_by_client_type"] = self.analyze_client_distribution(cleaned_data)

        return results

    def _clean_data(self, data: pd.DataFrame) -> pd.DataFrame:
        """Clean and prepare the data for analysis"""
        # Make a copy to avoid modifying the original
        df = data.copy()

        # Convert AUM values to numeric
        if 'aum_value' in df.columns:
            df['aum_value'] = pd.to_numeric(
                df['aum_value'].astype(str).str.replace(r'[^\d.]', '', regex=True),
                errors='coerce'
            )

        return df

    def get_top_funds(self, data: pd.DataFrame, top_n: int = 10) -> pd.DataFrame:
        """
        Identify top-performing funds based on AUM values

        Args:
            data: DataFrame with fund data
            top_n: Number of top funds to return

        Returns:
            DataFrame with top funds by AUM
        """
        self.logger.info(f"Identifying top {top_n} funds by AUM...")

        # Group by firm and fund
        fund_data = data.dropna(subset=['fund_name', 'fund_id'])

        # Calculate total AUM per firm
        firm_aum = {}
        for firm_crd in fund_data['firm_crd_nb'].unique():
            firm_rows = data[data['firm_crd_nb'] == firm_crd]
            firm_aum[firm_crd] = firm_rows['aum_value'].sum()

        # Create fund performance DataFrame
        fund_metrics = []
        for (firm_crd, fund_name), group in fund_data.groupby(['firm_crd_nb', 'fund_name']):
            fund_metrics.append({
                'firm_crd_nb': firm_crd,
                'business_name': group['business_name'].iloc[0],
                'fund_name': fund_name,
                'fund_id': group['fund_id'].iloc[0],
                'total_firm_aum': firm_aum.get(firm_crd, 0)
            })

        # Convert to DataFrame and sort by AUM
        performance_df = pd.DataFrame(fund_metrics)
        return performance_df.sort_values('total_firm_aum', ascending=False).head(top_n)

    def analyze_client_distribution(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Analyze how AUM is distributed across different client types

        Args:
            data: DataFrame with client type data

        Returns:
            DataFrame with AUM by client type
        """
        self.logger.info("Analyzing AUM distribution by client type...")

        # Filter for rows with client type information
        client_data = data.dropna(subset=['client_type'])
        client_data = client_data[client_data['client_type'] != 'None']

        if client_data.empty:
            return pd.DataFrame()

        # Group by client type and calculate total AUM
        aum_by_type = client_data.groupby(['client_type'])['aum_value'].sum().reset_index()
        aum_by_type = aum_by_type.sort_values('aum_value', ascending=False)

        # Calculate percentage of total AUM
        total_aum = aum_by_type['aum_value'].sum()
        aum_by_type['percentage'] = (aum_by_type['aum_value'] / total_aum * 100).round(2)

        return aum_by_type


In [17]:
"""
Database manager for MLP pipeline
"""

import logging
from abc import abstractmethod


class DatabaseManager:
    """Manager for SQLite database operations"""

    def __init__(self, db_path: str):
        self.db_path = db_path
        self.logger = logging.getLogger(__name__)

    @abstractmethod
    def setup(self):
        pass


In [18]:
"""
Database manager for MLP pipeline with improved schema design
"""

import sqlite3
from typing import Dict, Any, Union, List

import pandas as pd


class DatabaseSink(DatabaseManager):
    """Manager for SQLite database operations with improved schema for nested data"""

    def __init__(self, config: RunConfiguration):
        super().__init__(config.db_path)
        self.setup()

    def setup(self):
        """Initialize the database schema with proper relationships"""
        self.logger.info(f"Initializing database at {self.db_path}")

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()

            # Create the main firms table
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS firms (
                firm_crd_nb INTEGER PRIMARY KEY,
                sec_nb TEXT,
                business_name TEXT,
                full_legal_name TEXT,
                address TEXT,
                phone_number TEXT,
                employee_count INTEGER,
                signatory TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            ''')

            # Create compensation arrangements table with FK relationship
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS compensation_arrangements (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                firm_crd_nb INTEGER NOT NULL,
                arrangement TEXT NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (firm_crd_nb) REFERENCES firms(firm_crd_nb) ON DELETE CASCADE
            )
            ''')

            # Create client types table with FK relationship
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS client_types (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                firm_crd_nb INTEGER NOT NULL,
                client_type TEXT NOT NULL,
                aum_value INTEGER NOT NULL DEFAULT 0,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (firm_crd_nb) REFERENCES firms(firm_crd_nb) ON DELETE CASCADE
            )
            ''')

            # Create private funds table with FK relationship
            cursor.execute('''
            CREATE TABLE IF NOT EXISTS private_funds (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                firm_crd_nb INTEGER NOT NULL,
                fund_name TEXT NOT NULL,
                fund_id TEXT NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (firm_crd_nb) REFERENCES firms(firm_crd_nb) ON DELETE CASCADE,
                UNIQUE(firm_crd_nb, fund_id)
            )
            ''')

            # Create indexes for better query performance
            cursor.execute(
                'CREATE INDEX IF NOT EXISTS idx_compensation_firm_crd ON compensation_arrangements(firm_crd_nb)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_client_types_firm_crd ON client_types(firm_crd_nb)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_private_funds_firm_crd ON private_funds(firm_crd_nb)')

            # Enable foreign keys
            cursor.execute('PRAGMA foreign_keys = ON')

            conn.commit()

    def write(self, firm_data: Dict[int, Dict[str, Any]]):
        """Store firm data in the database with proper handling of nested fields

        Args:
            firm_data: Dictionary with firm CRD as key and firm data as value
        """
        self.logger.info(f"Storing data for {len(firm_data)} firms")

        with sqlite3.connect(self.db_path) as conn:
            # Enable foreign keys
            conn.execute('PRAGMA foreign_keys = ON')
            cursor = conn.cursor()

            for firm_crd, data in firm_data.items():
                # Start a transaction
                cursor.execute('BEGIN TRANSACTION')
                try:
                    # Insert or update firm data in the firms table
                    cursor.execute('''
                    INSERT INTO firms
                    (firm_crd_nb, sec_nb, business_name, full_legal_name, address, phone_number, employee_count, signatory, updated_at)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
                    ON CONFLICT(firm_crd_nb) DO UPDATE SET
                        sec_nb = excluded.sec_nb,
                        business_name = excluded.business_name,
                        full_legal_name = excluded.full_legal_name,
                        address = excluded.address,
                        phone_number = excluded.phone_number,
                        employee_count = excluded.employee_count,
                        signatory = excluded.signatory,
                        updated_at = CURRENT_TIMESTAMP
                    ''', (
                        firm_crd,
                        data.get('sec_nb'),
                        data.get('business_name'),
                        data.get('full_legal_name'),
                        data.get('address'),
                        data.get('phone_number'),
                        data.get('employee_count'),
                        data.get('signatory')
                    ))

                    # Handle compensation arrangements (expects a string that can be split)
                    self._write_compensation_arrangements(cursor, firm_crd, data.get('compensation_arrangements', ''))

                    # Handle client types (expects a dictionary)
                    self._write_client_types(cursor, firm_crd, data.get('client_types', {}))

                    # Handle private funds (expects a dictionary of fund name -> fund id)
                    self._write_private_funds(cursor, firm_crd, data.get('private_funds', {}))

                    # Commit the transaction
                    cursor.execute('COMMIT')
                except Exception as e:
                    # Rollback in case of error
                    cursor.execute('ROLLBACK')
                    self.logger.error(f"Error storing data for firm {firm_crd}: {str(e)}")
                    raise e

    def _write_compensation_arrangements(self, cursor: sqlite3.Cursor, firm_crd: str, arrangements: Union[str, list]):
        """Store compensation arrangements in the database

        Args:
            cursor: SQLite cursor
            firm_crd: Firm CRD number
            arrangements: List of compensation arrangements or a comma-separated string
        """
        # Delete existing arrangements for this firm
        cursor.execute('DELETE FROM compensation_arrangements WHERE firm_crd_nb = ?', (firm_crd,))

        # Ensure arrangements are always a list
        if isinstance(arrangements, list):
            arrangement_list = [arr.strip() for arr in arrangements if isinstance(arr, str)]
        elif isinstance(arrangements, str):
            arrangement_list = [arr.strip() for arr in arrangements.split(',') if arr.strip()]
        else:
            arrangement_list = []

        # Insert into DB
        for arrangement in arrangement_list:
            cursor.execute(
                '''INSERT INTO compensation_arrangements (firm_crd_nb, arrangement) VALUES (?, ?)''',
                (firm_crd, arrangement)
            )

    def _write_client_types(self, cursor: sqlite3.Cursor, firm_crd: str, client_types: Dict[str, Union[int, str]]):
        """Store client types and AUM values in the database

        Args:
            cursor: SQLite cursor
            firm_crd: Firm CRD number
            client_types: Dictionary of client type -> AUM value
        """
        # Delete existing client types for this firm
        cursor.execute('DELETE FROM client_types WHERE firm_crd_nb = ?', (firm_crd,))

        # Insert new client types
        for client_type, aum_value in client_types.items():
            # Convert AUM value to integer if it's not already
            if isinstance(aum_value, str):
                try:
                    aum_value = int(aum_value.replace(',', ''))
                except ValueError:
                    aum_value = 0

            cursor.execute('''
            INSERT INTO client_types (firm_crd_nb, client_type, aum_value)
            VALUES (?, ?, ?)
            ''', (firm_crd, client_type, aum_value))

    def _write_private_funds(self, cursor: sqlite3.Cursor, firm_crd: str, private_funds: Dict[str, str]):
        """Store private funds in the database

        Args:
            cursor: SQLite cursor
            firm_crd: Firm CRD number
            private_funds: Dictionary of fund name -> fund ID
        """
        # Delete existing private funds for this firm
        cursor.execute('DELETE FROM private_funds WHERE firm_crd_nb = ?', (firm_crd,))

        # Insert new private funds
        for fund_name, fund_id in private_funds.items():
            cursor.execute('''
            INSERT INTO private_funds (firm_crd_nb, fund_name, fund_id)
            VALUES (?, ?, ?)
            ''', (firm_crd, fund_name, fund_id))

    def query_all(self) -> pd.DataFrame:
        """Query all data for reporting with SQL joins returning normalized data"""
        self.logger.info("Querying data from all tables...")

        with sqlite3.connect(self.db_path) as conn:
            query = '''
            SELECT
                f.firm_crd_nb,
                f.sec_nb,
                f.business_name,
                f.full_legal_name,
                f.address,
                f.phone_number,
                f.employee_count,
                f.signatory,
                f.created_at,
                f.updated_at,
                ca.arrangement,
                ct.client_type,
                ct.aum_value,
                pf.fund_name,
                pf.fund_id
            FROM firms f
            LEFT JOIN compensation_arrangements ca ON f.firm_crd_nb = ca.firm_crd_nb
            LEFT JOIN client_types ct ON f.firm_crd_nb = ct.firm_crd_nb
            LEFT JOIN private_funds pf ON f.firm_crd_nb = pf.firm_crd_nb
            ORDER BY f.firm_crd_nb
            '''

            result = pd.read_sql_query(query, conn)

            # Fill NaN values for string columns only
            for col in result.select_dtypes(include=['object']).columns:
                result[col].fillna('None', inplace=True)

            return result

    def fetch_compensation(self, firm_crd_nb: int) -> List[str]:
        """Fetch compensation arrangements for a firm"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT arrangement FROM compensation_arrangements WHERE firm_crd_nb = ?", (firm_crd_nb,))
            return [row[0] for row in cursor.fetchall()]

    def fetch_client_types(self, firm_crd_nb: int) -> Dict[str, int]:
        """Fetch client types and AUM"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT client_type, aum_value FROM client_types WHERE firm_crd_nb = ?", (firm_crd_nb,))
            return {row[0]: row[1] for row in cursor.fetchall()}

    def fetch_private_funds(self, firm_crd_nb: int) -> List[Dict[str, str]]:
        """Fetch private funds"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT fund_name, fund_id FROM private_funds WHERE firm_crd_nb = ?", (firm_crd_nb,))
            return [{"name": row[0], "identification_number": row[1]} for row in cursor.fetchall()]


In [37]:
from typing import Dict

import pandas as pd


def prepare_tables_for_excel(df: pd.DataFrame) -> Dict[str, pd.DataFrame]:
    """
    Split the joined query result into separate tables for Excel output

    Args:
        df: DataFrame output from joining all tables together

    Returns:
        Dictionary mapping sheet names to DataFrames for each table
    """

    if df.empty:
        return {"Empty_Result": pd.DataFrame()}

    # Create separate dataframes for each table
    result = {}

    # Firms table (main table)
    firms_cols = [
        'firm_crd_nb', 'sec_nb', 'business_name', 'full_legal_name',
        'address', 'phone_number', 'employee_count', 'signatory',
        'created_at', 'updated_at'
    ]
    firms_df = df[firms_cols].drop_duplicates(subset=['firm_crd_nb'])
    result['Firms'] = firms_df

    # Compensation arrangements table
    comp_cols = ['firm_crd_nb', 'business_name', 'arrangement']
    comp_df = df[comp_cols].dropna(subset=['arrangement'])
    comp_df = comp_df[comp_df['arrangement'] != 'None'].drop_duplicates()
    if not comp_df.empty:
        result['Compensation_Arrangements'] = comp_df

    # Client types table
    client_cols = ['firm_crd_nb', 'business_name', 'client_type', 'aum_value']
    client_df = df[client_cols].dropna(subset=['client_type'])
    client_df = client_df[client_df['client_type'] != 'None'].drop_duplicates()
    if not client_df.empty:
        result['Client_Types'] = client_df

    # Private funds table
    fund_cols = ['firm_crd_nb', 'business_name', 'fund_name', 'fund_id']
    fund_df = df[fund_cols].dropna(subset=['fund_name'])
    fund_df = fund_df[fund_df['fund_name'] != 'None'].drop_duplicates()
    if not fund_df.empty:
        result['Private_Funds'] = fund_df

    # Generate summary table
    summary_data = []
    for firm_crd in firms_df['firm_crd_nb'].unique():
        firm_name = firms_df[firms_df['firm_crd_nb'] == firm_crd]['business_name'].iloc[0]

        # Count metrics for this firm
        comp_count = len(comp_df[comp_df['firm_crd_nb'] == firm_crd]) if 'Compensation_Arrangements' in result else 0
        client_count = len(client_df[client_df['firm_crd_nb'] == firm_crd]) if 'Client_Types' in result else 0
        fund_count = len(fund_df[fund_df['firm_crd_nb'] == firm_crd]) if 'Private_Funds' in result else 0

        # Calculate total AUM if available
        total_aum = 0
        if 'Client_Types' in result:
            firm_clients = client_df[client_df['firm_crd_nb'] == firm_crd]
            if not firm_clients.empty:
                # Convert to numeric, coercing errors to NaN, then sum
                total_aum = pd.to_numeric(firm_clients['aum_value'], errors='coerce').fillna(0).sum()

        summary_data.append({
            'firm_crd_nb': firm_crd,
            'business_name': firm_name,
            'total_arrangements': comp_count,
            'total_client_types': client_count,
            'total_funds': fund_count,
            'total_aum': total_aum
        })

    if summary_data:
        result['Summary'] = pd.DataFrame(summary_data)

    return result


In [24]:
"""
Excel writer for MLP pipeline
"""

from datetime import datetime
from typing import Dict

import pandas as pd


class ExcelWriter:
    """Excel writer for MLP pipeline that writes tables to separate sheets"""

    def __init__(self, config: RunConfiguration):
        self.config = config
        self.logger = logging.getLogger(__name__)

    def write(self, data_dict: Dict[str, pd.DataFrame]) -> str:
        """
        Write multiple dataframes to separate sheets in an Excel file

        Args:
            data_dict: Dictionary mapping sheet names to pandas DataFrames

        Returns:
            Path to the created Excel file
        """
        self.logger.info(f"Writing {len(data_dict)} tables to Excel file...")

        # Ensure output directory exists
        os.makedirs(self.config.output_dir, exist_ok=True)

        # Format timestamp for filename
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        output_path = os.path.join(self.config.output_dir, f'sec_adv_report_{timestamp}.xlsx')

        # Create Excel writer
        with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
            # Process each dataframe
            for sheet_name, df in data_dict.items():
                # Skip empty dataframes
                if df.empty:
                    continue

                # Format specific columns
                df_formatted = self._format_dataframe(df, sheet_name)

                # Write to Excel
                self.logger.info(f"Writing sheet: {sheet_name} with {len(df_formatted)} rows")
                df_formatted.to_excel(writer, sheet_name=sheet_name, index=False)

                # Auto-adjust columns width
                self._adjust_column_width(writer, sheet_name, df_formatted)

        self.logger.info(f"Excel report generated at {output_path}")
        return output_path

    def _format_dataframe(self, df: pd.DataFrame, sheet_name: str) -> pd.DataFrame:
        """Format specific columns based on sheet name"""
        df_copy = df.copy()

        if sheet_name == 'Client_Types' and 'aum_value' in df_copy.columns:
            # Format AUM as currency
            df_copy['aum_value'] = df_copy['aum_value'].apply(
                lambda x: f"${int(x):,}" if pd.notna(x) and x != 'None' else "$0"
            )

        # Format date columns across all tables
        for col in df_copy.columns:
            if 'date' in col.lower() or col in ('created_at', 'updated_at'):
                df_copy[col] = pd.to_datetime(df_copy[col], errors='ignore')
                if df_copy[col].dtype.kind == 'M':  # If successfully converted to datetime
                    df_copy[col] = df_copy[col].dt.strftime('%Y-%m-%d %H:%M:%S')

        return df_copy

    def _adjust_column_width(self, writer, sheet_name: str, df: pd.DataFrame):
        """Adjust column width based on content"""
        worksheet = writer.sheets[sheet_name]
        for i, col in enumerate(df.columns):
            # Calculate maximum column width
            max_length = max(
                df[col].astype(str).apply(len).max(),
                len(str(col))
            ) + 2  # Add a little extra space

            # Excel has column width limits
            col_letter = chr(65 + i) if i < 26 else chr(64 + i // 26) + chr(65 + i % 26)
            worksheet.column_dimensions[col_letter].width = min(max_length, 50)


In [19]:
import logging

from fastapi import FastAPI, HTTPException, Query


class APIService:
    """FastAPI Service for SEC ADV Data"""

    def __init__(self, config: RunConfiguration):
        self.config = config
        self.logger = logging.getLogger(__name__)
        self.db_manager = DatabaseSink(config)
        self.app = FastAPI(title="SEC ADV Data API", description="API for SEC ADV Data")
        self.setup_routes()

    def setup_routes(self):
        """Setup API routes"""

        @self.app.get("/firms", tags=["Firms"])
        async def get_firms(limit: int = Query(10, ge=1, le=100)):
            """Get all firms"""
            try:
                df = self.db_manager.query_all().head(limit)

                return df.to_dict(orient="records")
            except Exception as e:
                self.logger.error(f"Error getting firms: {str(e)}")
                raise HTTPException(status_code=500, detail=str(e))

        @self.app.get("/firms/{firm_crd_nb}", tags=["Firms"])
        async def get_firm(firm_crd_nb: int):
            """Get a specific firm by CRD number"""
            try:
                df = self.db_manager.query_all()
                firm = df[df["firm_crd_nb"] == firm_crd_nb]

                if firm.empty:
                    raise HTTPException(status_code=404, detail=f"Firm with CRD {firm_crd_nb} not found")

                return firm.iloc[0].to_dict()
            except HTTPException as e:
                raise e
            except Exception as e:
                self.logger.error(f"Error getting firm {firm_crd_nb}: {str(e)}")
                raise HTTPException(status_code=500, detail=str(e))

        @self.app.get("/compensation/{firm_crd_nb}", tags=["Compensation"])
        async def get_compensation(firm_crd_nb: int):
            """Fetch compensation arrangements for a firm"""
            try:
                data = self.db_manager.fetch_compensation(firm_crd_nb)
                if not data:
                    return {"firm_crd_nb": firm_crd_nb, "compensation": [], "message": "No compensation data found."}
                return {"firm_crd_nb": firm_crd_nb, "compensation": data}
            except Exception as e:
                self.logger.error(f"Error fetching compensation data for {firm_crd_nb}: {str(e)}")
                raise HTTPException(status_code=500, detail=str(e))

        @self.app.get("/client_types/{firm_crd_nb}", tags=["Client Types"])
        async def get_client_types(firm_crd_nb: int):
            """Fetch client types and AUM"""
            try:
                data = self.db_manager.fetch_client_types(firm_crd_nb)
                if not data:
                    return {"firm_crd_nb": firm_crd_nb, "client_types": {}, "message": "No client type data found."}
                return {"firm_crd_nb": firm_crd_nb, "client_types": data}
            except Exception as e:
                self.logger.error(f"Error fetching client types for {firm_crd_nb}: {str(e)}")
                raise HTTPException(status_code=500, detail=str(e))

        @self.app.get("/private_funds/{firm_crd_nb}", tags=["Private Funds"])
        async def get_private_funds(firm_crd_nb: int):
            """Fetch private funds for a firm"""
            try:
                data = self.db_manager.fetch_private_funds(firm_crd_nb)
                if not data:
                    return {"firm_crd_nb": firm_crd_nb, "private_funds": [], "message": "No private funds found."}
                return {"firm_crd_nb": firm_crd_nb, "private_funds": data}
            except Exception as e:
                self.logger.error(f"Error fetching private funds for {firm_crd_nb}: {str(e)}")
                raise HTTPException(status_code=500, detail=str(e))


In [33]:
import os

as_of_date = datetime(2025, 3, 17)  # Revised working dat
working_dir = os.getcwd()

# Define the input, output, and database paths relative to the script directory
input_dir = os.path.join(working_dir, 'data', 'input')
output_dir = os.path.join(working_dir, 'data', 'output')
db_path = os.path.join(working_dir, 'data', 'sec_adv.db')

# Create directories
os.makedirs(input_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)

config = RunConfiguration(
    input_dir=input_dir,
    output_dir=output_dir,
    firm_crd_numbers=[160882, 160021, 317731],
    db_path=db_path,
    raise_error=False,
    verbose=True
)

In [29]:
import logging

logger = logging.getLogger(__name__)
source = Source(config)
pdf_transformer = PdfTransformer(config)
fund_analysis_transformer = FundAnalysisTransformer(config)
db_manager = DatabaseSink(config)
excel_writer = ExcelWriter(config)
api_service = APIService(config)
logger.info("Starting MLP pipeline...")

INFO:__main__:Initializing database at /Users/kyleloomis/development/kyleloomis/mlp/data/sec_adv.db
INFO:__main__:Initializing database at /Users/kyleloomis/development/kyleloomis/mlp/data/sec_adv.db
INFO:__main__:Starting MLP pipeline...


# Task:

## 1. Download Metadata and PDFs (Estimated Time: 30-45 minutes):
Using the starter code provided, download the metadata and PDFs for the following FirmCrdNb values. Use the provided URLs to download the files and save them locally. Implement error handling and logging :
* 160882
* 160021
* 1679500

In [34]:
pdf_paths = source.run(as_of_date, config.firm_crd_numbers)
pdf_paths

INFO:__main__:Downloading metadata for 03_17_2025
INFO:__main__:Transforming SEC ADV metadata...
INFO:__main__:Downloading PDF files...
INFO:__main__:Downloaded /Users/kyleloomis/development/kyleloomis/mlp/data/input/160882.pdf
INFO:__main__:Downloaded /Users/kyleloomis/development/kyleloomis/mlp/data/input/160021.pdf
INFO:__main__:Downloaded /Users/kyleloomis/development/kyleloomis/mlp/data/input/317731.pdf


{160882: '/Users/kyleloomis/development/kyleloomis/mlp/data/input/160882.pdf',
 160021: '/Users/kyleloomis/development/kyleloomis/mlp/data/input/160021.pdf',
 317731: '/Users/kyleloomis/development/kyleloomis/mlp/data/input/317731.pdf'}

## 2. Extract and Store Information (Estimated Time: 1-1.5 hours):
* Task: Extract specific information from the downloaded PDFs and store it in a local SQLite database.
* Details: Extract fields such as FirmCrdNb, SECNb, Business Name, Full Legal Name, Address, Phone Number, Compensation Arrangements, Number of employees performing investment advisory functions, Type of Client and Amount of Regulatory Assets Under Management, Names of Private Fund and Private Fund Identification Number, and Signatory of the PDF.
* In practice, you will deal with tens of thousands of files. Your code should systematically parse the text and extract the relevant information, as scalability is an important factor.

In [35]:
firm_data = {firm_crd_number: pdf_transformer.transform(firm_crd_number) for firm_crd_number, path in
                     pdf_paths.items()}
db_manager.write(firm_data)

INFO:__main__:Transforming PDF content at /Users/kyleloomis/development/kyleloomis/mlp/data/input/160882.pdf...
INFO:__main__:Transforming PDF content at /Users/kyleloomis/development/kyleloomis/mlp/data/input/160021.pdf...
INFO:__main__:Transforming PDF content at /Users/kyleloomis/development/kyleloomis/mlp/data/input/317731.pdf...
INFO:__main__:Storing data for 3 firms


## 3. Data Transformation and Analysis (Estimated Time: 30-45 minutes):
* Task: Perform data transformation and analysis using Pandas.
* Details: Clean and transform the extracted data, and perform basic analysis such as identifying the top-performing funds based on specific criteria (e.g., assets under management).

In [36]:
df = db_manager.query_all()
analysis = fund_analysis_transformer.transform(df)
print(analysis['top_funds'])
print(analysis['aum_by_client_type'])

INFO:__main__:Querying data from all tables...
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  result[col].fillna('None', inplace=True)
INFO:__main__:Starting fund analysis transformation...
INFO:__main__:Identifying top 10 funds by AUM...
INFO:__main__:Analyzing AUM distribution by client type...


   firm_crd_nb                    business_name  \
1       160882  MARBLE BAR ASSET MANAGEMENT LLP   
2       160882  MARBLE BAR ASSET MANAGEMENT LLP   
3       160882  MARBLE BAR ASSET MANAGEMENT LLP   
5       160882  MARBLE BAR ASSET MANAGEMENT LLP   
4       160882  MARBLE BAR ASSET MANAGEMENT LLP   
6       317731            ARROW CAPITAL PTY LTD   
0       160021        THREE BRIDGES CAPITAL, LP   

                                           fund_name         fund_id  \
1                              CHELODINA MASTER FUND  805-1338780197   
2  EAM LONG-ONLY EMERGING MARKETS MASTER FUND LIM...  805-7389231579   
3                                 LEXCOR MASTER FUND  805-8547054532   
5                                  NAVAT MASTER FUND  805-5647299307   
4                           MNAV CAYMAN FUND LIMITED  805-9579883870   
6                                               None            None   
0             THREE BRIDGES EUROPE MASTER FUND, LTD.  805-7679842650   

   total_firm_

## 4. Generate Excel File (Estimated Time: 15-30 minutes):
* Task: Extract the following information from the SQLite database and output it in an **Excel file**. Keep in mind that this excel file will be ultimately used by BD recruiters who are considered non-techinical users:
* FirmCrdNb
* SECNb
* Business Name
* Full Legal Name
* Address
* Phone Number
* Compensation Arrangements
* Number of employees performing investment advisory functions, including research
* Type of Client and Amount of Regulatory Assets Under Management
* Names of Private Fund and Private Fund Identification Number
* Signatory of the PDF

In [38]:
tables = prepare_tables_for_excel(df)
excel_writer.write(tables)

INFO:__main__:Writing 5 tables to Excel file...
  df_copy[col] = pd.to_datetime(df_copy[col], errors='ignore')
  df_copy[col] = pd.to_datetime(df_copy[col], errors='ignore')
INFO:__main__:Writing sheet: Firms with 3 rows
INFO:__main__:Writing sheet: Compensation_Arrangements with 9 rows
INFO:__main__:Writing sheet: Client_Types with 6 rows
INFO:__main__:Writing sheet: Private_Funds with 6 rows
INFO:__main__:Writing sheet: Summary with 3 rows
INFO:__main__:Excel report generated at /Users/kyleloomis/development/kyleloomis/mlp/data/output/sec_adv_report_20250323_161153.xlsx


'/Users/kyleloomis/development/kyleloomis/mlp/data/output/sec_adv_report_20250323_161153.xlsx'

## 5. Scalability and Performance (Discussion - Estimated Time: 15-30 minutes):
* Task: Optimize the data pipeline for performance.
* Details: Write a brief explanation of how you would handle scalability and performance issues if the dataset were significantly larger.

I have designed the data pipeline to be broken down into 3 main parts: source, transform, and sink. In order to improve scalability and performance, I would improve each part separately, introducing new frameworks/technologies that enable the pipeline to be horizontally scaled. First, the data could be sourced (downloaded) concurrently using either multi-threading, where each thread would be used to request and save a PDF, or an async request pool, a non-blocking protocol useful for interacting with external systems. Second, the transformation layer could be broken up into multiple pieces. I would leverage better OCR tools for PDF text extraction, such as a fine-tuned version of Mistral's OCR LLM. This enables PDFs to be processed more accurately and concurrently in the cloud. The resulting text blobs could then be extracted concurrently using either a multi-threading approach, or better yet, a distributed framework like Spark. Pandas is limited to in-memory processing, whereas Spark can be deployed on a massive compute cluster to horizontally scale to support the size of the workload. Third, the database is the biggest bottleneck in the sink layer. I would replace SQLite with a cloud data warehouse such as BigQuery or Snowflake. This enables massive scalability for handling terabytes or even petabytes of data. Finally, I would tie together these pieces of technology with an orchestration framework such as Airflow, ensuring that the pipeline runs whenever new data arrives and failures are handled properly.


## 6. Integration with External Systems (Estimated Time: 1-1.5 hours):
* Task: Simulate integration with an external system using Fast APIs.
* Details: Write a Python script to fetch additional data from a mock API from your SQLite database using standard Python frameworks and data models. Create three GET and PUSH endpoints and demonstrate data quality checks and validations. Provide two tests for each call. Consider and handle potential edge cases.

In [47]:
# API and Database tests for MLP pipeline (Notebook Version)
import sqlite3
import tempfile
import pandas as pd
from fastapi.testclient import TestClient

# Create temporary database file
temp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
db_path = temp_db.name

# Set up test configuration
test_config = RunConfiguration(
    firm_crd_numbers=[160882],
    db_path=db_path,
    input_dir="./temp",
    output_dir="./temp",
    verbose=False
)

# Set up database manager
db_manager = DatabaseSink(test_config)

# Set up FastAPI client
api_service = APIService(test_config)
api_client = TestClient(api_service.app)

# Populate test database with sample data
firm_data = {
    160882: {
        'firm_crd_nb': 160882,
        "sec_nb": "801-12345",
        "business_name": "Test Firm 1",
        "full_legal_name": "Test Firm One LLC",
        "address": "123 Test St, Test City, TS 12345",
        "phone_number": "555-1234",
        "employee_count": 50,
        "signatory": "John Doe",
        "compensation_arrangements": ["Percentage of AUM", "Performance-based fees"],
        "client_types": {"Individuals": 5000000, "Corporations": 10000000},
        "private_funds": {
            "XYZ Fund": "805-123456"
        },
    }
}

db_manager.write(firm_data)

# Verify that the firm was inserted
with sqlite3.connect(db_manager.db_path) as conn:
    df = pd.read_sql_query("SELECT * FROM firms", conn)
    assert not df.empty, "Firm data was not written to the database"
    assert "160882" in df["firm_crd_nb"].astype(str).values, "Firm CRD 160882 missing"

# Store test results
test_results = {}

# --- TEST DATABASE INTEGRITY ---
print("Testing Database Integrity...")

# Test 1: Ensure required tables exist
with sqlite3.connect(db_manager.db_path) as conn:
    tables = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table'", conn)
    required_tables = ["firms", "compensation_arrangements", "client_types", "private_funds"]
    tables_exist = set(required_tables).issubset(set(tables["name"]))
    test_results["tables_exist"] = tables_exist
    assert tables_exist, f"Missing tables. Found: {tables['name'].tolist()}"
    print("✓ Required tables exist")

# Test 2: Ensure firms table contains expected data
with sqlite3.connect(db_manager.db_path) as conn:
    firms = pd.read_sql_query("SELECT * FROM firms", conn)
    firm_data_exists = len(firms) == 1 and "160882" in firms["firm_crd_nb"].astype(str).values
    test_results["firm_data_exists"] = firm_data_exists
    assert firm_data_exists, "Firm data not found in database"
    print("✓ Firm data exists")

# Test 3: Ensure compensation arrangements are stored correctly
comp_data = db_manager.fetch_compensation(160882)
comp_correct = set(comp_data) == {"Percentage of AUM", "Performance-based fees"}
test_results["compensation_correct"] = comp_correct
assert comp_correct, f"Compensation arrangements incorrect. Found: {comp_data}"
print("✓ Compensation arrangements correct")

# Test 4: Ensure client types and AUM are correctly stored
client_data = db_manager.fetch_client_types(160882)
client_correct = client_data == {"Individuals": 5000000, "Corporations": 10000000}
test_results["client_types_correct"] = client_correct
assert client_correct, f"Client types incorrect. Found: {client_data}"
print("✓ Client types correct")

# Test 5: Ensure private funds are correctly stored
fund_data = db_manager.fetch_private_funds(160882)
funds_correct = fund_data == [{"name": "XYZ Fund", "identification_number": "805-123456"}]
test_results["private_funds_correct"] = funds_correct
assert funds_correct, f"Private funds incorrect. Found: {fund_data}"
print("✓ Private funds correct")

# --- TEST API ENDPOINTS ---
print("\nTesting API Endpoints...")

# Test 6: Test GET /firms endpoint
response = api_client.get("/firms")
firms_endpoint = response.status_code == 200 and isinstance(response.json(), list)
test_results["get_firms"] = firms_endpoint
assert firms_endpoint, f"GET /firms failed. Status: {response.status_code}, Response: {response.json()}"
print("✓ GET /firms endpoint works")

# Test 7: Test GET /firms/{firm_crd_nb} endpoint
response = api_client.get("/firms/160882")
firm_by_id = response.status_code == 200 and response.json()["firm_crd_nb"] == 160882
test_results["get_firm_by_id"] = firm_by_id
assert firm_by_id, f"GET /firms/160882 failed. Status: {response.status_code}, Response: {response.json()}"
print("✓ GET /firms/{firm_crd_nb} endpoint works")

# Test 8: Test GET /firms/{firm_crd_nb} with invalid firm
response = api_client.get("/firms/999999")
nonexistent_firm = response.status_code == 404
test_results["nonexistent_firm"] = nonexistent_firm
assert nonexistent_firm, f"GET /firms/999999 should return 404. Got: {response.status_code}"
print("✓ GET /firms/{nonexistent_firm} returns 404")

# Test 9: Test GET /compensation/{firm_crd_nb}
response = api_client.get("/compensation/160882")
compensation_endpoint = (response.status_code == 200 and
                         set(response.json()["compensation"]) == {"Percentage of AUM", "Performance-based fees"})
test_results["get_compensation"] = compensation_endpoint
assert compensation_endpoint, f"GET /compensation/160882 failed. Status: {response.status_code}, Response: {response.json()}"
print("✓ GET /compensation/{firm_crd_nb} endpoint works")

# Test 10: Test GET /client_types/{firm_crd_nb}
response = api_client.get("/client_types/160882")
client_types_endpoint = (response.status_code == 200 and
                        response.json()["client_types"] == {"Individuals": 5000000, "Corporations": 10000000})
test_results["get_client_types"] = client_types_endpoint
assert client_types_endpoint, f"GET /client_types/160882 failed. Status: {response.status_code}, Response: {response.json()}"
print("✓ GET /client_types/{firm_crd_nb} endpoint works")

# Test 11: Test GET /private_funds/{firm_crd_nb}
response = api_client.get("/private_funds/160882")
private_funds_endpoint = (response.status_code == 200 and
                         response.json()["private_funds"] == [{"name": "XYZ Fund", "identification_number": "805-123456"}])
test_results["get_private_funds"] = private_funds_endpoint
assert private_funds_endpoint, f"GET /private_funds/160882 failed. Status: {response.status_code}, Response: {response.json()}"
print("✓ GET /private_funds/{firm_crd_nb} endpoint works")

# --- TEST API RESPONSE VALIDATION ---
print("\nTesting API Response Validation...")

# Test 12: Ensure firm data response structure matches expected format
response = api_client.get("/firms/160882")
firm_data = response.json()
expected_fields = {
    "firm_crd_nb",
    "sec_nb",
    "business_name",
    "full_legal_name",
    "address",
    "phone_number",
    "employee_count",
    "signatory",
}
firm_format_valid = response.status_code == 200 and expected_fields.issubset(firm_data.keys())
test_results["firm_format"] = firm_format_valid
assert firm_format_valid, f"Firm response format invalid. Missing fields: {expected_fields - set(firm_data.keys())}"
print("✓ Firm response format is valid")

# Test 13: Ensure compensation response structure is valid
response = api_client.get("/compensation/160882")
comp_format_valid = (response.status_code == 200 and
                     "compensation" in response.json() and
                     isinstance(response.json()["compensation"], list))
test_results["compensation_format"] = comp_format_valid
assert comp_format_valid, f"Compensation format invalid. Response: {response.json()}"
print("✓ Compensation response format is valid")

# Test 14: Ensure client types response format is correct
response = api_client.get("/client_types/160882")
client_format_valid = (response.status_code == 200 and
                      "client_types" in response.json() and
                      isinstance(response.json()["client_types"], dict))
test_results["client_types_format"] = client_format_valid
assert client_format_valid, f"Client types format invalid. Response: {response.json()}"
print("✓ Client types response format is valid")

# Test 15: Ensure private funds response format is valid
response = api_client.get("/private_funds/160882")
resp_json = response.json()
funds_format_valid = (response.status_code == 200 and
                     "private_funds" in resp_json and
                     isinstance(resp_json["private_funds"], list) and
                     "name" in resp_json["private_funds"][0] and
                     "identification_number" in resp_json["private_funds"][0])
test_results["private_funds_format"] = funds_format_valid
assert funds_format_valid, f"Private funds format invalid. Response: {resp_json}"
print("✓ Private funds response format is valid")

# Clean up
temp_db.close()

# Summary
all_passed = all(test_results.values())
print(f"\nTest Summary: {'All tests passed!' if all_passed else 'Some tests failed!'}")
print(f"Passed: {sum(test_results.values())}/{len(test_results)}")

if not all_passed:
    failed_tests = [test for test, result in test_results.items() if not result]
    print(f"Failed tests: {failed_tests}")

INFO:__main__:Initializing database at /var/folders/v6/1rdtx7yx2rz_g687s_jskfkh0000gn/T/tmp2qvbu6lj.db
INFO:__main__:Initializing database at /var/folders/v6/1rdtx7yx2rz_g687s_jskfkh0000gn/T/tmp2qvbu6lj.db
INFO:__main__:Storing data for 1 firms
INFO:__main__:Querying data from all tables...
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  result[col].fillna('None', inplace=True)
INFO:httpx:HTTP Request: GET http://testserver/firms "HTTP/1.1 200 OK"
INFO:__main__:Querying data from all tables...
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.


Testing Database Integrity...
✓ Required tables exist
✓ Firm data exists
✓ Compensation arrangements correct
✓ Client types correct
✓ Private funds correct

Testing API Endpoints...
✓ GET /firms endpoint works
✓ GET /firms/{firm_crd_nb} endpoint works
✓ GET /firms/{nonexistent_firm} returns 404
✓ GET /compensation/{firm_crd_nb} endpoint works
✓ GET /client_types/{firm_crd_nb} endpoint works
✓ GET /private_funds/{firm_crd_nb} endpoint works

Testing API Response Validation...
✓ Firm response format is valid
✓ Compensation response format is valid
✓ Client types response format is valid
✓ Private funds response format is valid

Test Summary: All tests passed!
Passed: 15/15


In [49]:
import uvicorn
import nest_asyncio

nest_asyncio.apply()
uvicorn.run(api_service.app, host="0.0.0.0", port=9999)

INFO:     Started server process [24945]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:9999 (Press CTRL+C to quit)
INFO:__main__:Querying data from all tables...
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-67' coro=<Server.serve() done, defined at /Users/kyleloomis/development/kyleloomis/mlp/venv/lib/python3.10/site-packages/uvicorn/server.py:68> exception=KeyboardInterrupt()>
Traceback (most recent call last):
  File "/Users/kyleloomis/development/kyleloomis/mlp/venv/lib/python3.10/site-packages/uvicorn/main.py", line 579, in run
    server.run()
  File "/Users/kyleloomis/development/kyleloomis/mlp/venv/lib/python3.10/site-packages/uvicorn/server.py", line 66, in run
    return asyncio.run(self.serve(sockets=sockets))
  File "/Users/kyleloomis/development/kyleloomis/mlp/venv/lib/python3.10/site-packages/nest_asyncio.py", line 30, in run
    return loop.run_until_comp

INFO:     127.0.0.1:52687 - "GET /firms HTTP/1.1" 200 OK


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  result[col].fillna('None', inplace=True)


INFO:     127.0.0.1:52687 - "GET /favicon.ico HTTP/1.1" 404 Not Found


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [24945]


## 7. Automated Testing and Data Quality (Discussion - Estimated Time: 15-30 minutes):
* Task: Write a brief explanation of how you would implement automated testing and ensure data quality

For automated testing, I would use pytest to create unit tests for individual components, integration tests for interactions between pipeline stages, and end-to-end tests simulating the full workflow. These tests would run automatically in a CI/CD pipeline. For data quality, I would implement validation at multiple stages: schema validation using Pydantic to enforce data structure, content validation to check for inconsistencies or errors, and statistical validation to identify outliers or anomalous patterns. For monitoring, I would implement dashboards to track key quality metrics and data lineage, with automated alerting for any quality issues. Finally, I would document data quality SLAs and regularly review quality metrics to ensure continuous improvement of both the pipeline and the testing framework.


## 8. Identify Top Performing Funds (Discussion - Estimated Time: 15-30 minutes):
* Describe how you would use the information above to identify top-performing funds.
* List any questions you would ask regarding the task.
* Specify any additional information you would need.

To identify top-performing funds, I would enhance the existing database to incorporate risk-adjusted return metrics beyond AUM, such as Sharpe ratio. The Sharpe Ratio is more useful since it indicates the reward per unit of risk taken with the strategy. The max drawdown since inception would also be useful to asses the worst historical performance.
Questions:
* How long has the fund been around?
* What is the Sharpe ratio over the life period of the fund?
* What is the benchmark (e.g. index) to compare the fund to?
* Has the fund maintained consistent performance through different market conditions?
* What is the maximum drawdown the fund has experienced?
Additional data: historical returns (ideally monthly), volatility metrics, benchmark performance data, and information about market conditions during the evaluation period.

## 9. Data Visualization (Optional - Estimated Time: 30-45 minutes (if included)):
* Task: Create visualizations using a Python library such as Matplotlib or Seaborn.
* Details: Generate plots to visualize key metrics and insights from the data.

## [NOTE] Please do not spend more than 6 hours on this task. Also, provide the time it took you to complete it.

TOTAL TIME FOR COMPLETION: ~6 hours