In [1]:
import kaskada as kd
import pandas as pd
import ray

ray.init(ignore_reinit_error=True)

2023-10-27 16:28:10,178	INFO worker.py:1633 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.11.4
Ray version:,2.7.1
Dashboard:,http://127.0.0.1:8265


In [2]:
@ray.remote
class RemoteUsers:
    async def __init__(self):
        kd.init_session()

        users_df = pd.read_json("slack-generation.users.json")
        columns_to_keep = ["id", "team_id", "name", "deleted", "real_name", "is_bot", "updated"]
        users_df.drop(columns=users_df.columns.difference(columns_to_keep), inplace=True)

        self.users = await kd.sources.Pandas.create(
            users_df,
            time_column = "updated",
            key_column = "id",
            time_unit = "s"
        )

    def get_user(self, user_id):
        data = self.users.filter(self.users.col("id").eq(user_id)).last().preview().to_dict(orient='index')
        return data[0] if len(data) == 1 else None

In [8]:
import re, asyncio

@ray.remote
class RemoteMessages:
    async def __init__(self):
        kd.init_session()

        self.users = RemoteUsers.remote()

        self.source = await kd.sources.Parquet.create(
            "slack-generation.1.parquet",
            time_column = "ts",
            key_column = "user",
            time_unit = "s",
            grouping_name="user",
        )

        async def format_user(user_id):
            user = await self.users.get_user.remote(user_id)
            return f"{user['name']} ({user_id})" if user else f"({user_id})"

        @kd.udf("f<N: any>(x: N) -> string")
        def format_users(batch: pd.Series):
            # Apply to each row in the batch
            return batch.map(format_user)

        @kd.udf("f<N: any>(x: N) -> string")
        async def format_users_async(batch: pd.Series):
            results = await asyncio.gather(*[format_user(user) for user in batch])
            return pd.Series(results)

        def strip_code_blocks(line):
            return re.sub(r"```.*?```", '', line)

        def user_repl(match_obj):
            user_id = match_obj.group(1)
            return format_user(user_id)

        def update_users(line):
            return re.sub(r"<@(.*?)>", user_repl, line)

        def clean_message(text):
            text = strip_code_blocks(update_users(text)).strip()
            return None if text == "" else text

        @kd.udf("f<N: any>(x: N) -> string")
        def clean_text(batch: pd.Series):
            # Apply to each row in the batch
            return batch.map(clean_message)

        @kd.udf("f<N: any>(x: N) -> string")
        def format_message(batch: pd.Series):
            def formatter(raw):
                return f"{raw['user']} --> {raw['text']}" # --> {raw['reactions']}"
            return batch.map(formatter)

        with_user = self.source.extend({
            "text": self.source.col("text"), # .pipe(clean_text),
            "user": self.source.col("user").pipe(format_users_async)
        })

        with_msgs = with_user.extend({
            "text": with_user.select("user", "text").pipe(format_message)
        })

        thread_ts = with_msgs.col("thread_ts")

        self.non_threads_iter = with_msgs.filter(thread_ts.is_null()).run_iter(kind="row", mode="live")
        self.threads_iter = with_msgs.filter(thread_ts.is_not_null()).run_iter(kind="row", mode="live")

    async def add_file(self, path):
        await self.source.add_file(path)

    def stop(self):
        self.non_threads_iter.stop()
        self.threads_iter.stop()

    async def _take_batch(self, execution, batch_size):
        rows = []
        while len(rows) < batch_size:
            try:
                async with asyncio.timeout(1):
                    row = await execution.__anext__()
                    rows.append(row)
            except StopAsyncIteration:
                print("stop async iteration")
                break
            except StopIteration:
                print("stop iteration")
                break
            except TimeoutError:
                print("timeout")
                break
            except Exception as exp:
                print(f"other exception: {exp}")
                break
        return rows

    async def non_threads_take_batch(self, batch_size: int = 20):
        return await self._take_batch(self.non_threads_iter, batch_size=batch_size)

    async def threads_take_batch(self, batch_size: int = 20):
        return await self._take_batch(self.threads_iter, batch_size=batch_size)

In [9]:
rm = RemoteMessages.remote()

In [10]:
ray.get(rm.non_threads_take_batch.remote())

[]

