## Extracting articles metadata without downloading its content
* use keywords and industry codes discussed with Sophie
* check possibility and limits of Snapshots API

In [11]:
import requests
import json
import logging
import os
from pprint import pprint
from time import sleep, time


user_key = "LKbI9fY6ZEPU8RcJUxQHjwKbld52WGt0"

base_url = "https://api.dowjones.com"
headers = {
    "user-key": user_key,
    "Content-Type": "application/json",
    "X-API-VERSION": "3.0"
}
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

def check_response(response):
    if response.status_code in [200, 201]:
        return response.json()
    else:
        print(f"Error: {response.status_code}")
        print(response.text)
        return None

In [2]:
def create_explain(query_body):
    """
    Create an explain request to check how many documents match a query
    """
    url = f"{base_url}/extractions/documents/_explain"
    response = requests.post(url, headers=headers, data=json.dumps(query_body))
    return check_response(response)

def check_explain_status(job_id):
    """
    Check the status of an explain job
    """
    url = f"{base_url}/extractions/documents/{job_id}/_explain"
    response = requests.get(url, headers=headers)
    return check_response(response)

def wait_for_explain_completion(job_id, max_retries=100, sleep_time=10):
    """
    Wait for an explain job to complete
    """
    for i in range(max_retries):
        status = check_explain_status(job_id)
        if not status:
            return None
            
        current_state = status['data']['attributes']['current_state']
        print(f"Current state: {current_state}")
        
        if current_state == "JOB_STATE_DONE":
            return status
        
        if i < max_retries - 1:
            print(f"Waiting {sleep_time} seconds...")
            sleep(sleep_time)
    
    print("Max retries reached. Job might still be running.")
    return None

def run_explain(input_query, num_samples=5):
    explain_result = create_explain(input_query)

    if explain_result:
        print("\nExplain job created:")
        print(f"Job ID: {explain_result['data']['id']}")
        
        job_id = explain_result['data']['id']
        final_status = wait_for_explain_completion(job_id)

        print(f"\nGetting samples for job ID: {job_id}")
        if final_status and 'counts' in final_status['data']['attributes']:
            count = final_status['data']['attributes']['counts']
            print(f"\nNumber of documents matching the query: {count}")
            
            
            samples_url = f"https://api.dowjones.com/extractions/samples/{job_id}?num_samples={num_samples}"
            samples_response = requests.get(samples_url, headers=headers)
            samples = samples_response.json()
            pprint(samples,compact=True)
            return final_status, samples

In [3]:
def create_extraction(query_body):
    """
    Submit a new Snapshot Extraction job.
    Endpoint: POST /extractions/documents {@link Postman Snapshots API}  [oai_citation:4‡Postman API Platform](https://www.postman.com/dj-cse/dow-jones-apis/documentation/l9tpql6/factiva-apis?entity=request-25580757-cd7e1b1f-a2a7-4a41-87fa-adfb6c3ff58a&utm_source=chatgpt.com)
    """
    url = f"{base_url}/extractions/documents"
    logging.info("Creating extraction job...")
    resp = requests.post(url, headers=headers, json=query_body)
    data = check_response(resp)
    job_id = data['data']['id']
    logging.info(f"Extraction job created: {job_id}")
    return job_id


def check_extraction_status(job_id):
    """
    Retrieve the current state of an extraction job.
    Endpoint: GET /extractions/documents/{job_id}  [oai_citation:5‡Postman API Platform](https://www.postman.com/dj-cse/dow-jones-apis/documentation/l9tpql6/factiva-apis?entity=request-25580757-cd7e1b1f-a2a7-4a41-87fa-adfb6c3ff58a&utm_source=chatgpt.com)
    """
    url = f"{base_url}/extractions/documents/{job_id}"
    resp = requests.get(url, headers=headers)
    return check_response(resp)

def wait_for_extraction_completion(job_id, max_retries=60, initial_sleep=10):
    """
    Poll the extraction job until it reaches JOB_STATE_DONE or JOB_STATE_FAILED.
    Uses exponential back-off to avoid hammering the API.
    """
    sleep_time = initial_sleep
    for attempt in range(1, max_retries + 1):
        status = check_extraction_status(job_id)
        state = status['data']['attributes']['current_state']
        logging.info(f"[Attempt {attempt}/{max_retries}] State: {state}")
        if state == "JOB_STATE_DONE":
            return status['data']['attributes']
        if state == "JOB_STATE_FAILED":
            detail = status['errors'][0].get('detail', 'No detail provided')
            raise RuntimeError(f"Extraction job {job_id} failed: {detail}")
        logging.info(f"Sleeping for {sleep_time}s before retry...")
        sleep(sleep_time)
        sleep_time = min(sleep_time * 1.5, 300)  # cap back-off at 5 minutes
    raise TimeoutError(f"Job {job_id} did not complete after {max_retries} attempts.")

