Skip to content

Commit

Permalink
Code style
Browse files Browse the repository at this point in the history
  • Loading branch information
xavierguihot committed Feb 16, 2018
1 parent b2b4cdc commit 4ed9394
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 107 deletions.
9 changes: 3 additions & 6 deletions src/main/scala/com/spark_helper/DateHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ object DateHelper extends Serializable {
jodaLastDate: DateTime
): List[DateTime] = {

val nbrOfDaysWithinRange = Days
.daysBetween(jodaFirstDate, jodaLastDate)
.getDays()
val nbrOfDaysWithinRange =
Days.daysBetween(jodaFirstDate, jodaLastDate).getDays()

(0 to nbrOfDaysWithinRange).toList.map(jodaFirstDate.plusDays)
}
Expand Down Expand Up @@ -240,12 +239,10 @@ object DateHelper extends Serializable {
date: String,
inputFormat: String,
outputFormat: String
): String = {

): String =
DateTimeFormat
.forPattern(outputFormat)
.print(DateTimeFormat.forPattern(inputFormat).parseDateTime(date))
}

/** Returns the current local timestamp.
*
Expand Down
67 changes: 19 additions & 48 deletions src/main/scala/com/spark_helper/HdfsHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,8 @@ object HdfsHelper extends Serializable {
*
* @param filePath the path of the empty file to create
*/
def createEmptyHdfsFile(filePath: String): Unit = {

val emptyFile =
FileSystem.get(new Configuration()).create(new Path(filePath))

emptyFile.close()
}
def createEmptyHdfsFile(filePath: String): Unit =
FileSystem.get(new Configuration()).create(new Path(filePath)).close()

/** Saves text in a file when content is too small to really require an RDD.
*
Expand Down Expand Up @@ -357,8 +352,7 @@ object HdfsHelper extends Serializable {
listFileNamesInFolder(
hdfsPath + "/" + status.getPath.getName,
true,
onlyName
)
onlyName)
// If it's a dir and we're not in a recursive option:
else
Nil
Expand All @@ -376,31 +370,27 @@ object HdfsHelper extends Serializable {
* @param hdfsPath the path of the folder for which to list folder names
* @return the list of folder names in the specified folder
*/
def listFolderNamesInFolder(hdfsPath: String): List[String] = {

def listFolderNamesInFolder(hdfsPath: String): List[String] =
FileSystem
.get(new Configuration())
.listStatus(new Path(hdfsPath))
.filter(!_.isFile)
.map(_.getPath.getName)
.toList
.sorted
}

/** Returns the joda DateTime of the last modification of the given file.
*
* @param hdfsPath the path of the file for which to get the last
* modification date.
* @return the joda DateTime of the last modification of the given file
*/
def fileModificationDateTime(hdfsPath: String): DateTime = {

def fileModificationDateTime(hdfsPath: String): DateTime =
new DateTime(
FileSystem
.get(new Configuration())
.getFileStatus(new Path(hdfsPath))
.getModificationTime())
}

/** Returns the stringified date of the last modification of the given file.
*
Expand Down Expand Up @@ -459,15 +449,10 @@ object HdfsHelper extends Serializable {
* since the last modification.
* @return the nbr of days since the given file has been last modified
*/
def nbrOfDaysSinceFileWasLastModified(hdfsPath: String): Int = {

def nbrOfDaysSinceFileWasLastModified(hdfsPath: String): Int =
Days
.daysBetween(
fileModificationDateTime(hdfsPath),
new DateTime()
)
.daysBetween(fileModificationDateTime(hdfsPath), new DateTime())
.getDays()
}

