-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
InstalledProtocolsProcedure.java
110 lines (97 loc) · 4.88 KB
/
InstalledProtocolsProcedure.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.discovery.procedures;
import java.util.Comparator;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.neo4j.causalclustering.protocol.Protocol;
import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.collection.RawIterator;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.api.ResourceTracker;
import org.neo4j.kernel.api.exceptions.ProcedureException;
import org.neo4j.kernel.api.proc.CallableProcedure;
import org.neo4j.kernel.api.proc.Context;
import org.neo4j.kernel.api.proc.Neo4jTypes;
import org.neo4j.kernel.api.proc.ProcedureSignature;
import org.neo4j.kernel.api.proc.QualifiedName;
public class InstalledProtocolsProcedure extends CallableProcedure.BasicProcedure
{
private static final String[] PROCEDURE_NAMESPACE = {"dbms", "cluster"};
public static final String PROCEDURE_NAME = "protocols";
private final Supplier<Stream<Pair<AdvertisedSocketAddress,ProtocolStack>>> clientInstalledProtocols;
private final Supplier<Stream<Pair<SocketAddress,ProtocolStack>>> serverInstalledProtocols;
public InstalledProtocolsProcedure( Supplier<Stream<Pair<AdvertisedSocketAddress,ProtocolStack>>> clientInstalledProtocols,
Supplier<Stream<Pair<SocketAddress,ProtocolStack>>> serverInstalledProtocols )
{
super( ProcedureSignature.procedureSignature( new QualifiedName( PROCEDURE_NAMESPACE, PROCEDURE_NAME ) )
.out( "orientation", Neo4jTypes.NTString )
.out( "remoteAddress", Neo4jTypes.NTString )
.out( "applicationProtocol", Neo4jTypes.NTString )
.out( "applicationProtocolVersion", Neo4jTypes.NTInteger )
.out( "modifierProtocols", Neo4jTypes.NTString )
.description( "Overview of installed protocols" )
.build() );
this.clientInstalledProtocols = clientInstalledProtocols;
this.serverInstalledProtocols = serverInstalledProtocols;
}
@Override
public RawIterator<Object[],ProcedureException> apply(
Context ctx, Object[] input, ResourceTracker resourceTracker )
{
Stream<Object[]> outbound = toOutputRows( clientInstalledProtocols, ProtocolInstaller.Orientation.Client.OUTBOUND );
Stream<Object[]> inbound = toOutputRows( serverInstalledProtocols, ProtocolInstaller.Orientation.Server.INBOUND );
return Iterators.asRawIterator( Stream.concat( outbound, inbound ) );
}
private <T extends SocketAddress> Stream<Object[]> toOutputRows( Supplier<Stream<Pair<T,ProtocolStack>>> installedProtocols, String orientation )
{
Comparator<Pair<T,ProtocolStack>> connectionInfoComparator = Comparator.comparing( ( Pair<T,ProtocolStack> entry ) -> entry.first().getHostname() )
.thenComparing( entry -> entry.first().getPort() );
return installedProtocols.get()
.sorted( connectionInfoComparator )
.map( entry -> buildRow( entry, orientation ) );
}
private <T extends SocketAddress> Object[] buildRow( Pair<T,ProtocolStack> connectionInfo, String orientation )
{
T socketAddress = connectionInfo.first();
ProtocolStack protocolStack = connectionInfo.other();
return new Object[]
{
orientation,
socketAddress.toString(),
protocolStack.applicationProtocol().identifier(),
(long) protocolStack.applicationProtocol().version(),
modifierString( protocolStack )
};
}
private String modifierString( ProtocolStack protocolStack )
{
return protocolStack
.modifierProtocols()
.stream()
.map( Protocol.ModifierProtocol::friendlyName )
.collect( Collectors.joining( ",", "[", "]") );
}
}