Skip to content

Commit

Permalink
Adding more tests on the Context class
Browse files Browse the repository at this point in the history
  • Loading branch information
htssouza committed Jun 24, 2016
1 parent 71f57b8 commit f2b1b44
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 4 deletions.
10 changes: 6 additions & 4 deletions dummy_spark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class SparkContext(object):

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')

DUMMY_VERSION = 'dummy version'

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=None, conf=None, gateway=None, jsc=None, profiler_cls=None):
self._callsite = None
SparkContext._ensure_initialized(self, gateway=gateway)
Expand Down Expand Up @@ -112,7 +114,7 @@ def setSystemProperty(cls, key, value):

@property
def version(self):
return 'dummy version'
return self.DUMMY_VERSION

@property
def startTime(self):
Expand Down Expand Up @@ -143,6 +145,9 @@ def textFile(self, name, minPartitions=None, use_unicode=True):
rdd = RDD(data, self, None)
return rdd

def addPyFile(self, path):
sys.path.append(path)

def pickleFile(self, name, minPartitions=None):
raise NotImplementedError

Expand Down Expand Up @@ -191,9 +196,6 @@ def addFile(self, path):
def clearFiles(self):
raise NotImplementedError

def addPyFile(self, path):
sys.path.append(path)

def setCheckpointDir(self, dirName):
raise NotImplementedError

Expand Down
108 changes: 108 additions & 0 deletions tests/unit/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,111 @@ def test_text_file(self):
f.seek(0)
rdd = ctx.textFile(f.name)
self.assertEquals(l, rdd.collect())

def test_version(self):
ctx = SparkContext()
self.assertEquals(ctx.version, SparkContext.DUMMY_VERSION)

def test_with_block(self):
with SparkContext():
pass
self.assertTrue(True)

def test_add_py_file(self):
with SparkContext() as ctx:
ctx.addPyFile(__file__)
self.assertTrue(True)

def test_hadoop_config(self):
ctx = SparkContext()
jvm = ctx._jsc
hc = jvm.hadoopConfiguration()
hc.set('key', 'value')
self.assertEquals(hc.get('key'), 'value')

def test_not_implemented_methods(self):
ctx = SparkContext()

with self.assertRaises(NotImplementedError):
ctx._checkpointFile(None, None)

with self.assertRaises(NotImplementedError):
ctx._dictToJavaMap(None)

with self.assertRaises(NotImplementedError):
ctx._getJavaStorageLevel(None)

with self.assertRaises(NotImplementedError):
ctx.accumulator(None)

with self.assertRaises(NotImplementedError):
ctx.addFile(None)

with self.assertRaises(NotImplementedError):
ctx.binaryFiles(None)

with self.assertRaises(NotImplementedError):
ctx.binaryRecords(None, None)

with self.assertRaises(NotImplementedError):
ctx.broadcast(None)

with self.assertRaises(NotImplementedError):
ctx.cancelAllJobs()

with self.assertRaises(NotImplementedError):
ctx.cancelJobGroup(None)

with self.assertRaises(NotImplementedError):
ctx.clearFiles()

with self.assertRaises(NotImplementedError):
ctx.dump_profiles(None)

with self.assertRaises(NotImplementedError):
ctx.getLocalProperty(None)

with self.assertRaises(NotImplementedError):
ctx.hadoopFile(None, None, None, None)

with self.assertRaises(NotImplementedError):
ctx.hadoopRDD(None, None, None)

with self.assertRaises(NotImplementedError):
ctx.newAPIHadoopFile(None, None, None, None)

with self.assertRaises(NotImplementedError):
ctx.newAPIHadoopRDD(None, None, None)

with self.assertRaises(NotImplementedError):
ctx.pickleFile(None)

with self.assertRaises(NotImplementedError):
ctx.runJob(None, None)

with self.assertRaises(NotImplementedError):
ctx.sequenceFile(None)

with self.assertRaises(NotImplementedError):
ctx.setCheckpointDir(None)

with self.assertRaises(NotImplementedError):
ctx.setJobGroup(None, None)

with self.assertRaises(NotImplementedError):
ctx.setLocalProperty(None, None)

with self.assertRaises(NotImplementedError):
ctx.show_profiles()

with self.assertRaises(NotImplementedError):
ctx.sparkUser()

with self.assertRaises(NotImplementedError):
ctx.statusTracker()

with self.assertRaises(NotImplementedError):
ctx.union(None)

with self.assertRaises(NotImplementedError):
ctx.wholeTextFiles(None)

0 comments on commit f2b1b44

Please sign in to comment.