/** Appends a header and a footer to a file.
*
Expand All @@ -491,14 +476,12 @@ object HdfsHelper extends Serializable {
header: String,
footer: String,
workingFolderPath: String = ""
): Unit = {

): Unit =
appendHeaderAndFooterInternal(
filePath,
Some(header),
Some(footer),
workingFolderPath)
}

/** Appends a header to a file.
*
Expand All @@ -519,14 +502,12 @@ object HdfsHelper extends Serializable {
filePath: String,
header: String,
workingFolderPath: String = ""
): Unit = {

): Unit =
appendHeaderAndFooterInternal(
filePath,
Some(header),
None,
workingFolderPath)
}

/** Appends a footer to a file.
*
Expand All @@ -544,14 +525,12 @@ object HdfsHelper extends Serializable {
filePath: String,
footer: String,
workingFolderPath: String = ""
): Unit = {

): Unit =
appendHeaderAndFooterInternal(
filePath,
None,
Some(footer),
workingFolderPath)
}

/** Validates an XML file on hdfs in regard to the given XSD.
*
Expand Down Expand Up @@ -737,21 +716,18 @@ object HdfsHelper extends Serializable {
.filter(path => {

val fileAgeInDays = Days
.daysBetween(
new DateTime(path.getModificationTime()),
new DateTime()
)
.daysBetween(new DateTime(path.getModificationTime()), new DateTime())
.getDays()

fileAgeInDays >= purgeAge

})
.foreach(path => {
if (path.isFile)
.foreach {
case path if path.isFile =>
deleteFile(folderPath + "/" + path.getPath.getName)
else
case path =>
deleteFolder(folderPath + "/" + path.getPath.getName)
})
}
}

/** Internal implementation of the addition to a file of header and footer.
Expand Down Expand Up @@ -780,22 +756,17 @@ object HdfsHelper extends Serializable {
val inputFile = fileSystem.open(new Path(filePath))
val tmpOutputFile = fileSystem.create(new Path(tmpOutputPath))

header match {
case Some(header) =>
tmpOutputFile.write((header + "\n").getBytes("UTF-8"))
case None => ()
}
// If there is an header, we add it to the file:
header.foreach(h => tmpOutputFile.write((h + "\n").getBytes("UTF-8")))

try {
IOUtils.copyBytes(inputFile, tmpOutputFile, new Configuration(), false)
} finally {
inputFile.close()
}

footer match {
case Some(footer) => tmpOutputFile.write(footer.getBytes("UTF-8"))
case None => ()
}
// If there is a footer, we append it to the file:
footer.foreach(f => tmpOutputFile.write((f + "\n").getBytes("UTF-8")))

deleteFile(filePath)
moveFile(tmpOutputPath, filePath)
Expand Down
18 changes: 4 additions & 14 deletions src/main/scala/com/spark_helper/SparkHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,12 @@ object SparkHelper extends Serializable {
outputRDD: RDD[String],
outputFile: String,
workingFolder: String
): Unit = {

): Unit =
saveAsSingleTextFileWithWorkingFolderInternal(
outputRDD,
outputFile,
workingFolder,
None)
}

/** Saves an RDD in exactly one file.
*
Expand Down Expand Up @@ -140,14 +138,12 @@ object SparkHelper extends Serializable {
outputFile: String,
workingFolder: String,
compressionCodec: Class[_ <: CompressionCodec]
): Unit = {

): Unit =
saveAsSingleTextFileWithWorkingFolderInternal(
outputRDD,
outputFile,
workingFolder,
Some(compressionCodec))
}

/** Equivalent to sparkContext.textFile(), but for a specific record delimiter.
*
Expand Down Expand Up @@ -347,15 +343,13 @@ object SparkHelper extends Serializable {
lowerCoalescenceLevelFolder: String,
finalCoalescenceLevel: Int,
sparkContext: SparkContext
): Unit = {

): Unit =
decreaseCoalescenceInternal(
highCoalescenceLevelFolder,
lowerCoalescenceLevelFolder,
finalCoalescenceLevel,
sparkContext,
None)
}

/** Decreases the nbr of partitions of a folder.
*
Expand Down Expand Up @@ -393,15 +387,13 @@ object SparkHelper extends Serializable {
finalCoalescenceLevel: Int,
sparkContext: SparkContext,
compressionCodec: Class[_ <: CompressionCodec]
): Unit = {

): Unit =
decreaseCoalescenceInternal(
highCoalescenceLevelFolder,
lowerCoalescenceLevelFolder,
finalCoalescenceLevel,
sparkContext,
Some(compressionCodec))
}

/** Saves as text file, but by decreasing the nbr of partitions of the output.
*
Expand Down Expand Up @@ -556,9 +548,7 @@ object SparkHelper extends Serializable {
}
}

//////
// Internal core:
//////

private def saveAsSingleTextFileWithWorkingFolderInternal(
outputRDD: RDD[String],
Expand Down

0 comments on commit 4ed9394

Please sign in to comment.