In [48]:
%load_ext lab_black
import os
import pandas as pd
from dotenv import load_dotenv
import boto3
import io

The lab_black extension is already loaded. To reload it, use:
  %reload_ext lab_black


In [49]:

class S3FileManager(object):

    def __init__(self, bucket: str = 'meiyume-datawarehouse-prod'):
        """__init__ initializes S3FileManager instance with given data bucket.

        Args:
            bucket (str, optional): The S3 bucket from/to which files will be read/downloaded/uploaded.
                                    Defaults to 'meiyume-datawarehouse-prod'.

        """
        self.bucket = bucket

    def get_matching_s3_objects(self, prefix: str = "", suffix: str = ""):
        s3 = boto3.client("s3")
        paginator = s3.get_paginator("list_objects_v2")

        kwargs = {'Bucket': self.bucket}

        # We can pass the prefix directly to the S3 API.  If the user has passed
        # a tuple or list of prefixes, we go through them one by one.
        if isinstance(prefix, str):
            prefixes = (prefix, )
        else:
            prefixes = prefix

        for key_prefix in prefixes:
            kwargs["Prefix"] = key_prefix

            for page in paginator.paginate(**kwargs):
                try:
                    contents = page["Contents"]
                except KeyError:
                    break

                for obj in contents:
                    key = obj["Key"]
                    if key.endswith(suffix):
                        yield obj

    def get_matching_s3_keys(self, prefix: str = "", suffix: str = ""):
        for obj in self.get_matching_s3_objects(prefix, suffix):
            yield obj  # obj["Key"]

    def get_last_modified_s3(self, key: str) -> dict:
        s3 = boto3.resource('s3')
        k = s3.Bucket(self.bucket).Object(key)  # pylint: disable=no-member
        return {'key_name': k.key, 'key_last_modified': str(k.last_modified)}

    def get_prefix_s3(self, job_name: str) -> str:
        upload_jobs = {
            'source_meta': 'Feeds/BeautyTrendEngine/Source_Meta/Staging/',
            'meta_detail': 'Feeds/BeautyTrendEngine/Meta_Detail/Staging/',
            'item': 'Feeds/BeautyTrendEngine/Item/Staging/',
            'ingredient': 'Feeds/BeautyTrendEngine/Ingredient/Staging/',
            'review': 'Feeds/BeautyTrendEngine/Review/Staging/',
            'review_summary': 'Feeds/BeautyTrendEngine/Review_Summary/Staging/',
            'image': 'Feeds/BeautyTrendEngine/Image/Staging/',
            'cleaned_pre_algorithm': 'Feeds/BeautyTrendEngine/CleanedData/PreAlgorithm/',
            'webapp': 'Feeds/BeautyTrendEngine/WebAppData/',
            'webapp_test': 'Feeds/BeautyTrendEngine/WebAppDevelopmentData/Test/'
        }

        try:
            return upload_jobs[job_name]
        except Exception as ex:
            raise MeiyumeException(
                'Unrecognizable job. Please input correct job_name.')

    def push_file_s3(self, file_path: Union[str, Path], job_name: str) -> None:
        # cls.make_manager()
        file_name = str(file_path).split("\\")[-1]

        prefix = self.get_prefix_s3(job_name)
        object_name = prefix+file_name
        # try:
        s3_client = boto3.client('s3')
        try:
            s3_client.upload_file(str(file_path), self.bucket, object_name)
            print('file pushed successfully.')
        except Exception:
            print('file pushing task failed.')

    def pull_file_s3(self, key: str, file_path: Path = Path.cwd()) -> None:
        s3 = boto3.resource('s3')
        file_name = str(key).split('/')[-1]
        s3.Bucket(self.bucket).download_file(  # pylint: disable=no-member
            key, f'{file_path}/{file_name}')

    def read_data_to_dataframe_s3(self, key: str, file_type: str) -> pd.DataFrame:
        s3 = boto3.client('s3')
        obj = s3.get_object(Bucket=self.bucket, Key=key)

        try:
            if file_type == 'csv':
                return pd.read_csv(io.BytesIO(obj['Body'].read()), sep='~')
            elif file_type == 'feather':
                return pd.read_feather(io.BytesIO(obj['Body'].read()))
            elif file_type == 'pickle':
                return pd.read_pickle(io.BytesIO(obj['Body'].read()))
        except Exception as ex:
            raise MeiyumeException('Provide correct file key and file type.')

    def read_feather_s3(self, key: str) -> pd.DataFrame:
        s3 = boto3.client('s3')
        obj = s3.get_object(Bucket=self.bucket, Key=key)
        df = pd.read_feather(io.BytesIO(obj['Body'].read()))
        return df

    def read_csv_s3(self, key: str) -> pd.DataFrame:
        s3 = boto3.client('s3')
        obj = s3.get_object(Bucket=self.bucket, Key=key)
        df = pd.read_csv(io.BytesIO(obj['Body'].read()), sep='~')
        return df

    def delete_file_s3(self, key: str) -> None:
        s3 = boto3.resource('s3')
        try:
            s3.Object(self.bucket, key).delete()  # pylint: disable=no-member
            print('file deleted.')
        except Exception:
            print('delete operation failed')


