In [29]:
import boto3, sagemaker, joblib, os, tarfile, subprocess, time
from datetime import datetime
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression
from sagemaker.model import Model
from sagemaker.async_inference import AsyncInferenceConfig
import json

# SageMaker 세션 및 역할 설정
sagemaker_session = sagemaker.Session()
#model_data = "s3://greenenergy-ai-app-d-an2-s3-gem/sagemaker-models/async-20250729100023/model.tar.gz"

In [30]:
role = sagemaker.get_execution_role()

In [31]:
region = boto3.Session().region_name

In [33]:
account_id = sagemaker_session.account_id()

In [34]:
print("role :" + role)
print("region :" + region)
print("id :" + account_id)

role :arn:aws:iam::154126116352:role/service-role/AmazonSageMakerServiceCatalogProductsUseRole
region :ap-northeast-2
id :154126116352


In [35]:
# 모델 생성
X, y = make_regression(n_samples=100, n_features=2, random_state=42)
model = LinearRegression().fit(X, y)
os.makedirs('model', exist_ok=True)
joblib.dump(model, 'model/model.joblib')
print(f"✅ 모델 생성 완료 - R²: {model.score(X, y):.4f}")

✅ 모델 생성 완료 - R²: 1.0000


In [36]:
# 컨테이너 파일 생성 (Async용)
os.makedirs('container', exist_ok=True)

# predictor.py (Async 처리 최적화)
predictor = '''import joblib, numpy as np, json, os
from flask import Flask, request, jsonify

app = Flask(__name__)
model = joblib.load('/opt/ml/model/model.joblib')

@app.route('/ping')
def ping():
    return jsonify({'status': 'healthy'})

@app.route('/invocations', methods=['POST'])
def predict():
    try:
        # Content-Type 확인
        content_type = request.content_type
        
        if content_type == 'application/json':
            data = request.get_json()
        else:
            # 텍스트 데이터 처리 (CSV 등)
            data_str = request.data.decode('utf-8')
            data = json.loads(data_str)
        
        instances = data.get('instances', data)
        
        # 대용량 데이터 처리를 위한 배치 처리
        if isinstance(instances, list) and len(instances) > 1000:
            print(f"Processing large batch: {len(instances)} instances")
        
        predictions = model.predict(np.array(instances))
        
        return jsonify({
            'predictions': predictions.tolist(),
            'batch_size': len(instances),
            'model_type': 'LinearRegression'
        })
        
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)
'''

# Dockerfile
dockerfile = '''FROM python:3.8-slim
RUN pip install flask scikit-learn joblib numpy
COPY predictor.py /opt/program/predictor.py
WORKDIR /opt/program
EXPOSE 8080
ENTRYPOINT ["python", "predictor.py"]
'''

with open('container/predictor.py', 'w') as f: f.write(predictor)
with open('container/Dockerfile', 'w') as f: f.write(dockerfile)

print("✅ 컨테이너 파일 생성 완료 (Async 최적화)")

✅ 컨테이너 파일 생성 완료 (Async 최적화)


In [37]:
# ECR 설정
ecr = boto3.client('ecr')
repo_name = 'ns-gem/sagemaker'
try:
    ecr.create_repository(repositoryName=repo_name)
    print(f"✅ ECR 리포지토리 생성: {repo_name}")
except ecr.exceptions.RepositoryAlreadyExistsException:
    print(f"ℹ️  ECR 리포지토리 이미 존재: {repo_name}")
except Exception as e:
    print(f"ℹ️  ECR 설정: {str(e)}")

repo_uri = f"{account_id}.dkr.ecr.{region}.amazonaws.com/{repo_name}"
print(f"📦 ECR URI: {repo_uri}")

ℹ️  ECR 리포지토리 이미 존재: ns-gem/sagemaker
📦 ECR URI: 154126116352.dkr.ecr.ap-northeast-2.amazonaws.com/ns-gem/sagemaker


In [38]:
# Docker 빌드 & 푸시
build_script = f'''#!/bin/bash
set -e
echo "🔐 ECR 로그인..."
aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {account_id}.dkr.ecr.{region}.amazonaws.com

echo "🔨 Docker 빌드..."
cd container
docker build --platform linux/amd64 -t ns-gem-async .

echo "🏷️  태그 설정..."
docker tag ns-gem-async:latest {repo_uri}:async

echo "📤 ECR 푸시..."
docker push {repo_uri}:async

echo "✅ 완료!"
'''

with open('build_async.sh', 'w') as f: f.write(build_script)
os.chmod('build_async.sh', 0o755)

print("🔨 Docker 빌드 & 푸시 시작...")
result = subprocess.run(['bash', 'build_async.sh'], capture_output=True, text=True)

if result.returncode == 0:
    print("✅ Docker 빌드 & 푸시 완료")
    # 마지막 몇 줄만 출력
    output_lines = result.stdout.split('\n')
    for line in output_lines[-5:]:
        if line.strip():
            print(f"  {line}")
