## Check Notebook configuration and Neptune cluster Status

In [None]:
%graph_notebook_version

In [None]:
%%graph_notebook_config
{
  "host": "neptune-cluster-poc-identity-graph.cluster-c4k0tumhelmt.us-east-2.neptune.amazonaws.com",
  "port": 8182,
  "auth_mode": "IAM",
  "load_from_s3_arn": "",
  "ssl": true,
  "aws_region": "us-east-2",
  "sparql": {
    "path": "sparql"
  },
  "gremlin": {
    "traversal_source": "g"
  }
}

In [None]:
%graph_notebook_config

In [None]:
%status

## Creating the Identity Graph

The below steps simulate the process of seeding the identity graph with exported data from source data stores.

It shows an example of how data from relational data sources (e.g. first-party customer data and transactional databases) as well as first-party behavioral data (e.i. cookies, device IDs and clickstream session) will be made available on CSV files uploaded to an Amazon S3 bucket.

Then, an AWS Glue Crawler will discover the schema and store it as metadata into a centralized Data Catalog. An AWS Glue  ETL Job will pull the data from Amazon S3 and prepares it for initial load. Finally, the initial data load uses the Neptune bulk loader

### 1. Download the PoC's artifacts from S3 bucket

In [None]:
%%bash

AWS_ACCOUNT_ID="733157031621"
BUCKET_NAME="poc-identity-graph-${AWS_ACCOUNT_ID}"
cd /home/ec2-user/SageMaker
aws s3 sync s3://$BUCKET_NAME/scripts/ utils/
aws s3 sync s3://$BUCKET_NAME/images/ images/

### 2. Generate raw data for source datasets
On this step we are simulating the replication tasks scheduled through AWS Database Migration Service (DMS) to pull relevant data from Relational and Non-relational source systems into an S3 bucket. See architecture diagram below

<img src="../images/data-ingestion-bash-load-01.png" alt="data-ingestion-bash-load-01" width="1024"/>

Mock data for the below source datasets will be generated

* First-party database (e.g. CRM)
* Transactional database (e.g. purchases)
* Cookie database
* Click Stream database

Resulting CSV files with raw data per each dataset

* first-party: `first_party_data_full.csv`
* cookie: `cookie_data_full.csv`
* clickstream: `clickstream_data_full.csv`
* transactional: `transactional_data_full.csv`

Execution time for this process will depends on the compute resources available in your compute environment. Below are some references captured by running the script in different EC2 instance types and sizes

* A volume of 10K first-party records could take 3-4 minutes when using a t3.large EC2 instance
* A volume of 20K first-party records could take 10-11 minutes when using a t3.large EC2 instance
* A volume of 100K first-party records could take about 85 minutes when using a m6i.4xlarge EC2 instance

Additional parameters that can be optionaly used when generating raw data for source datasets:

```sh
usage: create-source-datasets.py [-h] [--records RECORDS]
                                 [--uniqueness UNIQUENESS]
                                 [--incremental {0,1}] [--debug {0,1}]

optional arguments:
  -h, --help            show this help message and exit
  --records RECORDS     Amount of mock data records to generate. Default value
                        is 10000
  --uniqueness UNIQUENESS
                        Uniqueness percentage of mock data generated for IPv4
                        addresses and Device IDs. Default value is 30
  --incremental {0,1}   Switch filenames suffix from "_data_full" to
                        "_data_inc". Suffix is used for output CSV files
                        indicating mock data generated will be used for
                        initial load or incremental loads into the graph.
                        Default value is 0
  --debug {0,1}         Turn On/Off debugging (detailed output with muck data
                        generated). Default value is 0
```

In [None]:
%%bash

pip install Faker >/dev/null 2>&1
cd /home/ec2-user/SageMaker
python3 utils/create-source-datasets.py --records 10000 --uniqueness 50

### 3. Visualizing raw data from source datasets

In [None]:
%%bash

cd /home/ec2-user/SageMaker
python3 utils/show-source-dataset-record.py \
    --csv-file /home/ec2-user/SageMaker/clickstream_data_full.csv \
    --dataset clickstream

### 4. Upload source datasets files to S3 bucket
On this step we are simulating the replication tasks scheduled through AWS Database Migration Service (DMS) to store data pulled from Relational and Non-relational source systems into an S3 bucket. See architecture diagram below

