# Submit Spark App

PythonコードをIBM Analytics Engine SparkにSubmitするコードです

ibm_cloud_sdk_coreが導入されてない場合は以下を実行して導入してください

In [None]:
!pip install --upgrade ibm-cloud-sdk-core

## 1. 以下必要な情報をセットします

- API_KEY: IBM Analytics Engine にアクセス可能なAPIKEY
- APP_BUCKET: Spark実行ソースを置いたオブジェクトストレージのバケット名
- APP_BUCKET_ENDPOINT: Spark実行ソースを置いたオブジェクトストレージのENDPOINT
- APP_BUCKET_ACCESS_KEY: Spark実行ソースを置いたオブジェクトストレージのACCESS KEY
- APP_BUCKET_SECRET_KEY: Spark実行ソースを置いたオブジェクトストレージのSECRET KEY
- IAE_REGION: 使用するIBM Analytics Engine のリージョン名
- IAE_INCETANCE_GUID=: 使用するIBM Analytics Engine のInstance ID
- PY_FILE: Spark実行ソースのファイル名(ルート以外においた場合はパスもつける)

In [None]:
# 自分の環境に合わせて以下を変更する
API_KEY="xxxxxxxxxxxxx"
APP_BUCKET="spark-source"
APP_BUCKET_ENDPOINT="s3.direct.us-south.cloud-object-storage.appdomain.cloud"
APP_BUCKET_ACCESS_KEY = "xxxxxxxxxx"
APP_BUCKET_SECRET_KEY = "xxxxxxxxxxxxxxxxx"
IAE_REGION="us-south"
IAE_INCETANCE_GUID="xxxxxxxxxxxxxxxxxx"
PY_FILE="lakehouse-spark-sample.py"

## 2. IAM トークンを生成

In [None]:
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator

authenticator = IAMAuthenticator(API_KEY)
access_token=authenticator.token_manager.get_token()
print(access_token)

## 3. PythonコードをIBM Analytics Engine SparkにSubmit

In [None]:
# Submit the Spark application:

import requests
import json
import datetime

# REST API のURL 作成
url=f"https://api.{IAE_REGION}.ae.cloud.ibm.com/v3/analytics_engines/{IAE_INCETANCE_GUID}/spark_applications"

# request header作成
headers = {}
headers ['Authorization'] =  'Bearer ' + access_token #アクセストークンをHeaderにセット

conf={}
conf[f'spark.hadoop.fs.s3a.bucket.{APP_BUCKET}.endpoint']=APP_BUCKET_ENDPOINT
conf[f'spark.hadoop.fs.s3a.bucket.{APP_BUCKET}.access.key']=APP_BUCKET_ACCESS_KEY
conf[f'spark.hadoop.fs.s3a.bucket.{APP_BUCKET}.secret.key']=APP_BUCKET_SECRET_KEY 

application_details={}
application_details['application']=f"s3a://{APP_BUCKET}/{PY_FILE}"
application_details['conf']=conf

params = {}
params['application_details'] = application_details

try:
    r = requests.post(url, headers=headers, json=params)
    print( r.status_code)
except Exception as err:
    print("RESTful call failed. Detailed information follows.")
    print(err)


# Check for Invalid credentials
if (r.status_code == 401): # There was an error with the authentication
    print("RESTful called failed.")
    message = r.json()['errors']
    print(message)

print(datetime.datetime.now())    
print(json.dumps(r.json(), indent=4))