# API do Pandas, o antigo Koalas

Comumente usado por cientistas de dados, o **Pandas** fornece uma estrutura de dados facil, além de ferramentas para a análise de dados na linguagem Python. No entanto, o pandas não é dimensionado para Big Data.<br>
O Koalas preenche essa lacuna, fornecendo API equivalentes ao pandas que funcionam no Apache Spark. 
Koalas é útil não apenas para usuários de pandas, mas também para usuários do PySpark, porque ela oferece suporte a muitas tarefas difíceis de fazer com o PySpark, por exemplo, plotar dados diretamente de um PySpark DataFrame.

O **Koalas** está incluído em clusters que executam o Databricks Runtime 7.3 a 9.1. Para clusters que executam o Databricks Runtime 10.0 ou superior, use a **API Pandas no Spark** .

Para importar o **Koalas** podemos utilizar `import databricks.koalas as ks` <br>
Já para importar a **API do Pandas** no Spark, utilizamos `import pyspark.pandas as ps`

Como estamos utilizando um runtime de 10.4, vamos utilizar a versão da API, pois caso tentássemos rodar a versão do Koalas receberíamos um erro: `WARNING:root:Found pyspark version "3.2.1" caso tentássemos rodar a versão do Koalas receberíamos um erro:  The pyspark version 3.2 and above has a built-in "pandas APIs on Spark" module ported from Koalas. Try `import pyspark.pandas as ps` instead. `

## Principais funcionalidades

### Criação de Series e DataFrames

In [0]:
import numpy as np
import pandas as pd

import pyspark.pandas as ps



In [0]:
# Criando um Series do pandas
pandas = pd.Series([1, 3, 5, np.nan, 6, 8]) 

# Criando um Series com a API ( Kolas)
koalas = ps.Series([1, 3, 5, np.nan, 6, 8])

print ("pandas\n", pandas)
print('=============')
print ("Koalas\n", koalas)

pandas
 0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64
Koalas
 0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64


In [0]:
# Criando um Series no Koalas a partir de um Series no pandas
pandas_to_koalas = ps.from_pandas(pandas)
pandas_to_koalas

Out[3]: 0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

In [0]:
#OBS: como são números randomicos os valores do DF não serão o mesmos

# pandas DataFrame
pandas_df = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})


# koalas DataFrame
koalas_df = ps.DataFrame({'A': np.random.rand(5),
                     'B': np.random.rand(5)})

In [0]:
pandas_df

Unnamed: 0,A,B
0,0.816444,0.144277
1,0.022771,0.682784
2,0.382152,0.688586
3,0.710868,0.590294
4,0.599905,0.102739


In [0]:
koalas_df

Unnamed: 0,A,B
0,0.506934,0.424444
1,0.638653,0.042442
2,0.808824,0.412062
3,0.472404,0.266552
4,0.085098,0.547356


In [0]:
type(pandas_df)

Out[7]: pandas.core.frame.DataFrame

In [0]:
type(koalas_df)
# caso estivessemos utilizando a versão Koalas, 
# o nosso retorno seria : databricks.koalas.frame.DataFrame

Out[8]: pyspark.pandas.frame.DataFrame

In [0]:
# Criando um DataFrame koalas, a partir de um pandas DataFrame
pandas_to_koalas_df = ps.from_pandas(pandas_df)
pandas_to_koalas_df

Unnamed: 0,A,B
0,0.816444,0.144277
1,0.022771,0.682784
2,0.382152,0.688586
3,0.710868,0.590294
4,0.599905,0.102739


### Opções e configurações 
A API do Pandas no Spark possui um sistema de opções que permite personalizar alguns aspectos de seu comportamento, sendo as opções relacionadas à exibição dos dataframes.

As opções mais relevantes são:
- get_option() / set_option() - obter/definir o valor de uma única opção.
- reset_option() - redefinir as opções para o padrão

In [0]:
ps.get_option("display.max_rows")

Out[10]: 1000

In [0]:
ps.set_option("display.max_rows", 999)
ps.get_option("display.max_rows")

Out[11]: 999

In [0]:
ps.reset_option("display.max_rows")
ps.get_option("display.max_rows")

Out[12]: 1000

### Análisando os dados

In [0]:
koalas_df.head(2)

Unnamed: 0,A,B
0,0.506934,0.424444
1,0.638653,0.042442


In [0]:
koalas_df.describe()

