# PySpark 4.0 â†’ DataHub (acryl-spark)

[![PySpark](https://img.shields.io/badge/PySpark-4.0-262A38?style=flat-square&logo=apachespark&logoColor=E36B22&labelColor=262A38)](https://spark.apache.org/docs/4.0.2/api/python/user_guide/index.html)
[![Scala](https://img.shields.io/badge/Scala-2.13-262A38?style=flat-square&logo=scala&logoColor=E03E3C&labelColor=262A38)](https://sdkman.io/)
[![JDK](https://img.shields.io/badge/JDK-17-35667C?style=flat&logo=openjdk&logoColor=FFFFFF&labelColor=1D213B)](https://sdkman.io/)
[![Acryl-Spark](https://img.shields.io/badge/acryl--spark--lineage-262A38?style=flat-square&logo=lineageos&logoColor=73A4BC&labelColor=262A38)](https://docs.datahub.com/docs/metadata-integration/java/acryl-spark-lineage)

Read from the staging HackerNews RSS tables (`stg_hot_articles`, `stg_newest_articles`) and write
the combined result into the `hackernews_rss` dataset on BigQuery.

**Notes:** 
This notebook requires **Spark 4.0** as it uses dependencies bound to the Spark version and/or the Scala version (2.13) that is bundled with

**Known limitation:** 
- The **Spark 4.x** variant (`acryl-spark-lineage_2.13:0.2.19-rc4`) only emits **input** lineage to DataHub (Datasets read) - the **output** dataset (`articles_spark`) is missing

### SparkSession Setup

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark_jars = ",".join([
    "com.google.cloud.spark:spark-4.0-bigquery:0.44.0",
    "0.2.19-rc4:0.2.19-rc4",
])

In [3]:
spark = (
    SparkSession.builder
    .appName("jupyter-acryl-datahub")
    .master("local[*]")
    .config("spark.jars.packages", spark_jars)
    .config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
    .config("spark.datahub.rest.server", "http://localhost:9090")
    .config("spark.datahub.metadata.dataset.materialize", "true")
    .config("spark.datahub.capture_spark_plan", "true")
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/13 07:57:04 WARN Utils: Your hostname, Brunos-M1-Max-MBP-16.local, resolves to a loopback address: 127.0.0.1; using 192.168.15.5 instead (on interface en0)
26/02/13 07:57:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/iobruno/Vault/data-catalog-labs/spark/.venv/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Provided Maven Coordinates must be in the form 'groupId:artifactId:version'. The coordinate provided is: 0.2.19-rc4:0.2.19-rc4
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.util.MavenUtils$.$anonfun$extractMavenCoordinates$1(MavenUtils.scala:102)
	at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:936)
	at org.apache.spark.util.MavenUtils$.extractMavenCoordinates(Ma

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

### Read Staging Tables

In [None]:
GCP_PROJECT = "iobruno-gcp-labs"
STG_DATASET = "stg_hackernews_rss"

df_hot = (
    spark.read
    .format("bigquery")
    .option("table", f"{GCP_PROJECT}.{STG_DATASET}.stg_hot_articles")
    .option("viewsEnabled", "true")
    .option("materializationDataset", STG_DATASET)
    .load()
)

df_hot.printSchema()
df_hot.show(5, truncate=False)

In [None]:
df_newest = (
    spark.read
    .format("bigquery")
    .option("table", f"{GCP_PROJECT}.{STG_DATASET}.stg_newest_articles")
    .option("viewsEnabled", "true")
    .option("materializationDataset", STG_DATASET)
    .load()
)

df_newest.printSchema()
df_newest.show(5, truncate=False)

### Combine Articles

In [None]:
from pyspark.sql.functions import lit

df_hot_tagged = df_hot.withColumn("source_feed", lit("hot"))
df_newest_tagged = df_newest.withColumn("source_feed", lit("newest"))

df_articles = df_hot_tagged.unionByName(df_newest_tagged)

print(f"Total articles: {df_articles.count()}")
df_articles.show(10, truncate=False)

### Write to BigQuery

In [None]:
TARGET_DATASET = "hackernews_rss"
TARGET_TABLE = f"{GCP_PROJECT}.{TARGET_DATASET}.articles_spark"

(
    df_articles.write
    .format("bigquery")
    .option("table", TARGET_TABLE)
    .option("writeMethod", "direct")
    .mode("overwrite")
    .save()
)

print(f"Written to {TARGET_TABLE}")