[2m[36m(RemoteMessages pid=48526)[0m execution: no items left in local iterator
[2m[36m(RemoteMessages pid=48526)[0m execution: starting to wait for next pyarrow batch
[2m[36m(RemoteMessages pid=48526)[0m other exception: [1merror in kaskada Rust code[22m
[2m[36m(RemoteMessages pid=48526)[0m ├╴at [3msrc/error.rs:54:21[23m
[2m[36m(RemoteMessages pid=48526)[0m │
[2m[36m(RemoteMessages pid=48526)[0m ├─▶ [1mexecute query[22m
[2m[36m(RemoteMessages pid=48526)[0m │   ╰╴at [3m/Users/runner/work/kaskada/kaskada/crates/sparrow-session/src/session.rs:527:28[23m
[2m[36m(RemoteMessages pid=48526)[0m │
[2m[36m(RemoteMessages pid=48526)[0m ├─▶ [1minternal compute error: failed to join compute threads[22m
[2m[36m(RemoteMessages pid=48526)[0m │   ╰╴at [3m/Users/runner/work/kaskada/kaskada/crates/sparrow-runtime/src/execute/compute_executor.rs:192:22[23m
[2m[36m(RemoteMessages pid=48526)[0m │
[2m[36m(RemoteMessages pid=48526)[0m ├─▶ [1minternal compute e



In [None]:

await rm.add_file.remote("slack-generation.2.parquet")


In [None]:

count = 0
while True:
    try:
        batch = ray.get(rm.non_threads_take_batch.remote())

        for row in batch:
            count += 1
        if len(batch) == 0:
            break
    except Exception as exp:
        print(f"failed: {exp}")
        break


print(f"Rows consumed: {count}")

In [None]:
kd.init_session()

In [None]:
import asyncio

source = await kd.sources.Parquet.create(
    "slack-generation.1.parquet",
    time_column = "ts",
    key_column = "channel",
    time_unit = "s"
)


my_iter = source.run_iter(kind="pyarrow", mode="live")

count = 0
while True:
    try:
        item = await my_iter.__anext__()
        count += 1
    except StopAsyncIteration:
        print(f"Iterator is exhausted after {count} rows")
        count = 0
        await asyncio.sleep(2)

        await source.add_file("slack-generation.2.parquet")
        async_iterator = source.run_iter(kind="pyarrow", mode="live") # Create a new iterator





In [None]:
import kaskada as kd
import asyncio

kd.init_session()

data1 = "\n".join(
    [
        "time,key,m,n",
        "1996-12-19T16:39:57,A,5,10",
        "1996-12-19T16:39:58,B,24,3",
        "1996-12-19T16:39:59,A,17,6",
        "1996-12-19T16:40:00,A,,9",
        "1996-12-19T16:40:01,A,12,",
        "1996-12-19T16:40:02,A,,",
    ]
)

data2 = "\n".join(
    [
        "time,key,m,n",
        "1996-12-20T16:39:57,A,5,10",
        "1996-12-20T16:39:58,B,24,3",
        "1996-12-20T16:39:59,A,17,6",
        "1996-12-20T16:40:00,C,,9",
        "1996-12-20T16:40:01,A,12,",
        "1996-12-20T16:40:02,A,,",
    ]
)

In [None]:
source = await kd.sources.CsvString.create(data1, time_column="time", key_column="key")

execution = source.run_iter(mode="live")

async def get_next():
    try:
        print(await execution.__anext__())
    except StopAsyncIteration:
        print("stop async iteration")

# Await the first batch.
await get_next()

# Add more data
await source.add_string(data2)

# await the second batch.
await get_next()

execution.stop()
await get_next()

In [None]:
source = await kd.sources.CsvString.create(data1, time_column="time", key_column="key")

execution = source.run_iter(mode="live")

async def add_more_data():
    print("waiting to send more data")
    await asyncio.sleep(0.5)
    await source.add_string(data2)
    print("sent more data")

async def stop_execution():
    print("waiting to stop execution")
    await asyncio.sleep(1.5)
    print("stopping execution", flush=True)
    await asyncio.sleep(0.001)
    execution.stop()
    print("stopped execution", flush=True)

async def output_batches():
    while True:
        try:
            print("waiting for next batch")
            async with asyncio.timeout(2):
                next_batch = await execution.__anext__()
                print(next_batch)
        except StopAsyncIteration:
            print("stop async iteration")
            break
        except StopIteration:
            print("stop iteration")
            break
        except TimeoutError:
            print("timeout")
            break
        except Exception as exp:
            print(f"other exception: {exp}")


stop_task = asyncio.create_task(stop_execution())
add_task = asyncio.create_task(add_more_data())
output_task = asyncio.create_task(output_batches())


# wait for the tasks to finish
await stop_task
print("stop task complete")
await add_task
print("add task complete")
await output_task
print("output task complete")

In [None]:
import kaskada as kd
import pandas as pd

kd.init_session()


In [None]:

users_df = pd.read_json("slack-generation.users.json")
columns_to_keep = ["id", "team_id", "name", "deleted", "real_name", "is_bot", "updated"]
users_df.drop(columns=users_df.columns.difference(columns_to_keep), inplace=True)

users = await kd.sources.Pandas.create(
    users_df,
    time_column = "updated",
    key_column = "id",
    time_unit = "s",
    grouping_name="user",
)

messages = await kd.sources.Parquet.create(
    "slack-generation.1.parquet",
    time_column = "ts",
    key_column = "user",
    time_unit = "s",
    grouping_name="user",
)

with_username = messages.extend({
    "username": users.col("name").last().lookup(messages.col("user"))
})

with_username.preview()


In [None]:



@kd.udf("f<N: any>(x: N) -> string")
def format_users(batch: pd.Series):
    def format_user(raw):
        id = raw["user"]
        name = raw["username"]
        return f"{name} ({id})" if name else f"({id})"
    return batch.map(format_user)

def strip_code_blocks(line):
    return re.sub(r"```.*?```", '', line)


# this won't work with basic lookups
# I'll need to extract any @users in a message
# into a new column via an UDF
# and then convert them 1 by 1
# and then somehow re-insert them back into the message texts
def update_users(raw):
    msg = raw["text"]
    name = raw["username"]
    def user_repl(match_obj):
        id = match_obj.group(1)
        return f"{name} ({id})" if name else f"({id})"
    return re.sub(r"<@(.*?)>", user_repl, msg)

def clean_message(text):
    text = strip_code_blocks(update_users(text)).strip()
    return None if text == "" else text

@kd.udf("f<N: any>(x: N) -> string")
def clean_text(batch: pd.Series):
    # Apply to each row in the batch
    return batch.map(clean_message)

@kd.udf("f<N: any>(x: N) -> string")
def format_message(batch: pd.Series):
    def formatter(raw):
        return f"{raw['user']} --> {raw['text']}" # --> {raw['reactions']}"
    return batch.map(formatter)

with_user = self.source.extend({
    "text": self.source.col("text").pipe(clean_text),
    "user": self.source.col("user").pipe(format_users)
})

msgs = with_user.extend({
    "text": with_user.select("user", "text").pipe(format_message)
})