# Make user, item, and interaction datasets
This notebook creates the official Amazon Personalize dataset resources for the
car search recommendations demo.

In [76]:
import json
import boto3
import time

cars_filename         = 'car_items.csv'
users_filename        = 'users.csv'
interactions_filename = 'interactions.csv'

dataset_group_name = 'car-dg10'

schema_version = 'v10'

bucket   = '<your-bucket>'
prefix   = 'personalize/' + schema_version

MAX_WAIT_TIME = time.time() + 60*60 # 1 hour

role_arn    = '<your-role>'
account_num = '<your-account>'
CAR_INTERACTION_SCHEMA_NAME = 'car-interactions-schema-' + schema_version
CAR_INTERACTION_SCHEMA_ARN  = 'arn:aws:personalize:us-east-1:{}:schema/'.format(account_num) + \
                                CAR_INTERACTION_SCHEMA_NAME
    
CAR_ITEM_SCHEMA_NAME = 'car-items-schema-' + schema_version
CAR_ITEM_SCHEMA_ARN  = 'arn:aws:personalize:us-east-1:{}:schema/'.format(account_num) + \
                                CAR_ITEM_SCHEMA_NAME

CAR_USER_SCHEMA_NAME = 'car-users-schema-' + schema_version
CAR_USER_SCHEMA_ARN  = 'arn:aws:personalize:us-east-1:{}:schema/'.format(account_num) + \
                                CAR_USER_SCHEMA_NAME

In [77]:
personalize = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')

In [None]:
try:
    # first see if the schema is already in place
    arn = CAR_INTERACTION_SCHEMA_ARN
    response = personalize.describe_schema(schemaArn=arn)
    interactions_schema_arn = response['schema']['schemaArn']
    print(interactions_schema_arn)
except Exception as e:
    print('Schema {} did not exist, creating it...'.format(arn))
    schema = {
        "type": "record",
        "name": "Interactions",
        "namespace": "com.amazonaws.personalize.schema",
        "fields": [
            {
                "name": "USER_ID",
                "type": "string"
            },
            {
                "name": "ITEM_ID",
                "type": "string"
            },
            {
                "name": "TIMESTAMP",
                "type": "long"
            }
        ],
        "version": "1.0"
    }

    create_schema_response = personalize.create_schema(
        name   = CAR_INTERACTION_SCHEMA_NAME,
        schema = json.dumps(schema)
    )

    interactions_schema_arn = create_schema_response['schemaArn']
    print(json.dumps(create_schema_response, indent=2))

In [None]:
try:
    arn = CAR_ITEM_SCHEMA_ARN
    response = personalize.describe_schema(schemaArn=arn)
    items_schema_arn = response['schema']['schemaArn']
    print(items_schema_arn)
except Exception as e:
    print('schema not found, creating new...')
    schema = {
        "type": "record",
        "name": "Items",
        "namespace": "com.amazonaws.personalize.schema",
        "fields": [
            {
                "name": "ITEM_ID",
                "type": "string"
            },
            {
                "name": "MAKE",
                "type": "string",
                "categorical": True
            },
            {
                "name": "MODEL",
                "type": "string",
                "categorical": True
            },
            {
                "name": "YEAR",
                "type": "int"
            },
            {
                "name": "MILEAGE",
                "type": "int"
            },
            {
                "name": "PRICE",
                "type": "int"
            }
            #,
#            {
#                "name": "COLOR",
#                "type": "string",
#                "categorical": True
#            }#,  Max of 5 metadata columns is the Personalize limit for now
    #        {
    #            "name": "LOCATION",
    #            "type": "string",
    #            "categorical": True
    #        }
        ],
        "version": "1.0"
    }

    create_schema_response = personalize.create_schema(
        name   = CAR_ITEM_SCHEMA_NAME,
        schema = json.dumps(schema)
    )

    items_schema_arn = create_schema_response['schemaArn']
    print(json.dumps(create_schema_response, indent=2))

In [None]:
try:
    arn = CAR_USER_SCHEMA_ARN
    response = personalize.describe_schema(schemaArn=arn)
    users_schema_arn = response['schema']['schemaArn']
    print(users_schema_arn)
