<a href="https://colab.research.google.com/github/pathwaycom/pathway/blob/main/examples/projects/from_jupyter_to_deploy/part3_kafka_data_streamer.ipynb" target="_parent"><img src="https://pathway.com/assets/colab-badge.svg" alt="Run In Colab" class="inline"/></a>

# Installing Pathway with Python 3.10+

In the cell below, we install Pathway into a Python 3.10+ Linux runtime.

> **If you are running in Google Colab, please run the colab notebook (Ctrl+F9)**, disregarding the 'not authored by Google' warning.
> 
> **The installation and loading time is less than 1 minute**.


In [None]:
%%capture --no-display
!pip install --prefer-binary pathway

# Part 3: Kafka integration and alerts forwarding (Producer)

This notebook is a helper notebook for the third part of the tutorial [From interactive data exploration to deployment](/developers/user-guide/exploring-pathway/from-jupyter-to-deploy#part-3-kafka-integration-and-alerts-forwarding).

In [None]:
# Download CSV file
!wget -nc https://gist.githubusercontent.com/janchorowski/e351af72ecd8d206a34763a428826ab7/raw/ticker.csv

--2024-06-12 08:21:04--  https://gist.githubusercontent.com/janchorowski/e351af72ecd8d206a34763a428826ab7/raw/ticker.csv
Resolving gist.githubusercontent.com (gist.githubusercontent.com)... 

185.199.109.133, 185.199.108.133, 185.199.111.133, ...
Connecting to gist.githubusercontent.com (gist.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1253370 (1.2M) [text/plain]
Saving to: ‘ticker.csv’


ticker.csv            0%[                    ]       0  --.-KB/s               



2024-06-12 08:21:04 (52.2 MB/s) - ‘ticker.csv’ saved [1253370/1253370]



## Writing messages to Kafka

In [None]:
import pathway as pw

# To use advanced features with Pathway Scale, get your free license key from
# https://pathway.com/features and paste it below.
# To use Pathway Community, comment out the line below.
pw.set_license_key("demo-license-key-with-telemetry")

fname = "ticker.csv"
schema = pw.schema_from_csv(fname)

In [None]:
print(schema.generate_class(class_name="DataSchema"))

class DataSchema(pw.Schema):
    ticker: str
    open: float
    high: float
    low: float
    close: float
    volume: float
    vwap: float
    t: int
    transactions: int
    otc: str


In [None]:
# The schema definition is autogenerated
class DataSchema(pw.Schema):
    ticker: str
    open: float
    high: float
    low: float
    close: float
    volume: float
    vwap: float
    t: int
    transactions: int
    otc: str


data = pw.demo.replay_csv(fname, schema=DataSchema, input_rate=1000)

In [None]:
# TODO: please set appropriaye values for KAFKA_ENDPOINT, KAFKA_USERNAME, and KAFKA_PASSWORD
rdkafka_producer_settings = {
    "bootstrap.servers": "KAFKA_ENDPOINT:9092",
    "security.protocol": "sasl_ssl",
    "sasl.mechanism": "SCRAM-SHA-256",
    "sasl.username": "KAFKA_USERNAME",
    "sasl.password": "KAFKA_PASSWORD",
}

pw.io.kafka.write(data, rdkafka_producer_settings, topic_name="ticker")

In [None]:
pw.run()