def download_files(files, snapshot_id, target_root="../../extractions"):
    """
    Stream-download each file URI into ../../extractions/{snapshot_id}/.
    Ensures atomic writes and retries on transient errors.  [oai_citation:6‡factiva-news-python.readthedocs.io](https://factiva-news-python.readthedocs.io/en/latest/factiva.news/snapshot.html?utm_source=chatgpt.com)
    """
    download_dir = os.path.abspath(os.path.join(target_root, snapshot_id))
    os.makedirs(download_dir, exist_ok=True)
    logging.info(f"Downloading {len(files)} files into '{download_dir}'")

    for file_meta in files:
        uri = file_meta["uri"]
        file_name = os.path.basename(uri)
        dest_path = os.path.join(download_dir, file_name)
        logging.info(f"-> Downloading {file_name}")

        # Attempt download with up to 3 retries
        for attempt in range(1, 4):
            try:
                with requests.get(uri, headers=headers, stream=True, timeout=120) as r:
                    r.raise_for_status()
                    with open(dest_path + ".tmp", "wb") as tmpf:
                        for chunk in r.iter_content(chunk_size=8192):
                            if chunk:
                                tmpf.write(chunk)
                # Atomic rename on success
                os.replace(dest_path + ".tmp", dest_path)
                logging.info(f"   Saved to {dest_path}")
                break
            except Exception as e:
                logging.warning(f"   Attempt {attempt}/3 failed: {e}")
                if attempt == 3:
                    logging.error(f"   Giving up on {file_name}")
                    raise
                sleep(5 * attempt)

def run_extraction(query_body):
    """
    Full end-to-end extraction: submit job, wait, download files.
    """
    job_id = create_extraction(query_body)
    attrs = wait_for_extraction_completion(job_id)
    files = attrs.get("files", [])
    if not files:
        logging.warning(f"No files found for job {job_id}")
    else:
        download_files(files, job_id)
    logging.info(f"Extraction {job_id} complete.")
    return job_id


### Making explain queries
v0: ton of keywords and industries - leads to lot of irrelevant stuff

In [4]:
industry_codes = [
    "i814", "i815", "i82", "i831", "iabls", "ialtinv", "ibnk", "ibusdev", "icaslty", 
    "ichalbk", "iclins", "icrowfd", "iextrfu", "ifinal", "ifmsoft", "igovspon", 
    "ihedge", "iibnk", "iinsurt", "iinv", "ipension", "iplastic", "ippf", "ipricr", 
    "iprivhea", "irbank", "iresinv", "isover", "iventure", "iwealth"
]

data_providers_industry_codes = ["i8395463", "ifinal", "iplastic", "i83102","i83109","iadmin"]

industry_codes.extend(data_providers_industry_codes)
industry_codes = list(set(industry_codes))  # Remove duplicates

carveout_keywords = [
    "portfolio", "strategic actions",
    "review of strategic alternatives", "divestment", "divestiture", "divest", 
    "divesting", "disposal", "non-core asset", "focus on core assets", 
    "refocus on core assets", "reduce leverage", "improve liquidity", 
    "bonds maturing", "notes maturing", "debt maturing", "rationalize costs",
    "restructuring program", "in talks to divest", "capital return",
    "special dividend",  "simplification",   
    "carve-out", "spin-off", "strategic review", "portfolio optimization",
    "asset sale", "streamline operations", "exit markets", "discontinue operations",
    "strategic disposal", "non-strategic assets", "shed assets", "optimize footprint",
    "new CEO", "management change", "leadership change", "new management team",
    #TODO: add more keywords related to legal, management, strategy
]

keyword_conditions = " OR ".join([f"body LIKE '%{keyword}%'" for keyword in carveout_keywords])

query = {
    "query": {
        "where": f"language_code='en' AND publication_datetime >= '2024-05-01 00:00:00' AND ({keyword_conditions})",
        "includes": {
            "industry_codes": industry_codes,
            "region_codes": ["eurz"]
        },
    }
}

run_explain(query)


Explain job created:
Job ID: aaa2379e-49c3-43d8-9636-17d76fbdb689
Current state: JOB_STATE_RUNNING
Waiting 10 seconds...
Current state: JOB_STATE_RUNNING
Waiting 10 seconds...
Current state: JOB_STATE_RUNNING
Waiting 10 seconds...
Current state: JOB_STATE_RUNNING
Waiting 10 seconds...
Current state: JOB_STATE_RUNNING
Waiting 10 seconds...
Current state: JOB_STATE_RUNNING
Waiting 10 seconds...
Current state: JOB_STATE_RUNNING
Waiting 10 seconds...
Current state: JOB_STATE_DONE

