In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import requests
import urllib.request
import json
import os, sys, re
import logging
from datetime import date, datetime

In [None]:
# pip freeze > "requirements.txt"

# Assign Notebook Variables

In [None]:
datetime_today = (datetime.now().strftime("%Y-%m-%d %H:%M:%S")).replace(":","").replace(" ","_")

## API parameters
cms_url = "https://data.cms.gov/provider-data/api/1/metastore/schemas/dataset/items"
hospitals_relative_url = "?show-reference-ids=false"
params="application/json"

## Paths parameters
relative_path = os.path.dirname(os.getcwd())
temp_file_path = os.path.join(relative_path,"01_Source","temp",f"cms_{datetime_today}.json")
csv_file_path = os.path.join(relative_path,"01_Source","csv",f"hospitals_{datetime_today}.csv")
previous_csv_file = os.listdir(os.path.join(relative_path,"01_Source","csv"))[-1]
previous_csv_file_path = os.path.join(relative_path,"01_Source","csv",f"{previous_csv_file}")
output_relative_path = os.path.join(relative_path,"02_Output")
output_csv_relative_path = os.path.join(output_relative_path,"csv")
log_file_path = os.path.join(relative_path,"logs",f"cms_logs_{datetime_today}.log")

print(f"url                         = {cms_url}")
print("     ")
print(f"relative_path               = {relative_path}")
print(f"temp_file_path              = {temp_file_path}")
print(f"csv_file_path               = {csv_file_path}")
print(f"previous_csv_file_path      = {previous_csv_file_path}")
print(f"output_relative_path        = {output_relative_path}")
print(f"output_csv_relative_path   = {output_csv_relative_path}")
print(f"log_file_path               = {log_file_path}")

In [None]:
if not os.path.exists(os.path.join(relative_path,"01_Source","csv")):
    os.makedirs(os.path.join(relative_path,"01_Source","csv"))
if not os.path.exists(os.path.join(relative_path,"01_Source","temp")):
    os.makedirs(os.path.join(relative_path,"01_Source","temp"))
if not os.path.exists(os.path.join(relative_path,"logs")):
    os.makedirs(os.path.join(relative_path,"logs"))
if not os.path.exists(output_csv_relative_path):
    os.makedirs(output_csv_relative_path)

# Start Spark session

In [None]:
spark = SparkSession.builder.appName("Hospitals").getOrCreate()

# Define logger

In [None]:
def setup_logger(log_file_path):
    # Create a logger
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)  # Set the logging level

    # Create file handler and console handler
    file_handler = logging.FileHandler(log_file_path)
    console_handler = logging.StreamHandler(sys.stdout)

    # Create formatter
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    # Add formatter to handlers
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    # Add handlers to logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    return logger

logger = setup_logger(log_file_path)

In [None]:
def read_json_from_url(url, temp_file_path):
    try:
        # 1. Download the JSON file
        logger.info(f"Downloading JSON file from: {url}")

        response = urllib.request.urlopen(url)
        data = response.read()
        content = data.decode('utf-8')

        with open(temp_file_path, "w") as f:
            f.write(content)
        logger.info(f"Successfully written JSON to: {temp_file_path}")

        # 2. Read the JSON file into a PySpark DataFrame
        logger.info("Reading JSON data into Spark DataFrame...")
        df = spark.read.json(temp_file_path)

        #         # 
        # os.remove(temp_file_path)
        # logger.info("Successfully deleted the temporary file")

        logger.info("Successfully read JSON data.")
        return df
    
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        raise


In [None]:
data = read_json_from_url(cms_url,temp_file_path)
data.persist()

In [None]:
def process_data_and_write_to_csv(df, output_path):
    try:
        logger.info("Transforming and filtering items data for 'Hospitals'..")
        df = (df.withColumn("theme",explode("theme"))                                 ## explode theme column
                .where(col("theme")=="Hospitals")                                     ## filter for where them == Hospitals
                .withColumn("distribution", explode("distribution"))                  ## Explode distribution column
                .select("*", "distribution.downloadURL")                              ## extract downloadURL column
                .withColumn("current_timestamp", current_timestamp())                 ## Add current time stamp

                .withColumn("modified", col("modified").cast(DateType()))             ## convert modified column to  DateType
                
                .select("identifier","description","issued","landingPage","modified","downloadURL","released","theme","title","current_timestamp")
        )

        logger.info(f"Repartitioning data and saving as as csv")
        df.repartition(1).toPandas().to_csv(output_path,index=False, sep=",")
        logger.info(f"Data saved as csv to:{output_path}")
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        raise
    return df

