## Examples Importing parquet files using Apache PyArrow
- [Reference](azure-storage==0.37.0)

In [47]:
from io import BytesIO
import os

from azure.core.exceptions import ResourceNotFoundError
from azure.storage.blob import BlobServiceClient
import pandas as pd
import pyarrow.parquet as pq

In [2]:
%%bash

ls -al

total 16
drwxrwxrwx 3            1000            1000 4096 Dec  1 16:43 .
drwxr-xr-x 1 parquet-service parquet-service 4096 Dec  1 16:46 ..
drwxr-xr-x 2 parquet-service parquet-service 4096 Dec  1 16:40 .ipynb_checkpoints
-rw-r--r-- 1 parquet-service parquet-service 2210 Dec  1 16:43 parquet_import_ex.ipynb


In [6]:
account_name = os.environ['STORE_ACCOUNT_NAME']
account_key = os.environ['STORE_ACCOUNT_KEY']
container_name = os.environ['STORE_CONTAINER_NAME']

## Read Single Parquet File

In [24]:
parquet_file = 'patient_score_kinnser/2c3491d7-5d7f-44df-80cb-654035b4652e/part-00000.parquet'
# parquet_file = 'patient_score_ltc400/0a094741-5a7d-4b42-9443-3802ebb0f582/part-00000.parquet'

blob_service = BlobServiceClient(
    account_url=f'https://{account_name}.blob.core.windows.net/',
    credential=account_key)

container_client = blob_service.get_container_client(container_name)

blob_client = container_client.get_blob_client(
    blob=parquet_file
)

### To pandas

In [25]:
try:
    with BytesIO() as byte_stream:
        storage_stream = blob_client.download_blob()
        storage_stream.download_to_stream(byte_stream)
        parquet_df = pq.read_table(source=byte_stream).to_pandas()
except ResourceNotFoundError:
    print("No blob found.")

# Alternate 1:
#     with open('data/temp.parquet', "wb") as my_blob:
#         storage_stream = blob_client.download_blob()
#         my_blob.write(storage_stream.readall())
#     with open('data/temp.parquet', "rb") as my_blob:
#         parquet_df = pq.read_table(source=my_blob).to_pandas()

parquet_df.head()

Unnamed: 0,index,PatientKey,hcc_ce,hcc_ins,hcc_ne,hcc_cc_ce,hcupModelScore,lr_cc,rf_cc,num_cc,cc_readmission,patient_risk_score,patient_risk_score_scale,RiskStratify
0,0,8110480,0.549,1.221,0.818,0.549,330.71,0.50329,0.0,7.0,0.81,3.88,0.97,Ultra-High
1,1,7394368,0.947,0.954,1.622,0.947,215.59,0.505559,0.0,6.0,0.81,3.79,0.95,Ultra-High
2,2,7257571,0.876,0.725,1.28,0.876,107.38,0.0,0.50718,6.0,0.81,3.67,0.92,Ultra-High
3,3,9078361,0.635,1.194,1.056,0.635,168.59,0.505559,0.0,7.0,0.81,3.52,0.88,Ultra-High
4,4,8922281,0.818,1.072,1.446,0.818,122.33,0.517756,0.0,6.0,0.81,3.45,0.86,Ultra-High


## To parquet file

In [28]:
try:
    with BytesIO() as byte_stream:
        storage_stream = blob_client.download_blob()
        storage_stream.download_to_stream(byte_stream)
        parquet_file = pq.ParquetFile(byte_stream)
        
except ResourceNotFoundError:
    print("No blob found.")


# parquet_file.schema
parquet_file.metadata

<pyarrow._parquet.FileMetaData object at 0x7f2ed45854d0>
  created_by: Parquet.Net version 3.7.4 (build '2dd1d58fbd439b9f5f1457823400f36dd37b8aac')
  num_columns: 14
  num_rows: 15763
  num_row_groups: 4
  format_version: 1.0
  serialized_size: 5373

## Read Partitioned Parquet File
- [Reference](http://arrow.apache.org/docs/python/parquet.html#reading-from-partitioned-datasets)
- [Stack Overflow](https://stackoverflow.com/questions/58626126/partition-parquet-files-on-azure-blob-pyarrow)

In [62]:
blob_service = BlobServiceClient(
    account_url=f'https://{account_name}.blob.core.windows.net/',
    credential=account_key)
container_client = blob_service.get_container_client(container_name)

blob_prefix = 'cc_crosswalk_kinnser/7d1fb957-c9e2-4500-bd5b-be57ae339c83'
parquet_blobs = []

for blob in container_client.list_blobs(name_starts_with=blob_prefix):
    if blob.name.endswith('.parquet'):
        print(f"Found {blob.name}")
        parquet_blobs.append(blob.name)

target_directory = f'data/cc_crosswalk_kinnser'
os.makedirs(target_directory, exist_ok=True)

for blob in parquet_blobs:

    file_name = os.path.split(blob)[1]
    target_path = os.path.join(target_directory, file_name)

    try:
        print(f'Downloading {file_name} to {target_path}')
        blob_client = container_client.get_blob_client(blob)
        
        with open(target_path, "wb") as f:
            storage_stream = blob_client.download_blob()
            storage_stream.download_to_stream(f)        
        
    except ResourceNotFoundError as e:
        print("No blob found.")

dataset = pq.ParquetDataset(target_directory)
table = dataset.read()

cc_crosswalk_kinnser_df = table.to_pandas()

Found cc_crosswalk_kinnser/7d1fb957-c9e2-4500-bd5b-be57ae339c83/part-00000.parquet
Found cc_crosswalk_kinnser/7d1fb957-c9e2-4500-bd5b-be57ae339c83/part-00001.parquet
Found cc_crosswalk_kinnser/7d1fb957-c9e2-4500-bd5b-be57ae339c83/part-00002.parquet
Downloading part-00000.parquet to data/cc_crosswalk_kinnser/part-00000.parquet
Downloading part-00001.parquet to data/cc_crosswalk_kinnser/part-00001.parquet
Downloading part-00002.parquet to data/cc_crosswalk_kinnser/part-00002.parquet


In [63]:
cc_crosswalk_kinnser_df.head()

Unnamed: 0,PatientKey,icd10_level,ICD10CODE,ICD10CODE_LEN,ICD_3Char,ICD_4Char,ICD_5Char,row_id,cc_category
0,4031480,PrimaryDx,N179,4,N17,N179,N179,300,Acute kidney failure and chronic kidney disease
1,5671025,PrimaryDx,N179,4,N17,N179,N179,2210,Acute kidney failure and chronic kidney disease
2,7039528,PrimaryDx,N179,4,N17,N179,N179,6207,Acute kidney failure and chronic kidney disease
3,7501105,PrimaryDx,N179,4,N17,N179,N179,8353,Acute kidney failure and chronic kidney disease
4,7561960,PrimaryDx,N179,4,N17,N179,N179,9607,Acute kidney failure and chronic kidney disease
