In [1]:
import logging
import boto3
import os
import io
from botocore.config import Config
import pandas as pd
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
# pip install python-dotenv==0.21.0

In [3]:
ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID')
SECRET_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
MLFLOW_S3_ENDPOINT_URL = os.getenv('MLFLOW_S3_ENDPOINT_URL')

In [4]:
print(MLFLOW_S3_ENDPOINT_URL)

http://localhost:9000


In [5]:
def get_s3_client():
    s3 = boto3.client('s3',
                      endpoint_url=MLFLOW_S3_ENDPOINT_URL,
                      aws_access_key_id=ACCESS_KEY,
                      aws_secret_access_key=SECRET_KEY,
                      config=Config(signature_version='s3v4'))
    return s3


def get_s3_resource():
    s3_res = boto3.resource('s3',
                            endpoint_url=MLFLOW_S3_ENDPOINT_URL,
                            aws_access_key_id=ACCESS_KEY,
                            aws_secret_access_key=SECRET_KEY,
                            config=Config(signature_version='s3v4'))
    return s3_res


In [6]:
def save_df_to_s3(df, bucket, key, index=False):
    ''' Store df as a buffer, then save buffer to s3'''
    s3_res = get_s3_resource()
    try:
        csv_buffer = io.StringIO()
        df.to_csv(csv_buffer, index=False)
        s3_res.Object(bucket, key).put(Body=csv_buffer.getvalue())
        logging.info(f'{key} saved to s3 bucket {bucket}')
    except Exception as e:
        raise logging.exception(e)

In [7]:
def load_df_from_s3(bucket, key, index_col=None, usecols=None, sep=","):
    ''' Read a csv from a s3 bucket & load into pandas dataframe'''
    s3 = get_s3_client()
    try:
        logging.info(f"Loading {bucket, key}")
        obj = s3.get_object(Bucket=bucket, Key=key)
        return pd.read_csv(obj['Body'], index_col=index_col, usecols=usecols, low_memory=False, sep=sep)
    except Exception as e:
        raise logging.exception(e)


In [8]:
def create_bucket(bucket_name: str):
    s3_res = get_s3_resource()
    try:
        s3_res.create_bucket(Bucket=bucket_name)
        logging.info(f"{bucket_name} created.")
    except Exception as e:
        print(e)

In [9]:
! mkdir  ~/datasets

mkdir: cannot create directory ‘/home/train/datasets’: File exists


In [10]:
! curl -o ~/datasets/Advertising.csv https://raw.githubusercontent.com/erkansirin78/datasets/master/Advertising.csv



  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  4556  100  4556    0     0  11590      0 --:--:-- --:--:-- --:--:-- 11622


In [11]:
! ls -l /home/train/datasets/

total 8
-rw-rw-r--. 1 train train 4556 Dec 11 12:35 Advertising.csv


In [12]:
df = pd.read_csv("/home/train/datasets/Advertising.csv")

In [13]:
df.head()

Unnamed: 0,ID,TV,Radio,Newspaper,Sales
0,1,230.1,37.8,69.2,22.1
1,2,44.5,39.3,45.1,10.4
2,3,17.2,45.9,69.3,9.3
3,4,151.5,41.3,58.5,18.5
4,5,180.8,10.8,58.4,12.9


In [14]:
create_bucket("mlops-test")

In [15]:
save_df_to_s3(df=df, bucket='mlops-test', key='Advertising_kdkjdfkfd.csv')

In [16]:
df_from_s3 = load_df_from_s3( bucket='mlops-test', key='Advertising_kdkjdfkfd.csv')

In [17]:
df_from_s3.head()

Unnamed: 0,ID,TV,Radio,Newspaper,Sales
0,1,230.1,37.8,69.2,22.1
1,2,44.5,39.3,45.1,10.4
2,3,17.2,45.9,69.3,9.3
3,4,151.5,41.3,58.5,18.5
4,5,180.8,10.8,58.4,12.9
