In [1]:
import os
import re

import awswrangler as wr
import boto3
import pandas as pd

In [2]:
def arguments(*segments, defaults=None, **kwargs):
    path = os.path.join(*segments)
    if not path.startswith('s3://'):
        path = f's3://{path}'

    defaults = defaults or {}
    if 'boto3_session' not in kwargs and 'profile_name' in kwargs:
        profile_name = kwargs.pop('profile_name')
        defaults['boto3_session'] = boto3.Session(profile_name=profile_name)
    defaults.update(kwargs)

    return path, defaults

In [3]:
def read(path, **kwargs):
    defaults = {
        'ignore_index': False,
        'dataset': True,
    }
    parse_dates = kwargs.pop('parse_dates', [])
    path, kwargs = arguments(path, defaults=defaults, **kwargs)
    data = wr.s3.read_parquet(path, **kwargs)
    for column in parse_dates:
        data[column] = pd.to_datetime(data[column])
    return data

In [4]:
def read_partition(path, date, **kwargs):
    partition = pd.to_datetime(date).strftime('date=%Y-%m-%d %H:%M:%S')
    path, kwargs = arguments(path, partition, **kwargs)
    return read(path, **kwargs)

In [5]:
def get_latest_partition(prefix, partition_key='date', **kwargs):
    path, kwargs = arguments(prefix, **kwargs)
    xs = wr.s3.list_directories(path, **kwargs)

    def key(x):
        return pd.to_datetime(re.search(f'{partition_key}=([^/]+)', x).group(1))

    ys = sorted(xs, key=key)
    if ys:
        return ys[-1]

In [6]:
def write(data, path, **kwargs):
    defaults = {
        'index': True,
        'compression': 'gzip',
        'dataset': True,
        'use_threads': True,
    }
    path, kwargs = arguments(path, defaults=defaults, **kwargs)
    # Partition by date by default.
    if 'partition_cols' not in kwargs:
        dates = ['date']  # ['date', 'datetime', 'time']
        cols = [c for c in dates if c in data.columns][:1]
        if cols:
            kwargs['partition_cols'] = cols
    return wr.s3.to_parquet(data, path, **kwargs)

#### Demonstration

In [None]:
# Get latest partition.

path = get_latest_partition('bucketname/directory/', profile_name='profile')
latest = read(path, profile_name='profile', parse_dates=['date'])

In [None]:
# Write a new day (data taken from) yesterday.

from datetime import timedelta
latest['date'] = latest['date'][0] + timedelta(days=1)

write(latest, 'bucketname/directory/', profile_name='profile')