









# Apache Spark Koalas

Este notebook contiene las funciones principales de Koalas, obtenidas de la documentación oficial de https://koalas.readthedocs.io/

In [30]:
!pip install plotly



In [4]:
import sys
sys.executable

'c:\\users\\user\\appdata\\local\\programs\\python\\python39\\python.exe'

In [9]:
import findspark
findspark.init()

import pandas as pd
import pyspark.pandas as ps

import pandas as pd
import numpy as np
import databricks.koalas as ks
from pyspark.sql import SparkSession

### 1. Creación de objetos



Creando una serie Koalas pasando una lista de valores, permitiendo que Koalas cree un índice entero predeterminado:

In [11]:
s = ks.Series([1, 3, 5, np.nan, 6, 8])

  for name, col in reset_index.iteritems():
  [


In [12]:
s

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

Creando un Koalas DataFrame pasando un dict de objetos que se pueden convertir a series.

In [9]:
kdf = ks.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

In [10]:
kdf

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


Creando un DataFrame de pandas pasando una matriz numpy, con un índice de fecha y hora y columnas etiquetadas:

In [13]:
dates = pd.date_range('20130101', periods=6)

In [14]:
dates

DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')

In [15]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))

In [16]:
pdf

Unnamed: 0,A,B,C,D
2013-01-01,-1.318658,1.965193,0.483052,-0.906627
2013-01-02,-1.34724,-1.588059,0.693082,1.858337
2013-01-03,0.754954,-1.161812,0.336966,-0.052354
2013-01-04,-2.054697,-2.375747,-2.773655,0.264325
2013-01-05,-1.824353,1.253567,-0.451625,2.657719
2013-01-06,1.78004,1.413047,-0.647377,-1.162996


Ahora, este DataFrame de pandas se puede convertir en un DataFrame de Koalas

