# Pyspark DataFrame Basic Handling
- In comparison with RDD, pyspark DF is more structured, therefore supported with more functions such as sql like functions
- its running is automatically optimized by internal optimizer called "Catalyst Optimizer"
- to see the execution plan, hit pysparkDF.explain() 

In [74]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [75]:
spark = SparkSession.builder.appName("spark-dataframe-sql").getOrCreate()

In [76]:
spark

In [77]:
type(spark)

pyspark.sql.session.SparkSession

### SparkContext, SparkSession
- pyspark DF를 생성하기 위해 SparkSession 객체를 먼저 생성해야 한다
- SparkContext 객체는 rdd 를 만들 수 있다.
- SparkSession 객체 안에 SparkContext 객체가 있어 기본적으로 먼저 SparkSession 객체를 생성하면 된다

In [78]:
spark.sparkContext

In [79]:
type(spark.sparkContext)

pyspark.context.SparkContext

# 1. pyspark DataFrame API

In [None]:
trip_files = "/Users/krafton/project/personal/spark-airflow-hands-on/data/trips/*"
zone_file = "/Users/krafton/project/personal/spark-airflow-hands-on/data/taxi+_zone_lookup.csv"

In [None]:
trips_df = spark.read.csv(f"file:///{trip_files}", inferSchema=True, header=True)
zone_df = spark.read.csv(f"file:///{zone_file}", inferSchema=True, header=True)

                                                                                

In [None]:
zone_df.show(5, False)

+----------+-------------+-----------------------+------------+
|LocationID|Borough      |Zone                   |service_zone|
+----------+-------------+-----------------------+------------+
|1         |EWR          |Newark Airport         |EWR         |
|2         |Queens       |Jamaica Bay            |Boro Zone   |
|3         |Bronx        |Allerton/Pelham Gardens|Boro Zone   |
|4         |Manhattan    |Alphabet City          |Yellow Zone |
|5         |Staten Island|Arden Heights          |Boro Zone   |
+----------+-------------+-----------------------+------------+
only showing top 5 rows



In [None]:
trips_df.show(1, False)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|2       |2021-03-01 00:22:02 |2021-03-01 00:23:22  |1              |0.0          |1         |N                 |264         |264         |2           |3.0        |0.5  |0.5    |0.0       |0.0         |0.3                  

In [54]:
trips_df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

In [56]:
res = trips_df.join(zone_df.select(F.col("LocationID").alias("PULocationID"), F.col("Zone").alias("pu_zone")), \
                    "PULocationID", "left_outer")\
        .join(zone_df.select(F.col("LocationId").alias("DOLocationID"), F.col("Zone").alias("do_zone")), \
            "DOLocationID", "left_outer")

In [57]:
res.show(1, False)

+------------+------------+--------------------+---------------------+------------+-------+-------+
|DOLocationID|PULocationID|tpep_pickup_datetime|tpep_dropoff_datetime|total_amount|pu_zone|do_zone|
+------------+------------+--------------------+---------------------+------------+-------+-------+
|264         |264         |2021-03-01 00:22:02 |2021-03-01 00:23:22  |4.3         |NV     |NV     |
+------------+------------+--------------------+---------------------+------------+-------+-------+
only showing top 1 row



In [58]:
res.where("pu_zone != do_zone")\
    .select("tpep_pickup_datetime", "tpep_dropoff_datetime", "pu_zone", "do_zone", "total_amount")\
    .show(3, False)

+--------------------+---------------------+---------------------+--------------+------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|pu_zone              |do_zone       |total_amount|
+--------------------+---------------------+---------------------+--------------+------------+
|2021-03-01 00:07:40 |2021-03-01 00:31:23  |LaGuardia Airport    |NA            |70.07       |
|2021-03-01 00:02:13 |2021-03-01 00:06:01  |East Chelsea         |NV            |11.16       |
|2021-03-01 00:40:16 |2021-03-01 00:50:23  |Upper West Side South|Yorkville East|18.59       |
+--------------------+---------------------+---------------------+--------------+------------+
only showing top 3 rows



