Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

WIP: hamilton + quokka / pyspark #269

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft

WIP: hamilton + quokka / pyspark #269

wants to merge 5 commits into from

Conversation

skrawcz
Copy link
Collaborator

@skrawcz skrawcz commented Jan 5, 2023

Working on how to make Hamilton handle Quokka better.

So nothing special. It dos make things testable,
but not an ideal UX.
@skrawcz skrawcz changed the title Adds example showing hamilton + quokka today WIP: hamilton + quokka Jan 5, 2023
@skrawcz
Copy link
Collaborator Author

skrawcz commented Jan 5, 2023

Thoughts to think about:

  • quokka, like spark, relies on a central object to be manipulated so that it can perform query optimizations.
  • adding support for hamilton means that we need to under the hood manipulate this object correctly.
  • I believe with a default hamilton function we have enough information to know what to do, it's just a matter of figuring out how to traverse the DAG in a way to construct the right order of operations.

@skrawcz
Copy link
Collaborator Author

skrawcz commented Jan 5, 2023

Sketch of some code converting the hello world example

# Data Loading
# Filtering is part of data loading -- do we also expose columns like this?
@extract_columns(*["l_quantity", "l_extendedprice", "l_discount", "l_tax", "l_returnflag", "l_linestatus"])
def lineitem(qc: QuokkaContext, path: str,
             filter: str = "l_shipdate <= date '1998-12-01' - interval '90' day") -> DataStream:
    """Loads and filters data from the lineitem table"""
    ds: DataStream = qc.read_csv(path, sep="|", has_header=True)
    if filter:
        ds = ds.filter(filter)
    return ds


# transforms we want people to write
def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
    """Computes the discounted price"""
    return l_extendedprice * (1 - l_discount)


def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
    """Computes the charge"""
    return l_extendedprice * (1 - l_discount) * (1 + l_tax)


@groupby("l_returnflag", "l_linestatus", order_by=[...])
def grouped_lineitem(l_quantity: pl.Series, l_extendedprice: pl.Series,
                        disc_price: pl.Series, charge: pl.Series, l_discount: pl.Series,
                        l_returnflag: pl.Series, l_linestatus: pl.Series) -> GroupedDataStream:
    pass

# maybe more subtly
def grouped_lineitem(l_returnflag: pl.Series, l_linestatus: pl.Series, *, l_quantity: pl.Series, l_extendedprice: pl.Series,
                        disc_price: pl.Series, charge: pl.Series, l_discount: pl.Series,
                        ) -> GroupedDataStream:
    pass

This is highly experimental. Just getting a feel for:

1. the hamilton API one would need to write.
2. what we'd need to adjust within Hamilton to make
things work as expected.

There is a lot of hackyness. But basically it seems that
with a graph adapter, assuming correctly node traversal,
then at least for the case of a single datastream object
we can correctly derive things.

Edge cases to handle:

- multiple input datastreams
- joins -- what's the syntax for that?
- schema validation -- we use that to understand what should
or shouldn't be used.

API thoughts:

 - groupby doesn't seem terrible, but really the question is: where
do you house the group by logic? in the adapter? or in the function?
 - need to figure out some join syntax
@skrawcz
Copy link
Collaborator Author

skrawcz commented Jan 7, 2023

Parking a thought -- what about just enabling hamilton type functions instead of with_column?

Basically given a datastream, that's the input and the output is another datastream

def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
    """Computes the discounted price"""
    return l_extendedprice * (1 - l_discount)


def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
    """Computes the charge"""
    return l_extendedprice * (1 - l_discount) * (1 + l_tax)

def main(qc, path):
    temp_module = ad_hoc_utils.create_temporary_module(disc_price, charge)
    adapter = QuokkaGraphAdapter_V2(base.DictResult()) 

    lineitem = qc.read_csv(path, sep="|", has_header=True)
    d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
    
    dr = hamilton.Driver({}, temp_module, adapter=adapter)
    d = dr.execute(["disc_price", "charge"])
    f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag", "l_linestatus"]).agg(
        {
            "l_quantity": ["sum", "avg"],
            "l_extendedprice": ["sum", "avg"],
            "disc_price": "sum",
            "charge": "sum",
            "l_discount": "avg",
            "*": "count",
        }
    )
    return f.collect()

Hello world continues to work.

Basically the next steps would be to:

1. create a join syntax.
2. implement some more examples that have a join.

Other thoughts:
 - every decorator should tag a node that it produced by it.
Not terrible.

Still need to design the join abstraction -- but validation
that approach to quokka transferred to approach to pyspark.
This does not work..

Trying to replicate the tpc-h-3 benchmark.
@skrawcz skrawcz changed the title WIP: hamilton + quokka WIP: hamilton + quokka / pyspark Feb 7, 2023
@skrawcz
Copy link
Collaborator Author

skrawcz commented Feb 21, 2023

Tweaking the above slightly:

def disc_price(l_extendedprice: pl.Series, l_discount: pl.Series) -> pl.Series:
    """Computes the discounted price"""
    return l_extendedprice * (1 - l_discount)


def charge(l_extendedprice: pl.Series, l_discount: pl.Series, l_tax: pl.Series) -> pl.Series:
    """Computes the charge"""
    return l_extendedprice * (1 - l_discount) * (1 + l_tax)

def main(qc, path):
    temp_module = ad_hoc_utils.create_temporary_module(disc_price, charge)
    adapter = QuokkaGraphAdapter_V2() 

    lineitem = qc.read_csv(path, sep="|", has_header=True)
    d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
    
    dr = hamilton.Driver({}, temp_module, adapter=adapter)
    d = dr.execute(["disc_price", "charge"], inputs={c: d for c in d.schema}) # default is to append columns to passed in dataframe
    f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag", "l_linestatus"]).agg(
        {
            "l_quantity": ["sum", "avg"],
            "l_extendedprice": ["sum", "avg"],
            "disc_price": "sum",
            "charge": "sum",
            "l_discount": "avg",
            "*": "count",
        }
    )
    return f.collect()

The QuokkaGraphAdapter_V2 adapter then intercepts and massages the internals appropriately to return a datastream with the extra columns.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant