In [None]:
import gzip
import io
import pickle

import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator
from airflow.providers.amazon.aws.operators.sagemaker_endpoint import (
    SageMakerEndpointOperator,
)
from airflow.providers.amazon.aws.operators.sagemaker_training import (
    SageMakerTrainingOperator,
)
from sagemaker.amazon.common import write_numpy_to_dense_tensor

dag = DAG(
    dag_id="chapter9_aws_handwritten_digit_classifier",
    schedule_interval=None,
    start_date=airflow.utils.dates.days_ago(3),
)

download_mnist_data = S3CopyObjectOperator(
    task_id="download_mnist_data",
    source_bucket_name="sagemaker-sample-data-eu-west-1",
    source_bucket_key="algorithms/kmeans/mnist/mnist.pkl.gz",
    dest_bucket_name="your-bucket",
    dest_bucket_key="mnist.pkl.gz",
    dag=dag,
)


def _extract_mnist_data():
    s3hook = S3Hook()

    # Download S3 dataset into memory
    mnist_buffer = io.BytesIO()
    mnist_obj = s3hook.get_key(bucket_name="your-bucket", key="mnist.pkl.gz")
    mnist_obj.download_fileobj(mnist_buffer)

    # Unpack gzip file, extract dataset, convert to dense tensor, upload back to S3
    mnist_buffer.seek(0)
    with gzip.GzipFile(fileobj=mnist_buffer, mode="rb") as f:
        train_set, _, _ = pickle.loads(f.read(), encoding="latin1")
        output_buffer = io.BytesIO()
        write_numpy_to_dense_tensor(
            file=output_buffer, array=train_set[0], labels=train_set[1]
        )
        output_buffer.seek(0)
        s3hook.load_file_obj(
            output_buffer, key="mnist_data", bucket_name="your-bucket", replace=True
        )


extract_mnist_data = PythonOperator(
    task_id="extract_mnist_data", python_callable=_extract_mnist_data, dag=dag
)

sagemaker_train_model = SageMakerTrainingOperator(
    task_id="sagemaker_train_model",
    config={
        "TrainingJobName": "mnistclassifier-{{ execution_date.strftime('%Y-%m-%d-%H-%M-%S') }}",
        "AlgorithmSpecification": {
            "TrainingImage": "438346466558.dkr.ecr.eu-west-1.amazonaws.com/kmeans:1",
            "TrainingInputMode": "File",
        },
        "HyperParameters": {"k": "10", "feature_dim": "784"},
        "InputDataConfig": [
            {
                "ChannelName": "train",
                "DataSource": {
                    "S3DataSource": {
                        "S3DataType": "S3Prefix",
                        "S3Uri": "s3://your-bucket/mnist_data",
                        "S3DataDistributionType": "FullyReplicated",
                    }
                },
            }
        ],
        "OutputDataConfig": {"S3OutputPath": "s3://your-bucket/mnistclassifier-output"},
        "ResourceConfig": {
            "InstanceType": "ml.c4.xlarge",
            "InstanceCount": 1,
            "VolumeSizeInGB": 10,
        },
        "RoleArn": (
            "arn:aws:iam::297623009465:role/service-role/"
            "AmazonSageMaker-ExecutionRole-20180905T153196"
        ),
        "StoppingCondition": {"MaxRuntimeInSeconds": 24 * 60 * 60},
    },
    wait_for_completion=True,
    print_log=True,
    check_interval=10,
    dag=dag,
)

sagemaker_deploy_model = SageMakerEndpointOperator(
    task_id="sagemaker_deploy_model",
    operation="update",
    wait_for_completion=True,
    config={
        "Model": {
            "ModelName": "mnistclassifier-{{ execution_date.strftime('%Y-%m-%d-%H-%M-%S') }}",
            "PrimaryContainer": {
                "Image": "438346466558.dkr.ecr.eu-west-1.amazonaws.com/kmeans:1",
                "ModelDataUrl": (
                    "s3://your-bucket/mnistclassifier-output/mnistclassifier"
                    "-{{ execution_date.strftime('%Y-%m-%d-%H-%M-%S') }}/"
                    "output/model.tar.gz"
                ),  # this will link the model and the training job
            },
            "ExecutionRoleArn": (
                "arn:aws:iam::297623009465:role/service-role/"
                "AmazonSageMaker-ExecutionRole-20180905T153196"
            ),
        },
        "EndpointConfig": {
            "EndpointConfigName": "mnistclassifier-{{ execution_date.strftime('%Y-%m-%d-%H-%M-%S') }}",
            "ProductionVariants": [
                {
                    "InitialInstanceCount": 1,
                    "InstanceType": "ml.t2.medium",
                    "ModelName": "mnistclassifier",
                    "VariantName": "AllTraffic",
                }
            ],
        },
        "Endpoint": {
            "EndpointConfigName": "mnistclassifier-{{ execution_date.strftime('%Y-%m-%d-%H-%M-%S') }}",
            "EndpointName": "mnistclassifier",
        },
    },
    dag=dag,
)

download_mnist_data >> extract_mnist_data >> sagemaker_train_model >> sagemaker_deploy_model

In [None]:
import csv
import io
import os

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.decorators import apply_defaults

