The file downloads and save collected pdf and other txt files from orbit.

##### Will need orbit API

In [1]:
import os
import json
import boto3
import requests
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed

In [2]:
# Authenticate with Orbit-provided keys ---
s3_client = boto3.client(
    "s3",
    aws_access_key_id="",
    aws_secret_access_key=""
)

bucket_name = "orbit-data-provider"
prefix = "clients/eagle-alpha/"
base_dir = "data/reports"
os.makedirs(base_dir, exist_ok=True)

In [None]:
# Build dictionary of existing files once
def build_existing_files(base_dir="data/reports"):
    existing = set()
    for root, _, files in os.walk(base_dir):
        for f in files:
            existing.add(os.path.join(root, f))
    return existing

existing_files = build_existing_files(base_dir)

In [None]:
# Function to download one file
def download_file(presigned_url, file_path):
    if file_path in existing_files:   # O(1) set lookup
        print(f"⏩ Skipping {file_path}, already exists")
        return file_path

    try:
        r = requests.get(presigned_url, stream=True, timeout=60)
        r.raise_for_status()
        with open(file_path, "wb") as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)
        existing_files.add(file_path)   # update the set
        print(f"Downloaded {file_path}")
        return file_path
    except Exception as e:
        print(f"❌ Failed {file_path}: {e}")
        return None

In [None]:
# Save report given JSON index (parallel for files)
def save_report(index_data, base_dir="data/reports", max_workers=4):
    company_name = index_data["company_info"][0]["company_name"]
    safe_name = company_name.replace(" ", "_").replace("/", "_")
    company_folder = os.path.join(base_dir, safe_name)
    os.makedirs(company_folder, exist_ok=True)

    report_id = index_data["report_id"]
    futures = []
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for rf in index_data.get("report_files", []):
            for s3_key in ["s3_path_file", "s3_path_pages", "s3_path_blocks"]:
                if s3_key in rf and rf[s3_key]:
                    s3_path = rf[s3_key]
                    bucket_name, key_path = s3_path.replace("s3://", "").split("/", 1)

                    presigned_url = s3_client.generate_presigned_url(
                        "get_object",
                        Params={"Bucket": bucket_name, "Key": key_path},
                        ExpiresIn=3600,
                    )

                    if s3_key.endswith("file"):
                        fname = f"{report_id}_file.pdf"
                    elif s3_key.endswith("pages"):
                        fname = f"{report_id}_pages.txt"
                    else:
                        fname = f"{report_id}_blocks.txt"

                    file_path = os.path.join(company_folder, fname)
                    futures.append(executor.submit(download_file, presigned_url, file_path))

        for f in as_completed(futures):
            results.append(f.result())

    return {
        "report_id": report_id,
        "company_name": company_name,
        "isin": index_data["company_info"][0].get("isin", [None])[0],
        "ticker": index_data["company_info"][0].get("ticker", [None])[0],
        "reported_at": index_data.get("reported_at"),
        "pdf_path": os.path.join(company_folder, f"{report_id}_file.pdf"),
        "pages_path": os.path.join(company_folder, f"{report_id}_pages.txt"),
        "blocks_path": os.path.join(company_folder, f"{report_id}_blocks.txt"),
    }

In [None]:
# Iterate over JSON indexes in S3 (serial per report, parallel per file)
def collect_all_reports(s3_client, bucket, prefix, base_dir="data/reports"):
    records = []
    continuation_token = None

    while True:
        if continuation_token:
            response = s3_client.list_objects_v2(
                Bucket=bucket, Prefix=prefix, ContinuationToken=continuation_token
            )
        else:
            response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)

        for obj in response.get("Contents", []):
            key = obj["Key"]
            if not key.endswith(".json"):
                continue

            try:
                resp = s3_client.get_object(Bucket=bucket, Key=key)
                index_data = json.loads(resp["Body"].read().decode("utf-8"))
                meta = save_report(index_data, base_dir, max_workers=4)
                if meta:
                    records.append(meta)
            except Exception as e:
                print(f"❌ Failed on {key}: {e}")

        if response.get("IsTruncated"):
            continuation_token = response["NextContinuationToken"]
        else:
            break

    return pd.DataFrame(records)

In [None]:
# Run collection
meta_df = collect_all_reports(s3_client, bucket_name, prefix, base_dir)

if not meta_df.empty:
    meta_df["reported_at"] = pd.to_datetime(meta_df["reported_at"])
    print("✅ Downloaded files and built meta_df:", meta_df.shape)
    meta_df.to_csv("meta_reports.csv", index=False)
else:
    print("No new reports to process.")