<img src="../images/data-ingestion-bash-load-02.png" alt="data-ingestion-bash-load-02" width="1024"/>

CSV files with raw data for initial load into the graph will be stored into the `datasets/sources/initial` S3 prefix.

In [None]:
%%bash

AWS_ACCOUNT_ID="733157031621"
BUCKET_NAME="poc-identity-graph-${AWS_ACCOUNT_ID}"
PREFIX="datasets/sources/initial"
cd /home/ec2-user/SageMaker
aws s3 cp first_party_data_full.csv s3://$BUCKET_NAME/$PREFIX/first_party/
aws s3 cp cookie_data_full.csv s3://$BUCKET_NAME/$PREFIX/cookie/
aws s3 cp clickstream_data_full.csv s3://$BUCKET_NAME/$PREFIX/clickstream/
aws s3 cp transactional_data_full.csv s3://$BUCKET_NAME/$PREFIX/transactional/
echo ""
aws s3 ls s3://$BUCKET_NAME/$PREFIX/ --recursive

#### 4.1 Optionally, use the pre-generated datasets available on the S3 bucket (100K first-party records)

In [None]:
%%bash

AWS_ACCOUNT_ID="733157031621"
BUCKET_NAME="poc-identity-graph-${AWS_ACCOUNT_ID}"
PREFIX_1="datasets/100K_sample"
PREFIX_2="datasets/sources/initial"
aws s3 sync s3://$BUCKET_NAME/$PREFIX_1/ s3://$BUCKET_NAME/$PREFIX_2/
echo ""
aws s3 ls s3://$BUCKET_NAME/$PREFIX_1/ --recursive
echo ""
aws s3 ls s3://$BUCKET_NAME/$PREFIX_2/ --recursive

### 5. Manually run the Glue Crawler `source-datasets-crawler-poc-identity-graph`

The crawler `source-datasets-crawler-poc-identity-graph` is in charge for scanning raw CSV files stored in S3 path `s3://poc-identity-graph-733157031621/datasets/sources/initial/`, discover schema of these source datasets and stored it into the centralized Glue Data Catalog. A glue table definition is created per each source CSV file into the Glue database `database_poc_identity_graph`.

In [None]:
%%bash

REGION="us-east-2"
aws glue start-crawler \
--name source-datasets-crawler-poc-identity-graph \
--region $REGION

### 6. Manually run the Glue Job `nodes-initial-load-s3-to-s3-poc-identity-graph`

The Glue ETL job `nodes-initial-load-s3-to-s3-poc-identity-graph` create gremlin-formated CSV files related to Property Graph vertices (a.k.a. entities). It extracts data from Glue tables in Glue database `database_poc_identity_graph` (a.k.a. source datasets), apply simple transformations to schema and data like de-duplication and removes nulls, and generates gremlin-formated CSV files to ingest initial data into the identity graph using the Neptune bulk loader

In [None]:
%%bash

REGION="us-east-2"
aws glue start-job-run \
--job-name nodes-initial-load-s3-to-s3-poc-identity-graph \
--region $REGION

### 7. Manually run the Glue Job `edges-initial-load-s3-to-s3-poc-identity-graph`

The Glue ETL job `edges-initial-load-s3-to-s3-poc-identity-graph` create gremlin-formated CSV files related to Property Graph edges (a.k.a. relationships). It extracts data from Glue tables in Glue database `database_poc_identity_graph` (a.k.a. source datasets), apply simple transformations to schema and data like de-duplication and removes nulls, and generates gremlin-formated CSV files to ingest initial data into the identity graph using the Neptune bulk loader

In [None]:
%%bash

REGION="us-east-2"
aws glue start-job-run \
--job-name edges-initial-load-s3-to-s3-poc-identity-graph \
--region $REGION

### 8. If necessary, remove existent data in the identity graph

In [None]:
%db_reset

### 9. Load initial data into the Identity Graph

On steps 5, 6 and 7 we executing the AWS Glue steps needed to discover schema of source datasets as well as processing and prepare these datasets for the initial data load. See architecture diagram below

<img src="../images/data-ingestion-bash-load-03.png" alt="data-ingestion-bash-load-03" width="1024"/>

Now let's initiate a bulk load using the Neptune bulk loader

