In [1]:
import pyspark.pandas as ps
import pandas as pd



In [3]:
psdf = ps.DataFrame({
    "year" : [1990,1997,2003,2009,2014],
    "rabbit" : [20,18,489,675,1776],
    "horse" : [4,25,281,600,1900]
})

pdf = pd.DataFrame({
    "year" : [1990,1997,2003,2009,2014],
    "sheep" : [22,50,121,445,791],
    "chicken": [250,326,589,1241,2118]
    })

In [6]:
psdf

Unnamed: 0,year,rabbit,horse
0,1990,20,4
1,1997,18,25
2,2003,489,281
3,2009,675,600
4,2014,1776,1900


In [5]:
ps.sql(" SELECT * FROM {psdf} WHERE rabbit > 100", psdf=psdf)

Unnamed: 0,year,rabbit,horse
0,2003,489,281
1,2009,675,600
2,2014,1776,1900


In [7]:
ps.sql("""
SELECT ps.rabbit, pd.chicken
       FROM {psdf} ps INNER JOIN {pdf} pd
       ON ps.year = pd.year
       ORDER BY ps.rabbit, pd.chicken

""",psdf=psdf,pdf=pdf)

Unnamed: 0,rabbit,chicken
0,18,326
1,20,250
2,489,589
3,675,1241
4,1776,2118


### Pandas API on Spark

In [14]:
psdf = ps.DataFrame({
    "A" : [1,2,3,4,5],
    "B" : [10,20,30,40,50]
})
print(type(psdf))
psdf.head()

<class 'pyspark.pandas.frame.DataFrame'>


Unnamed: 0,A,B
0,1,10
1,2,20
2,3,30
3,4,40
4,5,50


In [15]:
# Converting pandas-on-spark DataFrame to Spark DataFrame
sdf = psdf.to_spark()
print(type(sdf))
sdf.show()



<class 'pyspark.sql.dataframe.DataFrame'>
+---+---+
|  A|  B|
+---+---+
|  1| 10|
|  2| 20|
|  3| 30|
|  4| 40|
|  5| 50|
+---+---+



In [16]:
psdf_2 = sdf.to_pandas_on_spark()
print(type(psdf_2))
psdf_2.head()



<class 'pyspark.pandas.frame.DataFrame'>


Unnamed: 0,A,B
0,1,10
1,2,20
2,3,30
3,4,40
4,5,50


In [20]:
psdf_3 = sdf.pandas_api()
print(type(psdf_3))
psdf_3

<class 'pyspark.pandas.frame.DataFrame'>


Unnamed: 0,A,B
0,1,10
1,2,20
2,3,30
3,4,40
4,5,50


### Checking Spark Execution Plans

In [21]:
from pyspark.pandas import option_context

with option_context(
    "compute.ops_on_diff_frames",True,
    "compute.default_index_type","distributed"
):
    df = ps.range(10) + ps.range(10)
    df.spark.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [CASE WHEN isnotnull(__this___index_level_0__#425L) THEN __this___index_level_0__#425L ELSE __that___index_level_0__#433L END AS __index_level_0__#438L, (__this_id#426L + __that_id#434L) AS id#468L]
   +- SortMergeJoin [__this___index_level_0__#425L], [__that___index_level_0__#433L], FullOuter
      :- Sort [__this___index_level_0__#425L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(__this___index_level_0__#425L, 200), ENSURE_REQUIREMENTS, [plan_id=419]
      :     +- Project [__index_level_0__#402L AS __this___index_level_0__#425L, id#400L AS __this_id#426L]
      :        +- Project [distributed_index() AS __index_level_0__#402L, id#400L]
      :           +- Range (0, 10, step=1, splits=12)
      +- Sort [__that___index_level_0__#433L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(__that___index_level_0__#433L, 200), ENSURE_REQUIREMENTS, [plan_id=420]
            +- Project [__ind

In [22]:
with option_context(
    "compute.ops_on_diff_frames",False,
    "compute.default_index_type","distributed"
):
    df = ps.range(10)
    df = df + df
    df.spark.explain()

== Physical Plan ==
*(1) Project [__index_level_0__#476L, (id#474L + id#474L) AS id#488L]
+- *(1) Project [distributed_index() AS __index_level_0__#476L, id#474L]
   +- *(1) Range (0, 10, step=1, splits=12)




###  Caching DataFrames

In [24]:
with option_context(
    "compute.default_index_type","distributed"
):
    df = ps.range(10)
    new_df = (df + df ).spark.cache()
    new_df.spark.explain()
    print(new_df)

24/10/01 15:04:39 WARN CacheManager: Asked to cache already cached data.


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan [__index_level_0__#576L, id#588L]
      +- InMemoryRelation [__index_level_0__#576L, id#588L, __natural_order__#579L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Project [__index_level_0__#496L, (id#494L + id#494L) AS id#508L, __natural_order__#499L]
               +- *(1) Project [__index_level_0__#496L, id#494L, monotonically_increasing_id() AS __natural_order__#499L]
                  +- *(1) Project [distributed_index() AS __index_level_0__#496L, id#494L]
                     +- *(1) Range (0, 10, step=1, splits=12)


             id
8589934592    0
17179869184   2
25769803776   4
34359738368   6
42949672960   8
60129542144  10
68719476736  12
77309411328  14
85899345920  16
94489280512  18


In [27]:
# uncache the memory
new_df.spark.unpersist()

In [30]:
with (df + df ).spark.cache() as df : 
    df.spark.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan [__index_level_0__#576L, id#716L]
      +- InMemoryRelation [__index_level_0__#576L, id#716L, __natural_order__#579L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Project [__index_level_0__#576L, (id#574L + id#574L) AS id#716L, __natural_order__#579L]
               +- *(1) Project [__index_level_0__#576L, id#574L, monotonically_increasing_id() AS __natural_order__#579L]
                  +- *(1) Project [distributed_index() AS __index_level_0__#576L, id#574L]
                     +- *(1) Range (0, 10, step=1, splits=12)


