#### S3 Upload
- 정제되 파일 읽기 및 확인
- Personalize가 사용할 S3 Bucket 생성, Policy 적용
- record가 너무 많으며, 학습속도에 문제가 있으므로 10 만 건으로 샘플링. 단. event_cnt가 최소 3개 되도록 설정 (성능을 위해서)
- 샘플 데이터에서 evnet_cnt drop 후 s3에 uplaod

In [1]:
import pandas as pd
import datetime
import numpy as np
pd.set_option('display.width', 1000)

In [2]:
interactions = pd.read_csv('./data/cleaned_user_interactions.csv')
print(interactions.shape)
interactions.head()

(10505424, 4)


Unnamed: 0,USER_ID,ITEM_ID,TIMESTAMP,EVENT_TYPE
0,533326659,1005014,1667260814,cart
1,565865924,1005115,1667261019,cart
2,549256216,1002542,1667261154,cart
3,549256216,1002542,1667261242,cart
4,522355747,4804056,1667261445,cart


In [3]:
# dataframe 내에 colname 이 multi_cnt 이상 df 반환
def filter_df(df, colname='USER_ID',  multi_cnt=2):
    # colname user_id 인 경우  아래 조건을 맞족한는 Series의 index 가 user_id
    is_multi = df[colname].value_counts() > 1
    # Series boolean index를 이용
    multi_index = is_multi[is_multi].index
    filtered = df[df[colname].isin(multi_index)]
    return filtered, multi_index

In [4]:
insteractins_sample = interactions.sample(n=150000)
filterd_sample, _ = filter_df(insteractins_sample)

In [5]:
filterd_sample['USER_ID'].value_counts()

568782581    30
512409624    19
582826305    18
572994775    17
549109608    17
             ..
523330463     2
557987857     2
512372678     2
560800107     2
549495088     2
Name: USER_ID, Length: 13236, dtype: int64

In [6]:
insteractins_sample.isnull().sum()

USER_ID       0
ITEM_ID       0
TIMESTAMP     0
EVENT_TYPE    0
dtype: int64

In [7]:
# insteractions sampling data을 파일로 저장
insteractins_sample.to_csv('./data/interactions-sample.csv', index=False)

In [8]:
# read sampled interactions file
sampled_intreactions = pd.read_csv('./data/interactions-sample.csv')
sampled_intreactions.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 150000 entries, 0 to 149999
Data columns (total 4 columns):
 #   Column      Non-Null Count   Dtype 
---  ------      --------------   ----- 
 0   USER_ID     150000 non-null  int64 
 1   ITEM_ID     150000 non-null  int64 
 2   TIMESTAMP   150000 non-null  int64 
 3   EVENT_TYPE  150000 non-null  object
dtypes: int64(3), object(1)
memory usage: 4.6+ MB


#### Create Bucket and policy

In [9]:
import boto3
from botocore.exceptions import ClientError
import numpy as np
import string
import random
import json
import time

#### Configure S3 bucket and an IAM Role

In [10]:
random.seed(2023)
suffix = ''.join(random.choice(string.ascii_lowercase) for i in range(5))
# bucket  생성
bucket_name = "osungmart-personalize-" + suffix
region = 'ap-northeast-2'
# print(bucket_name)
s3 = boto3.client("s3")
try:
  create_bucket_resp = s3.create_bucket(
    Bucket = bucket_name,
    CreateBucketConfiguration = {
      'LocationConstraint' : region
    },
  )
except ClientError as err:
  if err.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
    print(f"Buket {bucket_name} alread exist")
  else:
    raise

Buket osungmart-personalize-mwomk alread exist


Upload data to S3

In [11]:
data_dir = 'data'
interactions_filename = data_dir + '/interactions-sample.csv'
try: 
  boto3.Session().resource('s3').Bucket(bucket_name).Object(interactions_filename).upload_file(interactions_filename)
except ClientError as e:
  print(e)

#### Dataset Group and Dataset

In [12]:
personalize = boto3.client('personalize')

In [16]:
# Create Dataset Group
dataset_group_name = "osungmart-dataset-group" + suffix

try:
  create_dataset_group_resp = personalize.create_dataset_group(
    name = dataset_group_name
  )
  dataset_group_arn = create_dataset_group_resp['datasetGroupArn']
  print(json.dumps(create_dataset_group_resp, indent=2))
except ClientError as e:
  if e.response["Error"]["Code"] == "ResourceAlreadyExistsException":
    print(f"dataset group: {dataset_group_name} arleady exist")
  else:
    raise




dataset group: osungmart-dataset-groupmwomk arleady exist


In [27]:
# 이미 dataset group이 생성되어 있을 때, 조회
list_dataset_group_resp = personalize.list_dataset_groups(
  maxResults=10
)

for dataset_grp in list_dataset_group_resp['datasetGroups']:
  if dataset_grp['name'] == dataset_group_name:
    dataset_group_arn = dataset_grp['datasetGroupArn']
    print("dataset_group_arn: {}".format(dataset_grp['datasetGroupArn']) )

datasetGroupArn: arn:aws:personalize:ap-northeast-2:532805286864:dataset-group/osungmart-dataset-groupmwomk


In [28]:
# dataset group 생성 모니터링
status = None
max_time = time.time()* 3*60*60 # 3h
while time.time() < max_time:
  describe_dataset_group_resp = personalize.describe_dataset_group(
    datasetGroupArn = dataset_group_arn
  )
  status = describe_dataset_group_resp['datasetGroup']['status']
  print("DatasetGroup: {}".format(status))

  if status == "ACTIVE" or status == 'CREATE FAILED':
    break

  time.sleep(10)

DatasetGroup: ACTIVE


#### dataset 생성
interactions dataset

In [29]:
schema_name = "osungmart-interactions-schema"

In [28]:
# delete schema
# personalize.delete_schema(
#   schemaArn = "arn:aws:personalize:ap-northeast-2:532805286864:schema/osungmart-interactions-schema"
# )

{'ResponseMetadata': {'RequestId': '888198d6-a6de-4a83-85d4-f98de391122e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 02 Feb 2023 07:59:30 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'connection': 'keep-alive',
   'x-amzn-requestid': '888198d6-a6de-4a83-85d4-f98de391122e'},
  'RetryAttempts': 0}}

In [31]:
# schema 생성
# USER_ID,ITEM_ID,TIMESTAMP,EVENT_TYPE


schema = {
  "type": "record",
  "name": "Interactions",
  "namespace": "com.amazonaws.personalize.schema",
  "fields": [
    {
      "name": "ITEM_ID",
      "type": "string"
    },
    {
      "name": "USER_ID",
      "type": "string"
    },
    {
      "name": "TIMESTAMP",
      "type": "long"
    },
    {
      "name": "EVENT_TYPE",
      "type": "string"
    }
  ]
}
try: 
  create_schema_resp = personalize.create_schema(
    name = schema_name,
    schema = json.dumps(schema)
  )

  schema_arn = create_schema_resp['schemaArn']
  print(json.dumps(create_schema_resp, indent=2))
except ClientError as e:
  if e.response["Error"]["Code"] == "ResourceAlreadyExistsException":
    print(f"dataset schema: {schema_name} arleady exist")
  else:
    raise

dataset schema: osungmart-interactions-schema arleady exist


In [33]:
# 이미 dataset schema이 생성되어 있을 때, 조회
list_schemas_resp = personalize.list_schemas(
  maxResults=10
)

for schema in list_schemas_resp['schemas']:
  if schema['name'] == schema_name:
    schema_arn = schema['schemaArn']
    print("schema_arn: {}".format(schema_arn) )

schema_arn: arn:aws:personalize:ap-northeast-2:532805286864:schema/osungmart-interactions-schema


In [40]:
# create dataset
dataset_type = "INTERACTIONS"
dataset_name = "osugnmart-dataset-interactions"
try:
  create_dataset_resp = personalize.create_dataset(
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = schema_arn,
    name = dataset_name
  )

  interactions_dataset_arn = create_dataset_resp['datasetArn']
  print(json.dumps(create_dataset_resp, indent=2))
except ClientError as e:
  if e.response["Error"]["Code"] == "ResourceAlreadyExistsException":
    print(f"dataset : {dataset_name} arleady exist")
    list_datasets_resp = personalize.list_datasets(
      datasetGroupArn = dataset_group_arn,
      maxResults = 10
    )

    for dataset in list_datasets_resp['datasets']:
      if dataset['name'] == dataset_name:
        interactions_dataset_arn = dataset['datasetArn']
        print(f"dataet_arn: {interactions_dataset_arn}")
  else:
    raise

