diff --git a/python/docs/source/reference/pyspark.rst b/python/docs/source/reference/pyspark.rst index f0997255bb911..c3afae10ddb61 100644 --- a/python/docs/source/reference/pyspark.rst +++ b/python/docs/source/reference/pyspark.rst @@ -72,6 +72,8 @@ Spark Context APIs SparkContext.getOrCreate SparkContext.hadoopFile SparkContext.hadoopRDD + SparkContext.listArchives + SparkContext.listFiles SparkContext.newAPIHadoopFile SparkContext.newAPIHadoopRDD SparkContext.parallelize diff --git a/python/pyspark/context.py b/python/pyspark/context.py index adfdea51c9ca5..032efaef49288 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -1249,9 +1249,33 @@ def addFile(self, path: str, recursive: bool = False) -> None: ... return [x * fileVal for x in iterator] >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect() [100, 200, 300, 400] + >>> sc.listFiles + ['file:/.../test.txt'] + >>> path2 = os.path.join(tempdir, "test2.txt") + >>> with open(path2, "w") as testFile: + ... _ = testFile.write("100") + >>> sc.addFile(path2) + >>> sorted(sc.listFiles) + ['file:/.../test.txt', 'file:/.../test2.txt'] """ self._jsc.sc().addFile(path, recursive) + @property + def listFiles(self) -> List[str]: + """Returns a list of file paths that are added to resources. + + .. versionadded:: 3.4.0 + + See Also + -------- + SparkContext.addFile + """ + return list( + self._jvm.scala.collection.JavaConverters.seqAsJavaList( # type: ignore[union-attr] + self._jsc.sc().listFiles() + ) + ) + def addPyFile(self, path: str) -> None: """ Add a .py or .zip dependency for all tasks to be executed on this @@ -1304,6 +1328,16 @@ def addArchive(self, path: str) -> None: ... _ = f.write("100") ... zipped.write(path, os.path.basename(path)) >>> sc.addArchive(zip_path) + >>> sc.listArchives + ['file:/.../test.zip'] + >>> zip_path2 = os.path.join(tempdir, "test2.zip") + >>> with zipfile.ZipFile(zip_path2, "w", zipfile.ZIP_DEFLATED) as zipped: + ... with open(path, "w") as f: + ... _ = f.write("100") + ... zipped.write(path, os.path.basename(path)) + >>> sc.addArchive(zip_path2) + >>> sorted(sc.listArchives) + ['file:/.../test.zip', 'file:/.../test2.zip'] Reads the '100' as an integer in the zipped file, and processes it with the data in the RDD. @@ -1317,6 +1351,22 @@ def addArchive(self, path: str) -> None: """ self._jsc.sc().addArchive(path) + @property + def listArchives(self) -> List[str]: + """Returns a list of archive paths that are added to resources. + + .. versionadded:: 3.4.0 + + See Also + -------- + SparkContext.addArchive + """ + return list( + self._jvm.scala.collection.JavaConverters.seqAsJavaList( # type: ignore[union-attr] + self._jsc.sc().listArchives() + ) + ) + def setCheckpointDir(self, dirName: str) -> None: """ Set the directory under which RDDs are going to be checkpointed. The