except Exception as e:
    schema = {
        "type": "record",
        "name": "Users",
        "namespace": "com.amazonaws.personalize.schema",
        "fields": [
            {
                "name": "USER_ID",
                "type": "string"
            },
            {
                "name": "AGE",
                "type": "int"
            },
            {
                "name": "GENDER",
                "type": "string",
                "categorical": True
            },
            {
                "name": "LOCATION",
                "type": "string",
                "categorical": True
            },
            {
                "name": "SALARY",
                "type": "int"
            }
        ],
        "version": "1.0"
    }

    create_schema_response = personalize.create_schema(
        name   = CAR_USER_SCHEMA_NAME,
        schema = json.dumps(schema)
    )

    users_schema_arn = create_schema_response['schemaArn']
    print(json.dumps(create_schema_response, indent=2))

In [None]:
try:
    personalize.delete_dataset(datasetArn='arn:aws:personalize:us-east-1:{}:dataset/{}/INTERACTIONS'.format(account_num, dataset_group_name))
except Exception as e:
    pass
    
try:
    personalize.delete_dataset(datasetArn='arn:aws:personalize:us-east-1:{}:dataset/{}/ITEMS'.format(account_num, dataset_group_name))
except Exception as e:
    pass

try:
    personalize.delete_dataset(datasetArn='arn:aws:personalize:us-east-1:{}:dataset/{}/USERS'.format(account_num, dataset_group_name))
except Exception as e:
    pass

try:
    personalize.delete_dataset_group(datasetGroupArn='arn:aws:personalize:us-east-1:{}:dataset-group/{}'.format(account_num, dataset_group_name))
except Exception as e:
    pass

print('Waiting for dataset group to be created...')
time.sleep(30)
        
create_dataset_group_response = personalize.create_dataset_group(
    name = dataset_group_name
)

dataset_group_arn = create_dataset_group_response['datasetGroupArn']
print(json.dumps(create_dataset_group_response, indent=2))

In [None]:
max_time = time.time() + MAX_WAIT_TIME
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn = dataset_group_arn
    )
    status = describe_dataset_group_response['datasetGroup']['status']
    print('DatasetGroup: {}'.format(status))
    
    if status == 'ACTIVE' or status == 'CREATE FAILED':
        break
        
    time.sleep(60)

In [None]:
dataset_type = 'INTERACTIONS'
create_dataset_response = personalize.create_dataset(
    name = 'car-interactions',
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = interactions_schema_arn
)

interactions_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

In [None]:
print(items_schema_arn)
print(dataset_group_arn)

In [None]:
dataset_type = 'ITEMS'
create_dataset_response = personalize.create_dataset(
    name = 'car-items',
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = items_schema_arn
)

items_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

In [None]:
dataset_type = 'USERS'
create_dataset_response = personalize.create_dataset(
    name = 'car-users',
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = users_schema_arn
)

users_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

In [None]:
create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = '{}-car-interactions-import'.format(dataset_group_name),
    datasetArn = interactions_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}/{}".format(bucket, prefix, interactions_filename)
    },
    roleArn = role_arn
)

interactions_dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

In [None]:
create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = '{}-car-items-import'.format(dataset_group_name),
    datasetArn = items_dataset_arn,
    dataSource = {
        'dataLocation': 's3://{}/{}/{}'.format(bucket, prefix, cars_filename)
    },
    roleArn = role_arn
)

items_dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

In [None]:
create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = '{}-car-users-import'.format(dataset_group_name),
    datasetArn = users_dataset_arn,
    dataSource = {
        'dataLocation': 's3://{}/{}/{}'.format(bucket, prefix, users_filename)
    },
    roleArn = role_arn
)

users_dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

In [None]:
max_time = time.time() + MAX_WAIT_TIME
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = items_dataset_import_job_arn
    )
    status = describe_dataset_import_job_response['datasetImportJob']['status']
    print('DatasetImportJob: {}'.format(status))
    
    if status == 'ACTIVE' or status == 'CREATE FAILED':
        break
        
    time.sleep(60)

In [None]:
max_time = time.time() + MAX_WAIT_TIME
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = users_dataset_import_job_arn
    )
    status = describe_dataset_import_job_response['datasetImportJob']['status']
    print('DatasetImportJob: {}'.format(status))
    
    if status == 'ACTIVE' or status == 'CREATE FAILED':
        break
        
    time.sleep(60)

In [None]:
max_time = time.time() + MAX_WAIT_TIME
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = interactions_dataset_import_job_arn
    )
    status = describe_dataset_import_job_response['datasetImportJob']['status']
    print('DatasetImportJob: {}'.format(status))
    
    if status == 'ACTIVE' or status == 'CREATE FAILED':
        break
        
    time.sleep(60)