# Openmetadata spark agent

There is a spark agent which can capture the information from the spark operation. It can generate tables(spark write dataframe to database table) and the lineage between tables.

https://github.com/open-metadata/openmetadata-spark-agent

In [1]:
from pyspark.sql import SparkSession
from creds import dbUser, dbPasswd, dbHost, dbPort, om_admin_token

In [2]:
jar_path="/home/pengfei/git/OpenMetaIngestion/jars"

enable_meta=False

if enable_meta:
    spark = (
        SparkSession.builder.master("local")
        .appName("localTestApp")
        .config(
            "spark.jars",
            f"{jar_path}/openmetadata-spark-agent-1.0.jar,{jar_path}/mysql-connector-java-8.0.30.jar")
        .config(
            "spark.extraListeners","org.openmetadata.spark.agent.OpenMetadataSparkListener")
        .config("spark.openmetadata.transport.hostPort", "http://datacatalog.casd.local")
        .config("spark.openmetadata.transport.type", "openmetadata")
        .config("spark.openmetadata.transport.jwtToken", om_admin_token)
        .config("spark.openmetadata.transport.pipelineServiceName", "test_pipeline_service1")
        .config("spark.openmetadata.transport.pipelineName", "my_pipeline_name")
        .config("spark.openmetadata.transport.pipelineSourceUrl", "http://casd.local/path/to/pipeline")
        .config("spark.openmetadata.transport.pipelineDescription", "My_ETL_Pipeline")
        .config("spark.openmetadata.transport.databaseServiceNames", "constance")
        .config("spark.openmetadata.transport.timeout", "30")
        .getOrCreate()
    )
else:
    spark = (
        SparkSession.builder.master("local")
        .config(
            "spark.jars",
            f"{jar_path}/mysql-connector-java-8.0.30.jar")
        .appName("localTestApp")
        .getOrCreate()
    )


24/07/01 13:06:06 WARN Utils: Your hostname, pengfei-Virtual-Machine resolves to a loopback address: 127.0.1.1; using 10.50.2.80 instead (on interface eth0)
24/07/01 13:06:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/07/01 13:06:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Read from MySQL Table
db_name="constance1"
table_name="ct_E2021_F2010_DCIR_BIO"
employee_df = (
    spark.read.format("jdbc")
    .option("url", f"jdbc:mysql://{dbHost}:{dbPort}/{db_name}")
    .option("driver", "com.mysql.cj.jdbc.Driver")
    .option("dbtable", table_name)
    .option("user", f"{dbUser}")
    .option("password", f"{dbPasswd}")
    .load()
)

employee_df.show(5)

+---+----+-----------+
|UID|Name|FLX_DIS_DTD|
+---+----+-----------+
+---+----+-----------+


                                                                                

In [4]:
employee_df.printSchema()

root
 |-- UID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- FLX_DIS_DTD: date (nullable = true)


In [13]:

# creat a new table with the dataframe employee_df
target_db_name = "pengfei_test"

employee_df.write.format("jdbc") \
.option("url", f"jdbc:mysql://{dbHost}:{dbPort}/{db_name}") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", target_db_name) \
.option("user", f"{dbUser}") \
.option("password", f"{dbPasswd}") \
.mode("overwrite") \
.save()



## Create table by using view and sql


In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

data_user = [
    (1, "Alice", 29),
    (2, "Bob", 31),
    (3, "Cathy", 23)
]
schema_user = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
df_user=  spark.createDataFrame(data_user,schema_user)

In [6]:
df_user.show()

                                                                                

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 29|
|  2|  Bob| 31|
|  3|Cathy| 23|
+---+-----+---+


In [7]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

data_order = [
    (1, 23, 1),
    (2, 24, 2),
    (3, 25, 3)
]
schema_order = StructType([
    StructField("id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("uid", IntegerType(), True)
])
df_order =  spark.createDataFrame(data_order, schema_order)

In [8]:
df_order.show()

+---+----------+---+
| id|product_id|uid|
+---+----------+---+
|  1|        23|  1|
|  2|        24|  2|
|  3|        25|  3|
+---+----------+---+


In [9]:
df_user.createOrReplaceTempView("test_user")
df_order.createOrReplaceTempView("test_order")

In [10]:
test_df = spark.sql("""SELECT id, name, age FROM test_user ;""")
test_df.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 29|
|  2|  Bob| 31|
|  3|Cathy| 23|
+---+-----+---+


In [11]:
user_order_df=spark.sql("""SELECT test_user.id as uid, 
             test_order.id as oid, 
             test_order.product_id,
             test_user.age
             FROM test_user 
             INNER JOIN test_order 
             ON test_user.id=test_order.uid;""")

In [12]:
user_order_df.show()

[Stage 8:>                                                          (0 + 1) / 1]

+---+---+----------+---+
|uid|oid|product_id|age|
+---+---+----------+---+
|  1|  1|        23| 29|
|  2|  2|        24| 31|
|  3|  3|        25| 23|
+---+---+----------+---+


                                                                                

In [None]:
# Stop the Spark session
spark.stop()