else:
    print(f"❌ 빌드 실패")
    print(f"STDOUT: {result.stdout}")
    print(f"STDERR: {result.stderr}")

🔨 Docker 빌드 & 푸시 시작...
✅ Docker 빌드 & 푸시 완료
  5f70bf18a086: Pushed
  87c4e28e31d5: Pushed
  async: digest: sha256:086f2981fd0f2f850f1a73fbf448e950299cdfe9f859373b5004044b06d78edc size: 1785
  ✅ 완료!


In [39]:
# 모델 S3 업로드
print("📦 모델 S3 업로드 중...")
bucket_name = 'greenenergy-ai-app-d-an2-s3-gem'
with tarfile.open('model.tar.gz', 'w:gz') as tar:
    tar.add('model/model.joblib', arcname='model.joblib')

# 지정된 S3 버킷 사용
s3 = boto3.client('s3')
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
s3_key = f'sagemaker-models/async-{timestamp}-test/model.tar.gz'

s3.upload_file('model.tar.gz', bucket_name, s3_key)
model_uri = f's3://{bucket_name}/{s3_key}'

print(f"✅ 모델 업로드 완료: {model_uri}")

📦 모델 S3 업로드 중...
✅ 모델 업로드 완료: s3://greenenergy-ai-app-d-an2-s3-gem/sagemaker-models/async-20250903053305-test/model.tar.gz


In [17]:
image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch", 
    region=region,
    version="1.12",
    py_version="py38",
    instance_type="ml.m5.large",
    image_scope="inference" 
)

In [40]:
# 모델 생성
model_data=model_uri
model = Model(
    image_uri=image_uri,
    model_data=model_data,
    role=role,
    sagemaker_session=sagemaker_session
)

In [41]:
# Async Inference 설정
from sagemaker.async_inference import AsyncInferenceConfig

async_config = AsyncInferenceConfig(
    output_path="s3://greenenergy-ai-app-d-an2-s3-gem/async-inference/output/",
    max_concurrent_invocations_per_instance=4,
    failure_path="s3://greenenergy-ai-app-d-an2-s3-gem/async-inference/error/"
)

In [42]:
# Async Endpoint 배포
try:
    predictor = model.deploy(
        initial_instance_count=1,
        instance_type="ml.m5.large",
        async_inference_config=async_config,
        endpoint_name="test-async-endpoint2"  # 선택사항
    )

except Exception as e:
    print(f"❌ 배포 중 오류: {str(e)}")



------!

In [55]:
predictor = Predictor(
    endpoint_name="test-async-endpoint3",
    sagemaker_session=sagemaker_session,
    serializer=sagemaker.serializers.JSONSerializer(),
    deserializer=sagemaker.deserializers.JSONDeserializer()
)

print("✅ Predictor 객체 수동 생성 완료")

✅ Predictor 객체 수동 생성 완료


In [48]:
input_s3_uri = "s3://greenenergy-ai-app-d-an2-s3-gem/async-inference-input/test-20250729100023.json"

In [27]:
### 에러남
response = predictor.predict_async(
    data=input_data,
    input_path=input_data
)

print(f"추론 작업 ID: {response.output_path}")

In [56]:
def quick_test_existing_data():
    endpoint_name = "test-async-endpoint2"
    input_data = "s3://greenenergy-ai-app-d-an2-s3-gem/async-inference-input/test-20250729100023.json"
    
    print("🚀 기존 데이터로 빠른 테스트 시작")
    print(f"입력 데이터: {input_data}")
    
    runtime = boto3.client('sagemaker-runtime')
    s3 = boto3.client('s3')
    
    try:
        # Async 추론 요청
        response = runtime.invoke_endpoint_async(
            EndpointName=endpoint_name,
            InputLocation=input_data,
            ContentType='application/json'
        )
        
        output_location = response['OutputLocation']
        print(response)
        print(f"출력 위치: {output_location}")
        
        # 결과 대기
        import time
        start_time = time.time()
        
        for i in range(300):  # 5분 대기
            try:
                bucket = output_location.split('/')[2]
                print(bucket)
                key = '/'.join(output_location.split('/')[3:])
                print(key)
                
                result_obj = s3.get_object(Bucket=bucket, Key=key)
                print(result_obj)
                result_data = json.loads(result_obj['Body'].read())
                print(result_data)
                
                elapsed_time = time.time() - start_time
                print(f"✅ 테스트 완료 ({elapsed_time:.2f}초)")
                print(f"예측 결과 수: {len(result_data.get('predictions', []))}")
                print(f"배치 크기: {result_data.get('batch_size', 'N/A')}")
                print(f"첫 번째 예측값: {result_data.get('predictions', [])[:3]}")
                
                return result_data
                
            except Exception as e:
                if i == 299:
                    print(f"❌ 시간 초과: {e}")
                    return None
                time.sleep(1)
                
    except Exception as e:
        print(f"❌ 테스트 실패: {e}")
        return None

