Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] java.lang.RuntimeException: Failed to create schema for topic #26

Closed
daileizhi opened this issue Dec 20, 2019 · 3 comments
Closed

Comments

@daileizhi
Copy link

daileizhi commented Dec 20, 2019

When I try to call dataset. Write(), output the data to pulsar, and throw the error that failed to create the schema

Environment:
Pulsar-2.4.1
Spark-2.4.4

java.lang.RuntimeException: Failed to create schema for persistent://public/default/als_statistics_tocheck
	at org.apache.spark.sql.pulsar.SchemaUtils$.uploadPulsarSchema(SchemaUtils.scala:104)
	at org.apache.spark.sql.pulsar.PulsarRowWriter.singleProducer$lzycompute(PulsarWriteTask.scala:140)
	at org.apache.spark.sql.pulsar.PulsarRowWriter.singleProducer(PulsarWriteTask.scala:138)
	at org.apache.spark.sql.pulsar.PulsarRowWriter.producerFlush(PulsarWriteTask.scala:210)
     RecordSchemaBuilder schemaBuilder = SchemaBuilder.record("topLevelRecord");
     schemaBuilder.field("ip").type(SchemaType.STRING);
     schemaBuilder.field("port").type(SchemaType.INT32);
     schemaBuilder.field("url_id").type(SchemaType.STRING);
     schemaBuilder.field("response_rate").type(SchemaType.DOUBLE);
     schemaBuilder.field("success_rate").type(SchemaType.DOUBLE);
     schemaBuilder.field("average_response_time").type(SchemaType.DOUBLE);
     schemaBuilder.field("average_network_time").type(SchemaType.DOUBLE);
     schemaBuilder.field("start_time").type(SchemaType.TIMESTAMP);
     schemaBuilder.field("end_time").type(SchemaType.TIMESTAMP);

     SchemaInfo  statistics2checkSchemaInfo = schemaBuilder.build(SchemaType.AVRO);

      waitToCheckDataSet.write()
                            //.mode("append")
                            .format("pulsar")
                            .option("service.url", serviceUrl)
                            .option("admin.url", adminUrl)
                            .option("topic", statistics2CheckTopic)
                            .option("pulsar.producer.sendTimeoutMs","60000")
                            //.option("avroSchema",statistics2checkSchemaInfo.getSchemaDefinition())
                            //.option("recordName","CheckDataSet")
                            //.option("recordNamespace","com.some.domain")
                            .save();

    try {
            admin.schemas().getSchemaInfo(statistics2CheckTopic);
        } catch (PulsarAdminException e) {
            if (404 == e.getStatusCode()) {
                admin.schemas().createSchema(statistics2CheckTopic,statistics2checkSchemaInfo);
            }
        }

        spark.readStream()
                .format("pulsar")
                .option("service.url", serviceUrl)
                .option("admin.url", adminUrl)
                .option("topic", statistics2CheckTopic)
                .option("startingOffsets", "earliest")
                .load()
                .withWatermark("__eventTime", "1 minute")
                .writeStream().queryName("WaitToCheckDataSet")
                .outputMode("append")
                .trigger(Trigger.ProcessingTime("1 minute"))
                .foreachBatch((dataset,batchId) -> {
                    System.out.println("------WaitToCheckDataSet-------");
                     dataset.show(false);
               }).start();

In addition,how can support options "avroSchema,recordName,recordNamespace"
The name of the schema has always been named "topLevelRecord".Except : com.some.domain.

[http://spark.apache.org/docs/latest/sql-data-sources-avro.html](url)
{
    "type": "record",
    "name": "topLevelRecord",
    "fields": [
      {
        "name": "ip",
        "type": [
          "string",
          "null"
        ]
      },
      {
        "name": "port",
        "type": [
          "int",
          "null"
        ]
      },
      {
        "name": "url_id",
        "type": "string"
      },
      {
        "name": "response_rate",
        "type": [
          "double",
          "null"
        ]
      },
      {
        "name": "success_rate",
        "type": [
          "double",
          "null"
        ]
      },
      {
        "name": "average_response_time",
        "type": "double"
      },
      {
        "name": "average_network_time",
        "type": "double"
      },
      {
        "name": "start_time",
        "type": [
          {
            "type": "long",
            "logicalType": "timestamp-micros"
          },
          "null"
        ]
      },
      {
        "name": "end_time",
        "type": [
          {
            "type": "long",
            "logicalType": "timestamp-micros"
          },
          "null"
        ]
      },
      {
        "name": "type",
        "type": "int"
      }
    ]
  }
@swapnil-chougule
Copy link

Any resolution for this issue ?

@syhily
Copy link
Contributor

syhily commented Dec 5, 2022

@swapnil-chougule Can you paste your exception trace here?

@syhily syhily changed the title [BUG]java.lang.RuntimeException: Failed to create schema for topic [BUG] java.lang.RuntimeException: Failed to create schema for topic Dec 6, 2022
@syhily
Copy link
Contributor

syhily commented Dec 18, 2022

SchemaUtils$.uploadPulsarSchema(SchemaUtils.scala:104) indicate that you should have admin privilege on the topic. This should be the root cause for this exception. So I don't think this is a bug.

@syhily syhily closed this as completed Dec 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants