Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Delta Loads and Table Partitioning #126

Open
NicholasHoernleQC opened this issue Dec 4, 2023 · 2 comments
Open

Support for Delta Loads and Table Partitioning #126

NicholasHoernleQC opened this issue Dec 4, 2023 · 2 comments
Labels
enhancement New feature or request

Comments

@NicholasHoernleQC
Copy link

This issue describes some end goals for delta loading and table partitioning support. For the delta load, we would want to run a query that is based on the previous state of the delta load. This is described in "Row wise delta". There is also the possibility to use partitioned tables to manage transformations of data only in the relevant partition. This is described in "Partitioned Tables". "Use Cases" describes a number of examples that use various combinations of the row wise and partitioned loads.

Row wise

Here, some conditional where clause is added to a query depending on an input delta_load_date. If None, this query loads the full source table. If this value is set, the query only loaded the latest data. The task needs to return this value so that the state of the delta load can be stored. Moreover, the output table is not recreated in the delta load but rather appended to.

def get_col_range(year: int):
    return dt.datetime(year,1,1), dt.datetime(year+1,1,1)

@materialize(version="1.2.3", ignore_input_invalid=True, input_type=sa.Table, delta_mode=Delta.rowwise, delta_state=dict(load_date=Dtype.Datetime))
def src(src_tbl, delta_load_date: dt.Datetime | None):
    out = sa.select("*").select_from(sa.text(src_tbl))
    new_delta_load_date = sa.select("max(load_date)").select_from(sa.text(src_tbl))
    if delta_load_date is not None:
        out = out.where(sa.text(f"load_date > {delta_load_date}"))
        new_delta_load_date = new_delta_load_date.where(sa.text(f"load_date > {delta_load_date}"))
    return dict(load_date=new_delta_load_date), pdt.Table(out, name="src")

Partitioned Tables

Given specification of partitioning (e.g., below we partition a table by year), this task will not only create a partitioned table but it will also manage the cache invalidation by partition. For example, if only the 2023 data is updated, then only the 2023 partition requires updating.

@materialize(lazy=True, input_type=sa.Table, delta_mode=Delta.partition, delta_partitions=dict(year=range(2019,2024)), cache=input_valid)
def src(year: int):
    out = sa.select("*").select_from(sa.text("global_table")).where(sa.text(f"year = {year}"))
    return pdt.Table(out, partition=dict(load_date_x=get_col_range(year)), partition_name="{table_name}_yr{year}", name="src")

Use Cases

Use case 1 - Row wise delta load with a manual update of inputs

When the cash is valid, only "new" rows from a source table should be loaded and these should be appended to the output table.

## rowwise with manual update

def get_col_range(year: int):
    return dt.datetime(year,1,1), dt.datetime(year+1,1,1)


@materialize(version="1.2.3", ignore_input_invalid=True, input_type=sa.Table, delta_mode=Delta.rowwise, delta_state=dict(load_date=Dtype.Datetime))
def src(src_tbl, delta_load_date: dt.Datetime | None):
    out = sa.select("*").select_from(sa.text(src_tbl))
    new_delta_load_date = sa.select("max(load_date)").select_from(sa.text(src_tbl))
    if delta_load_date is not None:
        out = out.where(sa.text(f"load_date > {delta_load_date}"))
        new_delta_load_date = new_delta_load_date.where(sa.text(f"load_date > {delta_load_date}"))
    return dict(load_date=new_delta_load_date), pdt.Table(out, name="src")


def get_years(A: sa.Table, year: int):
    query = sa.select(A.c.year.distinct())
    engine = context.get_engine()
    vals = engine.execute(query).fetch_all()
    return [row[0] for row in vals]
    

@materialize(lazy=True, input_type=sa.Table, delta_mode=Delta.partition, delta_partitions=dict(year=get_years))
def f(A: sa.Table, year: int):
    out = sa.select([A, A.c.load_date.label("load_date_x")]).where(extract("YEAR", A.c.load_date)==year)
    return pdt.Table(out, partition=dict(load_date_x=get_col_range(year)), partition_name="{table_name}_yr{year}", name="out")
        # pipedage will call partition_name.format(dict(table_name=Table.name, year=year))


def get_pipeline():
    with Flow():
        with Stage():
            src_tbl = "global_table"
            data = src(src_tbl)
            transformed = f(data)

Use case 2 - Row wise delta load to partitioned output

# rowwise -> partitioned

def input_valid():
    query = sa.select("max(load_date)").select_from(sa.text("global_table"))
    engine = context.get_engine()
    val = engine.execute(query).fetch_one()[0][0]
    return val

def get_col_range(year: int):
    return dt.datetime(year,1,1)


@materialize(lazy=True, input_type=sa.Table, delta_mode=Delta.rowwise, delta_state=dict(load_date=Dtype.Datetime), cache=input_valid)
def src(delta_load_date: dt.Datetime | None):
    out = sa.select("*").select_from(sa.text("global_table"))
    new_delta_load_date = sa.select("max(load_date)").select_from(sa.text("global_table"))
    if delta_load_date is not None:
        out = out.where(sa.text(f"load_date > {delta_load_date}"))
        new_delta_load_date = new_delta_load_date.where(sa.text(f"load_date > {delta_load_date}"))
    return dict(load_date=new_delta_load_date), \
        pdt.Table(out, partition={"{table_name}_yr{year}".format(year=year):dict(load_date_x=get_col_range(year)), dict(year=year) for year in range(2019,2024)}, name="src")