# 기존 데이터로 빠른 테스트 실행
result = quick_test_existing_data()

🚀 기존 데이터로 빠른 테스트 시작
입력 데이터: s3://greenenergy-ai-app-d-an2-s3-gem/async-inference-input/test-20250729100023.json
출력 위치: s3://greenenergy-ai-app-d-an2-s3-gem/async-inference/output/88c619d4-3f96-4aa8-883c-fe79e8e9ce42.out
greenenergy-ai-app-d-an2-s3-gem
async-inference/output/88c619d4-3f96-4aa8-883c-fe79e8e9ce42.out
greenenergy-ai-app-d-an2-s3-gem
async-inference/output/88c619d4-3f96-4aa8-883c-fe79e8e9ce42.out
greenenergy-ai-app-d-an2-s3-gem
async-inference/output/88c619d4-3f96-4aa8-883c-fe79e8e9ce42.out
greenenergy-ai-app-d-an2-s3-gem
async-inference/output/88c619d4-3f96-4aa8-883c-fe79e8e9ce42.out
greenenergy-ai-app-d-an2-s3-gem
async-inference/output/88c619d4-3f96-4aa8-883c-fe79e8e9ce42.out
greenenergy-ai-app-d-an2-s3-gem
async-inference/output/88c619d4-3f96-4aa8-883c-fe79e8e9ce42.out
greenenergy-ai-app-d-an2-s3-gem
async-inference/output/88c619d4-3f96-4aa8-883c-fe79e8e9ce42.out
greenenergy-ai-app-d-an2-s3-gem
async-inference/output/88c619d4-3f96-4aa8-883c-fe79e8e9ce42.out
greenenergy-

In [None]:
print(result)

In [49]:
# Async Inference 실행
print("🧪 Async Inference 테스트...")
print("=" * 50)

try:
    # 비동기 추론 시작
    response = predictor.predict_async(
        input_path=input_s3_uri,
        initial_args={'ContentType': 'application/json'}
    )
    
    output_location = response.output_path
    print(f"🚀 비동기 추론 시작됨")
    print(f"📥 입력: {input_s3_uri}")
    print(f"📤 출력 예상 위치: {output_location}")
    
    # 결과 대기 (폴링)
    print("⏳ 결과 대기 중...")
    
    max_wait_time = 300  # 5분
    wait_time = 0
    
    while wait_time < max_wait_time:
        try:
            # 출력 파일 확인
            result = response.get_result()
            
            if result:
                print(f"✅ 추론 완료! (대기시간: {wait_time}초)")
                print(f"📋 결과: {result}")
                
                # 로컬 모델과 비교
                predictions = result['predictions']
                print("\n🔍 결과 검증:")
                for i, (input_data, prediction) in enumerate(zip(test_data['instances'], predictions)):
                    local_pred = model.predict([input_data])[0]
                    diff = abs(local_pred - prediction)
                    print(f"  {i+1}. {input_data} → {prediction:.4f} (로컬: {local_pred:.4f}, 차이: {diff:.6f}) {'✅' if diff < 1e-6 else '⚠️'}")
                
                break
                
        except Exception as e:
            if "does not exist" in str(e) or "NoSuchKey" in str(e):
                # 아직 결과가 준비되지 않음
                time.sleep(10)
                wait_time += 10
                print(f"  대기 중... ({wait_time}초)")
            else:
                print(f"❌ 오류 발생: {str(e)}")
                break
    
    if wait_time >= max_wait_time:
        print(f"⏰ 타임아웃: {max_wait_time}초 대기 후 결과 없음")
        print(f"💡 수동으로 확인: {output_location}")
        
except Exception as e:
    print(f"❌ Async 추론 실패: {str(e)}")

🧪 Async Inference 테스트...
❌ Async 추론 실패: 'NoneType' object has no attribute 'predict_async'


In [50]:
# 활성 endpoint 목록 확인
import boto3
sagemaker_client = boto3.client('sagemaker')
endpoints = sagemaker_client.list_endpoints()['Endpoints']
for ep in endpoints:
    print(f"{ep['EndpointName']}: {ep['EndpointStatus']}")


test-async-endpoint2: InService
gem-serverless-20250807163808: InService
gem-serverless-20250807162410: InService
gem-serverless-20250807162107: InService
gem-serverless-20250807161125: InService
gem-serverless-20250807160116: InService
gem-serverless-20250807154129: InService
gem-serverless-20250807151850: InService
iris-endpoint-compatible-2025-07-21-09-19-24: InService


In [51]:
# endpoint 이름을 알고 있다면
result = test_with_boto3("test-async-endpoint2")
