Skip to content

Commit

Permalink
Minor refactoring and add partitionBy to save, saveAsTable, and parquet.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Jun 22, 2015
1 parent 7fbc24b commit 889eb25
Showing 1 changed file with 14 additions and 29 deletions.
43 changes: 14 additions & 29 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ def mode(self, saveMode):
>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self._jwrite = self._jwrite.mode(saveMode)
# At the JVM side, the default value of mode is already set to "error".
# So, if the given saveMode is None, we will not call JVM-side's mode method.
if saveMode is not None:
self._jwrite = self._jwrite.mode(saveMode)
return self

@since(1.4)
Expand Down Expand Up @@ -253,11 +256,12 @@ def partitionBy(self, *cols):
"""
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
cols = cols[0]
self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols))
if len(cols) > 0:
self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols))
return self

@since(1.4)
def save(self, path=None, format=None, mode=None, **options):
def save(self, path=None, format=None, mode=None, partitionBy=(), **options):
"""Saves the contents of the :class:`DataFrame` to a data source.
The data source is specified by the ``format`` and a set of ``options``.
Expand All @@ -276,11 +280,7 @@ def save(self, path=None, format=None, mode=None, **options):
>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
if mode is not None:
# At the JVM side, the default value of mode is already set to "error".
# We will only call mode method if the provided mode is not None.
self.mode(mode)
self.options(**options)
self.partitionBy(partitionBy).mode(mode).options(**options)
if format is not None:
self.format(format)
if path is None:
Expand All @@ -300,7 +300,7 @@ def insertInto(self, tableName, overwrite=False):
self._jwrite.mode("overwrite" if overwrite else "append").insertInto(tableName)

@since(1.4)
def saveAsTable(self, name, format=None, mode=None, **options):
def saveAsTable(self, name, format=None, mode=None, partitionBy=(), **options):
"""Saves the content of the :class:`DataFrame` as the specified table.
In the case the table already exists, behavior of this function depends on the
Expand All @@ -318,11 +318,7 @@ def saveAsTable(self, name, format=None, mode=None, **options):
:param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
:param options: all other string options
"""
if mode is not None:
# At the JVM side, the default value of mode is already set to "error".
# We will only call mode method if the provided mode is not None.
self.mode(mode)
self.options(**options)
self.partitionBy(partitionBy).mode(mode).options(**options)
if format is not None:
self.format(format)
self._jwrite.saveAsTable(name)
Expand All @@ -341,14 +337,10 @@ def json(self, path, mode=None):
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
if mode is not None:
# At the JVM side, the default value of mode is already set to "error".
# We will only call mode method if the provided mode is not None.
self.mode(mode)
self._jwrite.json(path)
self._jwrite.mode(mode).json(path)

@since(1.4)
def parquet(self, path, mode=None):
def parquet(self, path, mode=None, partitionBy=()):
"""Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
:param path: the path in any Hadoop supported file system
Expand All @@ -361,10 +353,7 @@ def parquet(self, path, mode=None):
>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
"""
if mode is not None:
# At the JVM side, the default value of mode is already set to "error".
# We will only call mode method if the provided mode is not None.
self.mode(mode)
self.partitionBy(partitionBy).mode(mode)
self._jwrite.parquet(path)

@since(1.4)
Expand All @@ -386,14 +375,10 @@ def jdbc(self, url, table, mode=None, properties={}):
arbitrary string tag/value. Normally at least a
"user" and "password" property should be included.
"""
if mode is not None:
# At the JVM side, the default value of mode is already set to "error".
# We will only call mode method if the provided mode is not None.
self.mode(mode)
jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
for k in properties:
jprop.setProperty(k, properties[k])
self._jwrite.jdbc(url, table, jprop)
self._jwrite.mode(mode).jdbc(url, table, jprop)


def _test():
Expand Down

0 comments on commit 889eb25

Please sign in to comment.