# Cloud Workshop Azure Databricks
## 15. Databricks Koalas

<img src="https://raw.githubusercontent.com/retkowsky/images/master/AzureDatabricksLogo.jpg"><br>
V1.4 06/07/2020

-sandbox
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://raw.githubusercontent.com/databricks/koalas/master/Koalas-logo.png" width="220"/>
</div>

The Koalas project makes data scientists more productive when interacting with big data, by implementing the pandas DataFrame API on top of Apache Spark. By unifying the two ecosystems with a familiar API, Koalas offers a seamless transition between small and large data.

**Goals of this notebook:**
* Demonstrate the similarities of the Koalas API with the pandas API
* Understand the differences in syntax for the same DataFrame operations in Koalas vs PySpark

[Koalas Docs](https://koalas.readthedocs.io/en/latest/index.html)

[Koalas Github](https://github.com/databricks/koalas)

## Koalas: pandas API on Apache Spark
The Koalas project makes data scientists more productive when interacting with big data, by implementing the pandas DataFrame API on top of Apache Spark. pandas is the de facto standard (single-node) DataFrame implementation in Python, while Spark is the de facto standard for big data processing. With this package, you can:

Be immediately productive with Spark, with no learning curve, if you are already familiar with pandas.

Have a single codebase that works both with pandas (tests, smaller datasets) and with Spark (distributed datasets).

In [4]:
import sys
sys.version

In [5]:
import datetime
now = datetime.datetime.now()
print(now)

In [6]:
#Â Install Koalas
dbutils.library.installPyPI("koalas")
dbutils.library.restartPython()

The dataset used for this example is Bank marketing. Given a set of features about a customer can we predict whether the person will open a term deposit account.

Original Source: [UCI Machine Learning Repository 
Bank Marketing Data Set](https://archive.ics.uci.edu/ml/datasets/bank+marketing)
[Moro et al., 2014] S. Moro, P. Cortez and P. Rita. A Data-Driven Approach to Predict the Success of Bank Telemarketing. Decision Support Systems, Elsevier, 62:22-31, June 2014

In [8]:
file_path = "dbfs:/BankMarketing/bank/bank-full.csv"
dbutils.fs.head(file_path)

### Loading the dataset as a Spark Dataframe

In [10]:
df = (spark.read
           .option("inferSchema", "true")
           .option("header", "true")
           .option("delimiter", ";")
           .option("quote", '"')
           .csv(file_path))

display(df)

age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no
35,management,married,tertiary,no,231,yes,no,unknown,5,may,139,1,-1,0,unknown,no
28,management,single,tertiary,no,447,yes,yes,unknown,5,may,217,1,-1,0,unknown,no
42,entrepreneur,divorced,tertiary,yes,2,yes,no,unknown,5,may,380,1,-1,0,unknown,no
58,retired,married,primary,no,121,yes,no,unknown,5,may,50,1,-1,0,unknown,no
43,technician,single,secondary,no,593,yes,no,unknown,5,may,55,1,-1,0,unknown,no


### Loading the dataset as a pandas Dataframe

In [12]:
import pandas as pd

csv_path = "/dbfs/BankMarketing/bank/bank-full.csv"

# Read in using pandas read_csv
pdf = pd.read_csv(csv_path, header=0, sep=";", quotechar='"')
display(pdf.head())

age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no


### Loading the dataset as a Koalas Dataframe

In [14]:
#Â Import Koalas
import databricks.koalas as ks
import warnings
warnings.filterwarnings("ignore")

# Read in using Koalas read_csv
kdf = ks.read_csv(file_path, header=0, sep=";", quotechar='"')

display(kdf.head())

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
0,58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
1,44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
2,33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
3,47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
4,33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no


In [15]:
display(kdf.describe())

Unnamed: 0,age,balance,day,duration,campaign,pdays,previous
count,45211.0,45211.0,45211.0,45211.0,45211.0,45211.0,45211.0
mean,40.93621,1362.272058,15.806419,258.16308,2.763841,40.197828,0.580323
std,10.618762,3044.765829,8.322476,257.527812,3.098021,100.128746,2.303441
min,18.0,-8019.0,1.0,0.0,1.0,-1.0,0.0
25%,33.0,72.0,8.0,103.0,1.0,-1.0,0.0
50%,39.0,448.0,16.0,180.0,2.0,-1.0,0.0
75%,48.0,1427.0,21.0,319.0,3.0,-1.0,0.0
max,95.0,102127.0,31.0,4918.0,63.0,871.0,275.0


In [16]:
# Converting to Koalas Dataframe from Spark DataFrame

# Creating a Koalas DataFrame from PySpark DataFrame
# kdf = ks.DataFrame(df)

# # Alternative way of creating a Koalas DataFrame from PySpark DataFrame
kdf = df.to_koalas()

Note that calling `.head()` in Koalas may not return return the same results as pandas here. Unlike pandas, the data in a Spark dataframe is not ordered - it has no intrinsic notion of index. When asked for the head of a DataFrame, Spark will just take the requested number of rows from a partition. Do not rely on it to return specific rows, instead use `.loc` or `iloc`.

### Indexing Rows

In [19]:
pdf.iloc[:3]

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
0,58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
1,44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
2,33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no


In [20]:
kdf.iloc[3]

In [21]:
kdf.iloc[:4]

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
0,58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
1,44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
2,33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
3,47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no


In [22]:
display(df.limit(4))

age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no


In [23]:
# Koalas Dataframe -> PySpark DataFrame
display(kdf.to_spark())

age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no
35,management,married,tertiary,no,231,yes,no,unknown,5,may,139,1,-1,0,unknown,no
28,management,single,tertiary,no,447,yes,yes,unknown,5,may,217,1,-1,0,unknown,no
42,entrepreneur,divorced,tertiary,yes,2,yes,no,unknown,5,may,380,1,-1,0,unknown,no
58,retired,married,primary,no,121,yes,no,unknown,5,may,50,1,-1,0,unknown,no
43,technician,single,secondary,no,593,yes,no,unknown,5,may,55,1,-1,0,unknown,no


In [24]:
# Getting the number of rows and columns in PySpark
print((df.count(), len(df.columns)))

In [25]:
# Getting the number of rows and columns in Koalas
print(kdf.shape)

### Column Manipulation

Let's have a look at some column operations. Suppose we want to create a new column, where each last contact duration (the `duration` column) is 100 greater than the original duration entry. We will call this new column `duration_new`.

In [28]:
# Creating a column with PySpark
from pyspark.sql.functions import col

df = df.withColumn("duration_new", col("duration") + 100)
display(df)

age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y,duration_new
58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no,361
44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no,251
33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no,176
47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no,192
33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no,298
35,management,married,tertiary,no,231,yes,no,unknown,5,may,139,1,-1,0,unknown,no,239
28,management,single,tertiary,no,447,yes,yes,unknown,5,may,217,1,-1,0,unknown,no,317
42,entrepreneur,divorced,tertiary,yes,2,yes,no,unknown,5,may,380,1,-1,0,unknown,no,480
58,retired,married,primary,no,121,yes,no,unknown,5,may,50,1,-1,0,unknown,no,150
43,technician,single,secondary,no,593,yes,no,unknown,5,may,55,1,-1,0,unknown,no,155


In [29]:
# Creating a column with Koalas
kdf["duration_new"] = kdf["duration"] + 100
display(kdf.head())

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y,duration_new
0,58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no,361
1,44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no,251
2,33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no,176
3,47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no,192
4,33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no,298


###Filtering

Let's now count the number of instances where `duration_new` is greater than or equal to 300.

In [32]:
# Filtering with PySpark
df_filtered  = df.filter(col("duration_new") >= 300)
print(df_filtered.count())

In [33]:
# Filtering with Koalas
kdf_filtered = kdf[kdf.duration_new >= 300]
print(kdf_filtered.shape[0])

### Value Counts

Suppose we want to have a look at the number of clients for each unique job type.

In [36]:
# To get value counts of the different job types with PySpark
display(df.groupby("job").count().orderBy("count", ascending=False))

job,count
blue-collar,9732
management,9458
technician,7597
admin.,5171
services,4154
retired,2264
self-employed,1579
entrepreneur,1487
unemployed,1303
housemaid,1240


In [37]:
# Value counts in Koalas
kdf["job"].value_counts()

###GroupBy

Let's compare group by operations in PySpark versus Koalas. We will create two DataFrames grouped by education, to get the average `age` and maximum `balance` for each education group.

In [40]:
# Get average age per education group using PySpark
df_grouped_1 = (df.groupby("education")
                .agg({"age": "mean"})
                .select("education", col("avg(age)").alias("avg_age")))

display(df_grouped_1)

education,avg_age
unknown,44.51050080775444
tertiary,39.59363957597173
secondary,39.964270321524005
primary,45.86556707050066


In [41]:
# Get the maximum balance for each education group using PySpark
df_grouped_2 = (df.groupby("education")
                .agg({"balance": "max"})
                .select("education", col("max(balance)").alias("max_balance")))

display(df_grouped_2)

education,max_balance
unknown,64343
tertiary,102127
secondary,81204
primary,71188


In [42]:
# Get the average age per education group in Koalas
kdf_grouped_1 = kdf.groupby("education", as_index=False).agg({"age": "mean"})

# Rename our columns
kdf_grouped_1.columns = ["education", "avg_age"]
display(kdf_grouped_1)

Unnamed: 0,education,avg_age
0,unknown,44.510501
1,tertiary,39.59364
2,secondary,39.96427
3,primary,45.865567


In [43]:
# Get the maximum balance for each education group in Koalas
kdf_grouped_2 = kdf.groupby("education", as_index=False).agg({"balance": "max"})
kdf_grouped_2.columns = ["education", "max_balance"]
display(kdf_grouped_2)

Unnamed: 0,education,max_balance
0,unknown,64343
1,tertiary,102127
2,secondary,81204
3,primary,71188


### Joins

Let's now look at doing an inner join between our grouped DataFrames, on the `education` attribute.

In [46]:
# Joining the grouped DataFrames on education using PySpark
df_edu_joined = df_grouped_1.join(df_grouped_2, on="education", how="inner")
display(df_edu_joined)

education,avg_age,max_balance
unknown,44.51050080775444,64343
tertiary,39.59363957597173,102127
secondary,39.964270321524005,81204
primary,45.86556707050066,71188


In [47]:
# Joining the grouped DataFrames on education using Koalas
kdf_edu_joined = kdf_grouped_1.merge(kdf_grouped_2, on="education", how="inner")
display(kdf_edu_joined)

Unnamed: 0,education,avg_age,max_balance
0,unknown,44.510501,64343
1,tertiary,39.59364,102127
2,secondary,39.96427,81204
3,primary,45.865567,71188