In [17]:
kdf = ks.from_pandas(pdf)

  for name, col in reset_index.iteritems():
  [


In [18]:
type(kdf)

databricks.koalas.frame.DataFrame

Sin embargo, se ve y se comporta igual que un DataFrame de pandas

In [19]:
kdf

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D
2013-01-01,-1.318658,1.965193,0.483052,-0.906627
2013-01-02,-1.34724,-1.588059,0.693082,1.858337
2013-01-03,0.754954,-1.161812,0.336966,-0.052354
2013-01-04,-2.054697,-2.375747,-2.773655,0.264325
2013-01-05,-1.824353,1.253567,-0.451625,2.657719
2013-01-06,1.78004,1.413047,-0.647377,-1.162996


Además, es posible crear un **Koalas DataFrame desde Spark DataFrame**.

Creando un Spark DataFrame a partir de pandas DataFrame

In [23]:
spark = SparkSession.builder.getOrCreate()

In [24]:
sdf = spark.createDataFrame(pdf)

In [25]:
sdf.show() #mas dificiles de manipular

+-------------------+--------------------+-------------------+--------------------+
|                  A|                   B|                  C|                   D|
+-------------------+--------------------+-------------------+--------------------+
| 0.7821812376306791|  1.5610120866499508|0.41084488738945946|  0.7236737771474189|
|-1.0691913868131395|  2.0639362451398324| 0.9959679441175036|   1.340934944903627|
| -2.143607781674943|  1.3639139859258136|-0.8821316142367005|  0.8372436718492526|
|  1.314554006084135|-0.08445918861182933|0.25954690193030433| -1.2457420715961245|
|-1.3651083501150043|   1.045862077220967| 0.7071005819221372|-0.15400317814980163|
| -1.819903879710413|  0.5928358226785815| 0.1514616541958605| -1.1552653517201044|
+-------------------+--------------------+-------------------+--------------------+



Creando Koalas DataFrame desde Spark DataFrame.
`to_koalas ()` se adjunta automáticamente a Spark DataFrame y está disponible como una API cuando se importa Koalas.

In [26]:
kdf = sdf.to_koalas()

In [27]:
kdf

Unnamed: 0,A,B,C,D
0,0.782181,1.561012,0.410845,0.723674
1,-1.069191,2.063936,0.995968,1.340935
2,-2.143608,1.363914,-0.882132,0.837244
3,1.314554,-0.084459,0.259547,-1.245742
4,-1.365108,1.045862,0.707101,-0.154003
5,-1.819904,0.592836,0.151462,-1.155265


Tiene [dtypes] específicos. Actualmente se admiten los tipos que son comunes a Spark y pandas.

In [28]:
kdf.dtypes

A    float64
B    float64
C    float64
D    float64
dtype: object

### 2. Manipulación de datos


A diferencia de los pandas, los datos en un dataframe de datos de Spark no están _ordenados_, no tienen una noción intrínseca de índice. Cuando se le solicite el encabezado, Spark solo tomará el número solicitado de filas de una partición. **No hay que utilizar el df de Koalas para devolver filas específicas**, use `.loc` o` iloc` en su lugar.

In [29]:
kdf.head()

Unnamed: 0,A,B,C,D
0,0.782181,1.561012,0.410845,0.723674
1,-1.069191,2.063936,0.995968,1.340935
2,-2.143608,1.363914,-0.882132,0.837244
3,1.314554,-0.084459,0.259547,-1.245742
4,-1.365108,1.045862,0.707101,-0.154003


Muestre el índice, las columnas y los datos numéricos subyacentes.

También puede recuperar el índice; la columna de índice se puede atribuir a un DataFrame, ver más adelante

In [30]:
kdf.index

Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')

In [31]:
kdf.columns

Index(['A', 'B', 'C', 'D'], dtype='object')

In [32]:
kdf.to_numpy() #pasamos a un array

array([[ 0.78218124,  1.56101209,  0.41084489,  0.72367378],
       [-1.06919139,  2.06393625,  0.99596794,  1.34093494],
       [-2.14360778,  1.36391399, -0.88213161,  0.83724367],
       [ 1.31455401, -0.08445919,  0.2595469 , -1.24574207],
       [-1.36510835,  1.04586208,  0.70710058, -0.15400318],
       [-1.81990388,  0.59283582,  0.15146165, -1.15526535]])

**Describe** muestra un resumen estadístico rápido de sus datos

In [33]:
kdf.describe()

Unnamed: 0,A,B,C,D
count,6.0,6.0,6.0,6.0
mean,-0.716846,1.090517,0.273798,0.057807
std,1.426215,0.758143,0.644888,1.087471
min,-2.143608,-0.084459,-0.882132,-1.245742
25%,-1.819904,0.592836,0.151462,-1.155265
50%,-1.365108,1.045862,0.259547,-0.154003
75%,0.782181,1.561012,0.707101,0.837244
max,1.314554,2.063936,0.995968,1.340935


Transposición de sus datos

In [34]:
kdf.T #transponemos los datos

Unnamed: 0,0,1,2,3,4,5
A,0.782181,-1.069191,-2.143608,1.314554,-1.365108,-1.819904
B,1.561012,2.063936,1.363914,-0.084459,1.045862,0.592836
C,0.410845,0.995968,-0.882132,0.259547,0.707101,0.151462
D,0.723674,1.340935,0.837244,-1.245742,-0.154003,-1.155265


Ordenando por su índice

In [35]:
kdf.sort_index(ascending=False)

Unnamed: 0,A,B,C,D
5,-1.819904,0.592836,0.151462,-1.155265
4,-1.365108,1.045862,0.707101,-0.154003
3,1.314554,-0.084459,0.259547,-1.245742
2,-2.143608,1.363914,-0.882132,0.837244
1,-1.069191,2.063936,0.995968,1.340935
0,0.782181,1.561012,0.410845,0.723674


Ordenar por valor

In [36]:
kdf.sort_values(by='B') #orden ascendente por defecto

Unnamed: 0,A,B,C,D
3,1.314554,-0.084459,0.259547,-1.245742
5,-1.819904,0.592836,0.151462,-1.155265
4,-1.365108,1.045862,0.707101,-0.154003
2,-2.143608,1.363914,-0.882132,0.837244
0,0.782181,1.561012,0.410845,0.723674
1,-1.069191,2.063936,0.995968,1.340935


### 3. Datos faltantes
Koalas utiliza principalmente el valor `np.nan` para representar los datos faltantes. Por defecto, no se incluye en los cálculos.


In [20]:
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])