dataset : osugnmart-dataset-interactions arleady exist
dataet_arn: arn:aws:personalize:ap-northeast-2:532805286864:dataset/osungmart-dataset-groupmwomk/INTERACTIONS


#### S3 Bucket policy 설정
peronalize service가 s3 bucket에 접근할 수 있도록 bucket permission 설정

In [37]:
policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com",
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket_name),
                "arn:aws:s3:::{}/*".format(bucket_name)
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy))

{'ResponseMetadata': {'RequestId': 'AJBC8RMHWNQCDR8X',
  'HostId': '3LsgbugFjjJtI7xKLavkVtbe8plVk0bWCv2PULTEds0Mznea4PB2Kv5s/cq3jKGOMLk5KMXBQF4=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': '3LsgbugFjjJtI7xKLavkVtbe8plVk0bWCv2PULTEds0Mznea4PB2Kv5s/cq3jKGOMLk5KMXBQF4=',
   'x-amz-request-id': 'AJBC8RMHWNQCDR8X',
   'date': 'Fri, 03 Feb 2023 01:11:56 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}

#### IAM Role 생성

In [38]:
iam = boto3.client("iam")

role_name = "PersonalizeS3-Role" + suffix
assume_role_policy_document = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "personalize.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

try:
  create_role_resp = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
  )

  iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
  )

  role_arn = create_role_resp['Role']['Arn']
except ClientError as e:
  if e.response['Error']['Code'] == 'EntityAlreadyExists':
    print("this Role already exist")
    role_arn = iam.get_role(RoleName=role_name)['Role']['Arn']
  else:
    raise

time.sleep(30)
print(role_arn)

this Role already exist
arn:aws:iam::532805286864:role/PersonalizeS3-Rolemwomk


In [30]:
# dataset_group_arn = "arn:aws:personalize:ap-northeast-2:532805286864:dataset-group/osungmart-dataset-groupmwomk"
# bucket_name = "osungmart-personalize-mwomk"
# interactions_filename ="data/interactions-sample.csv"
# role_arn = "arn:aws:iam::532805286864:role/PersonalizeS3-Rolemwomk"
# print(dataset_group_arn)

#### Dataset import jobs 생성
upload s3 data to dataset

In [42]:
job_name = "osungmart-dataset-import-job-interactions"

In [46]:
# import the interactions data: import Job은 독립적으로 삭제 불가
try:
  create_dataset_import_job_resp = personalize.create_dataset_import_job(
    jobName = job_name,
    datasetArn = interactions_dataset_arn,
    dataSource = {
      "dataLocation": "s3://{}/{}".format(bucket_name, interactions_filename)
    },
    roleArn = role_arn
  )

  dataset_import_job_arn = create_dataset_import_job_resp['datasetImportJobArn']
  print(json.dumps(create_dataset_import_job_resp, indent=2))
except ClientError as e:
  if e.response["Error"]["Code"] == "ResourceAlreadyExistsException":
    print(f"this import Job - {job_name} - arleady exists")
    list_dataset_import_jobs_resp = personalize.list_dataset_import_jobs(
      datasetArn = interactions_dataset_arn,
      maxResults = 10
    )

    for jobs in list_dataset_import_jobs_resp['datasetImportJobs']:
      if jobs['jobName'] == job_name:
        dataset_import_job_arn = jobs['datasetImportJobArn']
        print(f"ImportJobs Arn: {dataset_import_job_arn}")

this import Job - osungmart-dataset-import-job-interactions - arleady exists
ImportJobs Arn: arn:aws:personalize:ap-northeast-2:532805286864:dataset-import-job/osungmart-dataset-import-job-interactions


In [47]:
# import jobs 생성 모니터링
status = None
max_time = time.time()* 3*60*60 # 3h
while time.time() < max_time:
  describe_dataset_import_resp = personalize.describe_dataset_import_job(
    datasetImportJobArn = dataset_import_job_arn
)
  status = describe_dataset_import_resp['datasetImportJob']['status']
  print("ImportJobs: {}".format(status))

  if status == "ACTIVE" or status == 'CREATE FAILED':
    break

  time.sleep(10)

ImportJobs: ACTIVE
