<a href="https://colab.research.google.com/github/pathwaycom/pathway-examples/blob/main/documentation/from_jupyter_to_deploy/part3_kafka_and_alerts.ipynb" target="_parent"><img src="https://pathway.com/assets/colab-badge.svg" alt="Run In Colab" class="inline"/></a>

# Installing Pathway with Python 3.10+

In the cell below, we install Pathway into a Python 3.10+ Linux runtime.

> **If you are running in Google Colab, please run the colab notebook (Ctrl+F9)**, disregarding the 'not authored by Google' warning.
> 
> **The installation and loading time is less than 1 minute**.


In [None]:
%%capture --no-display
!pip install pathway

# Part 3: Kafka integration and alerts forwarding (Consumer)
This notebook is part of the third part of the tutorial [From interactive data exploration to deployment](https://pathway.com/developers/user-guide/exploring-pathway/from-jupyter-to-deploy/#part-3-kafka-integration-and-alerts-forwarding).

## Reading messages from Kafka

In [1]:
import datetime

import pathway as pw

# TODO Please set appropriate values for KAFKA_ENDPOINT, KAFKA_USERNAME, and KAFKA_PASSWORD
rdkafka_consumer_settings = {
    "bootstrap.servers": "KAFKA_ENDPOINT:9092",
    "security.protocol": "sasl_ssl",
    "sasl.mechanism": "SCRAM-SHA-256",
    "sasl.username": "KAFKA_USERNAME",
    "sasl.password": "KAFKA_PASSWORD",
    "group.id": "kafka-group-0",
    "auto.offset.reset": "earliest",
}


# The schema definition is autogenerated
class DataSchema(pw.Schema):
    ticker: str
    open: float
    high: float
    low: float
    close: float
    volume: float
    vwap: float
    t: int
    transactions: int
    otc: str


data = pw.io.kafka.read(
    rdkafka_consumer_settings, topic="ticker", format="json", schema=DataSchema
)

In [2]:
data = data.with_columns(
    t=pw.apply_with_type(
        datetime.datetime.fromtimestamp, pw.DateTimeNaive, data.t / 1000.0
    )
)

In [3]:
minute_20_stats = (
    data.windowby(
        pw.this.t,
        window=pw.temporal.sliding(
            hop=datetime.timedelta(minutes=1), duration=datetime.timedelta(minutes=20)
        ),
        behavior=pw.temporal.common_behavior(delay=datetime.timedelta(minutes=20)),
        instance=pw.this.ticker,
    )
    .reduce(
        ticker=pw.this._pw_instance,
        t=pw.this._pw_window_end,
        volume=pw.reducers.sum(pw.this.volume),
        transact_total=pw.reducers.sum(pw.this.volume * pw.this.vwap),
        transact_total2=pw.reducers.sum(pw.this.volume * pw.this.vwap**2),
    )
    .with_columns(vwap=pw.this.transact_total / pw.this.volume)
    .with_columns(
        vwstd=(pw.this.transact_total2 / pw.this.volume - pw.this.vwap**2) ** 0.5
    )
    .with_columns(
        bollinger_upper=pw.this.vwap + 2 * pw.this.vwstd,
        bollinger_lower=pw.this.vwap - 2 * pw.this.vwstd,
    )
)

In [4]:
minute_1_stats = (
    data.windowby(
        pw.this.t,
        window=pw.temporal.tumbling(datetime.timedelta(minutes=1)),
        instance=pw.this.ticker,
    )
    .reduce(
        ticker=pw.this._pw_instance,
        t=pw.this._pw_window_end,
        volume=pw.reducers.sum(pw.this.volume),
        transact_total=pw.reducers.sum(pw.this.volume * pw.this.vwap),
    )
    .with_columns(vwap=pw.this.transact_total / pw.this.volume)
)

In [5]:
joint_stats = (
    minute_1_stats.join(
        minute_20_stats, pw.left.t == pw.right.t, pw.left.ticker == pw.right.ticker
    )
    .select(
        *pw.left,
        bollinger_lower=pw.right.bollinger_lower,
        bollinger_upper=pw.right.bollinger_upper,
    )
    .with_columns(
        is_alert=(
            (pw.this.volume > 10000)
            & (
                (pw.this.vwap > pw.this.bollinger_upper)
                | (pw.this.vwap < pw.this.bollinger_lower)
            )
        )
    )
    .with_columns(
        action=pw.if_else(
            pw.this.is_alert,
            pw.if_else(pw.this.vwap > pw.this.bollinger_upper, "sell", "buy"),
            "hodl",
        )
    )
)

In [6]:
alerts = joint_stats.filter(pw.this.is_alert).select(
    pw.this.ticker, pw.this.t, pw.this.vwap, pw.this.action
)

In [7]:
import bokeh.models


def stats_plotter(src):
    actions = ["buy", "sell", "hodl"]
    color_map = bokeh.models.CategoricalColorMapper(
        factors=actions, palette=("#00ff00", "#ff0000", "#00000000")
    )

    fig = bokeh.plotting.figure(
        height=400,
        width=600,
        title="20 minutes Bollinger bands with last 1 minute average",
        x_axis_type="datetime",
    )

    fig.line("t", "vwap", source=src)

    fig.line("t", "bollinger_lower", source=src, line_alpha=0.3)
    fig.line("t", "bollinger_upper", source=src, line_alpha=0.3)
    fig.varea(
        x="t",
        y1="bollinger_lower",
        y2="bollinger_upper",
        fill_alpha=0.3,
        fill_color="gray",
        source=src,
    )

    fig.scatter(
        "t",
        "vwap",
        size=10,
        marker="circle",
        color={"field": "action", "transform": color_map},
        source=src,
    )

    return fig

In [8]:
import panel as pn

viz = pn.Row(
    joint_stats.plot(stats_plotter, sorting_col="t"),
    alerts.show(include_id=False, sorters=[{"field": "t", "dir": "desc"}]),
)
viz

## Alerts forwarding to Slack

In [9]:
import requests

# TODO Please set appropriate values for SLACK_CHANNEL_ID, and SLACK_TOKEN
slack_alert_channel_id = "SLACK_CHANNEL_ID"
slack_alert_token = "SLACK_TOKEN"


def send_slack_alert(key, row, time, is_addition):
    if not is_addition:
        return
    alert_message = f'Please {row["action"]} {row["ticker"]}'
    print(f'Sending alert "{alert_message}"')
    requests.post(
        "https://slack.com/api/chat.postMessage",
        data="text={}&channel={}".format(alert_message, slack_alert_channel_id),
        headers={
            "Authorization": "Bearer {}".format(slack_alert_token),
            "Content-Type": "application/x-www-form-urlencoded",
        },
    ).raise_for_status()


pw.io.subscribe(alerts, send_slack_alert)

In [10]:
pw.run()