# S3-redshift-operations

Some data operations to read the contents of files on S3 whether they are parquet or csv

Also how to query data from redshift with sqlalchemy

In [2]:
import pandas as pd
from os import environ
import io
from operator import itemgetter
import logging
import numpy as np

import s3fs
import io
import boto3
import fastparquet as fp
import awswrangler as wr
import redshift_connector
from s3fs import S3FileSystem
from fastparquet import ParquetFile
from sqlalchemy.engine import create_engine
from pandas.io.sql import SQLTable

from helpers import (
    save_dataframe_csv,
    get_training_data,
    parquet_file,
    parquet_dataframe,
    make_dataframe_db_schema,
    download_from_s3,
    save_manifest_s3,
    save_data_qa,
    save_json_s3
)

dsn = create_engine(environ["ANALYTICS"])

pd.options.display.max_columns = 500
pd.options.display.max_rows = 500

log = logging.getLogger(__name__)

## Reading csv files from from S3

Simple as pie

In [3]:
df_raw = pd.read_csv(
    's3://qh-clinicaldata-phi/raw_feed/pre_ingest/healthy_blue/y=2022/m=03/d=10/'
    'ts=134943/QUARTET_GBDFACETS_PATIENT_20220310.txt',
    delimiter="|"
)

## Reading parquet files from from S3

In [4]:
s3 = s3fs.S3FileSystem(anon=False)

Listing directories

In [5]:
s3.glob('qh-clinicaldata-phi/raw_feed/pre_ingest/healthy_blue/')

['qh-clinicaldata-phi/raw_feed/pre_ingest/healthy_blue/_new_key_records',
 'qh-clinicaldata-phi/raw_feed/pre_ingest/healthy_blue/y=2021',
 'qh-clinicaldata-phi/raw_feed/pre_ingest/healthy_blue/y=2022']

Specifying buckets to pass to `parquet_file`

In [6]:
horiz_mc_bucket = 's3://qh-clinicaldata-phi/processed-feed/horizon/20220214_1947/parquet/medical_claim/'
horiz_mh_need_train_bucket = (
    's3://qh-clinicaldata-phi/processed-feed/horizon/20220214_1947/parquet/mh_need_pred_train/'
)

In [7]:
df_mhnt = parquet_file(horiz_mh_need_train_bucket).to_pandas()


In [8]:
df_mhnt.columns

Index(['relationship_change', 'source', 'age', 'is_male', 'household_size',
       'source_state_horizon', 'yr_cnt_claims_bh_conds_cnt_0',
       'yr_cnt_claims_chronic_conds_cnt_0', 'yr_cnt_claims_is_er_0',
       'yr_cnt_claims_is_inpatient_0',
       ...
       'mh_rx_category_Antidepressant_0', 'mh_rx_category_Antipsychotic_0',
       'mh_rx_category_Anxiolytic_0', 'mh_rx_category_MoodStabilizer_0',
       'mh_rx_category_OpioidDependence_0', 'mh_rx_category_Stimulant_0',
       'yr_polypharm_rx_0', 'has_mh', 'test_iper', 'test_mh_need'],
      dtype='object', length=160)

In [9]:
df_mhnt.shape

(59430, 160)

Taking only certain columns from parquet

In [10]:
engagement_check = 's3://qh-datascience/engagement-model/bcbs_az/2022_03/surfaced_claim_features/'
pf_ec = parquet_file(engagement_check, columns=['source', 'patient_quid', 'ip_er_visit_count',
                                                'recent_ip_er_visit_date', 'ip_er_bh_visit_count',
                                                'days_since_er_ip'])

Beware of memory usage!

In [None]:
# omitted so I don't explode
df_hmc = parquet_file(horiz_mc_bucket, columns=['member_id', 'source', 'month', 'svc_cat']).to_pandas()
df_hmc = df_hmc[(df_hmc['month'] >= '2019-12-01') & (df_hmc['month'] <= '2022-02-01')]

## Redshift operations

Query data in redshift and put it into a pandas dataframe

In [11]:
SAMPLE_QUERY = """
SELECT *
FROM atacama_atacama.address_type
"""

In [12]:
df_address_type = pd.read_sql(SAMPLE_QUERY, dsn)

  return insp.has_table(name, schema or self.meta.schema)


In [13]:
df_address_type

Unnamed: 0,value
0,HOME
1,OTHER
2,WORK


Put data into a table in redshift (connector for awswrangler is different than sqlalchemy)

In [14]:
schema_name = 'ahmiel'
table_name = 'address_type'

connector = redshift_connector.connect(
    host=environ['PGHOST'],
    database=environ['PGDATABASE'],
    user=environ['PGUSER'],
    password=environ['PGPASSWORD']
    )

In [15]:
wr.redshift.to_sql(
        df=df_address_type,
        table=table_name,
        schema=schema_name,
        con=connector,
        mode='overwrite',
        dtype={'value': 'VARCHAR(5)'},
        overwrite_method='drop',
        index=False,
        chunksize=1000
    )