diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 50535d2711708..57754776faaa2 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -310,8 +310,11 @@ def distinct(self, numPartitions=None): def sample(self, withReplacement, fraction, seed=None): """ - Return a sampled subset of this RDD (relies on numpy and falls back - on default random generator if numpy is unavailable). + Return a sampled subset of this RDD. + + >>> rdd = sc.parallelize(range(100), 4) + >>> rdd.sample(False, 0.1, 81).count() + 10 """ assert fraction >= 0.0, "Negative fraction value: %s" % fraction return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) @@ -343,8 +346,7 @@ def randomSplit(self, weights, seed=None): # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed=None): """ - Return a fixed-size sampled subset of this RDD (currently requires - numpy). + Return a fixed-size sampled subset of this RDD. >>> rdd = sc.parallelize(range(0, 10)) >>> len(rdd.takeSample(True, 20, 1)) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 558dcfd12d46f..459e1427803cb 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -17,81 +17,48 @@ import sys import random +import math class RDDSamplerBase(object): def __init__(self, withReplacement, seed=None): - try: - import numpy - self._use_numpy = True - except ImportError: - print >> sys.stderr, ( - "NumPy does not appear to be installed. " - "Falling back to default random generator for sampling.") - self._use_numpy = False - - self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1) + self._seed = seed if seed is not None else random.randint(0, sys.maxint) self._withReplacement = withReplacement self._random = None - self._split = None - self._rand_initialized = False def initRandomGenerator(self, split): - if self._use_numpy: - import numpy - self._random = numpy.random.RandomState(self._seed ^ split) - else: - self._random = random.Random(self._seed ^ split) + self._random = random.Random(self._seed ^ split) # mixing because the initial seeds are close to each other for _ in xrange(10): self._random.randint(0, 1) - self._split = split - self._rand_initialized = True - - def getUniformSample(self, split): - if not self._rand_initialized or split != self._split: - self.initRandomGenerator(split) - - if self._use_numpy: - return self._random.random_sample() + def getUniformSample(self): + return self._random.random() + + def getPoissonSample(self, mean): + # Using Knuth's algorithm described in + # http://en.wikipedia.org/wiki/Poisson_distribution + if mean < 20.0: + # one exp and k+1 random calls + l = math.exp(-mean) + p = self._random.random() + k = 0 + while p > l: + k += 1 + p *= self._random.random() else: - return self._random.uniform(0.0, 1.0) - - def getPoissonSample(self, split, mean): - if not self._rand_initialized or split != self._split: - self.initRandomGenerator(split) - - if self._use_numpy: - return self._random.poisson(mean) - else: - # here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by - # drawing a sequence of numbers delta_j ~ Exp(mean) - num_arrivals = 1 - cur_time = 0.0 - - cur_time += self._random.expovariate(mean) + # switch to the log domain, k+1 expovariate (random + log) calls + p = self._random.expovariate(mean) + k = 0 + while p < 1.0: + k += 1 + p += self._random.expovariate(mean) + return k - if cur_time > 1.0: - return 0 - - while(cur_time <= 1.0): - cur_time += self._random.expovariate(mean) - num_arrivals += 1 - - return (num_arrivals - 1) - - def shuffle(self, vals): - if self._random is None: - self.initRandomGenerator(0) # this should only ever called on the master so - # the split does not matter - - if self._use_numpy: - self._random.shuffle(vals) - else: - self._random.shuffle(vals, self._random.random) + def func(self, split, iterator): + raise NotImplementedError class RDDSampler(RDDSamplerBase): @@ -101,17 +68,18 @@ def __init__(self, withReplacement, fraction, seed=None): self._fraction = fraction def func(self, split, iterator): + self.initRandomGenerator(split) if self._withReplacement: for obj in iterator: # For large datasets, the expected number of occurrences of each element in # a sample with replacement is Poisson(frac). We use that to get a count for # each element. - count = self.getPoissonSample(split, mean=self._fraction) + count = self.getPoissonSample(self._fraction) for _ in range(0, count): yield obj else: for obj in iterator: - if self.getUniformSample(split) <= self._fraction: + if self.getUniformSample() < self._fraction: yield obj @@ -119,13 +87,13 @@ class RDDRangeSampler(RDDSamplerBase): def __init__(self, lowerBound, upperBound, seed=None): RDDSamplerBase.__init__(self, False, seed) - self._use_numpy = False # no performance gain from numpy self._lowerBound = lowerBound self._upperBound = upperBound def func(self, split, iterator): + self.initRandomGenerator(split) for obj in iterator: - if self._lowerBound <= self.getUniformSample(split) < self._upperBound: + if self._lowerBound <= self.getUniformSample() < self._upperBound: yield obj @@ -136,15 +104,16 @@ def __init__(self, withReplacement, fractions, seed=None): self._fractions = fractions def func(self, split, iterator): + self.initRandomGenerator(split) if self._withReplacement: for key, val in iterator: # For large datasets, the expected number of occurrences of each element in # a sample with replacement is Poisson(frac). We use that to get a count for # each element. - count = self.getPoissonSample(split, mean=self._fractions[key]) + count = self.getPoissonSample(self._fractions[key]) for _ in range(0, count): yield key, val else: for key, val in iterator: - if self.getUniformSample(split) <= self._fractions[key]: + if self.getUniformSample() < self._fractions[key]: yield key, val