Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can not create pipeline with spark job #19

Open
buithuhasvtech opened this issue May 21, 2024 · 0 comments
Open

Can not create pipeline with spark job #19

buithuhasvtech opened this issue May 21, 2024 · 0 comments

Comments

@buithuhasvtech
Copy link

Hi there, I am trying to create a pipeline with Spark job (My code is based on this tutorial: https://docs.open-metadata.org/v1.3.x/connectors/ingestion/lineage/spark-lineage#spark-lineage-ingestion). The tables used in the spark job all exist in OM, the spark job has a finished status, and a pipeline service named My_pipeline_service has been created.
Screenshot 2024-05-21 141945

But I didn't see any pipeline. What can I do to create the pipeline with the spark job? I have only added metadata ingestion in the DB service, do I need to add any other type of ingestion to create the pipeline?

This is my code:
`from pyspark.sql import SparkSession
import sys
import calendar
import time
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

ss_conf = {'database': 'test_db',
'password': 'my_pwd',
'port': '3306',
'host': 'my_host',
'ssl': 'false',
'username': 'my_username',
}

OM_JWT = "my_om_jwt"

spark = (
SparkSession.builder.master("local")
.appName("localTestApp").config(
"spark.jars",
"/thuvien/driver/singlestore-spark-connector_2.12-4.1.3-spark-3.3.0.jar,/thuvien/driver/mariadb-java-client-3.1.4.jar,/thuvien/driver/singlestore-jdbc-client-1.2.0.jar,/thuvien/driver/commons-dbcp2-2.9.0.jar,/thuvien/driver/commons-pool2-2.11.1.jar,/thuvien/driver/spray-json_2.10-1.2.5.jar,/thuvien/driver/openmetadata-spark-agent-1.0-beta.jar",
)
.config(
"spark.extraListeners",
"org.openmetadata.spark.agent.OpenMetadataSparkListener",
)
.config("spark.openmetadata.transport.hostPort", "my_hostPort")
.config("spark.openmetadata.transport.type", "openmetadata")
.config("spark.openmetadata.transport.jwtToken", OM_JWT)
.config(
"spark.openmetadata.transport.pipelineServiceName", "My_pipeline_service"
)
.config("spark.openmetadata.transport.pipelineName", "My_pipeline_name")
.config(
"spark.openmetadata.transport.pipelineDescription", "My ETL Pipeline"
)
.config(
"spark.openmetadata.transport.databaseServiceNames",
"analytic",
)
.config("spark.openmetadata.transport.timeout", "30")
.config("spark.openlineage.facets.disabled", "[spark_unknown;spark.logicalPlan]")
.getOrCreate()
)

sc = spark.sparkContext
sc.setLogLevel("INFO")
#conf ss
spark.conf.set("spark.datasource.singlestore.clientEndpoint",ss_conf['host'])
spark.conf.set("spark.datasource.singlestore.user", ss_conf['username'])
spark.conf.set("spark.datasource.singlestore.password", ss_conf['password'])

df1 = (spark
.read
.format("singlestore")
.option("spark.datasource.singlestore.disablePushdown", "true")
.option("enableParallelRead", "automatic")
.option("parallelRead.Features", "readFromAggregatorsMaterialized,readFromAggregators")
.option("paralledRead.repartition", "true")
.option("paralledRead.maxNumPartitions", 20)
.option("parallelRead.repartition.columns", "isdn_key")
.load("test_db.uc16_02_tkc_20240229"))

print(df1.count())

df1.na.fill(0)
.na.fill("")
.write
.mode("Overwrite")
.format("singlestore").save("test_db.uc16_02_tkc_20240229_new")

spark.stop()
`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant