# Test processor

In [16]:
from sagemaker.session import Session, get_execution_role
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput
from sagemaker.network import NetworkConfig
import os

In [17]:
def get_execution_role_in_local(sagemaker_session):
    role = sagemaker_session.boto_session.client("iam").get_role(
        RoleName="AmazonSageMaker-ExecutionRole-20230105T181131"
    )["Role"]["Arn"]
    return role

In [18]:
def load_env_variables(*env_files):
    env_vars = dict()
    get_values = lambda x: [(x.split("=")[0], x.split("=")[1])]
    for env_file in env_files:
        with open(env_file, "r") as file:
            env_vars.update(
                dict(
                    [
                        (key.strip(), value.strip()) 
                        for line in file.readlines()
                        for key, value in get_values(line)
                    ]
                )
            )
    return env_vars

In [19]:
def create_container_registry(ecr_client, repository_name, account_id):
    try:
        repository_info = ecr_client.create_repository(
            repositoryName=repository_name,
            tags=[
                {
                    "Key": "Test",
                    "Value": "True"
                }
            ],
            encryptionConfiguration={
                'encryptionType': 'AES256'
            }
        )["repository"]
    except ecr_client.exceptions.RepositoryAlreadyExistsException:
        print("repository already exists!")
        repository_info = ecr_client.describe_repositories(
            registryId=account_id,
            repositoryNames=[repository_name]
        )["repositories"][0]
    return repository_info

In [20]:
sagemaker_session = Session()
try:
    sagemaker_role = get_execution_role()
except:
    sagemaker_role = get_execution_role_in_local(sagemaker_session)

INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole
INFO:botocore.credentials:Found credentials from IAM Role: BaseNotebookInstanceEc2InstanceRole


## Configurations

In [21]:
def get_configurations(stage = "staging"):
    environment = load_env_variables("../vars.env", f"../vars.{stage}.env")
    return environment

In [22]:
def set_environ_temporal_variables(**variables):
    for name, value in variables.items():
        os.environ[name] = value

In [23]:
# I've already create a vpc configuration that is able to connect to koombea db
def get_koombea_db_vpc_conf(ec2_client, account_id, security_group_name):
    # Get subnets
    subnets = ec2_client.describe_subnets(
        Filters=[
            {
                'Name':'owner-id',
                'Values':[account_id]
            }
        ]
    )
    # choose just the private subnets routing to the NateGateway
    subnets_ids = [subnets_["SubnetId"]
                   for subnets_ in subnets["Subnets"]
                   if "Tags" in subnets_.keys() and 'sm' == subnets_["Tags"][0]["Value"].split("-")[0] and "p" in subnets_["Tags"][0]["Value"]]
    # get security groups
    security_groups = ec2_client.describe_security_groups(
        Filters=[
            {
                "Name":"owner-id",
                "Values":[account_id]
            },
            {
                "Name":"group-name",
                "Values":[security_group_name]
            }
        ]
    )
    sec_groups_ids = [sec_groups_["GroupId"] for sec_groups_ in security_groups["SecurityGroups"]]
    return {"Subnets":subnets_ids,
            "SecurityGroupIds":sec_groups_ids}

In [24]:
ec2_client = sagemaker_session.boto_session.client("ec2")
security_group_name = "launch-wizard-1"
ecr_client = sagemaker_session.boto_session.client("ecr")
account_id = sagemaker_session.account_id()
aws_region = sagemaker_session.boto_region_name
repository_name = "koombea-blogs-extraction-component"
docker_compose_service_name = "koombea_blogs_extraction_component"
docker_image_name = "koombea_blogs_extraction_{}".format(docker_compose_service_name)
stage = "staging"
environment = get_configurations(stage)
# create or get repository info
repository_info = create_container_registry(ecr_client, repository_name, account_id)
repository_uri = repository_info["repositoryUri"]
# get vpc configuration
vpc_config = get_koombea_db_vpc_conf(ec2_client, account_id, security_group_name)

repository already exists!


In [25]:
set_environ_temporal_variables(
    account_id=account_id,
    aws_region=aws_region,
    docker_compose_service_name=docker_compose_service_name,
    docker_image_name=docker_image_name,
    repository_uri=repository_uri
)

# Push container to ecr

In [26]:
%%writefile ../scripts/build_and_push_ecr.sh
echo "loging to aws ecr"
aws ecr get-login-password --region ${aws_region} | docker login --username AWS --password-stdin ${account_id}.dkr.ecr.${aws_region}.amazonaws.com

echo "building and tagging docker container"
cd ..
docker-compose build ${docker_compose_service_name}
docker tag ${docker_image_name}:latest \
    ${repository_uri}:latest

echo "pushing container"
docker push ${repository_uri}:latest
    
echo "cleaning dockers cache"
echo y | docker system prune

Overwriting ../scripts/build_and_push_ecr.sh


In [27]:
!bash ../scripts/build_and_push_ecr.sh

