Skip to content

Commit

Permalink
Fix scala style
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Apr 28, 2015
1 parent 4310271 commit 7ef957c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
24 changes: 15 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* This method is identical to `parallelize`.
*/
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withRDDScope {
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withRDDScope {
parallelize(seq, numSlices)
}

Expand All @@ -684,7 +686,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withRDDScope {
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withRDDScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
Expand Down Expand Up @@ -1040,15 +1044,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = withRDDScope {
assertNotStopped()
val kc = kcf()
val vc = vcf()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
withRDDScope {
assertNotStopped()
val kc = kcf()
val vc = vcf()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] = self.withScope {
def subtractByKey[W: ClassTag](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, V)] = self.withScope {
subtractByKey(other, new HashPartitioner(numPartitions))
}

Expand Down

0 comments on commit 7ef957c

Please sign in to comment.