Getting samples for job ID: aaa2379e-49c3-43d8-9636-17d76fbdb689

Number of documents matching the query: 813808
{'data': {'attributes': {'counts': 5,
                         'current_state': 'JOB_STATE_DONE',
                         'sample': [{'an': 'OHFSEC0020240724ek7n00001',
                                     'company_codes': '',
                                     'company_codes_about': '',
                                     'company_codes_occur': '',
                                     'industry_c

({'data': {'id': 'aaa2379e-49c3-43d8-9636-17d76fbdb689',
   'type': 'explain',
   'attributes': {'counts': 813808, 'current_state': 'JOB_STATE_DONE'}},
  'links': {'self': 'https://api.dowjones.com/extractions/documents/aaa2379e-49c3-43d8-9636-17d76fbdb689/_explain'}},
 {'data': {'id': 'aaa2379e-49c3-43d8-9636-17d76fbdb689',
   'type': 'explain',
   'attributes': {'counts': 5,
    'current_state': 'JOB_STATE_DONE',
    'sample': [{'an': 'OHFSEC0020240724ek7n00001',
      'company_codes': '',
      'company_codes_about': '',
      'company_codes_occur': '',
      'industry_codes': ',i814,i81402,ibnk,ifinal,',
      'ingestion_datetime': '2024-07-24T00:30:08.000Z',
      'modification_datetime': '2024-07-24T00:30:08.000Z',
      'publication_datetime': '2024-07-23T00:00:00.000Z',
      'publisher_name': 'Brownstein & Egusa',
      'region_codes': '',
      'region_of_origin': 'EEURZ EUR HUNG ',
      'source_code': 'OHFSEC',
      'source_name': '150sec',
      'subject_codes': ',c13,cca

### Version 1: intent+action for future-oriented search

In [5]:
action_tags = [
  "%cactio%", "%cspinoff%", "%cdivest%", "%cmger%", "%crestruc%"
]
tag_clause = " OR ".join([f"subject_codes LIKE '{t}'" for t in action_tags])

# 2) Intent & action (with extra verbs)
intents = ["plan to","intend to","looking to","considering","mulling","weighing"]
actions = ["divest","sell","spin-off","dispose","carve-out"]
intent_clause = " OR ".join([f"body LIKE '%{i}%'" for i in intents])
action_clause = " OR ".join([f"body LIKE '%{a}%'" for a in actions])


where = (
"language_code='en' " 
"AND publication_datetime >= '2024-12-01 00:00:00' "
"AND region_codes LIKE '%eurz%' "
f"AND ({tag_clause}) "
f"AND ({intent_clause}) "
f"AND ({action_clause})"
)
query = {
  "query": {
    "where": where,
    "includesList":{
      "industry_codes": ["2ce88edb-3f5e-43c5-bf4b-48eb22624ff1"],
    },
  }
}

status, samples = run_explain(query)


Explain job created:
Job ID: 50e56acc-7ce5-4052-bfb4-999a1e8f1375
Current state: JOB_STATE_RUNNING
Waiting 10 seconds...
Current state: JOB_STATE_RUNNING
Waiting 10 seconds...
Current state: JOB_STATE_DONE

Getting samples for job ID: 50e56acc-7ce5-4052-bfb4-999a1e8f1375

Number of documents matching the query: 2911
{'data': {'attributes': {'counts': 5,
                         'current_state': 'JOB_STATE_DONE',
                         'sample': [{'an': 'ACWIRE0020241213ekcc000gq',
                                     'company_codes': ',estind,ftamcg,ftamcg,ftamcg,linkd,onlnfr,pkxwks,rzlefe,rzlefe,seexc,sg,sg,sg,twnit,',
                                     'company_codes_about': ',sg,ftamcg,',
                                     'company_codes_occur': ',sg,rzlefe,ftamcg,',
                                     'industry_codes': ',i25,i814,i81402,ibasicm,ibnk,ifinal,iibnk,',
                                     'ingestion_datetime': '2024-12-13T01:13:35.000Z',
                       

### Writing extraction query and getting the data! **DANGER - do not execute due to limits**
Dangerous cells will be commented out

In [6]:
job_id = create_extraction(query)

2025-04-29 12:27:54 [INFO] Creating extraction job...
2025-04-29 12:28:03 [INFO] Extraction job created: dj-synhub-extraction-lkbi9fy6zepu8rcjuxqhjwkbld52wgt0-l0fy7lkzhf


In [7]:
attrs = wait_for_extraction_completion(job_id)


2025-04-29 12:28:18 [INFO] [Attempt 1/60] State: JOB_STATE_RUNNING
2025-04-29 12:28:18 [INFO] Sleeping for 10s before retry...
2025-04-29 12:28:28 [INFO] [Attempt 2/60] State: JOB_STATE_RUNNING
2025-04-29 12:28:28 [INFO] Sleeping for 15.0s before retry...
2025-04-29 12:28:44 [INFO] [Attempt 3/60] State: JOB_STATE_RUNNING
2025-04-29 12:28:44 [INFO] Sleeping for 22.5s before retry...
2025-04-29 12:29:07 [INFO] [Attempt 4/60] State: JOB_STATE_RUNNING
2025-04-29 12:29:07 [INFO] Sleeping for 33.75s before retry...
2025-04-29 12:29:41 [INFO] [Attempt 5/60] State: JOB_STATE_RUNNING
2025-04-29 12:29:41 [INFO] Sleeping for 50.625s before retry...
2025-04-29 12:30:33 [INFO] [Attempt 6/60] State: JOB_STATE_DONE


In [12]:
files = attrs.get("files", [])
download_files(files, job_id)
logging.info(f"Extraction {job_id} complete.")

2025-04-29 12:34:16 [INFO] Downloading 1 files into '/home/alk/Projects/carveouts_poc/extractions/dj-synhub-extraction-lkbi9fy6zepu8rcjuxqhjwkbld52wgt0-l0fy7lkzhf'
2025-04-29 12:34:16 [INFO] -> Downloading part-000000000000.avro
2025-04-29 12:34:25 [INFO]    Saved to /home/alk/Projects/carveouts_poc/extractions/dj-synhub-extraction-lkbi9fy6zepu8rcjuxqhjwkbld52wgt0-l0fy7lkzhf/part-000000000000.avro
2025-04-29 12:34:25 [INFO] Extraction dj-synhub-extraction-lkbi9fy6zepu8rcjuxqhjwkbld52wgt0-l0fy7lkzhf complete.


## Reading .avro file and converting it to csv

In [None]:
from fastavro import reader
import pandas as pd

def read_avro_file(path):
    """
    Yield each record (as a dict) from an Avro file.
    """
    with open(path, 'rb') as fo:
        avro_reader = reader(fo)
        for record in avro_reader:
            yield record

def records2df(records):
    """
    Convert a list of records to a pandas DataFrame.
    """
    df = pd.DataFrame(records)
    return df
# Usage example
file_path = '../../extractions/dj-synhub-extraction-lkbi9fy6zepu8rcjuxqhjwkbld52wgt0-l0fy7lkzhf/part-000000000000.avro'
records = list(read_avro_file(file_path))
print(f"Read {len(records)} records.")

records_df = pd.DataFrame.from_records(records)
records_df.to_csv('../../extractions/dj-synhub-extraction-lkbi9fy6zepu8rcjuxqhjwkbld52wgt0-l0fy7lkzhf/result.csv', index=False)

Read 2911 records.


### Viewing samples

In [None]:
explain_job_id = "fc06f298-dba6-4002-b4e8-06819322f34b"
samples_url = f"https://api.dowjones.com/extractions/samples/{explain_job_id}?num_samples=5"
samples_response = requests.get(samples_url, headers=headers)
samples = samples_response.json()

In [None]:
samples

{'data': {'id': 'fc06f298-dba6-4002-b4e8-06819322f34b',
  'type': 'explain',
  'attributes': {'counts': 5,
   'current_state': 'JOB_STATE_DONE',
   'sample': [{'an': 'OHFSEC0020240524ek5m00001',
     'company_codes': ',omniho,omniho,',
     'company_codes_about': '',
     'company_codes_occur': ',omniho,',
     'industry_codes': ',i41,i412,i4122,i4221,i4222,icnp,ifood,',
     'ingestion_datetime': '2024-05-24T00:30:29.000Z',
     'modification_datetime': '2024-05-24T00:30:29.000Z',
     'publication_datetime': '2024-05-22T00:00:00.000Z',
     'publisher_name': 'Brownstein & Egusa',
     'region_codes': ',eurz,uk,weurz,',
     'region_of_origin': 'EEURZ EUR HUNG ',
     'source_code': 'OHFSEC',
     'source_name': '150sec',
     'subject_codes': ',c22,ccat,cemis,centrp,cenvire,cesg,cexpro,cpartn,csmlbs,ncat,nfact,nfcpex,nfcpin,',
     'title': 'UK’s Pet-Tech Venture Launches Cultivated Chicken And Other Meat Products',
     'word_count': 490,
     'newswires_codes': '',
     'restrictor