-sandbox

# Koalas: 10 minutes from pandas to Koalas on Apache Spark
##*Notebook available here:* `github.com/niall-turbitt/koalas_demos`
<div style="text-align: center; line-height: 10; padding-top: 20px;">
  <img src="https://raw.githubusercontent.com/databricks/koalas/master/Koalas-logo.png" width="220"/>
</div>

The following tutorial has been adapted from the Databricks blog post ["10 minutes from pandas to Koalas on Apache Spark"](https://databricks.com/blog/2020/03/31/10-minutes-from-pandas-to-koalas-on-apache-spark.html).

**Outline**



**Libraries:**
* `koalas==1.0.0`

**Tested on:**
* `DBR 7.0 ML`

**Resources*
* [Koalas docs](https://koalas.readthedocs.io/en/latest/)

To utilize `%pip` or `%conda`, set `spark.databricks.conda.condaMagic.enabled true` in Spark cluster config. See [notebook-scoped libraries](https://docs.databricks.com/notebooks/notebooks-python-libraries.html#notebook-scoped-python-libraries) for more information

In [3]:
%pip install koalas==1.0.0

## Object Creation

The packages below are customarily imported in order to use Koalas. Technically those packages like numpy or pandas are not necessary, but allow users to utilize Koalas more flexibly.

In [6]:
import numpy as np
import pandas as pd
import databricks.koalas as ks

A Koalas Series can be created by passing a list of values, the same way as a pandas Series. A Koalas Series can also be created by passing a pandas Series.

In [8]:
# Create a pandas Series
pser = pd.Series([1, 3, 5, np.nan, 6, 8]) 
# Create a Koalas Series
kser = ks.Series([1, 3, 5, np.nan, 6, 8])
# Create a Koalas Series by passing a pandas Series
kser = ks.Series(pser)
kser = ks.from_pandas(pser)

**Best Practice:** Koalas does not guarantee the order of indices unlike pandas. This is because almost all operations in Koalas run in a distributed manner. You can use `Series.sort_index()` if you want ordered indices.

In [10]:
pser

In [11]:
kser

In [12]:
kser.sort_index() 

A Koalas DataFrame can also be created by passing a NumPy array, the same way as a pandas DataFrame. A Koalas DataFrame has an Index unlike PySpark DataFrame. Therefore, the Index of the pandas DataFrame would be preserved in the Koalas DataFrame after creating a Koalas DataFrame by passing a pandas DataFrame.

In [14]:
# Create a pandas DataFrame
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})
# Create a Koalas DataFrame
kdf = ks.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})
# Create a Koalas DataFrame by passing a pandas DataFrame
kdf = ks.DataFrame(pdf)
kdf = ks.from_pandas(pdf)

Likewise, the order of indices can be sorted by `DataFrame.sort_index()`.

In [16]:
pdf

Unnamed: 0,A,B
0,0.480072,0.974401
1,0.661875,0.305487
2,0.091322,0.302075
3,0.797951,0.340726
4,0.601441,0.783855


In [17]:
kdf.sort_index()

Unnamed: 0,A,B
0,0.480072,0.974401
1,0.661875,0.305487
2,0.091322,0.302075
3,0.797951,0.340726
4,0.601441,0.783855


## Viewing Data

As with a pandas DataFrame, the top rows of a Koalas DataFrame can be displayed using `DataFrame.head()`. Confusion can occur when converting from pandas to PySpark due to the different behavior of the `head()` between pandas and PySpark, but Koalas supports this in the same way as pandas by using `limit()` of PySpark under the hood.

In [20]:
kdf.head(2)

Unnamed: 0,A,B
0,0.480072,0.974401
2,0.091322,0.302075


A quick statistical summary of a Koalas DataFrame can be displayed using `DataFrame.describe()`.

In [22]:
kdf.describe()

Unnamed: 0,A,B
count,5.0,5.0
mean,0.526532,0.541309
std,0.26887,0.31602
min,0.091322,0.302075
25%,0.480072,0.305487
50%,0.601441,0.340726
75%,0.661875,0.783855
max,0.797951,0.974401


Sorting a Koalas DataFrame can be done using `DataFrame.sort_values()`.

In [24]:
kdf.sort_values(by='B')

Unnamed: 0,A,B
2,0.091322,0.302075
1,0.661875,0.305487
3,0.797951,0.340726
4,0.601441,0.783855
0,0.480072,0.974401


Transposing a Koalas DataFrame can be done using `DataFrame.transpose()`.

In [26]:
kdf.transpose()

