Skip to content

Commit

Permalink
Small changes to settings
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohlski committed Apr 1, 2020
1 parent 1360e40 commit a85a3ad
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 34 deletions.
Expand Up @@ -550,7 +550,7 @@ public class CommonSettings {
* Specifies the name of the default file system for Hadoop to use (URI consisting of scheme and authority).
* If not set, Hadoop defaults to 'file:///'.
*/
public static String HADOOP_HDFS = "settings.common.hadoop.hdfs";
public static String HADOOP_DEFAULT_FS = "settings.common.hadoop.defaultFS";

/**
* Specifies the address on where to locate the ResourceManager (e.g. YARN).
Expand All @@ -563,7 +563,7 @@ public class CommonSettings {
public static String HADOOP_MAPRED_FRAMEWORK = "settings.common.hadoop.mapred.framework";

/**
* Specifies what strategy/framework to use to resolve mass processing jobs.
* Specifies if hadoop is used for mass processing jobs.
*/
public static String MASS_PROCESSOR = "settings.common.massprocessor";
public static String USING_HADOOP = "settings.common.useHadoopAsMassProcessor";
}
@@ -1,20 +1,23 @@
package dk.netarkivet.common.utils;

import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import dk.netarkivet.common.CommonSettings;

public class HadoopUtils {
/** The class logger. */
private static Logger log = LoggerFactory.getLogger(HadoopUtils.class);
public static final String DEFAULT_FILESYSTEM = "fs.defaultFS";
public static final String MAPREDUCE_FRAMEWORK = "mapreduce.framework.name";
public static final String YARN_RESOURCEMANAGER_ADDRESS = "yarn.resourcemanager.address";

public Configuration getConf() {
/** Utility class, do not initialise. */
private HadoopUtils() {
}

public static Configuration getConfFromSettings() {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", Settings.get(CommonSettings.HADOOP_HDFS));
//conf.set("mapreduce.framework.name", Settings.get(CommonSettings.HADOOP_MAPRED_FRAMEWORK));
//conf.set("yarn.resourcemanager.address", Settings.get(CommonSettings.HADOOP_RESOURCEMANAGER_ADDRESS));
conf.set(DEFAULT_FILESYSTEM, Settings.get(CommonSettings.HADOOP_DEFAULT_FS));
conf.set(MAPREDUCE_FRAMEWORK, Settings.get(CommonSettings.HADOOP_MAPRED_FRAMEWORK));
conf.set(YARN_RESOURCEMANAGER_ADDRESS, Settings.get(CommonSettings.HADOOP_RESOURCEMANAGER_ADDRESS));
return conf;
}
}
Expand Up @@ -201,5 +201,15 @@ National Library.
<thisPhysicalLocation>physLocationOne</thisPhysicalLocation>
<applicationName>NA</applicationName>
<applicationInstanceId></applicationInstanceId>
<hadoop>
<defaultFS>hdfs://node1</defaultFS>
<resourcemanager>
<address>node1:8032</address>
</resourcemanager>
<mapred>
<framework>yarn</framework>
</mapred>
</hadoop>
<useHadoopAsMassProcessor>true</useHadoopAsMassProcessor>
</common>
</settings>
Expand Up @@ -3,11 +3,12 @@
import org.apache.hadoop.conf.Configuration;

import dk.netarkivet.common.utils.HadoopUtils;
import dk.netarkivet.common.utils.Settings;

public class HadoopUtilsTester {
public static void main(String[] args) {
HadoopUtils test = new HadoopUtils();
Configuration testConf = test.getConf();
Configuration testConf = HadoopUtils.getConfFromSettings();
System.out.println(testConf.get("fs.defaultFS"));
System.out.println(Settings.getBoolean(CommonSettings.USING_HADOOP));
}
}
Expand Up @@ -32,10 +32,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import dk.netarkivet.common.CommonSettings;
import dk.netarkivet.common.distribute.RemoteFile;
import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClient;
import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory;
import dk.netarkivet.common.distribute.arcrepository.BatchStatus;
import dk.netarkivet.common.distribute.arcrepository.PreservationArcRepositoryClient;
import dk.netarkivet.common.distribute.arcrepository.bitrepository.BitmagArcRepositoryClient;
import dk.netarkivet.common.exceptions.IOFailure;
import dk.netarkivet.common.utils.Settings;
import dk.netarkivet.common.utils.batch.DatedFileListJob;
Expand All @@ -47,33 +50,38 @@ public class FileNameHarvester {
/** Logger for this class. */
private static final Logger log = LoggerFactory.getLogger(FileNameHarvester.class);


/**
* This method harvests a list of all the files currently in the arcrepository and appends any new ones found to the
* ArchiveFile object store.
*/
public static synchronized void harvestAllFilenames() {
ArchiveFileDAO dao = new ArchiveFileDAO();
PreservationArcRepositoryClient client = ArcRepositoryClientFactory.getPreservationInstance();
BatchStatus status = client.batch(new FileListJob(), Settings.get(WaybackSettings.WAYBACK_REPLICA));
RemoteFile results = status.getResultFile();
InputStream is = results.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line;
try {
while ((line = reader.readLine()) != null) {
if (!dao.exists(line.trim())) {
ArchiveFile file = new ArchiveFile();
file.setFilename(line.trim());
file.setIndexed(false);
log.info("Creating object store entry for '{}'", file.getFilename());
dao.create(file);
} // If the file is already known in the persistent store, no
// action needs to be taken.
if (Settings.getBoolean(CommonSettings.USING_HADOOP)) {
BitmagArcRepositoryClient client;
} else {
ArchiveFileDAO dao = new ArchiveFileDAO();
PreservationArcRepositoryClient client = ArcRepositoryClientFactory.getPreservationInstance();
BatchStatus status = client.batch(new FileListJob(), Settings.get(WaybackSettings.WAYBACK_REPLICA));
RemoteFile results = status.getResultFile();
InputStream is = results.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line;
try {
while ((line = reader.readLine()) != null) {
if (!dao.exists(line.trim())) {
ArchiveFile file = new ArchiveFile();
file.setFilename(line.trim());
file.setIndexed(false);
log.info("Creating object store entry for '{}'", file.getFilename());
dao.create(file);
} // If the file is already known in the persistent store, no
// action needs to be taken.
}
} catch (IOException e) {
throw new IOFailure("Error reading remote file", e);
} finally {
IOUtils.closeQuietly(reader);
}
} catch (IOException e) {
throw new IOFailure("Error reading remote file", e);
} finally {
IOUtils.closeQuietly(reader);
}
}

Expand Down

0 comments on commit a85a3ad

Please sign in to comment.