In [None]:
%load

#### Get IDs of bulk load jobs

In [None]:
%load_ids

#### Get the status of a provided bulk load job ID

In [None]:
%load_status 319ff0de-27be-4ec5-9688-2ccba18f3ecd

## Exploring the Identity Graph
In order to better understand the data model and schema for the graph database we can execute the below graph queries to identify common entities like nodes/vertices and relationships/edges

### Amount of nodes/vertices group by label

In [None]:
%%gremlin

g.V().groupCount().by(label).unfold()

### Amount of relationships/edges group by label

In [None]:
%%gremlin

g.E().groupCount().by(label).unfold()

### Top 10 Device IDs by amount of known users linked

In [None]:
%%gremlin

g.V().hasLabel('DeviceID').
    project('device_id','known_users_linked').
        by(id).
        by(in('usedDevice').out('loggedAs').count()).
    order().
        by('known_users_linked',desc).
    limit(10)

### Top 10 Client IPs by amount of known users linked

In [None]:
%%gremlin

g.V().hasLabel('ClientIP').
    project('client_ip','known_users_linked').
        by(id).
        by(in('lastSeenAtIP').out('loggedAs').count()).
    order().
        by('known_users_linked',desc).
    limit(10)

### Top 10 Client IPs by amount of anonymous session linked

In [None]:
%%gremlin

g.V().hasLabel('ClientIP').
    project('client_ip','anonymous_sessions_linked','known_users_linked').
        by(id).
        by(in('lastSeenAtIP').not(out('loggedAs')).count()).
        by(in('lastSeenAtIP').out('loggedAs').count()).
    order().
        by('anonymous_sessions_linked',desc).
    limit(10)

### Anonymous sessions related to a given Client IP

In [None]:
%%gremlin -p inv,outv

g.V('198.43.127.104').
    in('lastSeenAtIP').
    not(out('loggedAs')).
    path().
    by()

### Top 10 Client's External ID by amount of product purchases

In [None]:
%%gremlin

g.V().hasLabel('CustomerID').
    project('external_id','amount_of_purchases').
        by(id).
        by(in('hasExternalId').out().out('hasPurchased').count()).
    order().
        by('amount_of_purchases',desc).
    limit(10)

## Identity Resolution Use Cases

These use cases are part of online data consumption processes to deploy through a set of HTTP APIs to query the identity graph. The code logic to issue the below graph queries is implemented by different Lambda functions while the publishing and access to each API is managed by Amazon API Gateway. See architecture diagram below

<img src="../images/data-consumption-online-query-02.png" alt="data-consumption-online-query-02" width="1024"/>

### Find out information about user interests based on the activity of the user across all devices
Suppose you are hosting a web platform and collecting clickstream data as users browse your site or use your mobile app. In the majority of situations, users using your platform will be anonymous (or non-registered or logged in users). However, these anonymous users may be linked to other known users in that have used our platform before. We can join (or resolve) the identity of the anonymous user with attributes we know about existing users to make some assumptions (based off of known user behavior and heuristics) in order to know more about this anonymous user. We can then use this information to target the user with advertising, special offers, discounts, etc

Let's use an example where we have an anonymous user interaction through AnyCompany Marketplace website (e.i. web session). The user's interaction is registered as session id **'bf4f32b7-41a5-49f9-898d-a494d376364a'**. We want to know more about this user and if it is linked to other users on our platform. This anonymous user is considered a **"transient ID"** in our graph data model. Assuming this user does not have a link to a known user, or **"persistent ID"**, how might we find connections from this transient ID to other known user IDs?

Looking at the data model, you can see that **"SessionID"** (a.k.a. transient ID) vertices in our graph are connected to **"ClientIP"** vertices by an outgoing edge. We can traverse across "ClientIP" vertices to get to other linked "SessionID" that might be linked to a known user (a.k.a. persistent ID). Let's do that in the following graph query.

In [None]:
%%gremlin -p outv,inv,outv,inv

g.V('bf4f32b7-41a5-49f9-898d-a494d376364a').
    out('lastSeenAtIP').
    in('lastSeenAtIP').
    out('loggedAs').
    dedup().
    path()

Now we can dive deeper into the subgraph that describe the online interactions for one of the identified known users

In [None]:
%%gremlin -p inv,outv

