## Kaskada: Materializing Results to a Pulsar Topic
Kaskada allows you to create a materialization, a resource similar to a query, that automatically runs when new data is loaded into any table the materialization references. Materializations can be used to populate feature vectors in a variety of feature stores to be used in production for low-latency inference. In this example, we'll showcase results being incrementally materialized to a Pulsar topic. 

For more information, see https://kaskada-ai.github.io/docs-site/kaskada/main/reference/working-with-materializations.html. 

In [1]:
import kaskada.api.release as release
import os
from getpass import getpass
os.environ[release.ReleaseClient.GITHUB_ACCESS_TOKEN_ENV] = getpass(prompt='Github Access Token:')

Github Access Token:········


In [2]:
from kaskada.api.session import LocalBuilder
session = LocalBuilder().build()

INFO:numexpr.utils:NumExpr defaulting to 6 threads.


<jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
<jemalloc>: (This is the expected behaviour if you are running under QEMU)


INFO:kaskada.api.release:Using latest release version: engine@v0.4.1


kaskada-engine-linux-amd64:   0%|          | 0/74341856 [00:00<?, ?it/s]

kaskada-manager-linux-amd64:   0%|          | 0/37310808 [00:00<?, ?it/s]

INFO:kaskada.api.session:Initializing manager process
INFO:kaskada.api.session:Initializing engine process
INFO:kaskada.api.session:Successfully connected to session.


### Create the table and load data

In [3]:
import kaskada.table

kaskada.table.create_table('transactions', 'transaction_time', 'id')

0,1
table,table_nametransactionsentity_key_column_nameidtime_column_nametransaction_timeversion0create_time2023-03-16T22:30:17.812073update_time2023-03-16T22:30:17.812111
request_details,request_id0a39a6207784fbe77bfc13fa229998e9

0,1
table_name,transactions
entity_key_column_name,id
time_column_name,transaction_time
version,0
create_time,2023-03-16T22:30:17.812073
update_time,2023-03-16T22:30:17.812111

0,1
request_id,0a39a6207784fbe77bfc13fa229998e9


In [4]:
kaskada.table.load('transactions', '../datasets/transactions_part1.parquet')

0,1
data_token_id,7bcb67a7-d150-4602-93c3-79de53a6142b
request_details,request_id17d91ff5be5f1a9e93ec2495f37cbb28

0,1
request_id,17d91ff5be5f1a9e93ec2495f37cbb28


### Create a referenceable query

In [5]:
%load_ext fenlmagic

In [6]:
%%fenl --result-behavior final-results --var test_query 

transactions

Unnamed: 0,_time,_subsort,_key_hash,_key,id,price,quantity,purchaser,purchaser_id,credit_provider,email,transaction_time,idx
0,2013-01-23 00:20:17.000000001,18446744073709551615,139170054356138125,a0d86e83-a235-4d41-af1d-221222a4c824,a0d86e83-a235-4d41-af1d-221222a4c824,115.18,3,Dylan Aguilar,97e575f6bf434306fc5afe71b7f8ea9f,VISA 19 digit,Dylan.Aguilar@example.com,2002-08-09 03:42:38,0
1,2013-01-23 00:20:17.000000001,18446744073709551615,15966548824441284666,5a90a5a5-d32b-4b28-a5d0-b2b84862c0dd,5a90a5a5-d32b-4b28-a5d0-b2b84862c0dd,36.1,8,Wendy Francis,a18cd9c625d9d6c56a183471e8cf9248,VISA 19 digit,Wendy.Francis@example.com,2013-01-23 00:20:17,1

0,1
state,SUCCESS
query_id,131c012c-3639-4355-a9ee-c260f8b0f698
metrics,time_preparing0.177stime_computing0.215soutput_files1
analysis,can_executeTrue
schema,(see Schema tab)
request_details,request_id8c8108bd099ae8d9ab76863eb43d1284
expression,transactions

0,1
time_preparing,0.177s
time_computing,0.215s
output_files,1

0,1
can_execute,True

0,1
request_id,8c8108bd099ae8d9ab76863eb43d1284

Unnamed: 0,column_name,column_type
0,id,string
1,price,f64
2,quantity,i64
3,purchaser,string
4,purchaser_id,string
5,credit_provider,string
6,email,string
7,transaction_time,string
8,idx,i64


### Create a materialization