In [21]:
pdf1.loc[dates[0]:dates[1], 'E'] = 1

In [22]:
kdf1 = ks.from_pandas(pdf1)

  for name, col in reset_index.iteritems():
  [


In [23]:
kdf1

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D,E
2013-01-01,-1.318658,1.965193,0.483052,-0.906627,1.0
2013-01-02,-1.34724,-1.588059,0.693082,1.858337,1.0
2013-01-03,0.754954,-1.161812,0.336966,-0.052354,
2013-01-04,-2.054697,-2.375747,-2.773655,0.264325,


Para eliminar las filas que tienen datos faltantes.

In [24]:
kdf1.dropna(how='any') #elimina todas las filas que en alguno de sus valores tenga un valor faltante.

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D,E
2013-01-01,-1.318658,1.965193,0.483052,-0.906627,1.0
2013-01-02,-1.34724,-1.588059,0.693082,1.858337,1.0


Llenando los datos faltantes.

In [42]:
kdf1.fillna(value=5) #rellenamos los valores faltantes con los valores pertinentes.

Unnamed: 0,A,B,C,D,E
2013-01-01,0.782181,1.561012,0.410845,0.723674,1.0
2013-01-02,-1.069191,2.063936,0.995968,1.340935,1.0
2013-01-03,-2.143608,1.363914,-0.882132,0.837244,5.0
2013-01-04,1.314554,-0.084459,0.259547,-1.245742,5.0


### 4. Operaciones

#### Estadísticas
Las operaciones en general excluyen los datos faltantes.

Realización de una estadística descriptiva:

In [43]:
kdf.mean()

A   -0.716846
B    1.090517
C    0.273798
D    0.057807
dtype: float64

#### Configuraciones de Spark

Varias configuraciones en PySpark se pueden aplicar internamente en Koalas.
Por ejemplo, puede habilitar la optimización de Arrow para acelerar enormemente la conversión de pandas internos.

In [44]:
prev = spark.conf.get("spark.sql.execution.arrow.enabled")  # Keep its default value.
ks.set_option("compute.default_index_type", "distributed")  # Use default index prevent overhead.

import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

In [46]:
spark.conf.set("spark.sql.execution.arrow.enabled", True)
%timeit ks.range(300000).to_pandas()

In [47]:
spark.conf.set("spark.sql.execution.arrow.enabled", False)
%timeit ks.range(300000).to_pandas()

1.45 s ± 94.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [48]:
ks.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.enabled", prev)  # Set its default value back.

#### Agrupación
Por "agrupar por" nos referimos a un proceso que involucra uno o más de los siguientes pasos:

- Dividir los datos en grupos según algunos criterios.
- Aplicar una función a cada grupo de forma independiente
- Combinar los resultados en una estructura de datos

In [49]:
kdf = ks.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
                          'foo', 'bar', 'foo', 'foo'],
                    'B': ['one', 'one', 'two', 'three',
                          'two', 'two', 'one', 'three'],
                    'C': np.random.randn(8),
                    'D': np.random.randn(8)})

In [50]:
kdf

Unnamed: 0,A,B,C,D
0,foo,one,-0.877903,1.390881
1,bar,one,-0.50192,0.294084
2,foo,two,-0.867403,-0.476039
3,bar,three,0.187626,0.404471
4,foo,two,-1.809876,-1.437326
5,bar,two,-0.832636,0.591997
6,foo,one,-0.572067,0.212416
7,foo,three,-0.358219,-1.02122


Agrupar y luego aplicar el **sum** a los grupos resultantes.

In [51]:
kdf.groupby('A').sum()

Unnamed: 0_level_0,C,D
A,Unnamed: 1_level_1,Unnamed: 2_level_1
bar,-1.146929,1.290552
foo,-4.485468,-1.331288


