In [11]:
# """
# this is an example of the json structure we're looking for from data.cms.gov
# {
#     "@type": "dcat:Distribution",
#     "downloadURL": "https://data.cms.gov/sites/default/files/2024-07/2f1a77b8-b55b-495a-a78e-2626ca67528d/2024_20240711.zip",
#     "mediaType": "application/zip",
#     "resourcesAPI": "https://data.cms.gov/data-api/v1/dataset-resources/bb625876-08c9-4329-9e17-6930376ca397",
#     "title": "Monthly Prescription Drug Plan Formulary and Pharmacy Network Information : 2024-07-01",
#     "modified": "2024-07-18",
#     "temporal": "2024-07-01/2024-07-31"
# },

# For Formulary Specific data, we can isolate the json by the key-value pair
# "@type": "dcat:Distribution"

# """

In [12]:
global BASE_URL
""" ref https://data.cms.gov/api-docs """
BASE_URL = "https://data.cms.gov/data.json"

In [13]:
import sys

This Docker environment most likely doesn't have pandas or numpy installed. Run the below cell if that's the case.

In [44]:
#!{sys.executable} -m pip install pyarrow



In [None]:
# !{sys.executable} -m pip install pandas

In [16]:
import requests
import json
import pandas as pd 
import os
import numpy as np

pd.set_option('display.max_colwidth', None)

In [17]:
response = json.loads(requests.get(BASE_URL).text)
tmp_df = pd.json_normalize(response)

In [18]:
def get_column_from_nested_json(json_data, column_name):
    """Extracts a specific column from nested JSON data."""

    result = []

    def extract_values(data):
        if isinstance(data, dict):
            for key, value in data.items():
                if key == column_name:
                    result.append(value)
                elif isinstance(value, (dict, list)):
                    extract_values(value)
        elif isinstance(data, list):
            for item in data:
                extract_values(item)

    extract_values(json_data)
    return result

# Extract the "name" column
urls = get_column_from_nested_json(response, "downloadURL")

print("There are {} Downloadable URLs from the CMS Public Open API Data Catalog available for review.".format(len(urls)))

There are 2236 Downloadable URLs from the CMS Public Open API Data Catalog available for review.


In [19]:
# example URL
urls[1]

'https://data.cms.gov/sites/default/files/2024-01/afc09855-5e4b-4baf-bdc4-88a4459a52e5/PY2024_Medicare_Shared_Savings_Program_Participants.csv'

In [20]:
def get_values_from_nested_json(data, key, filter_key, filter_value):
    result = []

    def traverse(data):
        if isinstance(data, dict):
            for k, v in data.items():
                if k == key and filter_key in data and data[filter_key].startswith(filter_value):
                    result.append(v)
                if isinstance(v, (dict, list)):
                    traverse(v)
        elif isinstance(data, list):
            for item in data:
                traverse(item)

    traverse(data)
    return result


formularies = get_values_from_nested_json(response, "downloadURL", "title", "Monthly Prescription Drug Plan Formulary and Pharmacy Network Information")

print("There are {} Monthly Drug Formularies from the CMS Public Open API Data Catalog available for review.".format(len(formularies)))

There are 73 Monthly Drug Formularies from the CMS Public Open API Data Catalog available for review.


In [21]:
# example URL
formularies[:5]

['https://data.cms.gov/sites/default/files/2025-01/b41e18c0-2249-490f-a57b-5ed085e03baf/2025_20250123.zip',
 'https://data.cms.gov/sites/default/files/2024-12/6258bcc2-9d95-4477-8c66-b43a7bc57c23/2025_20241212.zip',
 'https://data.cms.gov/sites/default/files/2024-12/bb6379c7-c39f-4c6c-9f99-58e58065668b/2025_20241219_Nov2024.zip',
 'https://data.cms.gov/sites/default/files/2024-12/0be15bc4-f63d-484a-8c40-4810b35c91aa/2025_20241219_Oct2024.zip',
 'https://data.cms.gov/sites/default/files/2024-09/d40e268c-12ca-4e9b-8e48-7fb2d71e66ab/2024_20240919.zip']

In [22]:
df_formularies = pd.DataFrame({'urls': formularies})
df_formularies = (df_formularies
            .assign(release_date=lambda df_:df_.urls
            .str.split(r'/', expand = True)
            .iloc[:,6])
            .assign(release_date=lambda df_:pd.to_datetime(df_.release_date)
                   )
        )

df_formularies.sample(3)

Unnamed: 0,urls,release_date
23,https://data.cms.gov/sites/default/files/2023-11/345e8284-9a21-4987-91a8-01beae73b695/2023_20230210.zip,2023-11-01
30,https://data.cms.gov/sites/default/files/2023-11/b82114ef-4dfe-4ad7-8c7a-3d89cad470b6/2022_20220701.zip,2023-11-01
28,https://data.cms.gov/sites/default/files/2023-11/78be4985-1061-412f-8186-f370189e7430/2022_20220909.zip,2023-11-01


In [23]:
#df_formularies.release_date.min()
#df_formularies.release_date.max()

In [24]:
df_formularies.head(3)

Unnamed: 0,urls,release_date
0,https://data.cms.gov/sites/default/files/2025-01/b41e18c0-2249-490f-a57b-5ed085e03baf/2025_20250123.zip,2025-01-01
1,https://data.cms.gov/sites/default/files/2024-12/6258bcc2-9d95-4477-8c66-b43a7bc57c23/2025_20241212.zip,2024-12-01
2,https://data.cms.gov/sites/default/files/2024-12/bb6379c7-c39f-4c6c-9f99-58e58065668b/2025_20241219_Nov2024.zip,2024-12-01