In [7]:
from kaskada import materialization as materialize
from kaskada.materialization import PulsarDestination

# A Pulsar topic is composed of a "tenant", "namespace", and "name". 
# Together, they comprise the `topic_url` in the format: `persistent://<tenant>/<namespace>/<name>`. 
#
# Note that if you change the "tenant" or "namespace", they must already exist.
# The "public" tenant and "default" namespace are created by default.
#
# A "name" may be used for a single topic. If the query changes, the original topic
# must be manually deleted to reuse the name. 
#
# The "broker_service_url" is how the client connects to the broker. The pulsar container is exposed 
# with the hostname "pulsar". 
destination=PulsarDestination(tenant="public", namespace="default", topic_name="my_topic", broker_service_url="pulsar://pulsar:6650")

# Creating a materialization runs the query and materializes results to your Pulsar topic. 
materialize.create_materialization(
    name = "test_materialization",
    expression = test_query.expression,
    destination = destination,
    views = [] 
)

0,1
materialization,materialization_nametest_materializationdestinationsliceNone(full dataset used for query)schema(see Schema tab)create_time2023-03-16T22:31:13.807262
analysis,can_executeTrue
request_details,request_idca5d34d23c5815c0d8ccbd55f522e76e

0,1
materialization_name,test_materialization
destination,
slice,None(full dataset used for query)
schema,(see Schema tab)
create_time,2023-03-16T22:31:13.807262

0,1
,(full dataset used for query)

0,1
can_execute,True

0,1
request_id,ca5d34d23c5815c0d8ccbd55f522e76e

Unnamed: 0,column_name,column_type
0,id,string
1,price,f64
2,quantity,i64
3,purchaser,string
4,purchaser_id,string
5,credit_provider,string
6,email,string
7,transaction_time,string
8,idx,i64


### Consume events from your Pulsar topic 


In [9]:
import pulsar

client = pulsar.Client('pulsar://pulsar:6650')
myTopic = "persistent://public/default/topic-my_topic"
consumer = client.subscribe(myTopic, subscription_name='my-subscription', initial_position=pulsar.InitialPosition.Earliest)

for _ in range(2):
    msg = consumer.receive()
    print("Received message: '%s'" % msg.data())
    consumer.acknowledge(msg)

2023-03-16 22:31:25.305 INFO  [274910443392] Client:87 | Subscribing on Topic :persistent://public/default/topic-my_topic
2023-03-16 22:31:25.309 INFO  [274910443392] ClientConnection:189 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000
2023-03-16 22:31:25.309 INFO  [274910443392] ConnectionPool:97 | Created connection for pulsar://pulsar:6650
2023-03-16 22:31:25.314 INFO  [281182012992] ClientConnection:379 | [172.25.0.3:41518 -> 172.25.0.2:6650] Connected to broker
2023-03-16 22:31:25.385 INFO  [281182012992] HandlerBase:72 | [persistent://public/default/topic-my_topic, my-subscription, 0] Getting connection from pool
2023-03-16 22:31:25.399 INFO  [281182012992] ClientConnection:189 | [<none> -> pulsar://pulsar:6650] Create ClientConnection, timeout=10000
2023-03-16 22:31:25.399 INFO  [281182012992] ConnectionPool:97 | Created connection for pulsar://localhost:6650
2023-03-16 22:31:25.400 INFO  [281182012992] ClientConnection:381 | [172.25.0.3:41524 -> 172.25

### Load new data 

In [10]:
# Loading data into a table referenced by an existing materialization will cause the query 
# to materialize incremental results to your destination. 
#
# In this example, we expect all 3 events in `transaction_part2.parquet` to be materialized 
# to our topic.
kaskada.table.load('transactions', '../datasets/transactions_part2.parquet')

Exception: An error occurred in your request.
	Error Code: UNAVAILABLE
	Error Message: failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:50051: Failed to connect to remote host: Connection refused



In [None]:
for _ in range(3):
    msg = consumer.receive()
    print("Received message: '%s'" % msg.data())
    consumer.acknowledge(msg)

In [None]:
# Similarly, loads 10 new events
kaskada.table.load('transactions', '../datasets/transactions_part3.parquet')

In [None]:
for _ in range(10):
    msg = consumer.receive()
    print("Received message: '%s'" % msg.data())
    consumer.acknowledge(msg)

client.close()