# Transforming a DataSet to another schema

We often come across the following pattern:

In [2]:
from pyspark.sql.types import IntegerType
from typedspark import Column, Schema, DataSet

class A(Schema):
    a: Column[IntegerType]
    b: Column[IntegerType]
    c: Column[IntegerType]
    d: Column[IntegerType]
    e: Column[IntegerType]

class B(Schema):
    f: Column[IntegerType]
    g: Column[IntegerType]
    h: Column[IntegerType]
    i: Column[IntegerType]
    j: Column[IntegerType]

class AB(Schema):
    a: Column[IntegerType]
    b: Column[IntegerType]
    i: Column[IntegerType]
    j: Column[IntegerType]

def foo(df_a: DataSet[A], df_b: DataSet[B]) -> DataSet[AB]:
    return DataSet[AB](
        df_a.join(df_b, A.a == B.f)
        .withColumn(AB.a.str, A.a + 3)
        .withColumn(AB.b.str, A.b + 7)
        .withColumn(AB.i.str, B.i - 5)
        .withColumn(AB.j.str, B.j + 1)
        .select(*AB.all_column_names())
    )

We can make that quite a bit more condensed:

In [3]:
from typedspark import transform_to_schema

def foo(df_a: DataSet[A], df_b: DataSet[B]) -> DataSet[AB]:
    return transform_to_schema(
        df_a.join(df_b, A.a == B.f),
        AB,
        {
            AB.a: A.a + 3,
            AB.b: A.b + 7,
            AB.i: B.i - 5,
            AB.j: B.j + 1,
        }
    )

This function can also be used to just select the subset of columns used in the schema, simply omit the third argument.

In [4]:
class A(Schema):
    a: Column[IntegerType]
    b: Column[IntegerType]
    c: Column[IntegerType]
    d: Column[IntegerType]
    e: Column[IntegerType]

class B(Schema):
    a: Column[IntegerType]
    b: Column[IntegerType]

def foo(df: DataSet[A]) -> DataSet[B]:
    return transform_to_schema(df, B)

The `transformations` dictionary in `transform_to_schema(..., transformations)` requires columns with unique names as keys. The following pattern will throw an exception.

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.Builder().getOrCreate()

In [12]:
from typedspark import create_partially_filled_dataset

df = create_partially_filled_dataset(spark, A, {A.a: [1, 2, 3]})

try:
    transform_to_schema(
        df,
        B,
        {
            B.a: A.a + 3,
            B.a: B.a * 2,
        }
    )
except ValueError as e:
    print(e)

23/03/23 10:51:45 WARN Column: Constructing trivially true equals predicate, ''a = 'a'. Perhaps you need to use aliases.
Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.


Instead, use one line per column

In [11]:
transform_to_schema(
    df,
    B,
    {
        B.a: (A.a + 3) * 2,
    }
).show()

+---+----+
|  a|   b|
+---+----+
|  8|null|
| 10|null|
| 12|null|
+---+----+