def get_years(A: sa.Table, year: int):
    query = sa.select(A.c.year.distinct())
    engine = context.get_engine()
    vals = engine.execute(query).fetch_all()
    return [row[0] for row in vals]
    

@materialize(lazy=True, input_type=sa.Table, delta_mode=Delta.partition, delta_partitions=dict(year=get_years))
def f(A: sa.Table, year: int):
    out = sa.select([A, A.c.load_date.label("load_date_x")]).where(extract("YEAR", A.c.load_date)==year)
    return pdt.Table(out, partition=dict(load_date_x=get_col_range(year)), partition_name="{table_name}_yr{year}", name="out")
        # pipedage will call partition_name.format(dict(table_name=Table.name, year=year))


def get_pipeline():
    with Flow():
        with Stage():
            data = src()
            transformed = f(data)

Use case 3 - table to table with partitions managed separately

# partitioned -> partitioned (multiple tables)

def input_valid(year):
    if year == 2023:
        query = sa.select("count(*)").select_from(sa.text("global_table")).where(sa.text(f"year = {year}"))
        engine = context.get_engine()
        val = engine.execute(query).fetch_one()[0][0]
    else:
        val = 1
        
    return val


@materialize(lazy=True, input_type=sa.Table, delta_mode=Delta.partition_tables, delta_partitions=dict(year=range(2019,2024)), cache=input_valid)
def src(year: int):
    out = sa.select("*").select_from(sa.text("global_table")).where(sa.text(f"year = {year}"))
    return pdt.Table(out, name=f"src{year}")


@materialize(lazy=True, input_type=sa.Table, delta_mode=Delta.partition_tables, delta_partitions=dict(year=None))
def f(A: sa.Table, year: int):
    out = sa.select(A).where(A.c.year==year)
    return pdt.Table(out, name=f"out{year}")


@materialize(lazy=True, input_type=sa.Table)
def count_all(X: sa.Table | sa.Alias):  # alias is subquery
    return sa.select("count(*)").select_from(X)


def get_pipeline():
    with Flow():
        with Stage():
            data = src()  # data is a partitioned table object
            transformed = f(data)  # data is a partitioned table object
            cnt = count_all(transformed)
 
  
@materialize(lazy=True, input_type=sa.Table, inline_if_possible=True)
def union(tbls: list[sa.Table]):
    query = None
    for tbl in tbls:
        if query is None:
            query = tbl
        else:
            query = query.union_all(tbl)
    return query
    

def get_pipeline():
    with Flow():
        with Stage():
            transformed = []
            for year in range(2019,2024):
                data = src(year)
                transformed.append(f(data, year))
            cnt = count_all(union(transformed))
            

def get_pipeline():
    with Flow():
        with Stage():
            data = partition(dict(year=range(2019,2024)), src, args=[])
            transformed = for_each(data, f)

Use case 4 - partitioned table to partitioned table

# partitioned -> partitioned (one table)
# Attention: for caching, we need to do delta loading also when transferring between caches

def input_valid(year):
    if year == 2023:
        query = sa.select("count(*)").select_from(sa.text("global_table")).where(sa.text(f"year = {year}"))
        engine = context.get_engine()
        val = engine.execute(query).fetch_one()[0][0]
    else:
        val = 1
    return val


def get_col_range(year: int):
    return dt.datetime(year,1,1), dt.datetime(year+1,1,1)


@materialize(lazy=True, input_type=sa.Table, delta_mode=Delta.partition, delta_partitions=dict(year=range(2019,2024)), cache=input_valid)
def src(year: int):
    out = sa.select("*").select_from(sa.text("global_table")).where(sa.text(f"year = {year}"))
    return pdt.Table(out, partition=dict(load_date_x=get_col_range(year)), partition_name="{table_name}_yr{year}", name="src")


@materialize(lazy=True, input_type=sa.Table, delta_mode=Delta.partition, delta_partitions=dict(year=None)) # None means copy range from first source table that has this partitioning key
def f(A: sa.Table, year: int):
    out = sa.select([A, A.c.load_date.label("load_date_x")]).where(extract("YEAR", A.c.load_date)==year)
    return pdt.Table(out, partition=dict(load_date_x=get_col_range(year)), partition_name="{table_name}_yr{year}", name="out")
        # pipedage will call partition_name.format(dict(table_name=Table.name, year=year))


def get_pipeline():
    with Flow():
        with Stage():
            data = src()
            transformed = f(data)
@NicholasHoernleQC
Copy link
Author

FYI @nicolasmueller @windiana42 . Would you like me to add more details and/or explanation here?

@windiana42
Copy link
Member

Hamilton has a feature that is similar to partition based delta loading. In this case, a source node returns an iterator, subsequent tasks will be called separately for each element of the iterator, multiple iterator inputs are zipped, aggregation tasks can source an iterator and return non-iterated outputs: https://hamilton.dagworks.io/en/latest/concepts/parallel-task/

@windiana42 windiana42 added the enhancement New feature or request label Mar 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants