# Pandas API

źródło: https://www.sicara.fr/blog-technique/run-pandas-code-on-spark, </br> 
https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html </br>
https://sparkbyexamples.com/pyspark/pandas-api-on-apache-spark-pyspark/ </br>

Pandas to bardzo potężna biblioteka, którą znają wszyscy analitycy danych, ale kod Pandas może działać tylko na jednej jednostce. W związku z tym jeżeli przetwarzamy duży zestaw danych za pomocą pandas będzie się to działo bardzo wolno i najprawdopodobniej pojawi się OOM error.

Zwykle wtedy do gry wchodzi Spark. PySpark co prawda zawiera  moduł o nazwie Spark SQL, który zapewnia obiekty typu DataFrame podobny do ramek pandas, ale mają one wady:
- napisany wcześniej kod pandas nie może zostać poniewnie użyty, ponieważ Pandas nie jest kompatybilny z PySpark DataFrames.
- składnia PySpark bardzo różni się od składni Pandas (patrz poniżej), co utrudnia PySparka osobom pracującyjm wcześniej w pandas

Poniżej przykład różnic:


In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
# Może chwilę potrwać
spark = SparkSession.builder.appName("UczymySięSparka").getOrCreate()
spark

'''
#PySpark:

df = spark.read.option("inferSchema", True).cvs("data.csv")
df = df.toDF("x","y", "z")
df = df.withColumn("x2", df.x * df.x)

#Pandas: 
df = pd.read_csv("data.csv")
df.columns = ["x","y","z"]
df["x2"] = df.x * df.x

'''


'\n#PySpark:\n\ndf = spark.read.option("inferSchema", True).cvs("data.csv")\ndf = df.toDF("x","y", "z")\ndf = df.withColumn("x2", df.x * df.x)\n\n#Pandas: \ndf = pd.read_csv("data.csv")\ndf.columns = ["x","y","z"]\ndf["x2"] = df.x * df.x\n\n'

24 kwietnia 2019 r. firma Databricks ogłosiła nowy projektopen source o nazwie Koalas, którego celem było udostępnienie interfejsu API Pandas na platformie Spark. Biblioteka nabrała rozpędu i została oficjalnie połączona z PySpark w Spark 3.2 (październik 2021 r.) i nazwana API Pandas on Spark.

Interfejs API Pandas ma taką samą składnię jak Pandas, ale "pod spodem" używa ramek PySparkowych. Oznacza to, że kod napisany za pomocą Pandas API może być uruchamiany w systemie master - slave, w których Spark jest skonfigurowany (w przeciwieństwie do Pandas), co pozwala na obsługę dużych zbiorów danych. Korzystająz z Pandas API można robić prawie wszystko, co z ramkami Pandowymi (~83% funkcji dostępnych w pyspark.pandas). Pełna lista funkcji: 

https://spark.apache.org/docs/latest/api/python//reference/pyspark.pandas/general_functions.html

### Co zrobić, jeśli nie mogę znaleźć funkcji Pandas w Pandas API?

Ponieważ ramka danych Pandas-on-Spark wykorzystuje ramkę danych PySpark "pod spodem", można ją przekonwertować z/do ramki danych PySpark. Dlatego jeśli nie możesz znaleźć potrzebnej funkcji, nadal możesz wykonać następujące czynności:

* przekształcić ramkę danych Pandas-on-Spark w ramkę danych PySpark
* wykonać tyle transformacji ile potrzeba za pomocą PySpark
* przekonwertować ramkę danych PySpark z powrotem na ramkę danych Pandas-on-Spark

In [2]:
import pyspark.pandas as ps
psdf = ps.range(10)
sdf = psdf.to_spark().filter("id > 5")
sdf.show()



+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+



In [3]:
sdf.to_pandas_on_spark()

Unnamed: 0,id
0,6
1,7
2,8
3,9


Poniżej więcej przykładów jak wymiennie korzystać z PySparka i pandas:

In [17]:
import pyspark.pandas as ps # przekstzałcenie na ramkę pandasową
psdf = ps.range(10)
pdf = psdf.to_pandas()
pdf.values

array([[0],
       [1],
       [2],
       [3],
       [4],
       [5],
       [6],
       [7],
       [8],
       [9]], dtype=int64)

In [18]:
ps.from_pandas(pdf) # pandas on spark 


Unnamed: 0,id
0,0
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
9,9


In [19]:
import pyspark.pandas as ps

psdf = ps.range(10)
sdf = psdf.to_spark().filter("id > 5") ## ramki sprakowe i pandasowe są do siebie bardzo podobne, tu przekształcenie z ramki pandasowej na sparkową
sdf.show()

+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+



In [22]:
sdf.to_pandas_on_spark() ## ze sparka do pandas on spark

Unnamed: 0,id
0,6
1,7
2,8
3,9


# Jak działa Pandas API?
Gdy użytkownik tworzy ramkę danych Pandas-on-Spark, powstaje wtedy też „ramka wewnętrzna” i ramka sparkowa.

„Ramka wewnętrzna” zapewnia konwersje między ramkami pandas-on-Spark i PySpark. Przechowuje metadane, takie jak mapowanie danych w kolumnach i indeksów.

Pozwala ramce Pandas-on-Spark na obsługę funkcji Pandas, które nie są obsługiwane przez ramki PySpark:

* mutable syntax: dzięki czemu nie trzeba tworzyć nowej ramki za każdym razem, gdy chcemy coś zmodyfikować
* indeks sekwencyjny: żeby można było manipulować ramką w oparciu o indeks (patrz poniżej)
* pandowe typy danych

Należy pamiętać, że dane są rozproszone po wielu workerach, podczas gdy w Pandzie dane pozostają na jednej maszynie.

### Indeksowanie

W przeciwieństwie do ramki PySpark, ramki Pandas-on-Spark replikują funkcjonalność indeksowania Pandas (dzięki wspomianej wyżej wewnętrznej ramce). Dla przypomnienia, indeksy służą do uzyskiwania dostępu do wierszy przez indeksatory loc/iloc lub do mapowania właściwych wierszy w przypadku operacji łączących dwie ramki danych lub serie (na przykład df1 + df2) i tak dalej.

Jeśli żadna z kolumn nie została określona jako indeks zostanie użyty indeks domyślny. Może to być jeden z 3:

**Sekwencja** </br>
Używany domyślnie. Implementuje sekwencję, która zwiększa się o jeden razem z każdym kolejnym rekordem. Najprawdopodobniej spowoduje to przeniesienie całej ramki do jednego klastra, co będzie bardzo powolne i najprawdopodoniej rzuci OOM error. Nie należy go używać, gdy zbiór danych jest duży.

In [4]:
import pyspark.pandas as ps
ps.set_option('compute.default_index_type', 'sequence')
psdf = ps.range(3)
ps.reset_option('compute.default_index_type')
psdf.index


Int64Index([0, 1, 2], dtype='int64')

**Rozproszona sekwencja** </br>
Mechanizm jest ten sam, co wyżej, ale indeks rozproszony można wykorzystywać razem z partycjonowaniem. Powinien zostać użyty jeśli zbiór  danych jest duży i potrzebny jest indeks sekwencyjny. Należy zauważyć, że jeśli po utworzeniu tego indeksu do zbioru danych zostanie dodanych więcej rekordów nie ma gwarancji, że indeks pozostanie sekwencyjny.



In [5]:
import pyspark.pandas as ps
ps.set_option('compute.default_index_type', 'distributed-sequence')
psdf = ps.range(3)
ps.reset_option('compute.default_index_type')
psdf.index

Int64Index([0, 1, 2], dtype='int64')

