In [52]:
import numpy as np
import pandas as pd
import pyspark.pandas as ps

### Set Spark Cluster

In [53]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("v3_sparkSession").master("spark://spark-master:7077") \
        .getOrCreate()

24/10/31 14:07:36 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [54]:
# Create pandas series...
psdf = ps.DataFrame({
    "year" : [1990,1997,2003,2009,2014],
    "rabbit" : [20,18,489,675,1776],
    "horse" : [4,25,281,600,1900]
})

pdf = pd.DataFrame({
    "year" : [1990,1997,2003,2009,2014],
    "sheep" : [22,50,121,445,791],
    "chicken": [250,326,589,1241,2118]
    })

In [55]:
print("psdf : ",type(psdf))

psdf :  <class 'pyspark.pandas.frame.DataFrame'>


### Apply SQL on Pandas on SparkAPI

In [56]:
temp_df = ps.sql(" SELECT * FROM {psdf} WHERE rabbit > 100", psdf=psdf)
temp_df

Unnamed: 0,year,rabbit,horse
0,2003,489,281
1,2009,675,600
2,2014,1776,1900


In [57]:
temp_df.describe()

Unnamed: 0,year,rabbit,horse
count,3.0,3.0,3.0
mean,2008.666667,980.0,927.0
std,5.507571,695.601179,857.605387
min,2003.0,489.0,281.0
25%,2003.0,489.0,281.0
50%,2009.0,675.0,600.0
75%,2014.0,1776.0,1900.0
max,2014.0,1776.0,1900.0


### Spark API and PandasOnSpark API Converstion

In [58]:
spark_df = temp_df.to_spark()
print("spark_df : ",type(spark_df))
spark_df.describe()

spark_df :  <class 'pyspark.sql.dataframe.DataFrame'>




DataFrame[summary: string, year: string, rabbit: string, horse: string]

In [59]:
print(spark_df)
spark_df.show()

DataFrame[year: bigint, rabbit: bigint, horse: bigint]
+----+------+-----+
|year|rabbit|horse|
+----+------+-----+
|2003|   489|  281|
|2009|   675|  600|
|2014|  1776| 1900|
+----+------+-----+



In [60]:
spark_df.head()

Row(year=2003, rabbit=489, horse=281)

In [61]:
pandas_on_spark =spark_df.to_pandas_on_spark()
print("pandas_on_spark : ",type(pandas_on_spark))

pandas_on_spark :  <class 'pyspark.pandas.frame.DataFrame'>




### Tracking Spark execution in detail

In [62]:
# check SQL filtered dataframe
temp_df.spark.explain()

== Physical Plan ==
*(2) Project [distributed_sequence_id#2394L AS __index_level_0__#2393L, year#2371L, rabbit#2372L, horse#2373L]
+- AttachDistributedSequence[distributed_sequence_id#2394L, year#2371L, rabbit#2372L, horse#2373L] Index: distributed_sequence_id#2394L
   +- *(1) Project [year#2371L, rabbit#2372L, horse#2373L]
      +- *(1) Filter (rabbit#2372L > 100)
         +- *(1) Scan ExistingRDD[__index_level_0__#2370L,year#2371L,rabbit#2372L,horse#2373L]




In [63]:
from pyspark.pandas import option_context

with option_context(
    "compute.ops_on_diff_frames",False,
    "compute.default_index_type","distributed"
):
    df = ps.range(10)
    df = df + df
    df.spark.explain()

== Physical Plan ==
*(1) Project [__index_level_0__#2965L, (id#2963L + id#2963L) AS id#2977L]
+- *(1) Project [distributed_index() AS __index_level_0__#2965L, id#2963L]
   +- *(1) Range (0, 10, step=1, splits=4)




In [64]:
with option_context(
    "compute.ops_on_diff_frames",False,
    "compute.default_index_type","distributed"
):
    df = ps.range(10)
    df = (df + df).spark.cache()
    df.spark.explain()
    print(df)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan [__index_level_0__#2982L, id#2994L]
      +- InMemoryRelation [__index_level_0__#2982L, id#2994L, __natural_order__#2985L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Project [__index_level_0__#1721L, (id#1719L + id#1719L) AS id#1733L, __natural_order__#1724L]
               +- *(1) Project [__index_level_0__#1721L, id#1719L, monotonically_increasing_id() AS __natural_order__#1724L]
                  +- *(1) Project [distributed_index() AS __index_level_0__#1721L, id#1719L]
                     +- *(1) Range (0, 10, step=1, splits=4)


             id
0             0
1             2
8589934592    4
8589934593    6
8589934594    8
17179869184  10
17179869185  12
25769803776  14
25769803777  16
25769803778  18


24/10/31 14:07:38 WARN CacheManager: Asked to cache already cached data.


In [65]:
with (df + df ).spark.cache() as df : 
    new_df = df+df
    df.spark.explain()
    print(new_df)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan [__index_level_0__#2982L, id#3119L]
      +- InMemoryRelation [__index_level_0__#2982L, id#3119L, __natural_order__#2985L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- AdaptiveSparkPlan isFinalPlan=false
               +- Project [__index_level_0__#2982L, (id#2994L + id#2994L) AS id#3119L, __natural_order__#2985L]
                  +- InMemoryTableScan [__index_level_0__#2982L, __natural_order__#2985L, id#2994L]
                        +- InMemoryRelation [__index_level_0__#2982L, id#2994L, __natural_order__#2985L], StorageLevel(disk, memory, deserialized, 1 replicas)
                              +- *(1) Project [__index_level_0__#1721L, (id#1719L + id#1719L) AS id#1733L, __natural_order__#1724L]
                                 +- *(1) Project [__index_level_0__#1721L, id#1719L, monotonically_increasing_id() AS __natural_order__#1724L]
                                    +- *(1) Proje