### 事前定義
実行する環境やデータの配置場所に応じた設定を行う

In [None]:
# コマンド発行先のNeptuneインスタンスを定義する
neptune_ep = 'neptunecluster-xxxxxxxx.neptune.amazonaws.com'

# Neptuneにロードするデータの格納場所（S3）
s3_data_prefix = 's3://xxxxxxxxx/data/graph_test/'

# Neptuneに付与したLoad用のS3アクセスRole
iamRoleArn = 'arn:aws:iam::xxxxxxxxxx'

# Endpoint URLを生成
neptune_port = '8182'
neptune_ep_url = 'https://{}:{}'.format(neptune_ep, neptune_port)

region = neptune_ep.split('.')[-4]

### curlコマンドの実行用関数

In [None]:
import ast

def parse_curl_output(output):
    response_dict = ast.literal_eval(output[-1])
    print('Request ID:', response_dict['requestId'])
    print('http response code:', response_dict['status']['code'])
    print('Result:', response_dict['result']['data']['@value'])
    
def exec_gremlin_query(query, ep=neptune_ep_url):
    curl_cmd = 'curl -X POST -d \'{}\' {}/gremlin'.format(query, ep)
    curl_output = !$curl_cmd
    print(curl_cmd, '\n')
    parse_curl_output(curl_output)

### 頂点の件数カウント

In [None]:
# gremlinによる照会を定義する
query = '{"gremlin":"g.V().count()"}'

# 照会を実行
exec_gremlin_query(query)

### データが入っていたらいったんクリアする

In [None]:
# gremlinによる照会を定義する
query = '{"gremlin":"g.V().drop()"}'

# 照会を実行
exec_gremlin_query(query)

### テストデータのロード処理を発行
ここでは処理の指示を発行するだけで、実際の完了を待たずに制御が戻る

In [None]:
import json

def exec_load(load_params, ep=neptune_ep_url):
    
    # loadコマンドの文字列を組み立てて実行
    params_json = json.dumps(load_params)
    curl_cmd = "curl -X POST -H 'Content-Type: application/json' {}/loader -d '{}'".format(ep, params_json)
    print(curl_cmd, '\n')
    response=!$curl_cmd
    
    # 返却された文字列からloadIdを取り出してreturn。エラーが発生していてloadIdがなければresponse全体を出力
    tmp = [x for x in response if 'loadId' in x]
    if len(tmp) == 0:
        print(response)
        return None
    else:
        loadid = tmp[0].split()[-1].replace("\"", "")
        print('Load issued')
        print('Load ID :{}'.format(loadid))
        return loadid

In [None]:
# Loadを実行する
load_params = {
      "source" : s3_data_prefix,
      "format" : "csv",
      "iamRoleArn" : iamRoleArn,
      "region" : region,
      "failOnError" : "FALSE",
      "parallelism" : "MEDIUM",
      "updateSingleCardinalityProperties" : "FALSE"
}

loadid = exec_load(load_params)

### ロード処理の状況を確認

In [None]:
curl_cmd = "curl -X GET 'https://{}:8182/loader/{}?details=true&errors=true'".format(neptune_ep, loadid)
print(curl_cmd, '\n')
!$curl_cmd