**Rozlokowany (distributed)** </br>
Implementuje ciąg rosnący monotonicznie, ale nie nieprzerwany (np. 1, 8, 12), wartości są przypadkowe. Nie ma nic wspólnego z indeksowaniem Pandas. Pod względem wydajności jest najlepszy, ale nie można go używać do wykonywania operacji na dwóch ramkach. 

Poniżej przykład ustawiania typu indeksu, z któego chcemy skorzystać: 

In [6]:
import pyspark.pandas as ps
ps.set_option('compute.default_index_type', 'distributed')
psdf = ps.range(3)
ps.reset_option('compute.default_index_type')
psdf.index


Int64Index([42949672960, 85899345920, 128849018880], dtype='int64')

### Pozostałe opcje

Pandas API ma system opcji, który pozwala dostosować niektóre aspekty jego pracy. Najczęściej użytkownicy zmieniają opcję wyświetlania wyników. Najważniejsze w tym wypadku są dwa polecenia: 

* get_option() / set_option() - podejrzenie/ustawienie opcji
* reset_option() - zresteowanie danej opcji

In [7]:
import pyspark.pandas as ps
ps.get_option('compute.max_rows')

1000

In [8]:
ps.set_option('compute.max_rows', 2000)
ps.get_option('compute.max_rows')

2000

In [11]:
ps.reset_option("display.max_rows")


In [12]:
ps.get_option('compute.max_rows')

2000

### Operacje na różnych ramkach
Pandas API domyślnie blokuje operacje na różnych ramkach (lub seriach), aby zapobiec zbyt ciężkim obliczeniowo operacjom. 

Można to włączyć, ustawiając compute.ops_on_diff_frames na True. 

In [13]:
import pyspark.pandas as ps
ps.set_option('compute.ops_on_diff_frames', True)
psdf1 = ps.range(5)
psdf2 = ps.DataFrame({'id': [5, 4, 3]})
(psdf1 - psdf2).sort_index()

Unnamed: 0,id
0,-5.0
1,-3.0
2,-1.0
3,
4,


In [15]:
ps.reset_option('compute.ops_on_diff_frames')

In [16]:
import pyspark.pandas as ps
ps.set_option('compute.ops_on_diff_frames', True)
psdf = ps.range(5)
psser_a = ps.Series([1, 2, 3, 4])
psdf['new_col'] = psser_a
psdf

Unnamed: 0,id,new_col
0,0,1.0
1,1,2.0
2,2,3.0
3,3,4.0
4,4,


Wszystkie dostępne opcje można sprawdzić pod tym linkiem: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#available-options

### Transform & apply

Poniżej przedstawimy takie funkcje jak *DataFrame.transform()*, *DataFrame.apply()*, *DataFrame.pandas_on_spark.transform_batch()*, *DataFrame.pandas_on_spark.apply_batch()*, *Series.pandas_on_spark.transform_batch()*. Każda ma odrębny cel i działa inaczej. Poniżej przedstawimy te różnice, które najczęściej powoduję dezorientację. 

**transform & apply** </br>
Główna różnica między *DataFrame.transform()* a *DataFrame.apply()* polega na tym, że ta pierwsza wymaga zwrócenia tej samej długości danych wejściowych, a druga tego nie. Przykład poniżej:

In [23]:
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pser):
   return pser + 1 

psdf.transform(pandas_plus)

Unnamed: 0,a,b
0,2,5
1,3,6
2,4,7


In [24]:
psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]})
def pandas_plus(pser):
     return pser[pser % 2 == 1]  # allows an arbitrary length

psdf.apply(pandas_plus)

Unnamed: 0,a,b
0,1,5
2,3,7


Każda funkcja przyjmuje serię pandas, a interfejs pandas przelicza funkcje w sposób rozproszony, jak poniżej: 

W przypadku osi „kolumnowej” funkcja przyjmuje każdy wiersz jako serię:

In [25]:
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pser):
     return sum(pser)  # allows an arbitrary length

psdf.apply(pandas_plus, axis='columns')

0    5
1    7
2    9
dtype: int64