Unnamed: 0,A,B
count,5.0,5.0
mean,0.502383,0.338571
std,0.268105,0.193162
min,0.085098,0.042442
25%,0.472404,0.266552
50%,0.506934,0.412062
75%,0.638653,0.424444
max,0.808824,0.547356


In [0]:
koalas_df.info()

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 5 entries, 0 to 4
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   Col_1   5 non-null      float64
 1   Col_2   5 non-null      float64
 2   Col_3   5 non-null      int64  
dtypes: float64(2), int64(1)

In [0]:
koalas_df.A.value_counts()

Out[15]: 0.506934    1
0.638653    1
0.808824    1
0.472404    1
0.085098    1
Name: A, dtype: int64

In [0]:
koalas_df.sort_values(by='B')

Unnamed: 0,A,B
1,0.638653,0.042442
3,0.472404,0.266552
2,0.808824,0.412062
0,0.506934,0.424444
4,0.085098,0.547356


In [0]:
koalas_df.transpose()

Unnamed: 0,0,1,2,3,4
A,0.506934,0.638653,0.808824,0.472404,0.085098
B,0.424444,0.042442,0.412062,0.266552,0.547356


### Seleção de dados

In [0]:
koalas_df['A']  

Out[18]: 0    0.506934
1    0.638653
2    0.808824
3    0.472404
4    0.085098
Name: A, dtype: float64

In [0]:
koalas_df.A

Out[19]: 0    0.506934
1    0.638653
2    0.808824
3    0.472404
4    0.085098
Name: A, dtype: float64

In [0]:
koalas_df[['A', 'B']]

Unnamed: 0,A,B
0,0.506934,0.424444
1,0.638653,0.042442
2,0.808824,0.412062
3,0.472404,0.266552
4,0.085098,0.547356


In [0]:
koalas_df.loc[1:2]

Unnamed: 0,A,B
1,0.638653,0.042442
2,0.808824,0.412062


In [0]:
koalas_df.iloc[:3, 1:2]

Unnamed: 0,B
0,0.424444
1,0.042442
2,0.412062


In [0]:
new_data = ps.Series([100, 200, 300, 400, 500], index=[0, 1, 2, 3, 4])

In [0]:
koalas_df['C'] = new_data

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
[0;32m<command-1801012934352295>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mkoalas_df[0m[0;34m[[0m[0;34m'C'[0m[0;34m][0m [0;34m=[0m [0mnew_data[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m                 [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     49[0m                 logger.log_success(
[1;32m   

esse erro ocorreu porque a API do Pandas não permite a adição de colunas provenientes de DF ou de Series em outro DF, visto que essa ação utiliza processos de junções (joins/merge) os quais, em geral, são caras 

Para habilitar essa opção deve utilizar o comando *compute.ops_on_diff_frames = True*

In [0]:
from pyspark.pandas.config import set_option, reset_option

set_option("compute.ops_on_diff_frames", True)
koalas_df['C'] = new_data
koalas_df


Unnamed: 0,A,B,C
0,0.506934,0.424444,100
1,0.638653,0.042442,200
2,0.808824,0.412062,300
3,0.472404,0.266552,400
4,0.085098,0.547356,500


In [0]:
koalas_df[koalas_df.C == 200]


Unnamed: 0,A,B,C
1,0.638653,0.042442,200


In [0]:
# Para voltar as configurações originais, é só utilizar o comando
reset_option("compute.ops_on_diff_frames")

In [0]:
# Renomeando as colunas 

koalas_df = koalas_df.rename(columns={'A': 'Col_1', 
                         'B': 'Col_2',
                         'C': 'Col_3'})

koalas_df

Unnamed: 0,Col_1,Col_2,Col_3
0,0.506934,0.424444,100
1,0.638653,0.042442,200
2,0.808824,0.412062,300
3,0.472404,0.266552,400
4,0.085098,0.547356,500


### Apply

Utilizando o método apply para aplicar as funções no DataFrame

In [0]:
koalas_df.apply(np.sum)

Out[39]: Col_1       2.511913
Col_2       1.692856
Col_3    1500.000000
dtype: float64

In [0]:
koalas_df.apply(np.cumsum)

Unnamed: 0,Col_1,Col_2,Col_3
0,0.506934,0.424444,100
1,1.145587,0.466886,300
2,1.954411,0.878948,600
3,2.426815,1.1455,1000
4,2.511913,1.692856,1500


In [0]:
koalas_df.apply(np.cumsum, axis=1)

Unnamed: 0,Col_1,Col_2,Col_3
0,0.506934,0.931379,100.931379
1,0.638653,0.681095,200.681095
2,0.808824,1.220886,301.220886
3,0.472404,0.738956,400.738956
4,0.085098,0.632454,500.632454


In [0]:
koalas_df.apply(lambda x: x ** 3)

Unnamed: 0,Col_1,Col_2,Col_3
0,0.130273,0.076465,1000000
1,0.260492,7.6e-05,8000000
2,0.529129,0.069966,27000000
3,0.105424,0.018938,64000000
4,0.000616,0.163987,125000000


In [0]:
def square(x):
    return x ** 2
  
koalas_df.apply(square)  

Unnamed: 0,Col_1,Col_2,Col_3
0,0.256983,0.180153,10000
1,0.407878,0.001801,40000
2,0.654196,0.169795,90000
3,0.223165,0.07105,160000
4,0.007242,0.299599,250000


### GroupBy

Podemos utilizar agregações assim como no pandas

In [0]:
koalas_df.groupby('Col_1').sum()

Unnamed: 0_level_0,Col_2,Col_3
Col_1,Unnamed: 1_level_1,Unnamed: 2_level_1
0.506934,0.424444,100
0.638653,0.042442,200
0.808824,0.412062,300
0.472404,0.266552,400
0.085098,0.547356,500


In [0]:
koalas_df.groupby(['Col_1', 'Col_2']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,Col_3
Col_1,Col_2,Unnamed: 2_level_1
0.506934,0.424444,100
0.638653,0.042442,200
0.808824,0.412062,300
0.472404,0.266552,400
0.085098,0.547356,500


vamos criar um novo dataFrame  e aplicar o mesmo agrupamento para ficar mais claro esse processo

In [0]:
cursos   = ({
    'curso':["Spark","PySpark","Hadoop","Python","Pandas","Hadoop","Spark","Python","NA"],
    'num_aluno' :[22000,25000,23000,24000,26000,25000,25000,22000,1500],
    'duracao':[30,50,55,40,60,35,30,50,40]
          })


df = ps.DataFrame(cursos)
display(df)

curso,num_aluno,duracao
Spark,22000,30
PySpark,25000,50
Hadoop,23000,55
Python,24000,40
Pandas,26000,60
Hadoop,25000,35
Spark,25000,30
Python,22000,50
,1500,40


In [0]:
df2 = df.groupby(['curso']).sum()
display(df2)

num_aluno,duracao
47000,60
25000,50
48000,90
46000,90
26000,60
1500,40


In [0]:
df3 = df.groupby(['curso']).mean()
display(df3)

num_aluno,duracao
23500.0,30.0
25000.0,50.0
24000.0,45.0
23000.0,45.0
26000.0,60.0
1500.0,40.0


In [0]:
df4= df.groupby("curso").apply(lambda x: x * 2)
display(df4)

curso,num_aluno,duracao
SparkSpark,44000,60
PySparkPySpark,50000,100
HadoopHadoop,46000,110
PythonPython,48000,80
PandasPandas,52000,120
HadoopHadoop,50000,70
SparkSpark,50000,60
PythonPython,44000,100
NANA,3000,80


In [0]:
display(df)

curso,num_aluno,duracao
Spark,22000,30
PySpark,25000,50
Hadoop,23000,55
Python,24000,40
Pandas,26000,60
Hadoop,25000,35
Spark,25000,30
Python,22000,50
,1500,40


In [0]:
display(df)

curso,num_aluno,duracao
Spark,22000,30
PySpark,25000,50
Hadoop,23000,55
Python,24000,40
Pandas,26000,60
Hadoop,25000,35
Spark,25000,30
Python,22000,50
,1500,40


### SQL

Uma das funcionalidades que temos na API do Pandas é realizar querys assim como fazemos no PySpark

Para compreendermos como usar o SQL com o koalas, vamos criar dois dataFrames, depois vamos explorar cada um deles  e por fim unir os dois

In [0]:
df_1 = ps.DataFrame({'ano': [1990, 1997, 2003, 2009, 2014],
                     'porco': [20, 18, 489, 675, 1776],
                     'cavalo': [4, 25, 281, 600, 1900]})
df_1

Unnamed: 0,ano,porco,cavalo
0,1990,20,4
1,1997,18,25
2,2003,489,281
3,2009,675,600
4,2014,1776,1900


In [0]:
df_2 = ps.DataFrame({'ano': [1990, 1997, 2003, 2009, 2014],
                    'gato': [22, 50, 121, 445, 791],
                    'cachorro': [250, 326, 589, 1241, 2118]})
df_2

Unnamed: 0,ano,gato,cachorro
0,1990,22,250
1,1997,50,326
2,2003,121,589
3,2009,445,1241
4,2014,791,2118


In [0]:
# explorando o df_1

ps.sql("SELECT * FROM df_1 WHERE ano = '2014' ")

In [0]:
# explorando o df_1 da forma clássica

ps.sql("SELECT * FROM df_1 WHERE ano = '2014' ")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1902417128262575>[0m in [0;36m<cell line: 3>[0;34m()[0m
[1;32m      1[0m [0;31m# explorando o df_1[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;34m[0m[0m
[0;32m----> 3[0;31m [0mps[0m[0;34m.[0m[0msql[0m[0;34m([0m[0;34m"SELECT * FROM df_1 WHERE ano = '2014' "[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m                 [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0

In [0]:
# explorando o df_1 da forma clássica

ps.sql("SELECT * FROM {df_1} WHERE ano = '2014' ", df_1=df_1)

Unnamed: 0,ano,porco,cavalo
0,2014,1776,1900


In [0]:
# explorando o df_2

ps.sql("SELECT * FROM {df_2} WHERE gato > 250 ", df_2=df_2)

Unnamed: 0,ano,gato,cachorro
0,2009,445,1241
1,2014,791,2118


In [0]:
# unindo os dois df

ps.sql ('''
SELECT 
a.porco,
a.cavalo,
b.gato,
b.cachorro

FROM {df_1} a
INNER JOIN {df_2} b
ON a.ano = b.ano
''', df_1 = df_1, df_2 = df_2 )

Unnamed: 0,porco,cavalo,gato,cachorro
0,20,4,22,250
1,18,25,50,326
2,489,281,121,589
3,675,600,445,1241
4,1776,1900,791,2118


In [0]:
# passando a união dos DFs para uma variável
df_3 = ps.sql ('''
SELECT 
a.porco,
a.cavalo,
b.gato,
b.cachorro

FROM {df_1} a
INNER JOIN {df_2} b
ON a.ano = b.ano
''', df_1 = df_1, df_2 = df_2 )

df_3

Unnamed: 0,porco,cavalo,gato,cachorro
0,20,4,22,250
1,18,25,50,326
2,489,281,121,589
3,675,600,445,1241
4,1776,1900,791,2118


In [0]:
df_3.describe()

Unnamed: 0,porco,cavalo,gato,cachorro
count,5.0,5.0,5.0,5.0
mean,595.6,562.0,285.8,904.8
std,720.407732,785.872445,328.984346,782.366091
min,18.0,4.0,22.0,250.0
25%,20.0,25.0,50.0,326.0
50%,489.0,281.0,121.0,589.0
75%,675.0,600.0,445.0,1241.0
max,1776.0,1900.0,791.0,2118.0


Podemos também  fazer as operações  como verificar o valor minimo, máximo, média, dentre outros

In [0]:
ps.sql ('''
SELECT 
min (porco),
avg(cavalo)
FROM {df_1} 
''', df_1 = df_1 )

Unnamed: 0,min(porco),avg(cavalo)
0,18,562.0


In [0]:
# com alias
ps.sql ('''
SELECT 
min (porco) as min_pig,
avg(cavalo) as media_horse
FROM {df_1} 
''', df_1 = df_1 )

Unnamed: 0,min_pig,media_horse
0,18,562.0


In [0]:
df_4 = ps.DataFrame({"frutas": ["banana", "goiaba", "laranja", "banana", "uva", "laranja", "banana"]})
df_4

Unnamed: 0,frutas
0,banana
1,goiaba
2,laranja
3,banana
4,uva
5,laranja
6,banana


In [0]:
ps.sql ('''
SELECT 
distinct frutas
FROM {df_4} 
''', df_4 = df_4 )

Unnamed: 0,frutas
0,banana
1,goiaba
2,laranja
3,uva


In [0]:
ps.sql ('''
SELECT 
count (distinct frutas) as qtd
FROM {df_4} 
''', df_4 = df_4 )


Unnamed: 0,qtd
0,4


### Plotting

In [0]:
speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]
index = ['caracol', 'porco', 'elefante',
         'coelho', 'girafa', 'coiote', 'cavalo']
df = ps.DataFrame({'velocidade': speed,
                     'tempo_de_vida': lifespan}, index=index)
df.plot.bar()

In [0]:
df.plot.barh()

In [0]:
df_2 = ps.DataFrame({'massa': [0.330, 4.87, 5.97],
                     'raio': [2439.7, 6051.8, 6378.1]},
                    index=['Mercurio', 'Venus', 'Terra'])
df_2.plot.pie(y='massa')

In [0]:
df_3 = ps.DataFrame({
    'vendas': [3, 2, 3, 9, 10, 6, 3],
    'inscricoes': [5, 5, 6, 12, 14, 13, 9],
    'visitas': [20, 42, 28, 62, 81, 50, 90],
}, index=pd.date_range(start='2019/08/15', end='2020/03/09',
                       freq='M'))
df_3.plot.area()

In [0]:
# mesmo fd_1 que criamos para testar SQL

df_1 = ps.DataFrame({'ano': [1990, 1997, 2003, 2009, 2014],
                     'porco': [20, 18, 489, 675, 1776],
                     'cavalo': [4, 25, 281, 600, 1900]})
df_1.plot.line()

In [0]:
# plotando a partir de um df em pandas

df_pandas = pd.DataFrame(
    np.random.randint(1, 7, 6000),
    columns=['one'])
df_pandas['two'] = df_pandas['one'] + np.random.randint(1, 7, 6000)
df_koalas = ps.from_pandas(df_pandas)
df_koalas.plot.hist(bins=12, alpha=0.5)

In [0]:
df_5 = ps.DataFrame([[5.1, 3.5, 0], [4.9, 3.0, 0], [7.0, 3.2, 1],
                    [6.4, 3.2, 1], [5.9, 3.0, 2]],
                   columns=['length', 'width', 'species'])


df_5 .plot.scatter(x='length',
                  y='width',
                  c='species')

### Convertendo o DataFrame entre Koalas e PySpark

- para converte de koalas para PySpark : *df.to_spark()* <br>
- para converte de PySpark para koalas : *df.pandas_api()*

In [0]:
df_koalas = ps.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
type(df_koalas)

Out[4]: pyspark.pandas.frame.DataFrame

In [0]:
# Koalas para PySpark
df_spark = df_koalas.to_spark()
type(df_spark)

Out[5]: pyspark.sql.dataframe.DataFrame

In [0]:
df_spark.show()

+---+---+
|  A|  B|
+---+---+
|  1| 10|
|  2| 20|
|  3| 30|
|  4| 40|
|  5| 50|
+---+---+



In [0]:
# PySpark para Koalas
# método mais visto na internet, porém está depreciado

df_koalas = df_spark.to_pandas_on_spark()
type(df_koalas)


Out[11]: pyspark.pandas.frame.DataFrame

In [0]:
# PySpark para Koalas, método 2 (novo)
df_koalas= df_spark.pandas_api()
type(df_koalas)

Out[8]: pyspark.pandas.frame.DataFrame

In [0]:
df_koalas.head()

Unnamed: 0,A,B
0,1,10
1,2,20
2,3,30
3,4,40
4,5,50


Ao converter spark para koalas é possivel definir uma coluna como index

In [0]:
df = df_spark.pandas_api(index_col='A')
df

Unnamed: 0_level_0,B
A,Unnamed: 1_level_1
1,10
2,20
3,30
4,40
5,50


Já na conversão de kolas para spark não é possivel

In [0]:
df = df_koalas.to_spark(index_col='A')
df

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
[0;32m<command-3387341362531963>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdf[0m [0;34m=[0m [0mdf_koalas[0m[0;34m.[0m[0mto_spark[0m[0;34m([0m[0mindex_col[0m[0;34m=[0m[0;34m'A'[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0mdf[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m                 [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m

Esse erro ocorre devido as configurações de indexação do spark. 

Quando é criado um DataFrame com a API do Pandas no Spark (Koalas), são criados um “frame interno” e um DataFrame PySpark também, como mostrado na imagem <br>
![](https://miro.medium.com/max/1400/0*DydLJYVJhaufUYtL.webp)

Fonte: https://medium.com/@courgeonpierre/how-to-run-pandas-code-on-spark-4a136f4f3e37

O frame interno é que fornecerá as conversões entre os DF Koalas e PySpark. Além disso, manterá os metadados como o mapeamento de coluna entre os dois dataframes ou o índice. E assim, permite que possamos usar as funcionalidades do Pandas que não são suportadas pelos dataframes do PySpark, como:
 - Sintaxe mutável: para que você não precise criar um novo dataframe toda vez que quiser modificá-lo;
 - Índice sequencial: para que você possa usar as funcionalidades do Pandas com base em um índice
 - Pandas dtypes



Para saber mais: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type