-
Notifications
You must be signed in to change notification settings - Fork 0
/
RecordStorageServer.java
177 lines (142 loc) · 6.54 KB
/
RecordStorageServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Copyright 2021 Piotr Morgwai Kotarbinski, Licensed under the Apache License, Version 2.0
package pl.morgwai.samples.grpc.scopes.grpc;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.ConsoleHandler;
import java.util.logging.Logger;
import javax.persistence.*;
import com.google.inject.Module;
import com.google.inject.*;
import com.google.inject.name.Names;
import io.grpc.*;
import pl.morgwai.base.grpc.scopes.GrpcContextTrackingExecutor;
import pl.morgwai.base.grpc.scopes.GrpcModule;
import pl.morgwai.base.grpc.utils.GrpcAwaitable;
import pl.morgwai.base.jul.JulManualResetLogManager;
import pl.morgwai.base.utils.concurrent.Awaitable;
import pl.morgwai.samples.grpc.scopes.data_access.JpaRecordDao;
import pl.morgwai.samples.grpc.scopes.domain.RecordDao;
import static java.util.concurrent.TimeUnit.SECONDS;
import static pl.morgwai.base.jul.JulConfigurator.*;
import static pl.morgwai.samples.grpc.scopes.grpc.RecordStorageService.CONCURRENCY_LEVEL;
import static pl.morgwai.samples.grpc.scopes.grpc.RecordStorageService.JPA_EXECUTOR_NAME;
public class RecordStorageServer {
/** TCP port to listen on. */
static final String PORT_ENVVAR = "PORT";
public static final int DEFAULT_PORT = 6666;
/** Value for {@link ServerBuilder#maxConnectionIdle(long, TimeUnit)}. */
static final String MAX_CONNECTION_IDLE_ENVVAR = "MAX_CONNECTION_IDLE_SECONDS";
static final int DEFAULT_MAX_CONNECTION_IDLE = 60;
/** Value for {@link ServerBuilder#maxConnectionAge(long, TimeUnit)}. */
static final String MAX_CONNECTION_AGE_ENVVAR = "MAX_CONNECTION_AGE_SECONDS";
static final int DEFAULT_MAX_CONNECTION_AGE = 60;
/** Value for {@link ServerBuilder#maxConnectionAgeGrace(long, TimeUnit)}. */
static final String MAX_CONNECTION_AGE_GRACE_ENVVAR = "MAX_CONNECTION_AGE_GRACE_SECONDS";
static final int DEFAULT_MAX_CONNECTION_AGE_GRACE = 180;
/** Threadpool size for {@link #jpaExecutor}. */
static final String JPA_EXECUTOR_THREADPOOL_SIZE_ENVVAR = "JPA_EXECUTOR_THREADPOOL_SIZE";
static final int DEFAULT_JPA_EXECUTOR_THREADPOOL_SIZE = 10;
final Server recordStorageServer;
/** Name of the persistence unit defined in {@code persistence.xml} file. */
static final String PERSISTENCE_UNIT_NAME = "RecordDb";
final EntityManagerFactory entityManagerFactory;
/**
* JPA operations will be executed on this executor.
* Its thread pool size is obtained from {@value #JPA_EXECUTOR_THREADPOOL_SIZE_ENVVAR} env var
* and should be at least the size of DB-connection pool size defined in {@code persistence.xml}
* ({@code hibernate.c3p0.maxPoolSize} in this case). Note that most JPA implementations are
* able to perform many operation in cache and deffer or even completely bypass actual
* connection acquisition, so usually the thread pool should be significantly greater than the
* connection pool. The exact number should be determined either by load tests or even better by
* real usage data.
*/
final GrpcContextTrackingExecutor jpaExecutor;
RecordStorageServer(
int port,
int maxConnectionIdleSeconds,
int maxConnectionAgeSeconds,
int maxConnectionAgeGraceSeconds,
int jpaExecutorThreadpoolSize
) throws Exception {
final var grpcModule = new GrpcModule();
entityManagerFactory = Persistence.createEntityManagerFactory(PERSISTENCE_UNIT_NAME);
jpaExecutor = grpcModule.newContextTrackingExecutor(
PERSISTENCE_UNIT_NAME + JPA_EXECUTOR_NAME,
jpaExecutorThreadpoolSize
);
log.info("entity manager factory " + PERSISTENCE_UNIT_NAME + " and its associated "
+ jpaExecutor.getName() + " created successfully");
final Module jpaModule = (binder) -> {
binder.bind(EntityManager.class)
.toProvider(entityManagerFactory::createEntityManager)
.in(grpcModule.listenerEventScope);
binder.bind(EntityManagerFactory.class)
.toInstance(entityManagerFactory);
binder.bind(GrpcContextTrackingExecutor.class)
.annotatedWith(Names.named(JPA_EXECUTOR_NAME))
.toInstance(jpaExecutor);
binder.bind(RecordDao.class)
.to(JpaRecordDao.class)
.in(Scopes.SINGLETON);
binder.bind(Integer.class)
.annotatedWith(Names.named(CONCURRENCY_LEVEL))
.toInstance(jpaExecutorThreadpoolSize + 1);
// +1 is to account for request message delivery delay
};
final var injector = Guice.createInjector(grpcModule, jpaModule);
final var service = injector.getInstance(RecordStorageService.class);
recordStorageServer = ServerBuilder.forPort(port)
.directExecutor()
// this is just to demonstrate that all slow JPA operations are
// dispatched to jpaExecutor, so no gRPC thread is ever blocked
.maxConnectionIdle(maxConnectionIdleSeconds, SECONDS)
.maxConnectionAge(maxConnectionAgeSeconds, SECONDS)
.maxConnectionAgeGrace(maxConnectionAgeGraceSeconds, SECONDS)
.addService(ServerInterceptors.intercept(service, grpcModule.serverInterceptor))
.build();
recordStorageServer.start();
log.info("server started on port " + port);
Runtime.getRuntime().addShutdownHook(new Thread(
() -> {
try {
log.info("shutting down");
Awaitable.awaitMultiple(
5000L,
GrpcAwaitable.ofEnforcedTermination(recordStorageServer),
grpcModule.toAwaitableOfEnforcedTerminationOfAllExecutors()
);
} catch (InterruptedException ignored) {}
entityManagerFactory.close();
log.info("exiting, bye!");
((JulManualResetLogManager) JulManualResetLogManager.getLogManager()).manualReset();
}
));
}
public static void main(String[] args) throws Exception {
addOrReplaceLoggingConfigProperties(Map.of(
ConsoleHandler.class.getName() + LEVEL_SUFFIX, "FINEST"));
overrideLogLevelsWithSystemProperties("pl.morgwai", "org.hibernate", "com.mchange");
new RecordStorageServer(
getIntFromEnv(PORT_ENVVAR, DEFAULT_PORT),
getIntFromEnv(MAX_CONNECTION_IDLE_ENVVAR, DEFAULT_MAX_CONNECTION_IDLE),
getIntFromEnv(MAX_CONNECTION_AGE_ENVVAR, DEFAULT_MAX_CONNECTION_AGE),
getIntFromEnv(MAX_CONNECTION_AGE_GRACE_ENVVAR, DEFAULT_MAX_CONNECTION_AGE_GRACE),
getIntFromEnv(JPA_EXECUTOR_THREADPOOL_SIZE_ENVVAR, DEFAULT_JPA_EXECUTOR_THREADPOOL_SIZE)
).recordStorageServer.awaitTermination();
}
static int getIntFromEnv(String envVarName, int defaultValue) {
final var value = System.getenv(envVarName);
if (value == null) {
log.info(envVarName + " unset, using default " + defaultValue);
return defaultValue;
}
return Integer.parseInt(value);
}
static {
System.setProperty(
JulManualResetLogManager.JUL_LOG_MANAGER_PROPERTY,
JulManualResetLogManager.class.getName()
);
}
static final Logger log = Logger.getLogger(RecordStorageServer.class.getName());
}