-sandbox
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://raw.githubusercontent.com/databricks/koalas/master/Koalas-logo.png" width="220"/>
</div>

## Koalas vs. Pandas vs. Spark vs. PySpark

The Koalas project makes data scientists more productive when interacting with big data, by implementing the pandas DataFrame API on top of Apache Spark. By unifying the two ecosystems with a familiar API, Koalas offers a seamless transition between small and large data.

**Goals of this notebook:**
* Demonstrate the similarities of the Koalas API with the pandas API
* Understand the differences in syntax for the same DataFrame operations in Koalas vs PySpark


[Koalas Github](https://github.com/databricks/koalas)

**Requirements:**
* `DBR 6.0 ML`
* `koalas==0.20.0` 

**Data:**
* *[Moro et al., 2014] S. Moro, P. Cortez and P. Rita. A Data-Driven Approach to Predict the Success of Bank Telemarketing. Decision Support Systems, Elsevier, 62:22-31, June 2014*

We will be using the [UCI Machine Learning Repository 
Bank Marketing Data Set](https://archive.ics.uci.edu/ml/datasets/bank+marketing) throughout this demo.

In [3]:
%sh wget https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip -O /tmp/bank.zip

In [4]:
%sh unzip -o /tmp/bank.zip -d /tmp/bank

In [5]:
file_path = "/bank-full.csv"
dbutils.fs.cp("file:/tmp/bank/bank-full.csv", file_path)
dbutils.fs.head(file_path)

In [6]:
%fs ls bank-full.csv

path,name,size
dbfs:/bank-full.csv,bank-full.csv,4610348


### Loading the dataset as a Spark Dataframe

In [8]:
dbutils.library.installPyPI("koalas")
dbutils.library.restartPython()

In [9]:
df = (spark.read
           .option("inferSchema", "true")
           .option("header", "true")
           .option("delimiter", ";")
           .option("quote", '"')
           .csv("/bank-full.csv"))

display(df)

age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no
35,management,married,tertiary,no,231,yes,no,unknown,5,may,139,1,-1,0,unknown,no
28,management,single,tertiary,no,447,yes,yes,unknown,5,may,217,1,-1,0,unknown,no
42,entrepreneur,divorced,tertiary,yes,2,yes,no,unknown,5,may,380,1,-1,0,unknown,no
58,retired,married,primary,no,121,yes,no,unknown,5,may,50,1,-1,0,unknown,no
43,technician,single,secondary,no,593,yes,no,unknown,5,may,55,1,-1,0,unknown,no


### Loading the dataset as a pandas Dataframe

In [11]:
import pandas as pd

csv_path = "/dbfs/bank-full.csv"

# Read in using pandas read_csv
pdf = pd.read_csv(csv_path, header=0, sep=";", quotechar='"')
display(pdf.head())

age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no


### Loading the dataset as a Koalas Dataframe

In [13]:
# Import Koalas
import databricks.koalas as ks
import warnings
warnings.filterwarnings("ignore")

# Read in using Koalas read_csv
kdf = ks.read_csv("/bank-full.csv", header=0, sep=";", quotechar='"')

display(kdf.head())

In [14]:
# Converting to Koalas Dataframe from Spark DataFrame

# Creating a Koalas DataFrame from PySpark DataFrame
# kdf = ks.DataFrame(df)

# # Alternative way of creating a Koalas DataFrame from PySpark DataFrame
# kdf = df.to_koalas()

In [15]:
%SQL

SELECT * FROM 

Note that calling `.head()` in Koalas may not return return the same results as pandas here. Unlike pandas, the data in a Spark dataframe is not ordered - it has no intrinsic notion of index. When asked for the head of a DataFrame, Spark will just take the requested number of rows from a partition. Do not rely on it to return specific rows, instead use `.loc` or `iloc`.

### Indexing Rows

In [18]:
pdf.iloc[3]

In [19]:
# kdf.iloc[3]

Using a scalar integer for row selection is not allowed in Koalas, instead we must supply either a *slice* object or boolean condition.

In [21]:
kdf.iloc[:4]

In [22]:
display(df.limit(4))

In [23]:
# Koalas Dataframe -> PySpark DataFrame
display(kdf.to_spark())

In [24]:
# Getting the number of rows and columns in PySpark
print((df.count(), len(df.columns)))

In [25]:
# Getting the number of rows and columns in Koalas
print(kdf.shape)

### Column Manipulation

Let's have a look at some column operations. Suppose we want to create a new column, where each last contact duration (the `duration` column) is 100 greater than the original duration entry. We will call this new column `duration_new`.

In [28]:
# Creating a column with PySpark
from pyspark.sql.functions import col

df = df.withColumn("duration_new", col("duration") + 100)
display(df)

In [29]:
# Creating a column with Koalas
kdf["duration_new"] = kdf["duration"] + 100
display(kdf.head())

###Filtering

Let's now count the number of instances where `duration_new` is greater than or equal to 300.

In [32]:
# Filtering with PySpark
df_filtered  = df.filter(col("duration_new") >= 300)
print(df_filtered.count())

In [33]:
# Filtering with Koalas
kdf_filtered = kdf[kdf.duration_new >= 300]
print(kdf_filtered.shape[0])

### Value Counts

Suppose we want to have a look at the number of clients for each unique job type.

In [36]:
# To get value counts of the different job types with PySpark
display(df.groupby("job").count().orderBy("count", ascending=False))

In [37]:
# Value counts in Koalas
kdf["job"].value_counts()

###GroupBy

Let's compare group by operations in PySpark versus Koalas. We will create two DataFrames grouped by education, to get the average `age` and maximum `balance` for each education group.

In [40]:
# Get average age per education group using PySpark
df_grouped_1 = (df.groupby("education")
                .agg({"age": "mean"})
                .select("education", col("avg(age)").alias("avg_age")))

display(df_grouped_1)

In [41]:
# Get the maximum balance for each education group using PySpark
df_grouped_2 = (df.groupby("education")
                .agg({"balance": "max"})
                .select("education", col("max(balance)").alias("max_balance")))

display(df_grouped_2)

In [42]:
# Get the average age per education group in Koalas
kdf_grouped_1 = kdf.groupby("education", as_index=False).agg({"age": "mean"})

# Rename our columns
kdf_grouped_1.columns = ["education", "avg_age"]
display(kdf_grouped_1)

In [43]:
# Get the maximum balance for each education group in Koalas
kdf_grouped_2 = kdf.groupby("education", as_index=False).agg({"balance": "max"})
kdf_grouped_2.columns = ["education", "max_balance"]
display(kdf_grouped_2)

### Joins

Let's now look at doing an inner join between our grouped DataFrames, on the `education` attribute.

In [46]:
# Joining the grouped DataFrames on education using PySpark
df_edu_joined = df_grouped_1.join(df_grouped_2, on="education", how="inner")
display(df_edu_joined)

In [47]:
# Joining the grouped DataFrames on education using Koalas
kdf_edu_joined = kdf_grouped_1.merge(kdf_grouped_2, on="education", how="inner")
display(kdf_edu_joined)

### Writing Data

Finally, let's save our joined DataFrames as Parquet files. [Parquet](https://parquet.apache.org/) is an efficient and compact file format to read and write faster.

In [50]:
# Saving the Spark DataFrame as a Parquet file.
spark_out_path = "/dbfs/bank_grouped_pyspark.parquet"

df_edu_joined.write.mode("overwrite").parquet(spark_out_path)

In [51]:
# Saving the Koalas DataFrame as a Parquet file.
koalas_out_path = "/dbfs/bank_grouped_koalas.parquet"

kdf.to_parquet(koalas_out_path, mode="overwrite")