-
Notifications
You must be signed in to change notification settings - Fork 14
/
FusekiStorage.java
151 lines (130 loc) · 4.33 KB
/
FusekiStorage.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package cz.cvut.kbss.ontodriver.jena.connector;
import cz.cvut.kbss.ontodriver.config.DriverConfiguration;
import cz.cvut.kbss.ontodriver.jena.config.JenaConfigParam;
import org.apache.jena.query.Dataset;
import org.apache.jena.query.Query;
import org.apache.jena.query.QueryExecution;
import org.apache.jena.query.ReadWrite;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.rdf.model.Statement;
import org.apache.jena.rdf.model.StmtIterator;
import org.apache.jena.rdfconnection.RDFConnection;
import org.apache.jena.rdfconnection.RDFConnectionFuseki;
import org.apache.jena.sparql.core.Transactional;
import java.util.List;
/**
* Represents a connection to a Jena Fuseki server.
*/
class FusekiStorage implements Storage {
private final boolean defaultAsUnion;
private final String serverUrl;
private RDFConnection connection;
FusekiStorage(DriverConfiguration configuration) {
this.defaultAsUnion = configuration.is(JenaConfigParam.TREAT_DEFAULT_GRAPH_AS_UNION);
this.serverUrl = configuration.getStorageProperties().getPhysicalURI().toString();
}
private RDFConnection connect() {
if (connection == null) {
this.connection = RDFConnectionFuseki.create().destination(serverUrl).build();
}
return connection;
}
@Override
public Transactional getTransactional() {
return connect();
}
@Override
public Dataset getDataset() {
return connect().fetchDataset();
}
@Override
public Model getDefaultGraph() {
if (defaultAsUnion) {
return ModelFactory.createUnion(connect().fetch(), getDataset().getUnionModel());
} else {
return connect().fetch();
}
}
@Override
public Model getNamedGraph(String ctx) {
return connect().fetch(ctx);
}
@Override
public void begin(ReadWrite readWrite) {
connect().begin(readWrite);
}
@Override
public void commit() {
connection.commit();
closeConnection();
}
@Override
public void rollback() {
connection.abort();
closeConnection();
}
@Override
public void close() {
closeConnection();
}
private void closeConnection() {
if (connection != null) {
connection.close();
this.connection = null;
}
}
@Override
public void add(List<Statement> statements, String context) {
assert connection != null && connection.isInTransaction();
if (statements.isEmpty()) {
return;
}
final Model model = ModelFactory.createDefaultModel().add(statements);
if (context != null) {
connection.load(context, model);
} else {
connection.load(model);
}
}
@Override
public void remove(List<Statement> statements, String context) {
assert connection != null && connection.isInTransaction();
if (statements.isEmpty()) {
return;
}
// Note that given the way Fuseki connection works, this can be quite inefficient (fetch model, update it, upload it again)
// So translation to a SPARQL update may be more appropriate
if (context != null) {
final Model m = connection.fetch(context);
m.remove(statements);
connection.put(context, m);
} else {
final Model def = connection.fetch();
def.remove(statements);
connection.put(def);
if (defaultAsUnion) {
connection.querySelect("SELECT ?g WHERE { ?g {} }", qs -> {
final String ctx = qs.getResource("g").getURI();
final Model m = connect().fetch(ctx);
m.remove(statements);
connection.put(ctx, m);
});
}
}
}
@Override
public void remove(StmtIterator iterator, String context) {
assert connection != null && connection.isInTransaction();
final List<Statement> statements = iterator.toList();
remove(statements, context);
}
@Override
public QueryExecution prepareQuery(Query query) {
return connect().query(query);
}
@Override
public void executeUpdate(String update) {
connect().update(update);
}
}