### pandas_on_spark.transform_batch i pandas_on_spark.apply_batch
W *DataFrame.pandas_on_spark.transform_batch()*, *DataFrame.pandas_on_spark.apply_batch()*, *Series.pandas_on_spark.transform_batch()* itp. przedrostek funkcji oznacza czy będziemy pracować z serią czy z ramką. Interfejsy dzielą Pandas-on-Spark DataFrame lub serię na partycje, a następnie stosują daną funkcję z ramką albo serią pandową w charakterze danych wejściowych i wyjściowych.

In [26]:
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pdf):
     return pdf + 1  # should always return the same length as input.

psdf.pandas_on_spark.transform_batch(pandas_plus)

Unnamed: 0,a,b
0,2,5
1,3,6
2,4,7


In [27]:
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pdf):
    return pdf[pdf.a > 1]  # allow arbitrary length

psdf.pandas_on_spark.apply_batch(pandas_plus)

Unnamed: 0,a,b
1,2,5
2,3,6


Funkcje w obu przykładach pobierają pandas DataFrame jako fragment pandas-on-Spark DataFrame i zwracają pandas DataFrame. 
Panuje ta sama zasada do poprzednio - transform() wymaga tej samej długości danych wyjściowych i wejściowych. Apply nie. 

W przypadku Series.pandas_on_spark.transform batch(), jest podobnie jak z DataFrame.pandas_on_spark.transform batch() - przyjmuje fragment serii pandas jako fragmentu serii pandas-on-Spark i zwraca serię pandas

In [28]:
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pser):
     return pser + 1  # should always return the same length as input.

psdf.a.pandas_on_spark.transform_batch(pandas_plus)

0    2
1    3
2    4
Name: a, dtype: int64

### Typy danych pandas on spark vs pyspark
Podczas konwertowania Pandas-on-Spark DataFrame z/do PySpark DataFrame typy danych są automatycznie mapowane na odpowiedni typ. 

Poniższy przykład pokazuje, jak typy danych są mapowane z PySpark DataFrame na Pandas-on-Spark DataFrame. Podczas konwertowania pandas-on-Spark DataFrame na pandas DataFrame typy danych są w zasadzie takie same jak w pandas.

Poniżej więcej informacji o mapowaniu między ramkami: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html#type-support-in-pandas-api-on-spark

**Przykłady wykorzystania funkcji pandas w Spark** 

In [30]:
# Import pyspark.pandas
import pyspark.pandas as ps

# Create pandas DataFrame
technologies   = ({
    'Courses':["Spark","PySpark","Hadoop","Python","Pandas","Hadoop","Spark","Python","NA"],
    'Fee' :[22000,25000,23000,24000,26000,25000,25000,22000,1500],
    'Duration':['30days','50days','55days','40days','60days','35days','30days','50days','40days'],
    'Discount':[1000,2300,1000,1200,2500,None,1400,1600,0]
          })
df = ps.DataFrame(technologies)
print(df)

# Use groupby() to compute the sum
df2 = df.groupby(['Courses']).sum()
print(df2)

   Courses    Fee Duration  Discount
0    Spark  22000   30days    1000.0
1  PySpark  25000   50days    2300.0
2   Hadoop  23000   55days    1000.0
3   Python  24000   40days    1200.0
4   Pandas  26000   60days    2500.0
5   Hadoop  25000   35days       NaN
6    Spark  25000   30days    1400.0
7   Python  22000   50days    1600.0
8       NA   1500   40days       0.0
           Fee  Discount
Courses                 
Spark    47000    2400.0
PySpark  25000    2300.0
Hadoop   48000    1000.0
Python   46000    2800.0
Pandas   26000    2500.0
NA        1500       0.0


In [41]:
sdf = df.to_spark()


Wybieranie kolumn:

In [39]:
# Pandas API on Spark
df[["Courses","Fee"]]