#write custom operator
class PostgresToS3Operator(BaseOperator):
    template_fields = ("_query", "_s3_key")

    @apply_defaults
    def __init__(
        self, postgres_conn_id, query, s3_conn_id, s3_bucket, s3_key, **kwargs
    ):
        super().__init__(**kwargs)
        self._postgres_conn_id = postgres_conn_id
        self._query = query
        self._s3_conn_id = s3_conn_id
        self._s3_bucket = s3_bucket
        self._s3_key = s3_key

    def execute(self, context):
        postgres_hook = PostgresHook(postgres_conn_id=self._postgres_conn_id)
        s3_hook = S3Hook(aws_conn_id=self._s3_conn_id)

        with postgres_hook.get_cursor() as cursor:
            cursor.execute(self._query)
            results = cursor.fetchall()
            headers = [_[0] for _ in cursor.description]

        data_buffer = io.StringIO()
        csv_writer = csv.writer(
            data_buffer, quoting=csv.QUOTE_ALL, lineterminator=os.linesep
        )
        csv_writer.writerow(headers)
        csv_writer.writerows(results)
        data_buffer_binary = io.BytesIO(data_buffer.getvalue().encode())

        s3_hook.load_file_obj(
            file_obj=data_buffer_binary,
            bucket_name=self._s3_bucket,
            key=self._s3_key,
            replace=True,
        )

In [None]:
import io
from datetime import datetime

import pandas as pd
from airflow import DAG
from airflow.hooks.base import BaseHook
from airflow.operators.python import PythonOperator
from custom.postgres_to_s3_operator import PostgresToS3Operator
from minio import Minio

dag = DAG(
    dag_id="chapter7_insideairbnb",
    start_date=datetime(2015, 4, 5),
    end_date=datetime(2019, 12, 7),
    schedule_interval="@monthly",
)

download_from_postgres = PostgresToS3Operator(
    task_id="download_from_postgres",
    postgres_conn_id="inside_airbnb",
    query="SELECT * FROM listings WHERE download_date BETWEEN '{{ prev_ds }}' AND '{{ ds }}'",
    s3_conn_id="locals3",
    s3_bucket="inside-airbnb",
    s3_key="listing-{{ ds }}.csv",
    dag=dag,
)


def _crunch_numbers():
    s3_conn = BaseHook.get_connection("locals3")
    client = Minio(
        s3_conn.extra_dejson["host"].replace("http://", ""),
        access_key=s3_conn.login,
        secret_key=s3_conn.password,
        secure=False,
    )

    # Get list of all objects
    objects = [
        obj.object_name
        for obj in client.list_objects(bucket_name="inside-airbnb", prefix="listing")
    ]
    df = pd.DataFrame()
    for obj in objects:
        response = client.get_object(bucket_name="inside-airbnb", object_name=obj)
        temp_df = pd.read_csv(
            io.BytesIO(response.read()),
            usecols=["id", "price", "download_date"],
            parse_dates=["download_date"],
        )
        df = df.append(temp_df)

    # Per id, get the price increase/decrease
    # There's probably a nicer way to do this
    min_max_per_id = (
        df.groupby(["id"])
        .agg(
            download_date_min=("download_date", "min"),
            download_date_max=("download_date", "max"),
        )
        .reset_index()
    )
    df_with_min = (
        pd.merge(
            min_max_per_id,
            df,
            how="left",
            left_on=["id", "download_date_min"],
            right_on=["id", "download_date"],
        )
        .rename(columns={"price": "oldest_price"})
        .drop("download_date", axis=1)
    )
    df_with_max = (
        pd.merge(
            df_with_min,
            df,
            how="left",
            left_on=["id", "download_date_max"],
            right_on=["id", "download_date"],
        )
        .rename(columns={"price": "latest_price"})
        .drop("download_date", axis=1)
    )

    df_with_max = df_with_max[
        df_with_max["download_date_max"] != df_with_max["download_date_min"]
    ]
    df_with_max["price_diff_per_day"] = (
        df_with_max["latest_price"] - df_with_max["oldest_price"]
    ) / ((df_with_max["download_date_max"] - df_with_max["download_date_min"]).dt.days)
    df_with_max[["price_diff_per_day"]] = df_with_max[["price_diff_per_day"]].apply(
        pd.to_numeric
    )
    biggest_increase = df_with_max.nlargest(5, "price_diff_per_day")
    biggest_decrease = df_with_max.nsmallest(5, "price_diff_per_day")

    # We found the top 5, write back the results.
    biggest_increase_json = biggest_increase.to_json(orient="records")
    print(f"Biggest increases: {biggest_increase_json}")
    biggest_increase_bytes = biggest_increase_json.encode("utf-8")
    client.put_object(
        bucket_name="inside-airbnb",
        object_name="results/biggest_increase.json",
        data=io.BytesIO(biggest_increase_bytes),
        length=len(biggest_increase_bytes),
    )

    biggest_decrease_json = biggest_decrease.to_json(orient="records")
    print(f"Biggest decreases: {biggest_decrease_json}")
    biggest_decrease_bytes = biggest_decrease_json.encode("utf-8")
    client.put_object(
        bucket_name="inside-airbnb",
        object_name="results/biggest_decrease.json",
        data=io.BytesIO(biggest_decrease_bytes),
        length=len(biggest_decrease_bytes),
    )


crunch_numbers = PythonOperator(
    task_id="crunch_numbers", python_callable=_crunch_numbers, dag=dag
)


download_from_postgres >> crunch_numbers