Skip to content

Commit

Permalink
[SPARK-40015][PYTHON] Add sc.listArchives and sc.listFiles to PySpark
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add `sc.listArchives` and `sc.listFiles` to PySpark

### Why are the changes needed?
for function parity

### Does this PR introduce _any_ user-facing change?
yes, new APIs

### How was this patch tested?
added doctests

Closes apache#37445 from zhengruifeng/py_add_files.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
zhengruifeng authored and HyukjinKwon committed Aug 10, 2022
1 parent a08f2e5 commit 934b74d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
2 changes: 2 additions & 0 deletions python/docs/source/reference/pyspark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Spark Context APIs
SparkContext.getOrCreate
SparkContext.hadoopFile
SparkContext.hadoopRDD
SparkContext.listArchives
SparkContext.listFiles
SparkContext.newAPIHadoopFile
SparkContext.newAPIHadoopRDD
SparkContext.parallelize
Expand Down
50 changes: 50 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,9 +1249,33 @@ def addFile(self, path: str, recursive: bool = False) -> None:
... return [x * fileVal for x in iterator]
>>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
[100, 200, 300, 400]
>>> sc.listFiles
['file:/.../test.txt']
>>> path2 = os.path.join(tempdir, "test2.txt")
>>> with open(path2, "w") as testFile:
... _ = testFile.write("100")
>>> sc.addFile(path2)
>>> sorted(sc.listFiles)
['file:/.../test.txt', 'file:/.../test2.txt']
"""
self._jsc.sc().addFile(path, recursive)

@property
def listFiles(self) -> List[str]:
"""Returns a list of file paths that are added to resources.
.. versionadded:: 3.4.0
See Also
--------
SparkContext.addFile
"""
return list(
self._jvm.scala.collection.JavaConverters.seqAsJavaList( # type: ignore[union-attr]
self._jsc.sc().listFiles()
)
)

def addPyFile(self, path: str) -> None:
"""
Add a .py or .zip dependency for all tasks to be executed on this
Expand Down Expand Up @@ -1304,6 +1328,16 @@ def addArchive(self, path: str) -> None:
... _ = f.write("100")
... zipped.write(path, os.path.basename(path))
>>> sc.addArchive(zip_path)
>>> sc.listArchives
['file:/.../test.zip']
>>> zip_path2 = os.path.join(tempdir, "test2.zip")
>>> with zipfile.ZipFile(zip_path2, "w", zipfile.ZIP_DEFLATED) as zipped:
... with open(path, "w") as f:
... _ = f.write("100")
... zipped.write(path, os.path.basename(path))
>>> sc.addArchive(zip_path2)
>>> sorted(sc.listArchives)
['file:/.../test.zip', 'file:/.../test2.zip']
Reads the '100' as an integer in the zipped file, and processes
it with the data in the RDD.
Expand All @@ -1317,6 +1351,22 @@ def addArchive(self, path: str) -> None:
"""
self._jsc.sc().addArchive(path)

@property
def listArchives(self) -> List[str]:
"""Returns a list of archive paths that are added to resources.
.. versionadded:: 3.4.0
See Also
--------
SparkContext.addArchive
"""
return list(
self._jvm.scala.collection.JavaConverters.seqAsJavaList( # type: ignore[union-attr]
self._jsc.sc().listArchives()
)
)

def setCheckpointDir(self, dirName: str) -> None:
"""
Set the directory under which RDDs are going to be checkpointed. The
Expand Down

0 comments on commit 934b74d

Please sign in to comment.