Unnamed: 0,Courses,Fee
0,Spark,22000
1,PySpark,25000
2,Hadoop,23000
3,Python,24000
4,Pandas,26000
5,Hadoop,25000
6,Spark,25000
7,Python,22000
8,,1500


In [42]:
# PySpark
sdf.select("Courses","Fee").show()

+-------+-----+
|Courses|  Fee|
+-------+-----+
|  Spark|22000|
|PySpark|25000|
| Hadoop|23000|
| Python|24000|
| Pandas|26000|
| Hadoop|25000|
|  Spark|25000|
| Python|22000|
|     NA| 1500|
+-------+-----+



Wybieranie lub filtrowanie wierszy:

In [44]:
# Pandas API on Spark
df2 = df.loc[ (df.Courses == "Python")]
df2

Unnamed: 0,Courses,Fee,Duration,Discount
3,Python,24000,40days,1200.0
7,Python,22000,50days,1600.0


In [45]:
# PySpark
sdf2 = sdf.filter(sdf.Courses == "Python")
sdf2.show()

+-------+-----+--------+--------+
|Courses|  Fee|Duration|Discount|
+-------+-----+--------+--------+
| Python|24000|  40days|  1200.0|
| Python|22000|  50days|  1600.0|
+-------+-----+--------+--------+



In [46]:
# Pandas API on Spark
df.count()

Courses     9
Fee         9
Duration    9
Discount    8
dtype: int64

In [47]:
# PySpark
sdf.count()

9

Sortowanie wierszy:

In [48]:
# Pandas API on Spark
df2 = df.sort_values(["Courses", "Fee"])

In [49]:
# PySpark
sdf2 = sdf.sort("Courses", "Fee")
sdf2.show()

+-------+-----+--------+--------+
|Courses|  Fee|Duration|Discount|
+-------+-----+--------+--------+
| Hadoop|23000|  55days|  1000.0|
| Hadoop|25000|  35days|    null|
|     NA| 1500|  40days|     0.0|
| Pandas|26000|  60days|  2500.0|
|PySpark|25000|  50days|  2300.0|
| Python|22000|  50days|  1600.0|
| Python|24000|  40days|  1200.0|
|  Spark|22000|  30days|  1000.0|
|  Spark|25000|  30days|  1400.0|
+-------+-----+--------+--------+



Zmiana nazwy kolumny:

In [52]:
# Pandas API on Spark
df2 = df.rename(columns={'Fee': 'Courses_Fee'})

In [53]:
# PySpark
sdf2 = sdf.withColumnRenamed("Fee", "Courses_Fee")
sdf2.show()

+-------+-----------+--------+--------+
|Courses|Courses_Fee|Duration|Discount|
+-------+-----------+--------+--------+
|  Spark|      22000|  30days|  1000.0|
|PySpark|      25000|  50days|  2300.0|
| Hadoop|      23000|  55days|  1000.0|
| Python|      24000|  40days|  1200.0|
| Pandas|      26000|  60days|  2500.0|
| Hadoop|      25000|  35days|    null|
|  Spark|      25000|  30days|  1400.0|
| Python|      22000|  50days|  1600.0|
|     NA|       1500|  40days|     0.0|
+-------+-----------+--------+--------+



GroupBy:

In [54]:
# Pandas API on Spark
df.groupby(['Courses']).count()

Unnamed: 0_level_0,Fee,Duration,Discount
Courses,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Spark,2,2,2
PySpark,1,1,1
Hadoop,2,2,1
Python,2,2,2
Pandas,1,1,1
,1,1,1


In [56]:
# PySpark
sdf.groupBy("Courses").count().show()

+-------+-----+
|Courses|count|
+-------+-----+
|  Spark|    2|
|PySpark|    1|
| Hadoop|    2|
| Python|    2|
| Pandas|    1|
|     NA|    1|
+-------+-----+



### Lista pandas API wspieranych przez PySpark:

https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/supported_pandas_api.html#supported-pandas-api