In [59]:
res.where("pu_zone != do_zone")\
    .select("tpep_pickup_datetime", "tpep_dropoff_datetime", "pu_zone", "do_zone", "total_amount")\
    .printSchema()

root
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- pu_zone: string (nullable = true)
 |-- do_zone: string (nullable = true)
 |-- total_amount: double (nullable = true)



# 2. SQL API

In [48]:
res.cache()
res.createOrReplaceTempView("res")


In [49]:
spark.sql("""
select 
    tpep_pickup_datetime,
    tpep_dropoff_datetime,
    pu_zone,
    do_zone,
    total_amount
from res
where pu_zone != do_zone
""").show(3, False)

[Stage 51:>                                                         (0 + 1) / 1]

+--------------------+---------------------+---------------------+--------------+------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|pu_zone              |do_zone       |total_amount|
+--------------------+---------------------+---------------------+--------------+------------+
|2021-03-01 00:07:40 |2021-03-01 00:31:23  |LaGuardia Airport    |NA            |70.07       |
|2021-03-01 00:02:13 |2021-03-01 00:06:01  |East Chelsea         |NV            |11.16       |
|2021-03-01 00:40:16 |2021-03-01 00:50:23  |Upper West Side South|Yorkville East|18.59       |
+--------------------+---------------------+---------------------+--------------+------------+
only showing top 3 rows



                                                                                

In [50]:
res.unpersist()

DataFrame[DOLocationID: int, PULocationID: int, VendorID: int, tpep_pickup_datetime: string, tpep_dropoff_datetime: string, passenger_count: int, trip_distance: double, RatecodeID: int, store_and_fwd_flag: string, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, pu_zone: string, do_zone: string]

# 3. execution plan of pyspark DF
- Parsed Logical Plan -> Analyzed Logical Plan -> Optimized Logical Plan -> Physical Plan 
- Analyzed Logical Plan 에서 join 후 filter 하는 과정이 -> Optimized Logical Plan 에서 filter 하고 join 하는 것으로 변경
- Physical Plan 에서 join 이 구체적으로 BroadcastHashJoin 으로 표시됨

In [69]:
trips_df.groupBy("payment_type").count().show()



+------------+--------+
|payment_type|   count|
+------------+--------+
|           1|10716903|
|           3|   81434|
|           4|   59664|
|           2| 3308670|
|        null|  834028|
|           5|       1|
+------------+--------+



                                                                                

In [71]:
trips_df.createOrReplaceTempView("trip_data")
zone_df.createOrReplaceTempView("zone_data")

spark.sql("""
SELECT zone_data.Zone, count(*) AS trips
FROM trip_data JOIN zone_data 
    ON trip_data.PULocationID = zone_data.LocationID
WHERE trip_data.payment_type=1
GROUP BY zone_data.Zone
ORDER BY trips DESC
""").explain(True)

== Parsed Logical Plan ==
'Sort ['trips DESC NULLS LAST], true
+- 'Aggregate ['zone_data.Zone], ['zone_data.Zone, 'count(1) AS trips#2563]
   +- 'Filter ('trip_data.payment_type = 1)
      +- 'Join Inner, ('trip_data.PULocationID = 'zone_data.LocationID)
         :- 'UnresolvedRelation [trip_data], [], false
         +- 'UnresolvedRelation [zone_data], [], false

== Analyzed Logical Plan ==
Zone: string, trips: bigint
Sort [trips#2563L DESC NULLS LAST], true
+- Aggregate [Zone#2517], [Zone#2517, count(1) AS trips#2563L]
   +- Filter (payment_type#2472 = 1)
      +- Join Inner, (PULocationID#2470 = LocationID#2515)
         :- SubqueryAlias trip_data
         :  +- View (`trip_data`, [VendorID#2463,tpep_pickup_datetime#2464,tpep_dropoff_datetime#2465,passenger_count#2466,trip_distance#2467,RatecodeID#2468,store_and_fwd_flag#2469,PULocationID#2470,DOLocationID#2471,payment_type#2472,fare_amount#2473,extra#2474,mta_tax#2475,tip_amount#2476,tolls_amount#2477,improvement_surcharge#2478,tota