g.V('rivasanthony').
    both('loggedAs','hasEmail','hasExternalId').
    out().
    simplePath().
    path()

Let's use another example where we have an anonymous user interaction through AnyCompany's Mobile Application. The user's interaction is coming from a device with id **'EB7D73F94443EDC8'**. We want to know more about this user and if it is linked to other users on our platform. This anonymous user is considered a "transient ID" in our graph data model. Assuming this user does not have a link to a known user, or "persistent ID", how might we find connections from this transient ID to other known user IDs?

Looking at the data model, you can see that **"DeviceID"** vertices in our graph are connected to **"SessionID"** (a.k.a. transient ID) vertices by an incomming edge. We can traverse across "SessionID" vertices that might be linked to a known user (a.k.a. persistent ID) and are connected to same "DeviceID" (EB7D73F94443EDC8). Let's do that in the following graph query.

In [None]:
%%gremlin -p inv,outv,inv,inv,inv

g.V('EB7D73F94443EDC8').
    in('usedDevice').
    out('loggedAs').
    order().
        by(id).
    dedup().
    out('hasEmail','hasExternalId').
    out('hasPurchased').
    path()

Let's use another example where we have an user interaction at the register of an AnyCompany's physical store. The user's interaction is registered with the customer external ID **'814-57-5109'** provided by the user at the register. We want to know more about this user and its purchase history, even if the user is linked to other known users on our platform.

Looking at the data model, you can see that **"ExternalID"** vertices in our graph are connected to **"User"** (a.k.a. persistent ID) vertices by an incomming edge. We can traverse across "User" vertices to get other linked purchase identifiers (e.g. Email) that might have been used by same user when purchasing other products. Let's do that in the following graph query.

In [None]:
%%gremlin -p inv,outv,inv

g.V('814-57-5109').
    in('hasExternalId').
    out().
    out('hasPurchased').
    path()

## Adding new data into the Identity Graph

The below steps simulate the process of ingesting updates from source data stores into the graph incrementally. It shows an example of how new data from relational data sources (first-party customer data and transactional databases) as well as first-party behavioral data (e.i. cookies, device IDs and clickstream session) can be added into the identity graph.

This example showcase how new data should be added either by the bash data load process (using an AWS Glue ETL job) or by the online data load (using Amazon Lambda).

### 1. Generate raw data simulating incremental updates from source datasets

Mock data for the below source datasets will be generated

* First-party database (e.g. CRM)
* Transactional database (e.g. purchases)
* Cookie database
* Click Stream database

Resulting CSV files with raw data per each dataset

* first-party: `first_party_data_inc.csv`
* cookie: `cookie_data_inc.csv`
* clickstream: `clickstream_data_inc.csv`
* transactional: `transactional_data_inc.csv`

In [None]:
%%bash

pip install Faker >/dev/null 2>&1
cd /home/ec2-user/SageMaker
python3 utils/create-source-datasets.py --records 100 --uniqueness 50 --incremental 1

### 2. Visualizing raw data from clickstream dataset

In [None]:
%%bash

cd /home/ec2-user/SageMaker
python3 utils/show-graph-entities-sample.py \
    --dataset clickstream \
    --csv-file clickstream_data_inc.csv

### 3. Create property graph vertices and edges

On this step we are simulating the online data load process to deploy through a set of HTTP APIs to add new data (incremental updates) to the identity graph. The code logic to issue the below graph queries is implemented by different Lambda functions while the publishing and access to each API is managed by Amazon API Gateway. See architecture diagram below

<img src="../images/data-ingestion-online-load-02.png" alt="data-ingestion-online-load-02" width="1024"/>

The below graph query add new vertices and edges using in a single conditionally operation. The query uses input data from a single clickstream session (a.k.a. transient ID) to create the following entities used in our data model:

* Vertex: _SessionID_
* Vertex: _DomainName_
* Vertex: _ClientIP_
* Vertex: _DeviceID (in case any)_
* Vertex: _User (in case any)_
* Edge: _lastSeenAtDomain_ (from _SessionID_ to _DomainName_)
* Edge: _lastSeenAtIP_ (from _SessionID_ to _ClientIP)
* Edge: _usedDevice_ (from _SessionID_ to _DeviceID_)
* Edge: _loggedAs_ (from _SessionID_ to _User_)