NameError: name 'Union' is not defined

In [13]:
file_manager = S3FileManager()
s3_keys = file_manager.get_matching_s3_keys(
    prefix="Feeds/BeautyTrendEngine/WebAppData/"
)
s3_keys = list(s3_keys)
len(s3_keys)

27

In [26]:
s3_keys[0]

{'Key': 'Feeds/BeautyTrendEngine/WebAppData/category_page_distinct_brands_products',
 'LastModified': datetime.datetime(2021, 1, 19, 8, 47, 8, tzinfo=tzutc()),
 'ETag': '"3126e3b9d6db656b32d7b1d8b230aff7"',
 'Size': 14410,
 'StorageClass': 'STANDARD'}

In [25]:
s3_keys_name_list = [x["Key"].split("/")[-1] for x in s3_keys]
s3_keys_name_list

['category_page_distinct_brands_products',
 'category_page_item_package_oz',
 'category_page_item_variations_price',
 'category_page_new_ingredients',
 'category_page_new_products_count',
 'category_page_new_products_details',
 'category_page_pricing_data',
 'category_page_reviews_by_user_attributes',
 'category_page_top_products',
 'ing_page_ing_data',
 'landing_page_data',
 'meta_product_launch_intensity_category_month',
 'meta_product_launch_trend_category_month',
 'meta_product_launch_trend_product_type_month',
 'new_ingredient_trend_category_month',
 'new_ingredient_trend_product_type_month',
 'prod_page_ing_data',
 'prod_page_item_data',
 'prod_page_product_review_summary',
 'prod_page_review_sentiment_influence',
 'prod_page_review_talking_points',
 'prod_page_reviews_attribute',
 'product_page_metadetail_data',
 'review_trend_by_marketing_category_month',
 'review_trend_by_marketing_product_type_month',
 'review_trend_category_month',
 'review_trend_product_type_month']

In [21]:
load_dotenv()
S3_BUCKET = "meiyume-datawarehouse-prod"
S3_PREFIX = "Feeds/BeautyTrendEngine"
S3_REGION = "ap-southeast-1"
AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")


def get_s3_client(region: str, access_key_id: str, secret_access_key: str):
    if region == "":  # or access_key_id == '' or secret_access_key == '':
        print("*ERROR: S3 client information not set*")
        return sys.exit(1)
    else:
        try:
            client = boto3.client(
                "s3",
                region,
                aws_access_key_id=access_key_id,
                aws_secret_access_key=secret_access_key,
            )
        except Exception as ex:
            client = boto3.client("s3")
    return client


def read_file_s3(
    filename: str,
    # f"{S3_PREFIX}/WebAppData",
    # "Feeds/BeautyTrendEngine/WebAppDevelopmentData/Test"
    prefix: str = f"{S3_PREFIX}/WebAppData",
    bucket: str = S3_BUCKET,
    file_type: str = "feather",
) -> pd.DataFrame:
    key = prefix + "/" + filename
    s3 = get_s3_client(S3_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
    obj = s3.get_object(Bucket=bucket, Key=key)
    if file_type == "feather":
        df = pd.read_feather(io.BytesIO(obj["Body"].read()))
    elif file_type == "pickle":
        df = pd.read_pickle(io.BytesIO(obj["Body"].read()))
    return df

In [36]:
category_page_distinct_brands_products_df = read_file_s3(
    filename="category_page_distinct_brands_products", file_type="feather"
)
json_response = category_page_distinct_brands_products_df.to_json(orient="records")

In [31]:
# category_page_distinct_brands_products_df.to_json(orient="records")
# category_page_distinct_brands_products_df.to_json(
#     "temp.json",
#     orient="records",
# )

In [37]:
# @app.get("/data/{s3_key}")
def get_data_by_s3_bucket_key(s3_key):
    df = read_file_s3(filename=s3_key, file_type="feather")
    json_response = df.to_json(orient="records")
    json_response = json.loads(json_response)
    return json_response

In [47]:
for k in s3_keys_name_list:
    print(f"@app.get('/data/{k}')")
    print(f"def get_data_by_s3_key_{k}():")
    print(f"    df = read_file_s3(filename={k}, file_type='feather')")
    print(f"    json_response = df.to_json(orient='records')")
    print(f"    json_response = json.loads(json_response)")
    print(f"    return json_response")
    print()

@app.get('/data/category_page_distinct_brands_products')
def get_data_by_s3_key_category_page_distinct_brands_products():
    df = read_file_s3(filename=category_page_distinct_brands_products, file_type='feather')
    json_response = df.to_json(orient='records')
    json_response = json.loads(json_response)
    return json_response

@app.get('/data/category_page_item_package_oz')
def get_data_by_s3_key_category_page_item_package_oz():
    df = read_file_s3(filename=category_page_item_package_oz, file_type='feather')
    json_response = df.to_json(orient='records')
    json_response = json.loads(json_response)
    return json_response

@app.get('/data/category_page_item_variations_price')
def get_data_by_s3_key_category_page_item_variations_price():
    df = read_file_s3(filename=category_page_item_variations_price, file_type='feather')
    json_response = df.to_json(orient='records')
    json_response = json.loads(json_response)
    return json_response

@app.get('/data/category_page_n