In [None]:
data_csv = process_data_and_write_to_csv(data, csv_file_path)

# Get Data that has been modified since last yesterday 

In [None]:
def get_all_modified_CSVs(df, previous_csv_path, modified=True):
    logger.info("Processing data to get list of recently modified data..")
    try: 
        if not modified:
            logger.info("mdified set to False") 
            logger.info("Reading previous day csv file..") 
            df_old = spark.read.csv(previous_csv_path, header=True, sep=",")
            df_old= (df_old.withColumn("current_timestamp", col("current_timestamp").cast(StringType()))
                            .withColumn("current_timestamp", (col("current_timestamp").substr(1,10)).cast(DateType()))
                            .select("current_timestamp"))
            date_previous_day_date = [row["current_timestamp"] for row in df_old.collect()][0]

            df = (df.where(col("modified")>date_previous_day_date)
                    .select("identifier","downloadURL"))
            logger.info("Extracted list of recently modified identifiers since previous day")

        else:
            logger.info("modified set to True") 
            df = df.select("identifier","downloadURL")
            
            logger.info("Extracted list of recently modified identifiers since previous day")
    
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        raise

    return df

In [None]:
df_csv = get_all_modified_CSVs(data_csv, previous_csv_file_path, modified=True)

# Download CSVs

In [None]:
def download_csv_file(url, save_path, logger):
    try:
        logger.info(f"Downloading CSV file from {url} to {save_path}")
        urllib.request.urlretrieve(url, save_path)
        logger.info(f"Successfully downloaded file to {save_path}")
    except Exception as e:
        logger.error(f"Error downloading file from {url}: {e}")
        raise  # Re-raise to be caught in process_files

In [None]:
def download_csv_and_save(df_csv, output_relative_path):
    logger.info("Starting download of csv files..")
    df_id = df_csv.select("identifier")
    identifier_list = [row["identifier"] for row in df_id.collect()]
    df_urls = df_csv.select("downloadURL")
    download_url_list = [row["downloadURL"] for row in df_urls.collect()]

    try:
        downloaded_files = []
        for id,url in zip(identifier_list,download_url_list):
            logger.info(f"Making dowloading csv for Hospital ID: {id} ")
            logger.info(f"url: {url} ")

            file_name = os.path.splitext(url.split("/")[-1])[0]
            
            output_path = os.path.join(output_relative_path,f"{file_name}__{id}__{datetime_today}.csv")

            ## Download the csv file
            download_csv_file(url, output_path, logger)
            downloaded_files.append(output_path)
            logger.info(f"Downloaded csv file for Hospital ID: {id} ")


        # Read all downloaded CSV files into a single Spark DataFrame
        # logger.info("Reading downloaded CSV files into a Spark DataFrame")
        # df = spark.read.csv(os.path.join(output_relative_path,"*.csv"),inferSchema=True, header=True)
        # logger.info("Successfully read downloaded CSV files into a Spark DataFrame")
            
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        raise

    return #df

In [None]:
download_csv_and_save(df_csv, output_csv_relative_path)

## Get test csv data

In [None]:
df_test = spark.read.csv(os.path.join(output_csv_relative_path,"ASCQR_OAS_CAHPS_STATE__x663-bwbj__2025-04-22_003308.csv"),inferSchema=True, header=True)

# Transform Data and Save csv

In [None]:
def transform_data(df, output_path, logger):
    try:
        columns = df.columns
        logger.info("Begin transformations..")
        
        # Create a list of new column names by replacing spaces with underscores
        logger.info("Removing special characters and making column names snake case..")
        new_columns = [re.sub(r"[^0-9a-zA-Z_]+", "_", c.lower().replace(" ", "_")) for c in columns]
        
        # Use the `toDF` function to rename the columns
        df_renamed = df.toDF(*new_columns)

        logger.info("Saving csv file")
        data.repartition(1).toPandas().to_csv(output_path)
        logger.info(f"CSV file saved to {output_path}")
    except Exception as e:
        logger.error(f"An error occurred: {e}")
        raise
    return df_renamed

In [None]:
final_output_path = os.path.join(output_csv_relative_path,"ASCQR_OAS_CAHPS_STATE__x663-bwbj__2025-04-22_003308.csv")
data_final = transform_data(df_test, final_output_path, logger)