New vertices and edges are only created if they don't exists previously in the graph

In [None]:
%%gremlin

g.V('202.211.203.86').
    fold().
    coalesce(unfold(), addV('ClientIP').property(id, '202.211.203.86')).
    out('lastSeenAtIP').
    fold().
    coalesce(
        unfold().hasId('b5e3f34b-7f42-4d8b-a14d-a54e209363ad'),
        V('b5e3f34b-7f42-4d8b-a14d-a54e209363ad').
        fold().
        coalesce(
            unfold(),
            addV('SessionID').
                property(id, 'b5e3f34b-7f42-4d8b-a14d-a54e209363ad').
                property('client_platform','web').
                property('canonical_url','http://mitchell.com/').
                property('app_id','travel').
                property('events',17).
                property('start_timestamp','2021-02-12 09:48:36').
                property('start_event','Purchase').
                property('end_timestamp','2021-02-12 10:08:20').
                property('end_event','Purchase').
                property('session_duration_sec',1184)).
            sideEffect(addE('lastSeenAtIP').to(V('202.211.203.86')))).
    V('stewart.com').
    fold().
    coalesce(unfold(), addV('DomainName').property(id, 'stewart.com')).
    out('lastSeenAtDomain').
    fold().
    coalesce(
        unfold().hasId('b5e3f34b-7f42-4d8b-a14d-a54e209363ad'),
        V('b5e3f34b-7f42-4d8b-a14d-a54e209363ad').
            addE('lastSeenAtDomain').to(V('stewart.com'))).
    V('EB7D73F94443EDC8').
    fold().
    coalesce(unfold(), addV('DeviceID').property(id, 'EB7D73F94443EDC8')).
    out('usedDevice').
    fold().
    coalesce(
        unfold().hasId('b5e3f34b-7f42-4d8b-a14d-a54e209363ad'),
        V('b5e3f34b-7f42-4d8b-a14d-a54e209363ad').
            addE('usedDevice').to(V('EB7D73F94443EDC8'))).
    V('rivasanthony').
    fold().
    coalesce(unfold(), addV('User').property(id, 'rivasanthony')).
    out('loggedAs').
    fold().
    coalesce(
        unfold().hasId('b5e3f34b-7f42-4d8b-a14d-a54e209363ad'),
        V('b5e3f34b-7f42-4d8b-a14d-a54e209363ad').
            addE('loggedAs').to(V('rivasanthony'))).
    iterate()

Once the new clickstream session (a.k.a. transient ID) is stored into the graph, lets use the below query to identify possible entities/vertices shared between the new session and existent sessions in the graph. Depending on the results, we could investigate further the resulting sessions in order to resolve identities of possible known users related (a.k.a. persistent IDs)

In [None]:
%%gremlin -p outv,inv,outv,inv

g.V('b5e3f34b-7f42-4d8b-a14d-a54e209363ad').
    out().
    in('lastSeenAtIP','lastSeenAtDomain','usedDevice','loggedAs').
    dedup().
    path()

### 4. Upload source datasets files simulating incremental updates to S3 bucket (bash load)

CSV files with raw data for incremental loads into the graph will be stored into the `datasets/sources/incremental/<DATASET_NAME>/${TIMESTAMP}` S3 prefix. Timestamp is calculated in UTC using the format `YYYYMMDD_HHmmss`.

In [None]:
%%bash

AWS_ACCOUNT_ID="733157031621"
BUCKET_NAME="poc-identity-graph-${AWS_ACCOUNT_ID}"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
PREFIX="datasets/sources/incremental"
cd /home/ec2-user/SageMaker
aws s3 cp first_party_data_inc.csv s3://$BUCKET_NAME/$PREFIX/first_party/${TIMESTAMP}/
aws s3 cp cookie_data_inc.csv s3://$BUCKET_NAME/$PREFIX/cookie/${TIMESTAMP}/
aws s3 cp clickstream_data_inc.csv s3://$BUCKET_NAME/$PREFIX/clickstream/${TIMESTAMP}/
aws s3 cp transactional_data_inc.csv s3://$BUCKET_NAME/$PREFIX/transactional/${TIMESTAMP}/
echo ""
aws s3 ls s3://$BUCKET_NAME/$PREFIX/ --recursive