/
HDFSSuite.scala
60 lines (39 loc) 路 1.52 KB
/
HDFSSuite.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.jetprobe.sample
import com.jetprobe.core.TestPipeline
import com.jetprobe.core.annotation.PipelineMeta
import com.jetprobe.core.generator.TemplateDataGen
import com.jetprobe.core.structure.{ExecutablePipeline, PipelineBuilder}
import com.jetprobe.hadoop.storage.{HDFSConfig, HDFSStorage}
import org.apache.hadoop.fs.Path
/**
* @author Shad.
*/
@PipelineMeta(name = "HDFS Testing")
class HDFSSuite extends TestPipeline {
val clusterHDFS = "hdfs://xxx.xx.xx"
val hdfsConf = new HDFSConfig(clusterHDFS,"shad")
val datagen = new TemplateDataGen(
"""/path/to/template.in""",
"""path/to/dataset.out""", 1000)
override def tasks: PipelineBuilder= {
task("Copy sample data",datagen,hdfsConf){ (data,hdfs) =>
hdfs.mkdir("/user/shad/data")
hdfs.write(data,"/user/shad/data/sample.in")
}
task("Cp sample directories",hdfsConf){ hadoop =>
//Copy the file
hadoop.copyFromLocal(localSrc= """/local/source/path""", destination = "/destination/path.out")
hadoop.copyToLocal("/path/to/hdfs","""/path/to/local""")
}
runCmd("local cmd","ls -al")
validate("HDFS validate",hdfsConf){ hdfs =>
given(hdfs.usingFS(fs => fs.getFileStatus(new Path("/user/name/data.in")))) { status =>
assertEquals("hdfs",status.getOwner)
}
}
validate("check file output",hdfsConf) { hdfs =>
val fsStatus = hdfs.usingFS(fs => fs.getContentSummary(new Path("/user/name/path")))
assertEquals(24L,fsStatus.get.getFileCount)
}
}
}