In [42]:
# smallest possible test case for SQL query using jupyter notebook cell magic commands
import duckdb
import pandas as pd
import requests
import chardet
from tqdm import tqdm
from dotenv import load_dotenv
import boto3
import os
from IPython.display import clear_output
import numpy as np
import joblib
import contextlib
import multiprocessing
import pyarrow as pa
import pyarrow.parquet as pq
import json

# Load environment variables
load_dotenv(override=True)

# Import jupysql Jupyter extension to create SQL cells
%load_ext sql
%reload_ext sql

# Set configrations on jupysql to directly output data to Pandas and to simplify the output that is printed to the notebook.
%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False
%config SqlMagic.autolimit = False

# Connect jupysql to DuckDB using a SQLAlchemy-style connection string. Either connect to an in memory DuckDB, or a file backed db.
%sql duckdb:///:memory:

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [27]:

BUCKET = 'payless.health'
REGION = 'us-east-1'
PREFIX = "hospital_price_transparency"

access_key = os.getenv('AWS_ACCESS_KEY_ID')
secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
s3_resource = boto3.resource('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
bucket = s3_resource.Bucket(BUCKET)


In [3]:
# simple tqdm wrapper for joblib
@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""

    class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
        def __call__(self, *args, **kwargs):
            tqdm_object.update(n=self.batch_size)
            return super().__call__(*args, **kwargs)

    old_batch_callback = joblib.parallel.BatchCompletionCallBack
    joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
    try:
        yield tqdm_object
    finally:
        joblib.parallel.BatchCompletionCallBack = old_batch_callback
        tqdm_object.close()

In [None]:
# if data folder doesn't exist, create it
if not os.path.exists('../data'):
    os.makedirs('../data')

In [4]:
# process the csv files, fix the encoding, and upload to S3 in processed folder
urls = []
# iterate over the list of objects
for obj in s3.list_objects(Bucket=BUCKET)['Contents']:
    # filter the objects with the prefix and suffix needed
    if obj['Key'].startswith(PREFIX) and obj['Key'].endswith('.csv') and "processed" not in obj['Key']:
        url = f"https://s3.amazonaws.com/{BUCKET}/" + obj['Key']
        urls.append(url)

print(f"Found {len(urls)} raw files to download from S3")

def process_file(url):
    file = url.split("/")[-1]
    key = f"{PREFIX}/processed/{file}"

    # if already processed, skip
    if len(list(s3_resource.Bucket(BUCKET).objects.filter(Prefix=key))) == 0:
        try:
            r = requests.get(url, allow_redirects=True)
            encoding = chardet.detect(r.content)['encoding']
            open(f"../data/{file}", 'wb').write(r.content)
            df = pd.read_csv(f"../data/{file}", encoding=encoding, engine='python')
            df.to_csv(f"../data/{file}", index=False, encoding='utf-8')
            s3_resource.Bucket(BUCKET).upload_file(f"../data/{file}", key)
            os.remove(f"../data/{file}") # delete the file from local
        except:
            return
    
with tqdm_joblib(tqdm(desc="Processing Raw csvs", total=len(urls))):
    joblib.Parallel(n_jobs=multiprocessing.cpu_count() - 1, backend="threading")(joblib.delayed(process_file)(url) for url in urls)

Found 369 raw files to download from S3


Processing Raw csvs: 100%|██████████| 369/369 [4:44:12<00:00, 46.21s/it]   


In [33]:
processed_urls = []

PREFIX = "hospital_price_transparency/processed"
# iterate over the list of objects
for obj in s3_resource.Bucket(BUCKET).objects.all():
    # filter the objects with the prefix and suffix needed
    if PREFIX in obj.key and obj.key.endswith('.csv'):
        url = f"https://s3.amazonaws.com/{BUCKET}/" + obj.key
        processed_urls.append(url)

print(f"Found {len(processed_urls)} processed files to download from S3")

headers = []
bad_urls = []

for url in tqdm(processed_urls):
    try:
        df = pd.read_csv(url, header=None, nrows=10)
        CCN = url.split("/")[-1][:6]
        # simple heuristic to find header row
        string_rows = [",".join([str(x) for x in df.iloc[i, :]]) for i in range(10)]
        header_row = np.argmax([len(x) for x in string_rows])
        headers.append([CCN, df.iloc[header_row, :].tolist()])
    except:
        bad_urls.append(url)
        continue

print(f"Found {len(headers)} files with headers")
print(f"Found {len(bad_urls)} files without errors")

Found 333 processed files to download from S3


100%|██████████| 333/333 [08:31<00:00,  1.54s/it] 

Found 323 files with headers
Found 10 files without errors





In [40]:
# generate a parquet file with the CCN and the column names (headers)
CCN_column = []
header_column = []
for header in headers:
    CCN = header[0]
    for column in header[1]:
        CCN_column.append(CCN)
        header_column.append(str(column))

CCN_column = pa.array(CCN_column)
header_column = pa.array(header_column)

table = pa.Table.from_arrays([CCN_column, header_column], names=['CCN', 'source_column_name'])
pq.write_table(table, '../hospital_price_transparency_ccn_column_names.parquet')

In [44]:
# convert to label studio json format for labeling
label_studio_json = []
for i in tqdm(range(len(header_column))):
    label_studio_json.append({
        "data": {
            "CCN": str(CCN_column[i]),
            "text": str(header_column[i])
        },
        "id": i
    })

with open('../hospital_price_transparency_ccn_column_names.json', 'w') as f:
    json.dump(label_studio_json, f)

100%|██████████| 11295/11295 [00:00<00:00, 288091.70it/s]


In [None]:
for url in urls:
    file = url.split("%2F")[1]
    full_path = os.path.abspath(f"../data/{file}")
    query = f"SELECT * FROM read_csv_auto('{full_path}', header=False) LIMIT 10;"
    res = %sql {{query}}
    for i in range(10):
        vals = list(res.iloc[i, :10])
        print(i, ", ".join([str(val) for val in vals]))
    print("Enter row index to use as column names:")
    row_indexes.append(input())
    clear_output(wait=True)