In [2]:
import ibis
# NEXMark schemas https://datalab.cs.pdx.edu/niagara/NEXMark/
person_schema = ibis.schema(
    {
        "id": "int64",
        "name": "string",
        "emailaddress": "string",
        "creditcard": "string",
        "city": "string",
        "state": "string"
    }
)

auction_schema = ibis.schema(
    {
        "id": "int64",
        "itemname": "string",
        "description": "string",
        "initialbid": "float64",
        "reserve": "float64",
        "expires": "timestamp(3)",
        "seller": "int64",
    }
)

bid_schema = ibis.schema(
    {
        "auction": "int64",
        "bidder": "int64",
        "price": "float64",
        "datetime": "timestamp(3)"
    }
)

In [9]:
import ibis
from ibis import _

# Create source tables
person_table = ibis.table(name="Person", schema=person_schema)
auction_table = ibis.table(name="Auction", schema=auction_schema)
bid_table = ibis.table(name="Bid", schema=bid_schema)

# Create SubstraitCompiler object
from ibis_substrait.compiler.core import SubstraitCompiler
compiler = SubstraitCompiler()

compiler.compile(bid_table)

relations {
  root {
    input {
      read {
        common {
          direct {
          }
        }
        base_schema {
          names: "auction"
          names: "bidder"
          names: "price"
          names: "datetime"
          struct {
            types {
              i64 {
                nullability: NULLABILITY_NULLABLE
              }
            }
            types {
              i64 {
                nullability: NULLABILITY_NULLABLE
              }
            }
            types {
              fp64 {
                nullability: NULLABILITY_NULLABLE
              }
            }
            types {
              timestamp {
                nullability: NULLABILITY_NULLABLE
              }
            }
            nullability: NULLABILITY_REQUIRED
          }
        }
        named_table {
          names: "Bid"
        }
      }
    }
    names: "auction"
    names: "bidder"
    names: "price"
    names: "datetime"
  }
}
version {
  minor_number: 54
  produc

In [3]:
# Simulating queries from Arroyo documentation: https://doc.arroyo.dev/tutorial/first-pipeline
# First Query
# SELECT bid FROM nexmark WHERE bid IS NOT NULL;

first_query = bid_table.filter(_ is not None)
plan = compiler.compile(first_query)

with open("first_query.proto", "wb") as f:
    f.write(plan.SerializeToString())
plan

relations {
  root {
    input {
      filter {
        input {
          read {
            common {
              direct {
              }
            }
            base_schema {
              names: "auction"
              names: "bidder"
              names: "price"
              names: "datetime"
              struct {
                types {
                  i64 {
                    nullability: NULLABILITY_NULLABLE
                  }
                }
                types {
                  i64 {
                    nullability: NULLABILITY_NULLABLE
                  }
                }
                types {
                  fp64 {
                    nullability: NULLABILITY_NULLABLE
                  }
                }
                types {
                  timestamp {
                    nullability: NULLABILITY_NULLABLE
                  }
                }
                nullability: NULLABILITY_REQUIRED
              }
            }
            named_table {
 

In [8]:
# Second query: Window 
# SELECT avg(bid.price) as avg_price
# FROM nexmark
# WHERE bid IS NOT NULL
# GROUP BY hop(interval '2 seconds', interval '10 seconds')

# over window
# over_window_streaming = bid_table.filter(_ is not None)[_.price.mean().over(range=(-ibis.interval(seconds=10), 0), order_by=_.datetime).name("avg_price")]
over_window_batch = bid_table.filter(_ is not None).mutate(avg_price=_.price.mean().over(rows=(-2,0), order_by=_.datetime))
plan = compiler.compile(over_window_batch)
with open("over_window_batch.proto", "wb") as f:
    f.write(plan.SerializeToString())

# hop window
# TODO: notice how the watermark declaration is not associated with the query, but instead tied with the backend (not shown here) - would that be a problem?
hop_window = bid_table.filter(_ is not None).window_by(time_col=_.datetime).hop(size=ibis.interval(seconds=10), slide=ibis.interval(seconds=2)).agg(_.price.mean().name("avg_price"))

plan = compiler.compile(hop_window)
with open("hop_window_streaming.proto", "wb") as f:
    f.write(plan.SerializeToString())


  with open(".\substrait-plans\over_window_batch.proto", "wb") as f:
  with open(".\substrait-plans\hop_window_streaming.proto", "wb") as f:
  with open(".\substrait-plans\over_window_batch.proto", "wb") as f:
  with open(".\substrait-plans\hop_window_streaming.proto", "wb") as f:


NotImplementedError: <ibis.expr.operations.temporal_windows.WindowAggregate object at 0x10c561f40>