Skip to content

Commit

Permalink
Add ShedLock provider for Memcached (#893)
Browse files Browse the repository at this point in the history
* feat: add shedlock provider for memcached

* feat: shedlock provider for memcached

* feat: reactor unit test

* feat:  add Memcached Standard Protocol

* feat: remove code

* fix findbug

* feat: code style

* import error

* feat: nonnull

* feat:  reactor

* feat: importved

* Pre-merge changes

Co-authored-by: lee123lee123 <lee123lee123@163.com>
  • Loading branch information
lukas-krecan and pinkhello committed Jan 27, 2022
1 parent 5f0efda commit 04048ac
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 0 deletions.
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ executed repeatedly. Moreover, the locks are time-based and ShedLock assumes tha
- [Apache Ignite](#apache-ignite)
- [Multi-tenancy](#Multi-tenancy)
- [In-Memory](#In-Memory)
- [Memcached](#Memcached)
+ [Duration specification](#duration-specification)
+ [Extending the lock](#extending-the-lock)
+ [Micronaut integration](#micronaut-integration)
Expand Down Expand Up @@ -749,6 +750,40 @@ public LockProvider lockProvider() {
}
```

#### Memcached (using spymemcached)
Please, be aware that memcached is not a database but a cache. It means that if the cache is full,
[the lock may be released prematurely](https://stackoverflow.com/questions/6868256/memcached-eviction-prior-to-key-expiry/10456364#10456364)
**Use only if you know what you are doing.**.

Import
```xml
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-memcached-spy</artifactId>
<version>4.32.0</version>
</dependency>
```

and configure

```java
import net.javacrumbs.shedlock.provider.memcached.spy.MemcachedLockProvider;

...

@Bean
public LockProvider lockProvider(net.spy.memcached.MemcachedClient client) {
return new MemcachedLockProvider(client, ENV);
}
```

P.S.:

Memcached Standard Protocol:
- A key (arbitrary string up to 250 bytes in length. No space or newlines for ASCII mode)
- An expiration time, in `seconds`. '0' means never expire. Can be up to 30 days. After 30 days, is treated as a unix timestamp of an exact date. (support `seconds``minutes``days`, and less than `30` days)


## Duration specification
All the annotations where you need to specify a duration support the following formats

Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<module>providers/arangodb/shedlock-provider-arangodb</module>
<module>providers/ignite/shedlock-provider-ignite</module>
<module>providers/inmemory/shedlock-provider-inmemory</module>
<module>providers/memcached/shedlock-provider-memcached-spy</module>
</modules>

<properties>
Expand Down
77 changes: 77 additions & 0 deletions providers/memcached/shedlock-provider-memcached-spy/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>shedlock-parent</artifactId>
<groupId>net.javacrumbs.shedlock</groupId>
<version>4.32.1-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>shedlock-provider-memcached-spy</artifactId>
<version>4.32.1-SNAPSHOT</version>

<properties>
<spymemcached.version>2.12.3</spymemcached.version>
</properties>

<dependencies>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-test-support</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>net.spy</groupId>
<artifactId>spymemcached</artifactId>
<version>${spymemcached.version}</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${test-containers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${test-containers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Automatic-Module-Name>
net.javacrumbs.shedlock.provider.memcached.spy
</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package net.javacrumbs.shedlock.provider.memcached.spy;

import net.javacrumbs.shedlock.core.AbstractSimpleLock;
import net.javacrumbs.shedlock.core.ClockProvider;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import net.javacrumbs.shedlock.support.LockException;
import net.javacrumbs.shedlock.support.annotation.NonNull;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.util.StringUtils;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;

import static net.javacrumbs.shedlock.support.Utils.getHostname;
import static net.javacrumbs.shedlock.support.Utils.toIsoString;

/**
* Lock Provider for Memcached
*
* @see <a href="https://memcached.org/">memcached</a>
*/
public class MemcachedLockProvider implements LockProvider {

/**
* KEY PREFIX
*/
private static final String KEY_PREFIX = "shedlock";

/**
* ENV DEFAULT
*/
private static final String ENV_DEFAULT = "default";

private final MemcachedClient client;

private final String env;

/**
* Create MemcachedLockProvider
* @param client Spy.memcached.MemcachedClient
*/
public MemcachedLockProvider(@NonNull MemcachedClient client){
this(client, ENV_DEFAULT);
}

/**
* Create MemcachedLockProvider
* @param client Spy.memcached.MemcachedClient
* @param env is part of the key and thus makes sure there is not key conflict between multiple ShedLock instances
* running on the same memcached
*/
public MemcachedLockProvider(@NonNull MemcachedClient client, @NonNull String env){
this.client = client;
this.env = env;
}

@Override
@NonNull
public Optional<SimpleLock> lock(@NonNull LockConfiguration lockConfiguration){
long expireTime = getSecondUntil(lockConfiguration.getLockAtMostUntil());
String key = buildKey(lockConfiguration.getName(), this.env);
OperationStatus status = client.add(key, (int) expireTime, buildValue()).getStatus();
if (status.isSuccess()) {
return Optional.of(new MemcachedLock(key, client, lockConfiguration));
}
return Optional.empty();
}


private static long getSecondUntil(Instant instant) {
long millis = Duration.between(ClockProvider.now(), instant).toMillis();
return millis / 1000;
}

static String buildKey(String lockName, String env) {
String k = String.format("%s:%s:%s", KEY_PREFIX, env, lockName);
StringUtils.validateKey(k, false);
return k;
}

private static String buildValue() {
return String.format("ADDED:%s@%s", toIsoString(ClockProvider.now()), getHostname());
}


private static final class MemcachedLock extends AbstractSimpleLock {

private final String key;

private final MemcachedClient client;

private MemcachedLock(@NonNull String key,
@NonNull MemcachedClient client,
@NonNull LockConfiguration lockConfiguration) {
super(lockConfiguration);
this.key = key;
this.client = client;
}

@Override
protected void doUnlock() {
long keepLockFor = getSecondUntil(lockConfiguration.getLockAtLeastUntil());
if (keepLockFor <= 0) {
OperationStatus status = client.delete(key).getStatus();
if (!status.isSuccess()) {
throw new LockException("Can not remove node. " + status.getMessage());
}
} else {
OperationStatus status = client.replace(key, (int) keepLockFor, buildValue()).getStatus();
if (!status.isSuccess()) {
throw new LockException("Can not replace node. " + status.getMessage());
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package net.javacrumbs.shedlock.provider.memcached.spy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

public class MemcachedContainer extends GenericContainer<MemcachedContainer> {

private static final Logger LOGGER = LoggerFactory.getLogger(MemcachedContainer.class);

public static final DockerImageName MEMCACHED_IMAGE = DockerImageName.parse("memcached:1.6-alpine");

public MemcachedContainer() {
super(MEMCACHED_IMAGE.asCanonicalNameString());
this.withExposedPorts(11211)
.withLogConsumer(frame -> LOGGER.info(frame.getUtf8String()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package net.javacrumbs.shedlock.provider.memcached.spy;

import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import net.javacrumbs.shedlock.test.support.AbstractLockProviderIntegrationTest;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.MemcachedClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;

import static java.lang.Thread.sleep;
import static org.assertj.core.api.Assertions.assertThat;


@Testcontainers
public class MemcachedLockProviderIntegrationTest extends AbstractLockProviderIntegrationTest {

@Container
public static final MemcachedContainer container = new MemcachedContainer();

static final String ENV = "test";

private LockProvider lockProvider;

private MemcachedClient memcachedClient;

@BeforeEach
public void createLockProvider() throws IOException {
memcachedClient = new MemcachedClient(
AddrUtil.getAddresses(container.getContainerIpAddress() + ":" + container.getFirstMappedPort())
);

lockProvider = new MemcachedLockProvider(memcachedClient, ENV);
}


@Override
protected void assertUnlocked(String lockName) {
assertThat(getLock(lockName)).isNull();
}

@Override
protected void assertLocked(String lockName) {
assertThat(getLock(lockName)).isNotNull();
}


@Test
public void shouldTimeout() throws InterruptedException {
this.doTestTimeout(Duration.ofSeconds(1));
}

/**
* memcached smallest unit is second.
*/
@Override
protected void doTestTimeout(Duration lockAtMostFor) throws InterruptedException {
LockConfiguration configWithShortTimeout = lockConfig(LOCK_NAME1, lockAtMostFor, Duration.ZERO);
Optional<SimpleLock> lock1 = getLockProvider().lock(configWithShortTimeout);
assertThat(lock1).isNotEmpty();

sleep(lockAtMostFor.toMillis() * 2);
assertUnlocked(LOCK_NAME1);

Optional<SimpleLock> lock2 = getLockProvider().lock(lockConfig(LOCK_NAME1, Duration.ofSeconds(1), Duration.ZERO));
assertThat(lock2).isNotEmpty();
lock2.get().unlock();
}


@Override
protected LockProvider getLockProvider() {
return lockProvider;
}

private String getLock(String lockName) {
return (String) memcachedClient.get(MemcachedLockProvider.buildKey(lockName, ENV));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<!--
Copyright 2009 the original author or authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

0 comments on commit 04048ac

Please sign in to comment.