# Stream S3 Content

References:
- [Overview](https://aws.amazon.com/blogs/storage/querying-data-without-servers-or-databases-using-amazon-s3-select)
- [User Guide](https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html)
- [Client.select_object_content](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.select_object_content)

To Do:
- Consider [Smart Open](https://github.com/RaRe-Technologies/smart_open)

In [None]:
from contextlib import closing
import csv
from datetime import datetime
import gzip as gz
import logging
from os import environ
from os.path import basename, splitext

import boto3
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

## Config

In [None]:
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s',
                    datefmt='%I:%M:%S %p', level=logging.INFO)

logger = logging.getLogger(__name__)

# Inspect Bucket via Filter
- The `Marker` parameter seems to be designed to implement watermarks
- **TODO**: The S3 objects LastModified attribute is persisted so we may be able to use this for incremental loading rather than parsing the dates manually

### References
- https://stackoverflow.com/a/58066657

In [None]:
%%time

session = boto3.Session(
    aws_access_key_id=environ['P3_AWS_ACCESS_KEY_ID'],
    aws_secret_access_key=environ['P3_AWS_SECRET_ACCESS_KEY'],
    aws_session_token=environ['P3_AWS_SESSION_TOKEN']
)

s3 = session.resource('s3')
# s3 = boto3.resource('s3')  # equivalent
ale_bucket = s3.Bucket('my-lake')

pracs = [
    "unload/vendor1/parquet/",
]
for prac in pracs:
    object_count, min_date, max_date = 0, datetime.max, datetime.min
    for summary in ale_bucket.objects.filter(Prefix=prac):
        object_count += 1
        cur_date = summary.last_modified.replace(tzinfo=None)
        min_date = min(min_date, cur_date)
        max_date = max(max_date, cur_date)
    logger.info(f"{prac}: {object_count} from {min_date.date()} to {max_date.date()}")

# TODO: Inspect sorting
# for summary in (
#     ale_bucket
#     .objects
#     .filter(
#         Prefix='unload/vendor1/parquet/',
#         Marker='unload/vendor1/parquet/data_summary_20211019'
#     )
#     .limit(count=7)
# ):
#     logger.info(f"{summary.key} at {summary.last_modified}")

# Inspect Bucket
- The `StartAfter` parameter seems to be designed to implement watermarks

In [None]:
%%time

s3_hj_client = boto3.client('s3')
exports = s3_hj_client.list_objects_v2(
            Bucket='my-bucket',
            Prefix='unload/parquet/data_file_202112'
#             StartAfter='unload/parquet/summary_20211001.gz'
        )

for part in exports.get('Contents', []):
    logger.info(f"{basename(part['Key'])}, {part['LastModified']}, {part['Size']}")

# Stream S3
Use `Body` attribute. Also, `iter_lines()` would be an option if the content was unzipped. This could take the unzipped lines, and then write to SQL. If the existing streaming implementation turns out flaky, this is plan B.

- Alternatively, could write to a local temp file as batches of CSV/JSON and then upload to S3 or use psql COPY FROM (file or INPUT)
- Alternatively, same as above but instead using EventStream as a source.

References:
- https://kokes.github.io/blog/2018/07/26/s3-objects-streaming-python.html

In [None]:
%%time

s3_hj_client = boto3.client('s3')

allergy_columns = ['client_id', 'patient_id', 'type', 'description', 'onset_date', 
                   'resolved_date', 'severity', 'reaction_code', 'reaction', 'product_code']

allergy_obj = s3_hj_client.get_object(
    Bucket='my-bucket',
    Key='unload/parquet/data_file_202112'
)

lines = 0
body = allergy_obj['Body']

with closing(body):
    with gz.open(body, 'rb') as f:
        for row in f:  # allergy_obj['Body'].iter_lines():
            line = row.decode('utf-8')
            kv = {k:v for (k, v) in zip(allergy_columns, line.split('|'))}
            logger.info(f"Raw Row data: {line}")
            lines += 1

logger.info(f"Result is complete: {lines} rows processed")


# Stream S3 Object Content
Using `select_object_content` to stream content from a gzip-compressed S3 object

References
- [EventStream](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/eventstream.html)
- [Partial record handling](https://github.com/aws/aws-sdk-net/issues/1296#issuecomment-494998477)

# Data Analysis

In [None]:
%%time

s3_hj_client = boto3.client('s3')

try:
    content = s3_hj_client.select_object_content(
        Bucket='my-bucket',
        Key="unload/parquet/data_file_202112",
        RequestProgress = {
            'Enabled': False
        },
        ExpressionType="SQL",
        Expression="""
            SELECT  s3.*
            FROM    s3object AS s3
            --WHERE   SUBSTRING(s3._14, 1, 4) = '2017'
            LIMIT 21
        """,
        InputSerialization = {
            "CSV": {
                "FileHeaderInfo": "NONE",
                "FieldDelimiter": "|",
                "QuoteCharacter": "|",
            }, 
            "CompressionType": "GZIP"
        },
        OutputSerialization = {"CSV": {"QuoteFields": "ALWAYS"}},
    )
    
    event_stream = content['Payload']
    end_event_received = False
    total = batches = 0
    lines, line = [], []

    for event in event_stream:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            for r in records:
                if r == '\n':
                    lines.append(''.join(line))
                    logger.info(''.join(line))
                    line = []
                else:
                    line.append(r)
            batches += 1
        elif 'End' in event:
            end_event_received = True

        if lines:
            total += len(lines)
            lines = []
finally:
    if event_stream:
        event_stream.close()

if not end_event_received:
    logger.info("End event not received, request incomplete.")
else:
    logger.info(f"Result is complete: {total} rows processed in {batches} batches")

## Remove NUL Characters
Existence of extra `0x00` characters in strings is not allowed in PostgreSQL.

References:
- https://stackoverflow.com/a/1348551
- https://stackoverflow.com/a/7760752

In [None]:
%%bash

tail -1 data/medication_20210619_l5.csv | od -c

In [None]:
%%bash

sed -e 's/\x00//g' data/medication_20210619_l5.csv | tail -1 | od -c

In [None]:
%%time
%%bash

sed -e 's/\x00//g' -i data/medication_20210619.csv

## Write As COPY FROM Batch
References
- [Parameterization](https://stackoverflow.com/a/1471178)
- [Parameterization/Psycopg2](https://www.psycopg.org/docs/usage.html#passing-parameters-to-sql-queries)
- [`copy_expert`](https://stackoverflow.com/a/34523707)
- [`copy_expert`/Psycopg2](https://www.psycopg.org/docs/usage.html#using-copy-to-and-copy-from)

In [None]:
%%time

maehc_db_url = 'postgresql://username@pgsql-jupyter-lib:5432/psycodb'
db_engine = create_engine(maehc_db_url, echo=False)
c = db_engine.raw_connection()
etl_date = '20210619'
s3_object = f"medication_{etl_date}.csv"

try:
    with open(f'data/{s3_object}', 'rb') as f:
        cursor = c.cursor()
        
        cursor.execute("TRUNCATE TABLE staging.medication")
        
        # 
        cursor.copy_expert("""
            COPY staging.medication (
                client_id, patient_id, id_ndc, id_rxnorm, id_other,
                type, name, dose, form, route, sig_code, sig, duration,
                frequency, start_date, stop_date, comment, quantity,
                refill, fill_status, sample_indicator, generic_indicator,
                brand_name_code, prescribed_npi, status, encounter_id,
                client_medication_id, row_hash
            )
            FROM STDIN
            WITH (FORMAT csv)
            --WITH (FORMAT csv, FORCE_NULL ( billable_ind ))
        """, f)
        
        logger.info(f"Bulk COPY FROM: {cursor.rowcount}")

#         cursor.execute("""
#             UPDATE staging.result
#                 SET source = %(source)s
#         """
#         , {"source": f"{splitext(s3_object)[0]}.gz",}
#         )

#         logger.info(f"Bulk UPDATE: {cursor.rowcount}")
        
        c.commit()
except Exception as e:
    c.rollback()
    raise e