/
MetadataCDXMapper.java
153 lines (142 loc) · 7.07 KB
/
MetadataCDXMapper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package dk.netarkivet.viewerproxy.webinterface.hadoop;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.archive.io.ArchiveReader;
import org.archive.io.ArchiveReaderFactory;
import org.archive.io.ArchiveRecord;
import org.jwat.common.ByteCountingPushBackInputStream;
import org.jwat.common.ContentType;
import org.jwat.common.HttpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dk.netarkivet.common.CommonSettings;
import dk.netarkivet.common.exceptions.IOFailure;
import dk.netarkivet.common.utils.Settings;
import dk.netarkivet.common.utils.archive.ArchiveHeaderBase;
import dk.netarkivet.common.utils.archive.ArchiveRecordBase;
import dk.netarkivet.common.utils.batch.ArchiveBatchFilter;
/**
* Hadoop Mapper for creating CDX indexes for metadata files through the GUI application's QA pages.
*
* The input is a key (not used) and a Text line, which should be the path to the archive file.
* The output is an exit code (not used), and the generated CDX lines.
*/
public class MetadataCDXMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
private static final Logger log = LoggerFactory.getLogger(MetadataCDXMapper.class);
/**
* Mapping method.
*
* @param linenumber The linenumber. Is ignored.
* @param archiveFilePath The path to the archive file.
* @param context Context used for writing output.
* @throws IOException If it fails to generate the CDX indexes.
*/
@Override
protected void map(LongWritable linenumber, Text archiveFilePath, Context context) throws IOException,
InterruptedException {
// reject empty or null warc paths.
if (archiveFilePath == null || archiveFilePath.toString().trim().isEmpty()) {
log.warn("Encountered empty path in job {}", context.getJobID().toString());
return;
}
Path path = new Path(archiveFilePath.toString());
List<String> cdxIndexes;
try (InputStream in = new BufferedInputStream(path.getFileSystem(context.getConfiguration()).open(path))) {
log.info("CDX-indexing archive file '{}'", path);
cdxIndexes = index(in, archiveFilePath.toString());
}
for (String cdxIndex : cdxIndexes) {
context.write(NullWritable.get(), new Text(cdxIndex));
}
}
/**
* Extracts CDX lines from an inputstream representing a metadata archive file
* @param archiveInputStream The inputstream the archive file is read from
* @return A list of the CDX lines for the records in the archive file.
*/
public List<String> index(InputStream archiveInputStream, String archiveName) throws IOException {
try (ArchiveReader archiveReader = ArchiveReaderFactory.get(archiveName, archiveInputStream, false)) {
boolean isMetadataFile = archiveName
.matches("(.*)" + Settings.get(CommonSettings.METADATAFILE_REGEX_SUFFIX));
if (isMetadataFile) {
final int HTTP_HEADER_BUFFER_SIZE = 1024 * 1024;
String[] fields = {"A", "e", "b", "m", "n", "g", "v"};
List<String> cdxIndexes = new ArrayList<>();
for (ArchiveRecord archiveRecord : archiveReader) {
ArchiveRecordBase record = ArchiveRecordBase.wrapArchiveRecord(archiveRecord);
boolean isResponseRecord = ArchiveBatchFilter.EXCLUDE_NON_WARCINFO_RECORDS.accept(record);
if (!isResponseRecord) {
continue;
}
log.trace("Processing archive record in '{}' with offset: {}", archiveName, record.getHeader().getOffset());
ArchiveHeaderBase header = record.getHeader();
Map<String, String> fieldsRead = new HashMap<>();
fieldsRead.put("A", header.getUrl());
fieldsRead.put("e", header.getIp());
fieldsRead.put("b", header.getArcDateStr());
fieldsRead.put("n", Long.toString(header.getLength()));
fieldsRead.put("g", record.getHeader().getArchiveFile().getName());
fieldsRead.put("v", Long.toString(record.getHeader().getOffset()));
String mimeType = header.getMimetype();
String msgType;
ContentType contentType = ContentType.parseContentType(mimeType);
boolean bResponse = false;
if (contentType != null) {
if (contentType.contentType.equals("application") && contentType.mediaType.equals("http")) {
msgType = contentType.getParameter("msgtype");
if (msgType.equals("response")) {
bResponse = true;
}
}
mimeType = contentType.toStringShort();
}
ByteCountingPushBackInputStream pbin = new ByteCountingPushBackInputStream(record.getInputStream(),
HTTP_HEADER_BUFFER_SIZE);
HttpHeader httpResponse = null;
if (bResponse) {
try {
httpResponse = HttpHeader.processPayload(HttpHeader.HT_RESPONSE, pbin, header.getLength(), null);
if (httpResponse.contentType != null) {
contentType = ContentType.parseContentType(httpResponse.contentType);
if (contentType != null) {
mimeType = contentType.toStringShort();
}
}
} catch (IOException e) {
throw new IOFailure("Error reading httpresponse header", e);
}
}
fieldsRead.put("m", mimeType);
if (httpResponse != null) {
try {
httpResponse.close();
} catch (IOException e) {
throw new IOFailure("Error closing httpresponse header", e);
}
}
// Build the cdx line
StringBuilder sb = new StringBuilder();
for (int i = 0; i < fields.length; i++) {
Object o = fieldsRead.get(fields[i]);
sb.append((i > 0) ? " " : "");
sb.append((o == null) ? "-" : o.toString());
}
cdxIndexes.add(sb.toString());
}
return cdxIndexes;
} else {
return new ArrayList<>();
}
}
}
}