### forecast


In [27]:
import logging
import boto3
from botocore.exceptions import ClientError


def create_bucket(bucket_name, region=None):
    """Create an S3 bucket in a specified region

    If a region is not specified, the bucket is created in the S3 default
    region (us-east-1).

    :param bucket_name: Bucket to create
    :param region: String region to create bucket in, e.g., 'us-west-2'
    :return: True if bucket created, else False
    """

    # Create bucket
    try:
        if region is None:
            s3_client = boto3.client('s3')
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client = boto3.client('s3', region_name=region)
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location)
    except ClientError as e:
        logging.error(e)
        return False
    return True

In [30]:
create_bucket('mybucket-01')

ERROR:root:An error occurred (IllegalLocationConstraintException) when calling the CreateBucket operation: The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.


False

In [28]:
# Retrieve the list of existing buckets
s3 = boto3.client('s3')
response = s3.list_buckets()

# Output the bucket names
print('Existing buckets:')
for bucket in response['Buckets']:
    print(f'  {bucket["Name"]}')

Existing buckets:
  mybucket-0721


In [None]:
# 模擬數據生成
def generate_time_series_data(start_date, end_date, freq="D"):
    dates = pd.date_range(start_date, end_date, freq=freq)
    data = np.random.randint(1, 100, len(dates))
    item_ids = ["item_1"] * len(dates)  # 使用相同的 item_id
    return pd.DataFrame({"timestamp": dates, "value": data, "item_id": item_ids})


start_date = "2023-01-01"
end_date = "2023-12-31"
time_series_data = generate_time_series_data(start_date, end_date)

# 將數據保存到本地文件
local_file_path = "time_series_data.csv"
time_series_data.to_csv(local_file_path, index=False)

# 上傳數據到 S3
s3_key = "time_series_data.csv"
s3.upload_file(local_file_path, bucket_name, s3_key)

# 初始化 Amazon Forecast 客戶端
forecast = boto3.client("forecast")
forecastquery = boto3.client("forecastquery")

# 建立數據集
dataset_name = "my_dataset"
dataset_group_name = "my_dataset_group"
schema = {
    "Attributes": [
        {"AttributeName": "timestamp", "AttributeType": "timestamp"},
        {"AttributeName": "value", "AttributeType": "integer"},
        {"AttributeName": "item_id", "AttributeType": "string"},
    ]
}

create_dataset_response = forecast.create_dataset(
    DatasetName=dataset_name,
    Domain="CUSTOM",
    DatasetType="TARGET_TIME_SERIES",
    DataFrequency="D",
    Schema=schema,
)
dataset_arn = create_dataset_response["DatasetArn"]

# 建立數據集群組
create_dataset_group_response = forecast.create_dataset_group(
    DatasetGroupName=dataset_group_name, Domain="CUSTOM", DatasetArns=[dataset_arn]
)
dataset_group_arn = create_dataset_group_response["DatasetGroupArn"]

# 將數據導入到數據集中
s3_data_path = f"s3://{bucket_name}/{s3_key}"
role_arn = "arn:aws:iam::your-account-id:role/ForecastRole"  # 使用你的 IAM 角色 ARN

create_dataset_import_job_response = forecast.create_dataset_import_job(
    DatasetImportJobName="my_dataset_import_job",
    DatasetArn=dataset_arn,
    DataSource={"S3Config": {"Path": s3_data_path, "RoleArn": role_arn}},
    TimestampFormat="yyyy-MM-dd",
)
dataset_import_job_arn = create_dataset_import_job_response["DatasetImportJobArn"]

# 等待數據導入完成
while True:
    status = forecast.describe_dataset_import_job(
        DatasetImportJobArn=dataset_import_job_arn
    )["Status"]
    if status in ("ACTIVE", "CREATE_FAILED"):
        break
    print(f"Dataset import job status: {status}")
    time.sleep(30)

if status == "CREATE_FAILED":
    raise Exception("Dataset import job creation failed")

# 建立預測器
predictor_name = "my_predictor"
algorithm_arn = "arn:aws:forecast:::algorithm/Prophet"

create_predictor_response = forecast.create_predictor(
    PredictorName=predictor_name,
    ForecastHorizon=30,
    PerformAutoML=False,
    PerformHPO=False,
    AlgorithmArn=algorithm_arn,
    InputDataConfig={"DatasetGroupArn": dataset_group_arn},
    FeaturizationConfig={
        "ForecastFrequency": "D",
        "Featurizations": [
            {
                "AttributeName": "value",
                "FeaturizationPipeline": [
                    {
                        "FeaturizationMethodName": "filling",
                        "FeaturizationMethodParameters": {
                            "aggregation": "sum",
                            "backfill": "nan",
                            "frontfill": "none",
                        },
                    }
                ],
            }
        ],
    },
)
predictor_arn = create_predictor_response["PredictorArn"]

# 等待預測器訓練完成
while True:
    status = forecast.describe_predictor(PredictorArn=predictor_arn)["Status"]
    if status in ("ACTIVE", "CREATE_FAILED"):
        break
    print(f"Predictor training status: {status}")
    time.sleep(30)

if status == "CREATE_FAILED":
    raise Exception("Predictor creation failed")

# 建立預測
forecast_name = "my_forecast"

create_forecast_response = forecast.create_forecast(
    ForecastName=forecast_name, PredictorArn=predictor_arn
)
forecast_arn = create_forecast_response["ForecastArn"]

# 等待預測生成完成
while True:
    status = forecast.describe_forecast(ForecastArn=forecast_arn)["Status"]
    if status in ("ACTIVE", "CREATE_FAILED"):
        break
    print(f"Forecast status: {status}")
    time.sleep(30)

if status == "CREATE_FAILED":
    raise Exception("Forecast creation failed")

# 查詢預測結果
forecast_response = forecastquery.query_forecast(
    ForecastArn=forecast_arn, Filters={"item_id": "item_1"}
)

print(forecast_response)