loging to aws ecr
https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded
building and tagging docker container
Building koombea_blogs_extraction_component
Sending build context to Docker daemon  91.14kB
Step 1/13 : FROM python:3.8.0-slim-buster
 ---> 577b86e4ee11
Step 2/13 : RUN apt-get update     && apt-get -y install netcat gcc     && apt-get clean
 ---> Using cache
 ---> 1fe3b1d4f2e3
Step 3/13 : WORKDIR /opt/ml
 ---> Using cache
 ---> 441f25f9c36c
Step 4/13 : RUN pip install -U pip
 ---> Using cache
 ---> 4630ed6c9e52
Step 5/13 : COPY requirements.txt .
 ---> Using cache
 ---> 59b3b8320226
Step 6/13 : RUN pip install -r requirements.txt
 ---> Using cache
 ---> 2e46986004ad
Step 7/13 : RUN python -m spacy download en_core_web_sm
 ---> Using cache
 ---> 3a4f088e1a31
Step 8/13 : RUN python -m spacy download es_core_news_sm
 ---> Using cache
 ---> c6271cfbf9b7
Step 9/13 : COPY run.py .
 ---> Using cache
 ---> 5722c807436d
Step 10/13 : COPY dataBas

## Test processor

In [13]:
entry_point = ["python", "run.py"]
base_job_name = "etl-koombea-blogs-job"
source_output = "/opt/ml/processing/processed_data"
instance_type = "ml.t3.large"
bucket_name = sagemaker_session.default_bucket()

folder_project_name = "koombea_website_ml"
folder_data_name = "koombea_blogs_information"
key_prefix = "{}/{}/".format(folder_project_name, folder_data_name)
s3_bucket_name = "s3://{}/{}".format(bucket_name, key_prefix)

network_config = NetworkConfig(
    security_group_ids=vpc_config["SecurityGroupIds"],
    subnets=vpc_config["Subnets"]
)

In [14]:
process_job = Processor(
    role=sagemaker_role,
    image_uri=repository_uri,
    instance_count=1,
    instance_type=instance_type,
    entrypoint=entry_point,
    base_job_name=base_job_name,
    sagemaker_session=sagemaker_session,
    env=environment,
    network_config=network_config
)

In [15]:
process_job.run(
    outputs=[
        ProcessingOutput(
            output_name="train",
            source=source_output,
            destination=s3_bucket_name
        )
    ],
    arguments=["--output-path", source_output]
)

INFO:sagemaker:Creating processing-job with name etl-koombea-blogs-job-2023-02-08-16-10-45-655



Job Name:  etl-koombea-blogs-job-2023-02-08-16-10-45-655
Inputs:  []
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-west-2-256305374409/koombea_website_ml/koombea_blogs_information/', 'LocalPath': '/opt/ml/processing/processed_data', 'S3UploadMode': 'EndOfJob'}}]
................................[34m2023-02-08 16:15:57,745 - INFO - generated new fontManager[0m
[34m2023-02-08 16:15:58,372 - INFO - /opt/ml/koombea_blogs/connection/dataBaseKey.pem[0m
[34m2023-02-08 16:15:58,536 - INFO - Connected (version 2.0, client Go)[0m
[34m2023-02-08 16:15:58,983 - INFO - Authentication (publickey) successful![0m
[34m2023-02-08 16:15:58,984 - INFO - Connect to the following sqlalchemy url: mysql+pymysql://koombea20stg:opypHiPy2GiuCyApXQpZ@127.0.0.1:36407/wp_koombea20stg[0m
[34m2023-02-08 16:15:58,984 - INFO - Initializing connection with db[0m
[34m2023-02-08 16:16:09,640 - INFO - Preprocessing job begins[0m
[34m2023-02-08 16:16:16,998 -

## show data files in s3

We need to see three files, blogs_df, en_data, and es_data

In [17]:
contents = sagemaker_session.boto_session.client("s3").list_objects_v2(
    Bucket=bucket_name,
    Prefix=key_prefix
)["Contents"]
for content in contents[1:]:
    print("s3://" + bucket_name + "/" + content["Key"])

s3://sagemaker-us-west-2-256305374409/koombea_website_ml/koombea_blogs_information/blogs_df_wp_koombea20.csv
s3://sagemaker-us-west-2-256305374409/koombea_website_ml/koombea_blogs_information/blogs_df_wp_koombea20stg.csv
s3://sagemaker-us-west-2-256305374409/koombea_website_ml/koombea_blogs_information/en_data_wp_koombea20.json
s3://sagemaker-us-west-2-256305374409/koombea_website_ml/koombea_blogs_information/en_data_wp_koombea20stg.json
s3://sagemaker-us-west-2-256305374409/koombea_website_ml/koombea_blogs_information/es_data_wp_koombea20.json
s3://sagemaker-us-west-2-256305374409/koombea_website_ml/koombea_blogs_information/es_data_wp_koombea20stg.json