In [52]:
kdf.groupby(['A', 'B']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,C,D
A,B,Unnamed: 2_level_1,Unnamed: 3_level_1
foo,one,-1.449971,1.603297
foo,two,-2.677279,-1.913365
bar,three,0.187626,0.404471
foo,three,-0.358219,-1.02122
bar,two,-0.832636,0.591997
bar,one,-0.50192,0.294084


### 5. Visualización de datos: Generar gráficos


In [25]:
pser = pd.Series(np.random.randn(1000),
                 index=pd.date_range('1/1/2000', periods=1000))

In [26]:
kser = ks.Series(pser)

  for name, col in reset_index.iteritems():
  [


In [27]:
kser = kser.cummax()
kser

  series = series.astype(t, copy=False)


2000-01-01    0.164652
2000-01-02    1.375363
2000-01-03    1.375363
2000-01-04    1.375363
2000-01-05    1.375363
2000-01-06    1.375363
2000-01-07    1.375363
2000-01-08    1.375363
2000-01-09    1.375363
2000-01-10    1.375363
2000-01-11    1.375363
2000-01-12    1.375363
2000-01-13    1.482500
2000-01-14    1.482500
2000-01-15    1.482500
2000-01-16    1.482500
2000-01-17    1.482500
2000-01-18    1.482500
2000-01-19    1.482500
2000-01-20    1.482500
2000-01-21    1.482500
2000-01-22    1.482500
2000-01-23    1.482500
2000-01-24    1.796483
2000-01-25    1.796483
2000-01-26    1.796483
2000-01-27    1.796483
2000-01-28    1.796483
2000-01-29    1.796483
2000-01-30    1.796483
2000-01-31    1.796483
2000-02-01    1.796483
2000-02-02    1.796483
2000-02-03    1.796483
2000-02-04    1.796483
2000-02-05    1.796483
2000-02-06    1.796483
2000-02-07    1.796483
2000-02-08    1.796483
2000-02-09    1.796483
2000-02-10    1.796483
2000-02-11    1.796483
2000-02-12    1.796483
2000-02-13 

**Gráficos de matplotlib**

In [38]:
pd.set_option('plotting.backend', 'matplotlib')

In [51]:
ks.options.plotting.backend = "matplotlib"

kser.plot()

KeyError: 'line'

**Gráficos de Pandas Bokeh**

In [52]:
ks.options.plotting.backend = "pandas_bokeh"

kdf.plot(backend="pandas_bokeh", title="Example Figure")
kser.plot()


  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


**Gráfico por defecto con plotly**

In [53]:
kser.plot()

  series = series.astype(t, copy=False)


En un DataFrame, el plot() es una conveniencia para trazar todas las columnas con etiquetas:

In [44]:
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
                   columns=['A', 'B', 'C', 'D'])

In [45]:
kdf = ks.from_pandas(pdf)

  for name, col in reset_index.iteritems():
  [


In [46]:
kdf = kdf.cummax()
kdf

  series = series.astype(t, copy=False)


Unnamed: 0,A,B,C,D
2000-01-01,0.822452,0.960337,0.885595,1.083787
2000-01-02,1.092326,1.771944,0.885595,1.083787
2000-01-03,1.092326,1.771944,0.885595,1.70448
2000-01-04,1.092326,2.157039,1.418594,1.70448
2000-01-05,1.299056,2.157039,1.630613,1.70448
2000-01-06,1.299056,2.157039,1.630613,1.70448
2000-01-07,1.299056,2.157039,1.630613,1.70448
2000-01-08,1.299056,2.157039,1.630613,1.70448
2000-01-09,1.299056,2.157039,2.59589,1.70448
2000-01-10,1.299056,2.157039,2.59589,1.70448


In [49]:
kdf.plot()

  series = series.astype(t, copy=False)


### 6. Entrada / salida de datos


### CSV

CSV es sencillo y fácil de usar

In [None]:
kdf.to_csv('foo.csv')
ks.read_csv('foo.csv').head(10)

### Parquet

Parquet es un formato de archivo eficiente y compacto para leer y escribir más rápido.

In [None]:
kdf.to_parquet('bar.parquet')
ks.read_parquet('bar.parquet').head(10)

### Spark IO

Además, Koalas es totalmente compatible con las diversas fuentes de datos de Spark, como ORC y una fuente de datos externa.

In [None]:
kdf.to_spark_io('zoo.orc', format="orc")
ks.read_spark_io('zoo.orc', format="orc").head(10)