Skip to content

Commit

Permalink
[SPARK-2024] Adding 2 saveAsHadoopDataset tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kanzhang committed Jul 28, 2014
1 parent 0c134f3 commit 9f39ff4
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions python/pyspark/tests.py
Expand Up @@ -535,6 +535,15 @@ def test_oldhadoop(self):
"org.apache.hadoop.io.MapWritable").collect())
self.assertEqual(result, dict_data)

conf = {
"mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat",
"mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class" : "org.apache.hadoop.io.MapWritable",
"mapred.output.dir" : basepath + "/olddataset/"}
self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
old_dataset = sorted(self.sc.sequenceFile(basepath + "/olddataset/").collect())
self.assertEqual(old_dataset, dict_data)

def test_newhadoop(self):
basepath = self.tempdir.name
# use custom ArrayWritable types and converters to handle arrays
Expand All @@ -555,6 +564,19 @@ def test_newhadoop(self):
valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
self.assertEqual(result, array_data)

conf = {"mapreduce.outputformat.class" :
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
"mapred.output.key.class" : "org.apache.hadoop.io.IntWritable",
"mapred.output.value.class" : "org.apache.spark.api.python.DoubleArrayWritable",
"mapred.output.dir" : basepath + "/newdataset/"}
self.sc.parallelize(array_data).saveAsNewAPIHadoopDataset(conf,
valueConverter="org.apache.spark.api.python.DoubleArrayToWritableConverter")
new_dataset = sorted(self.sc.sequenceFile(
basepath + "/newdataset/",
valueClass="org.apache.spark.api.python.DoubleArrayWritable",
valueConverter="org.apache.spark.api.python.WritableToDoubleArrayConverter").collect())
self.assertEqual(new_dataset, array_data)

def test_newolderror(self):
basepath = self.tempdir.name
rdd = self.sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
Expand Down

0 comments on commit 9f39ff4

Please sign in to comment.