Add archiving job for Airflow
* This job was previously included in Oozie.
* Its goal is to archive a directory to a specific location on HDFS.
* There is a new code mutualization with MediawikiHistoryDumper

Bug: T300039
Change-Id: I9636c44f860fa0a7211551a3484cf3eb79430438
aquwikimedia committed Apr 6, 2022
1 parent fa0ea90 commit fce97f8
Expand Up @@ -7,6 +7,7 @@ import profig._
import cats.syntax.either._
import io.circe.CursorOp.DownField
import io.circe.{Decoder, DecodingFailure}
import org.apache.hadoop.fs.Path

import scala.language.experimental.macros
import scala.reflect.macros.blackbox
Expand Down Expand Up @@ -152,6 +153,9 @@ trait ConfigHelper {

// implicit conversion from string to hadoop.fs.Path
implicit def stringToHadoopFsPath(s: String): Path = new Path(s)

// Support implicit DateTime conversion from string to DateTime
// The opt can either be given in integer hours ago, or
// as a ISO-8601 formatted date time.
import scala.collection.mutable.ListBuffer
import scala.collection.immutable.ListMap
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}

* Job to archive a file on HDFS.
* The source file:
* - ends with a specific string,
* - is uniq but could be beside an empty flag file,
* - and is not empty
* The target is set with specific permissions.
object HDFSArchiver extends LogHelper with ConfigHelper {

* Config class for use config files and args.
case class Config(
source_directory: String,
archive_file: String,
archive_parent_umask: String = "022",
archive_perms: String = "644",
expected_filename_ending: String = ".gz",
check_done: Boolean = false,
done_file: String = "_SUCCESS"

def loadConfig(args: Array[String]): Config = {
val config = try {
} catch {
case e: ConfigHelperException =>
log.fatal(e.getMessage + ". Aborting.")
}"Loaded configuration:\n" + prettyPrint(config))

object Config {
// This is just used to ease generating help message with default values.
// Required configs are set to dummy values.
val default = Config("", "")

val propertiesDoc: ListMap[String, String] = ListMap(
"source_directory" -> "Path of the directory where the source is located.",
"archive_file" -> "Path of the archive file, where to put the source file.",
"archive_parent_umask" ->
s"""Umask for the archive directory permission.
| default: ${default.archive_parent_umask}""",
"archive_perms" ->
s"""Permissions given to the archive file.
| default: ${default.archive_perms}""",
"expected_filename_ending" ->
s"""The ending of the source file name.
|default: ${default.expected_filename_ending}""",
"check_done" -> s"Check for a done file flag. default: ${default.check_done}",
"done_file" -> s"Name of the done file flag. default: ${default.done_file}"

val usage: String =
|Job to archive a file on HDFS.
| java -cp refinery-job.jar:$(/usr/bin/hadoop classpath) \
| --source_directory=/tmp/bob/source \
| --archive_file=/tmp/bob/public_archive\.gz

* Entry point to run this job.
* @param args
def main(args: Array[String]): Unit = {
if (args.contains("--help")) {
println(help(Config.usage, Config.propertiesDoc))

val config = loadConfig(args)

// Make sure to log in the console when launched from Airflow

val statusCode: Int = if (apply(
Path.mergePaths(config.source_directory, config.done_file),
)) 1 else 0

* The heart of the HDFS Archiver job
* @param sourceDirectory
* @param expectedFilenameEnding
* @param checkDone
* @param doneFilePath
* @param archiveFile
* @param archiveParentUmask
* @param archivePerms
* @return boolean true in case of success
def apply(
sourceDirectory: Path,
expectedFilenameEnding: String,
checkDone: Boolean,
doneFilePath: Path,
archiveFile: Path,
archiveParentUmask: String,
archivePerms: String
): Boolean = {
val conf: Configuration = new Configuration
conf.set("fs.permissions.umask-mode", archiveParentUmask)
val fs: FileSystem = sourceDirectory.getFileSystem(conf)
identifySourceFile(fs, sourceDirectory, expectedFilenameEnding, checkDone, doneFilePath) match {
case None => false
case Some(file) => createParentFolder(fs, archiveFile) &&
archiveSource(fs, file, archiveFile, archivePerms)

* identifySourceFile is validating that the source file is checking all conditions,
* and if it does, will return the Path to the source file.
* It checks:
* - if the source directory exists
* - if the check file exists (ex: _SUCCESS)
* - if there is 1 not empty file in the directory beside the optional check file
def identifySourceFile(
fs: FileSystem,
sourceDirectory: Path,
expectedFilenameEnding: String,
checkDone: Boolean,
doneFilePath: Path
): Option[Path] = {
if (directoryExists(fs, sourceDirectory)) {
if (checkDone && !fs.exists(doneFilePath)) {
log.error(s"Done file ${doneFilePath.toString} is not present.")
} else {
getSourceFileFromDirectory(fs, sourceDirectory, expectedFilenameEnding, checkDone)
} else {

* Checks if a specific path exists in the file system, and checks that it is a directory.
def directoryExists(fs: FileSystem, path: Path): Boolean = {
if (!fs.exists(path)) {
log.error(s"Dir ${path.toString} does not exist.")
} else if (!fs.isDirectory(path)) {
log.error(s"Dir ${path.toString} is not a directory.")
} else {

* Try to get the Path to the source file and checks that a there is single non empty file in the source directory
* (except the success file).
def getSourceFileFromDirectory(
fs: FileSystem,
sourceDirectory: Path,
expectedFilenameEnding: String,
checkDone: Boolean
): Option[Path] = {
val files = listFilesInDir(fs, sourceDirectory)
if ((checkDone && files.length != 2) || (!checkDone && files.length != 1)) {
log.error(s"Wrong file count in ${sourceDirectory.toString}")
} else {
val sourceFile: Option[LocatedFileStatus] = files
.find {
if (sourceFile.isEmpty) {
log.error(s"Missing source in ${sourceDirectory.toString} (ending in: $expectedFilenameEnding)")
} else if (sourceFile.get.getLen == 0) {
log.error(s"Empty source in ${sourceDirectory.toString} (ending in: $expectedFilenameEnding)")
} else {

* Lists the files into a directory, and converts the iterator into a list for convenience.
def listFilesInDir(fs: FileSystem, dir: Path): List[LocatedFileStatus] = {
val result = ListBuffer[LocatedFileStatus]()
val iterator = fs.listFiles(dir, true)
while (iterator.hasNext) result +=

def createParentFolder(fs: FileSystem, file: Path): Boolean = {
// This import has to happen after setting the umask mode
import org.apache.hadoop.fs.permission.FsPermission
fs.mkdirs(file.getParent, new FsPermission("777")) // Only restrict through umask.

def archiveSource(fs: FileSystem, sourceFile: Path, archiveFile: Path, archivePerms: String): Boolean = {
import org.apache.hadoop.fs.permission.FsPermission
fs.delete(archiveFile, false)
val result = fs.rename(sourceFile, archiveFile) &&
fs.delete(sourceFile.getParent, true) &&
fs.setPermission(archiveFile, new FsPermission(archivePerms)).equals() // "equals" is here to get a bool return.
if (result) {"Archive created: ${archiveFile.toString}")
import java.util.{TimeZone, Calendar}

import java.util.{Calendar, TimeZone}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.SparkConf
import{MediawikiEvent, MediawikiEventPageDetails, MediawikiEventUserDetails}
import scopt.OptionParser

Expand All @@ -25,18 +21,17 @@ import scopt.OptionParser
* Size of the time bucket varies depending on the size of the wiki.
* For example, big wikis might be split in 1-month buckets, while
* medium wikis might be split in 1-year buckets, and small wikis
* might be outputed as a single file.
* might be output as a single file.
* Parameters:
* snapshot Mediawiki snapshot to dump (usually YYYY-MM).
* inputBasePath HDFS base path where to read data from.
* tempDirectory HDFS temporary directory for intermediate files.
* tempPartitions Number of partitions to rehash data with (internal).
* outputBasePath HDFS base path where to write the dump.
* - snapshot Mediawiki snapshot to dump (usually YYYY-MM).
* - inputBasePath HDFS base path where to read data from.
* - tempDirectory HDFS temporary directory for intermediate files.
* - tempPartitions Number of partitions to rehash data with (internal).
* - outputBasePath HDFS base path where to write the dump.
* Example of usage:
* sudo -u analytics spark2-submit \
* {{{ sudo -u analytics spark2-submit \
* --master yarn \
* --deploy-mode cluster \
* --executor-memory 32G \
Expand All @@ -49,7 +44,7 @@ import scopt.OptionParser
* --input-base-path /wmf/data/wmf/mediawiki/history \
* --temp-directory /tmp/mforns/mediawiki_history_dumps_12345 \
* --temp-partitions 256 \
* --output-base-path /wmf/data/archive/mediawiki/history
* --output-base-path /wmf/data/archive/mediawiki/history}}}

Expand Down Expand Up @@ -186,11 +181,11 @@ object MediawikiHistoryDumper {
flatMap(r => {
val event = MediawikiEvent.fromRow(r)
// We can get the value of event_timestamp as None are filtered out in eventTimeBucket function
.map(timeBucket => Seq(Row.fromTuple((event.wikiDb, timeBucket, event.eventTimestamp.get.getTime, event.toTSVLine))))
// The first 3 dimensions are used for proper partitioning
// and ordering of the data, only the tsv line will be output.
// We can get the value of event_timestamp as None are filtered out in eventTimeBucket function
.map(timeBucket => Seq(Row.fromTuple((event.wikiDb, timeBucket, event.eventTimestamp.get.getTime, event.toTSVLine))))
// The first 3 dimensions are used for proper partitioning
// and ordering of the data, only the tsv line will be output.
// The following line applies the repartitioning. It redistributes
// the data among tempPartitions partitions. And makes sure that
Expand Down Expand Up @@ -269,20 +264,18 @@ object MediawikiHistoryDumper {
val wiki = wikiDirectory.getPath.getName.substring(5)
// The substring removes Hive partition prefix (time_bucket=).
val timeBucket = timeDirectory.getPath.getName.substring(12)

val dataFiles = fs.listStatus(timeDirectory.getPath)
if (dataFiles.length > 1) {
// This should not happen.
// Just making sure that we do not leave out any file.
throw new RuntimeException("More than one file per folder generated.")
val sourcePath = dataFiles(0).getPath

val destinationDirectory = s"$outputBasePath/$snapshot/$wiki"
fs.mkdirs(new Path(destinationDirectory))
val destinationFile = s"$snapshot.$wiki.$timeBucket.tsv.bz2"
val destinationPath = new Path(s"$destinationDirectory/$destinationFile")
fs.rename(sourcePath, destinationPath)
sourceDirectory = timeDirectory.getPath,
expectedFilenameEnding = "",
checkDone = false,
doneFilePath = new Path(""),
archiveFile = destinationPath,
archiveParentUmask = "022",
archivePerms = "644"
