Skip to content

Commit

Permalink
🍱 Add flink core module
Browse files Browse the repository at this point in the history
  • Loading branch information
zhisheng17 committed Sep 25, 2019
1 parent 2efa67b commit 2139a4c
Show file tree
Hide file tree
Showing 12 changed files with 831 additions and 0 deletions.
15 changes: 15 additions & 0 deletions flink-learning-core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-learning</artifactId>
<groupId>com.zhisheng.flink</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-learning-core</artifactId>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.zhisheng.core.exception;

/**
* Desc: Base class of all Flink-specific unchecked exceptions.
* Created by zhisheng on 2019-09-25
* blog:http://www.54tianzhisheng.cn/
* 微信公众号:zhisheng
*/
public class FlinkRuntimeException extends RuntimeException {

private static final long serialVersionUID = 193141189399279147L;

/**
* Creates a new Exception with the given message and null as the cause.
*
* @param message The exception message
*/
public FlinkRuntimeException(String message) {
super(message);
}

/**
* Creates a new exception with a null message and the given cause.
*
* @param cause The exception that caused this exception
*/
public FlinkRuntimeException(Throwable cause) {
super(cause);
}

/**
* Creates a new exception with the given message and cause.
*
* @param message The exception message
* @param cause The exception that caused this exception
*/
public FlinkRuntimeException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.zhisheng.core.factory;

/**
* Desc:
* blog:http://www.54tianzhisheng.cn/
* 微信公众号:zhisheng
*/
public class DeserializerFactory {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.zhisheng.core.factory;

/**
* Desc:
* blog:http://www.54tianzhisheng.cn/
* 微信公众号:zhisheng
*/
public class SerializerFactory {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.zhisheng.core.factory;

/**
* Desc:
* blog:http://www.54tianzhisheng.cn/
* 微信公众号:zhisheng
*/
public class SinkFactory {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.zhisheng.core.factory;

/**
* Desc:
* blog:http://www.54tianzhisheng.cn/
* 微信公众号:zhisheng
*/
public class SourceFactory {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.zhisheng.core.utils;


/**
* Utility class for Java arrays.
*/
public final class ArrayUtils {

public static String[] concat(String[] array1, String[] array2) {
if (array1.length == 0) {
return array2;
}
if (array2.length == 0) {
return array1;
}
String[] resultArray = new String[array1.length + array2.length];
System.arraycopy(array1, 0, resultArray, 0, array1.length);
System.arraycopy(array2, 0, resultArray, array1.length, array2.length);
return resultArray;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.zhisheng.core.utils;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Stream;

/**
* Simple utility to work with Java collections.
*/
public final class CollectionUtil {

/**
* A safe maximum size for arrays in the JVM.
*/
public static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

private CollectionUtil() {
throw new AssertionError();
}

public static boolean isNullOrEmpty(Collection<?> collection) {
return collection == null || collection.isEmpty();
}

public static boolean isNullOrEmpty(Map<?, ?> map) {
return map == null || map.isEmpty();
}

public static <T, R> Stream<R> mapWithIndex(Collection<T> input, final BiFunction<T, Integer, R> mapper) {
final AtomicInteger count = new AtomicInteger(0);

return input.stream().map(element -> mapper.apply(element, count.getAndIncrement()));
}

/**
* Partition a collection into approximately n buckets.
*/
public static <T> Collection<List<T>> partition(Collection<T> elements, int numBuckets) {
Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);

int initialCapacity = elements.size() / numBuckets;

int index = 0;
for (T element : elements) {
int bucket = index % numBuckets;
buckets.computeIfAbsent(bucket, key -> new ArrayList<>(initialCapacity)).add(element);
}

return buckets.values();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.zhisheng.core.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Utilities for {@link java.util.concurrent.Executor Executors}.
*/
public class ExecutorUtils {

private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.util.ExecutorUtils.class);

/**
* Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
* all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
* they will be shut down hard.
*
* @param timeout to wait for the termination of all ExecutorServices
* @param unit of the timeout
* @param executorServices to shut down
*/
public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
for (ExecutorService executorService: executorServices) {
executorService.shutdown();
}

boolean wasInterrupted = false;
final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
long timeLeft = unit.toMillis(timeout);
boolean hasTimeLeft = timeLeft > 0L;

for (ExecutorService executorService: executorServices) {
if (wasInterrupted || !hasTimeLeft) {
executorService.shutdownNow();
} else {
try {
if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
executorService.shutdownNow();
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while shutting down executor services. Shutting all " +
"remaining ExecutorServices down now.", e);
executorService.shutdownNow();

wasInterrupted = true;

Thread.currentThread().interrupt();
}

timeLeft = endTime - System.currentTimeMillis();
hasTimeLeft = timeLeft > 0L;
}
}
}

/**
* Shuts the given {@link ExecutorService} down in a non-blocking fashion. The shut down will
* be executed by a thread from the common fork-join pool.
*
* <p>The executor services will be shut down gracefully for the given timeout period. Afterwards
* {@link ExecutorService#shutdownNow()} will be called.
*
* @param timeout before {@link ExecutorService#shutdownNow()} is called
* @param unit time unit of the timeout
* @param executorServices to shut down
* @return Future which is completed once the {@link ExecutorService} are shut down
*/
public static CompletableFuture<Void> nonBlockingShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
return CompletableFuture.supplyAsync(
() -> {
gracefulShutdown(timeout, unit, executorServices);
return null;
});
}
}
Loading

0 comments on commit 2139a4c

Please sign in to comment.