## Next Steps

    1. Create a working example of File Extraction. We need to confirm that these are the formulary files we want.
    
    2. First three files have 2024-12 in the url but different DESCRIPTIVE month in the end of the URL. Are these files the same or are they different? 
    
    3. 72 Zip Files up for consideration, we need to see if we need all 72 or if there is another clue that allows us to trim the list further.
    
    4. Ultimately, once satisifed we convert this notebook into the python file with specified CodeRX dag formatting.

## Approach in Downloading:

    1. Using dask.dataframe.read_csv: Instead of using pd.read_csv, which loads the data into memory immediately, we use Dask’s dd.read_csv() which returns a Dask DataFrame. These DataFrames do not load the data fully into memory until explicitly computed. This helps with large files.

    2. Parallel processing: Dask can handle multiple files at once, and it will operate on data in parallel (depending on your system's capabilities). This will speed up processing, especially when there are many files.

    3. Dask DataFrame: The dataframes dictionary now stores Dask DataFrames instead of pandas DataFrames. You can compute the results by calling .compute() when needed.

In [None]:
# !{sys.executable} -m pip install dask

In [41]:
import zipfile
import dask.dataframe as dd
from io import BytesIO

def unpack_and_read_zip_from_url_with_dask(zip_url, encoding='ISO-8859-1', delimiter='|'):

    """
    about 5 min to find and download ~20 zipfiles contained in the single URL given
    """
    # Send a GET request to the URL and retrieve the zip file content in streaming mode
    response = requests.get(zip_url, stream=True)
    
    if response.status_code == 200:
        # Write the zip file to disk temporarily
        zip_path = 'temp_downloaded.zip'
        with open(zip_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        # Open the zip file
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            # Extract all files to the current working directory
            target_dir = os.getcwd()
            zip_ref.extractall(target_dir)

            # Get list of zip files WITHIN the zip archive
            z_files = [f for f in zip_ref.namelist() if f.endswith('.zip')]
            
            dataframes = {}
            for z in z_files:
                try:
                    # Construct the file path
                    file_path = os.path.join(target_dir, z)
                    
                    # Use Dask to read each zip file with specified encoding and delimiter
                    dask_df = dd.read_csv(file_path, encoding=encoding, sep=delimiter)
                    dataframes[z] = dask_df
                except Exception as e:
                    print(f"Failed to read {z}: {e}")

        # Optionally, remove the temporary downloaded zip file
        os.remove(zip_path)

        return dataframes
    else:
        print(f"Failed to download zip file. HTTP Status Code: {response.status_code}")
        return {}

# Example usage:
zip_url = 'https://data.cms.gov/sites/default/files/2024-12/6258bcc2-9d95-4477-8c66-b43a7bc57c23/2025_20241212.zip'

# The dfs dictionary will contain dask dataframes with zip files' names as keys
dfs = unpack_and_read_zip_from_url_with_dask(zip_url, encoding='ISO-8859-1', delimiter='|')

Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  warn(
Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  warn(
Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  warn(
Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  warn(
Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  warn(
Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  warn(
Please ensure that each individual file can fit in memory and
use the 

Failed to read basic drugs formulary file  20241231.zip: An error occurred while calling the read_csv method registered to the pandas backend.
Original Message: pyarrow>=10.0.1 is required for PyArrow backed StringArray.
Failed to read beneficiary cost file  20241231.zip: An error occurred while calling the read_csv method registered to the pandas backend.
Original Message: pyarrow>=10.0.1 is required for PyArrow backed StringArray.
Failed to read excluded drugs formulary file  20241231.zip: An error occurred while calling the read_csv method registered to the pandas backend.
Original Message: pyarrow>=10.0.1 is required for PyArrow backed StringArray.
Failed to read geographic locator file  20241231.zip: An error occurred while calling the read_csv method registered to the pandas backend.
Original Message: pyarrow>=10.0.1 is required for PyArrow backed StringArray.
Failed to read indication based coverage formulary file  20241231.zip: An error occurred while calling the read_csv metho

In [39]:
# Example of converting Dask DataFrame to pandas
# pyarrow package conflicts but false alarm. only showing up due to docker jupyter notebook image pull
# should resolve if venv
#df_pandas = dfs['basic drugs formulary file 20241231.zip'].compute()

In [46]:
tmp = pd.read_csv('basic drugs formulary file  20241231.zip', encoding='ISO-8859-1', delimiter='|')
tmp

Unnamed: 0,FORMULARY_ID,FORMULARY_VERSION,CONTRACT_YEAR,RXCUI,NDC,TIER_LEVEL_VALUE,QUANTITY_LIMIT_YN,QUANTITY_LIMIT_AMOUNT,QUANTITY_LIMIT_DAYS,PRIOR_AUTHORIZATION_YN,STEP_THERAPY_YN
0,25000,13,2025,1551300,2143380,3,Y,2,28,Y,N
1,25000,13,2025,1551306,2143480,3,Y,2,28,Y,N
2,25000,13,2025,2058877,2143611,3,Y,2,28,Y,N
3,25000,13,2025,2601758,2145780,3,Y,2,28,Y,N
4,25000,13,2025,2601776,2146080,3,Y,2,28,Y,N
...,...,...,...,...,...,...,...,...,...,...,...
1295907,25521,8,2025,2690862,82950000124,5,Y,24,28,Y,N
1295908,25521,8,2025,2682445,82950001201,5,Y,96,28,Y,N
1295909,25521,8,2025,2048025,83257000541,5,N,,,N,N
1295910,25521,8,2025,2684032,83296010012,5,Y,120,30,Y,N
