In [0]:
raw_posts_df = spark.read.table('data_platform.project.raw_posts')

In [0]:
display(raw_posts_df.limit(3))

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType 
from pyspark.sql import DataFrame 

def split_tags_to_array(df: DataFrame) -> DataFrame:
    return(
        df.withColumn('TagArray', F.filter(F.split(F.col('Tags'), r'\|'), lambda x: x!= '')).drop(F.col('Tags'))
    ) 

def rename_column(df: DataFrame) -> DataFrame:
    return(
        df.withColumnRenamed('Id', 'PostId')
    ) 

def map_posts_to_id(df: DataFrame) -> DataFrame:
    map_data = [
        (1, "Question"),
        (2, "Answer"),
        (3, "Orphaned tag wiki"),
        (4, "Tag wiki excerpt"),
        (5, "Tag wiki"),
        (6, "Moderator nomination"),
        (7, "Wiki placeholder"),
        (8, "Privilege wiki"),
        (9, "Article"),
        (10, "HelpArticle"),
        (12, "Collection"),
        (13, "ModeratorQuestionnaireResponse"),
        (14, "Announcement"),
        (15, "CollectiveDiscussion"),
        (17, "CollectiveCollection")
    ]
    map_schema = StructType([
        StructField('PostTypeId', IntegerType(), False),
        StructField('PostType', StringType(), False)
    ])
    map_df = spark.createDataFrame(map_data, schema = map_schema)

    return(
        df.join(
            F.broadcast(map_df),
            on = 'PostTypeId',
            how = 'left'
        )
    )

stg_posts_df = (
    raw_posts_df.transform(split_tags_to_array)
    .transform(rename_column)
    .transform(map_posts_to_id)
)


In [0]:
from pyspark.sql import functions as F 
from pyspark.sql import DataFrame
from delta.tables import DeltaTable

def incremental_upsert(dest_table: str, df: DataFrame, unique_key: str, updated_at: str, full_refresh = False):
    if not spark.catalog.tableExists(dest_table) or full_refresh:
        (
            df.write
                .format('delta')
                .mode('overwrite')
                .option('overwriteSchema', 'true')
                .saveAsTable(dest_table)
        )
    else:
        latest_max = (
            spark.read.table(dest_table)
                .agg(F.max(updated_at).alias('max_ts'))
                .collect()[0]['max_ts']
        )
        incremental_df = df.filter(F.col(updated_at) > latest_max)

        if not incremental_df.rdd.isEmpty():
            delta_table = DeltaTable.forName(spark, dest_table)

            (
                delta_table.alias('d').merge(
                    source = incremental_df.alias('i'),
                    condition = f'd.{unique_key} == i.{unique_key}'
                )
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
                .execute()
            )

dest_table = 'data_platform.project.stg_posts'
incremental_upsert(dest_table, stg_posts_df, 'PostId', 'CreationDate')
        

In [0]:
spark.table(dest_table).rdd.getNumPartitions()