Skip to content

Latest commit

 

History

History
117 lines (96 loc) · 7.35 KB

nativekafkatopscmigration.md

File metadata and controls

117 lines (96 loc) · 7.35 KB

Native Kafka Client to PSC Migration

If you wish to migrate your native Kafka client application to use PSC, it is a relatively straightforawrd process.

API parity

All API's present in the corresponding version of the native Kafka client can be used with a PSC client as well. Most API's have the same method name and signature, making the common-case migration a matter of simply dropping in a PSC client to replace the Kafka client. Please take a look at PscConsumer.java for the available API's.

Configurations

PSC configurations are more or less similar to those offered by native Kafka client as well, with possible minor differences in the names. Underneath the hood, PSC passes these configs into the backend Kafka client after performing config translation. The details of the config translation mapping can be found in PscConsumerToKafkaConsumerConfigConverter.java.

Example

Check out the consumer and producer migration examples. Each Psc.java file is identical to the corresponding Kafka.java file in terms of logic and functionality. Note that in the PSC examples for both consumer and producer, an addiitonal environment variable -DtempServersetDir is required for automated service discovery to find the correct broker endpoints (similar to what's provided in the quickstart script); otherwise the client will not be able to connect to the local Kafka broker.

Flink-Kafka to Flink-PSC Migration

If you wish to migrate a Flink-Kafka application to use Flink-PSC, the new PSC job needs to be able to recover from a checkpoint generated by the original Flink-Kafka application.

For jobs that only had Flink-Kafka source operators (consumers), the new PSC job should be able to recover from the old checkpoint without issues. However, if the job topology includes Kafka sinks (producers), a minor code change in the Flink-Kafka connector is required in order for PSC to recover state from a Flink-Kafka checkpoint.

Here is how that can be done:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.streaming.connectors.kafka.internal;

import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.util.function.Supplier;

/**
 * This class overrides the default <code>SimpleTypeSerializerSnapshot</code> and was added to allow
 * <code>FlinkPscProducer</code> resume the (transactional) snapshots taken by <code>FlinkKafkaProducer</code>. It
 * defines PSC serializer classes compatible with their corresponding Kafka serializer classes; e.g. enabling the
 * migration from <code>FlinkKafkaProducer.TransactionStateSerializer</code> to
 * <code>FlinkPscProducer.TransactionStateSerializer</code>.
 * @param <T>
 */
public class KafkaSimpleTypeSerializerSnapshot<T> extends SimpleTypeSerializerSnapshot<T> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSimpleTypeSerializerSnapshot.class);

    /**
     * Constructor to create snapshot from serializer (writing the snapshot).
     *
     * @param serializerSupplier
     */
    public KafkaSimpleTypeSerializerSnapshot(@Nonnull Supplier<? extends TypeSerializer<T>> serializerSupplier) {
        super(serializerSupplier);
    }

    @Override
    public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
        // default case
        if (super.resolveSchemaCompatibility(newSerializer).isCompatibleAsIs()) {
            return TypeSerializerSchemaCompatibility.compatibleAsIs();
        }

		// customized compatibility check
		String newSerializerClassName = newSerializer.getClass().getName();
		if (isPotentiallyCompatible(newSerializerClassName)) {
			try {
				Class<?> clazz = Class.forName(newSerializerClassName);
				Object serializer = clazz.newInstance();
				LOG.info("Matched PSC's type serializer schema '{}' with Kafka's '{}'", newSerializerClassName, restoreSerializer().getClass().getName());
				return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer((TypeSerializer <T>) serializer);
			} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
				LOG.warn("Failed to verify schema compatibility with compatible packages.", e);
			}
		}

        return TypeSerializerSchemaCompatibility.incompatible();
    }

	protected boolean isPotentiallyCompatible(String newSerializerClassName) {
		LOG.info("newSerializerClassName: {}", newSerializerClassName);
		String restoreSerializerClassName = restoreSerializer().getClass().getName();
		LOG.info("restoreSerializerClassName: {}", restoreSerializerClassName);
		return newSerializerClassName.startsWith("com.pinterest.") &&
			newSerializerClassName.contains("Psc") &&
			restoreSerializerClassName.contains("Kafka") &&
			newSerializerClassName.contains("$") &&
			newSerializerClassName.replaceAll("Psc", "Kafka")
				.replaceAll("psc", "kafka")
				.replaceFirst("com.pinterest", "org.apache")
				.equals(restoreSerializerClassName);
	}
}
  1. We need to create a new subclass of SimpleTypeSerializerSnapshot (see above) that overrides two methods: resolveSchemaCompatibility() and isPotentiallyCompatible(). The purpose of this change is to guarantee:

    • isPotentiallyCompatible() returns true if the newSerializerClassName is the PSC "equivalent" of the restoreSerializerClassName by checking their FQDNs
    • resolveSchemaCompatibility() returns that the new PSC serializer is compatible with the current Kafka one
  2. Replace all mentions of SimpleTypeSerializerSnapshot in FlinkKafkaProducer with the newly created KafkaSimpleTypeSerializerSnapshot

At runtime, if the active FlinkKafkaProducer code has these two changes implemented, FlinkPscProducer should be able to recover a checkpoint that was generated by FlinkKafkaProducer.