Unnamed: 0,1,3,0,2,4
B,0.305487,0.340726,0.974401,0.302075,0.783855
A,0.661875,0.797951,0.480072,0.091322,0.601441


**Best Practice:** `DataFrame.transpose()` will fail when the number of rows is more than the value of `compute.max_rows`, which is set to 1000 by default. This is to prevent users from unknowingly executing expensive operations. In Koalas, you can easily reset the default `compute.max_rows`. See the official docs for [`DataFrame.transpose()`](https://koalas.readthedocs.io/en/latest/reference/api/databricks.koalas.DataFrame.transpose.html#databricks.koalas.DataFrame.transpose) for more details.

In [28]:
from databricks.koalas.config import set_option, get_option

ks.get_option('compute.max_rows')

In [29]:
ks.set_option('compute.max_rows', 2000)
ks.get_option('compute.max_rows')

## Selecting or Accessing Data

As with a pandas DataFrame, selecting a single column from a Koalas DataFrame returns a Series.

In [32]:
kdf['A']

Selecting multiple columns from a Koalas DataFrame returns a Koalas DataFrame.

In [34]:
kdf[['A', 'B']]

Unnamed: 0,A,B
1,0.661875,0.305487
3,0.797951,0.340726
0,0.480072,0.974401
2,0.091322,0.302075
4,0.601441,0.783855


Slicing is available for selecting rows from a Koalas DataFrame.

In [36]:
kdf.loc[1:2]

Unnamed: 0,A,B
1,0.661875,0.305487
2,0.091322,0.302075


Slicing rows and columns is also available.

In [38]:
kdf.iloc[:3, 1:2]

Unnamed: 0,B
0,0.974401
1,0.305487
2,0.302075


**Best Practice:** By default, Koalas disallows adding columns coming from different DataFrames or Series to a Koalas DataFrame as adding columns requires join operations which are generally expensive. This operation can be enabled by setting `compute.ops_on_diff_frames` to True. See [Available options](https://koalas.readthedocs.io/en/latest/user_guide/options.html#available-options)  in the docs for more detail.

In [40]:
# kser = ks.Series([100, 200, 300, 400, 500], index=[0, 1, 2, 3, 4])
# kdf['C'] = kser

In [41]:
from databricks.koalas.config import set_option, reset_option

set_option("compute.ops_on_diff_frames", True)
kdf['C'] = kser

In [42]:
# Reset to default to avoid potential expensive operation in the future
reset_option("compute.ops_on_diff_frames")
kdf

Unnamed: 0,A,B,C
0,0.480072,0.974401,1.0
3,0.797951,0.340726,
4,0.601441,0.783855,6.0
2,0.091322,0.302075,5.0
1,0.661875,0.305487,3.0


## Applying a Python Function to Koalas DataFrame

`DataFrame.apply()` is a very powerful function favored by many pandas users. Koalas DataFrames also support this function.

In [45]:
kdf.apply(np.cumsum)

Unnamed: 0,A,B,C
3,1.278023,1.315127,
4,1.970787,2.401057,12.0
1,2.632661,2.706544,15.0
0,0.480072,0.974401,1.0
2,1.369345,1.617202,6.0


`DataFrame.apply()` also works for axis = 1 or ‘columns’ (0 or ‘index’ is the default).

In [47]:
kdf.apply(np.cumsum, axis=1)

Unnamed: 0,A,B,C
0,0.480072,1.454473,2.454473
1,0.661875,0.967361,3.967361
2,0.091322,0.393397,5.393397
3,0.797951,1.138678,
4,0.601441,1.385296,7.385296


Also, a Python native function can be applied to a Koalas DataFrame.

In [49]:
kdf.apply(lambda x: x ** 2)

Unnamed: 0,A,B,C
1,0.438078,0.093322,9.0
3,0.636727,0.116094,
0,0.230469,0.949457,1.0
4,0.361731,0.614429,36.0
2,0.00834,0.091249,25.0


**Best Practice:** While it works fine as it is, it is recommended to specify the return type hint for Spark’s return type internally when applying user defined functions to a Koalas DataFrame. If the return type hint is not specified, Koalas runs the function once for a small sample to infer the Spark return type which can be fairly expensive.

In [51]:
def square(x) -> ks.Series[np.float64]:
  return x ** 2

kdf.apply(square)

Unnamed: 0,A,B,C
0,0.230469,0.949457,1.0
1,0.438078,0.093322,9.0
2,0.636727,0.116094,
3,0.00834,0.091249,25.0
4,0.361731,0.614429,36.0


Note that `DataFrame.apply()` in Koalas does not support global aggregations by its design. However, If the size of data is lower than `compute.shortcut_limit`, it might work because it uses pandas as a shortcut execution.

In [53]:
# Working properly since size of data <= compute.shortcut_limit (1000)
ks.DataFrame({'A': range(1000)}).apply(lambda col: col.max())

In [54]:
# Not working properly since size of data > compute.shortcut_limit (1000)
ks.DataFrame({'A': range(1001)}).apply(lambda col: col.max())

**Best Practice:** In Koalas, `compute.shortcut_limit` (default = 1000) computes a specified number of rows in pandas as a shortcut when operating on a small dataset. Koalas uses the pandas API directly in some cases when the size of input data is below this threshold. Therefore, setting this limit too high could slow down the execution or even lead to out-of-memory errors. The following code example sets a higher `compute.shortcut_limit`, which then allows the previous code to work properly. See the [Available options](https://koalas.readthedocs.io/en/latest/user_guide/options.html#available-options) for more details.

In [56]:
ks.set_option('compute.shortcut_limit', 1001)
ks.DataFrame({'A': range(1001)}).apply(lambda col: col.max())

## Grouping Data

Grouping data by columns is one of the common APIs in pandas. `DataFrame.groupby()` is available in Koalas as well.

In [59]:
kdf.groupby('A').sum()

Unnamed: 0_level_0,B,C
A,Unnamed: 1_level_1,Unnamed: 2_level_1
0.091322,0.302075,5.0
0.797951,0.340726,
0.480072,0.974401,1.0
0.601441,0.783855,6.0
0.661875,0.305487,3.0


See also grouping data by multiple columns below.

In [61]:
kdf.groupby(['A', 'B']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,C
A,B,Unnamed: 2_level_1
0.601441,0.783855,6.0
0.091322,0.302075,5.0
0.797951,0.340726,
0.480072,0.974401,1.0
0.661875,0.305487,3.0


## Plotting and Visualizing Data

In pandas, `DataFrame.plot` is a good solution for visualizing data. It can be used in the same way in Koalas.

Note that Koalas leverages approximation for faster rendering. Therefore, the results could be slightly different when the number of data is larger than `plotting.max_rows`.

See the example below that plots a Koalas DataFrame as a bar chart with `DataFrame.plot.bar()`.

In [64]:
speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]
index = ['snail', 'pig', 'elephant','rabbit', 'giraffe', 'coyote', 'horse']
kdf = ks.DataFrame({'speed': speed, 'lifespan': lifespan}, index=index)
kdf.plot.bar(figsize=(12,8))           

Also, The horizontal bar plot is supported with `DataFrame.plot.barh()`.

In [66]:
kdf.plot.barh(figsize=(12,8))

Make a pie plot using `DataFrame.plot.pie()`.

In [68]:
kdf = ks.DataFrame({'mass': [0.330, 4.87, 5.97], 
                    'radius': [2439.7, 6051.8, 6378.1]},
                   index=['Mercury', 'Venus', 'Earth'])
kdf.plot.pie(y='mass', figsize=(12,8))

**Best Practice:** For bar and pie plots, only the top-n-rows are displayed to render more efficiently, which can be set by using option `plotting.max_rows`.

Make a stacked area plot using DataFrame.plot.area().

In [71]:
kdf = ks.DataFrame({'sales': [3, 2, 3, 9, 10, 6, 3],
                    'signups': [5, 5, 6, 12, 14, 13, 9],
                    'visits': [20, 42, 28, 62, 81, 50, 90],}, 
                   index=pd.date_range(start='2019/08/15', end='2020/03/09', freq='M'))
kdf.plot.area(figsize=(12,8))

Make line charts using `DataFrame.plot.line()`.

In [73]:
kdf = ks.DataFrame({'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]},
                   index=[1990, 1997, 2003, 2009, 2014])
kdf.plot.line(figsize=(12,8))

**Best Practice:** For area and line plots, the proportion of data that will be plotted can be set by `plotting.sample_ratio`. The default is 1000, or the same as `plotting.max_rows`. See [Available options](https://koalas.readthedocs.io/en/latest/user_guide/options.html#available-options) for details.

Make a histogram using `DataFrame.plot.hist()`.

In [76]:
kdf = pd.DataFrame(np.random.randint(1, 7, 6000), columns=['one'])
kdf['two'] = kdf['one'] + np.random.randint(1, 7, 6000)
kdf = ks.from_pandas(kdf)
kdf.plot.hist(bins=12, alpha=0.5, figsize=(12,8))

Make a scatter plot using `DataFrame.plot.scatter()`.

In [78]:
kdf = ks.DataFrame([[5.1, 3.5, 0], 
                    [4.9, 3.0, 0], 
                    [7.0, 3.2, 1], 
                    [6.4, 3.2, 1], 
                    [5.9, 3.0, 2]],
                   columns=['length', 'width', 'species'])

kdf.plot.scatter(x='length', y='width', c='species', colormap='viridis', figsize=(12,8))

## Missing Functionalities and Workarounds in Koalas

When working with Koalas, there are a few things to look out for. First, not all pandas APIs are currently available in Koalas. Currently, about ~70% of pandas APIs are available in Koalas. In addition, there are subtle behavioral differences between Koalas and pandas, even if the same APIs are applied. Due to the difference, it would not make sense to implement certain pandas APIs in Koalas. Here we discuss some common workarounds.

**Using pandas APIs via Conversion**

When dealing with missing pandas APIs in Koalas, a common workaround is to convert Koalas DataFrames to pandas or PySpark DataFrames, and then apply either pandas or PySpark APIs. Converting between Koalas DataFrames and pandas/PySpark DataFrames is pretty straightforward: `DataFrame.to_pandas()` and `koalas.from_pandas()` for conversion to/from pandas; `DataFrame.to_spark()` and `DataFrame.to_koalas()` for conversion to/from PySpark. However, if the Koalas DataFrame is too large to fit in one single machine, converting to pandas can cause an out-of-memory error.

Following code snippets shows a simple usage of `DataFrame.to_pandas()`.

In [81]:
kidx = kdf.index
# kidx.to_list()

**Best Practice:** `Index.to_list()` raises `PandasNotImplementedError`. Koalas does not support this because it requires collecting all data into the client (driver node) side. A simple workaround is to convert to pandas using `to_pandas()`.

In [83]:
kidx.to_pandas().to_list()

**Native Support for pandas Objects**

Koalas has also made available the native support for pandas objects. Koalas can directly leverage pandas objects as below.

In [85]:
kdf = ks.DataFrame({'A': 1.,
                    'B': pd.Timestamp('20200630'),
                    'C': pd.Series(1, index=list(range(4)), dtype='float32'),
                    'D': np.array([3] * 4, dtype='int32'),
                    'F': 'foo'})

kdf

Unnamed: 0,A,B,C,D,F
0,1.0,2020-06-30,1.0,3,foo
2,1.0,2020-06-30,1.0,3,foo
1,1.0,2020-06-30,1.0,3,foo
3,1.0,2020-06-30,1.0,3,foo


`ks.Timestamp()` is not implemented yet, and `ks.Series()` cannot be used in the creation of Koalas DataFrame. In these cases, the pandas native objects `pd.Timestamp()` and `pd.Series()` can be used instead.

**Distributing a pandas Function in Koalas**

In addition, Koalas offers Koalas-specific APIs such as `DataFrame.map_in_pandas()`, which natively support distributing a given pandas function in Koalas.

In [88]:
i = pd.date_range('2020-06-30', periods=2000, freq='1D1min')
ts = ks.DataFrame({'A': ['timestamp']}, index=i)
# ts.between_time('0:15', '0:16')

`DataFrame.between_time()` is not yet implemented in Koalas. As shown below, a simple workaround is to convert to a pandas DataFrame using `to_pandas()`, and then applying the function.

In [90]:
ts.to_pandas().between_time('0:15', '0:16')

Unnamed: 0,A
2020-07-15 00:15:00,timestamp
2020-07-16 00:16:00,timestamp
2024-06-25 00:15:00,timestamp
2024-06-26 00:16:00,timestamp


However, `DataFrame.map_in_pandas()` is a better alternative workaround because it does not require moving data into a single client node and potentially causing out-of-memory errors.

In [92]:
ts.map_in_pandas(func=lambda pdf: pdf.between_time('0:15', '0:16'))

Unnamed: 0,A
2024-06-25 00:15:00,timestamp
2024-06-26 00:16:00,timestamp
2020-07-15 00:15:00,timestamp
2020-07-16 00:16:00,timestamp


**Best Practice:** In this way, `DataFrame.between_time()`, which is a pandas function, can be performed on a distributed Koalas DataFrame because `DataFrame.map_in_pandas()` executes the given function across multiple nodes. See [`DataFrame.map_in_pandas()`](https://koalas.readthedocs.io/en/latest/reference/api/databricks.koalas.DataFrame.map_in_pandas.html#databricks.koalas.DataFrame.map_in_pandas).

## Using SQL in Koalas
Koalas supports standard SQL syntax with `ks.sql()` which allows executing Spark SQL query and returns the result as a Koalas DataFrame.

In [95]:
kdf = ks.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]})

ks.sql("SELECT * FROM {kdf} WHERE pig > 100")

Unnamed: 0,year,pig,horse
0,2003,489,281
1,2009,675,600
2,2014,1776,1900


Also, mixing Koalas DataFrame and pandas DataFrame is supported in a join operation.

In [97]:
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'sheep': [22, 50, 121, 445, 791],
                    'chicken': [250, 326, 589, 1241, 2118]})

ks.sql("""
          SELECT ks.pig, pd.chicken
          FROM {kdf} ks INNER JOIN {pdf} pd
          ON ks.year = pd.year
          ORDER BY ks.pig, pd.chicken
          """)

Unnamed: 0,pig,chicken
0,18,326
1,20,250
2,489,589
3,675,1241
4,1776,2118


## Working with PySpark

You can also apply several PySpark APIs on Koalas DataFrames. PySpark background can make you more productive when working in Koalas. If you know PySpark, you can use PySpark APIs as workarounds when the pandas-equivalent APIs are not available in Koalas. If you feel comfortable with PySpark, you can use many rich features such as the Spark UI, history server, etc.

**Conversion to and from PySpark DataFrame**

A Koalas DataFrame can be easily converted to a PySpark DataFrame using `DataFrame.to_spark()`, similar to `DataFrame.to_pandas()`. On the other hand, a PySpark DataFrame can be easily converted to a Koalas DataFrame using `DataFrame.to_koalas()`, which extends the Spark DataFrame class.

In [100]:
kdf = ks.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
sdf = kdf.to_spark()
type(sdf)

In [101]:
sdf.show()

Note that converting from PySpark to Koalas can cause an out-of-memory error when the default index type is sequence. Default index type can be set by `compute.default_index_type` (default = sequence). If the default index must be the sequence in a large dataset, distributed-sequence should be used.

In [103]:
from databricks.koalas import option_context

with option_context(
  "compute.default_index_type", "distributed-sequence"):
  kdf = sdf.to_koalas()
  
type(kdf)

In [104]:
kdf

Unnamed: 0,A,B
4,5,50
0,1,10
1,2,20
2,3,30
3,4,40


**Best Practice:** Converting from a PySpark DataFrame to Koalas DataFrame can have some overhead because it requires creating a new default index internally – PySpark DataFrames do not have indices. You can avoid this overhead by specifying the column that can be used as an index column. See the [Default Index type](https://koalas.readthedocs.io/en/latest/user_guide/options.html#default-index-type) for more detail.

## Checking Spark’s Execution Plans

`DataFrame.explain()` is a useful PySpark API and is also available in Koalas. It can show the Spark execution plans before the actual execution. It helps you understand and predict the actual execution and avoid the critical performance degradation.

In [107]:
from databricks.koalas import option_context

with option_context(
        "compute.ops_on_diff_frames", True,
        "compute.default_index_type", 'distributed'):
    df = ks.range(10) + ks.range(10)
    df.explain()

As shown in the physical plan, the execution will be fairly expensive because it will perform the sort merge join to combine DataFrames. To improve the execution performance, you can reuse the same DataFrame to avoid the merge. See[ Physical Plans in Spark SQL](https://databricks.com/session_eu19/physical-plans-in-spark-sql) to learn more.

In [109]:
with option_context(
        "compute.ops_on_diff_frames", False,
        "compute.default_index_type", 'distributed'):
    df = ks.range(10)
    df = df + df
    df.explain()

Now it uses the same DataFrame for the operations and avoids combining different DataFrames and triggering a sort merge join, which is enabled by `compute.ops_on_diff_frames`.

This operation is much cheaper than the previous one while producing the same output. Examine `DataFrame.explain()` to help improve your code efficiency.

## Caching DataFrame

`DataFrame.cache()` is a useful PySpark API and is available in Koalas as well. It is used to cache the output from a Koalas operation so that it would not need to be computed again in the subsequent execution. This would significantly improve the execution speed when the output needs to be accessed repeatedly.

In [112]:
with option_context("compute.default_index_type", 'distributed'):
    df = ks.range(10)
    new_df = (df + df).cache()  # `(df + df)` is cached here as `df`
    new_df.explain()

As the physical plan shows above, new_df will be cached once it is executed.

`InMemoryTableScan` and `InMemoryRelation` mean the new_df will be cached – it does not need to perform the same (df + df) operation when it is executed the next time.

A cached DataFrame can be uncached by `DataFrame.unpersist()`.

**Best Practice:** A cached DataFrame can be used in a context manager to ensure the cached scope against the DataFrame. It will be cached and uncached back within the with scope.