In [1]:
!python --version
!cat /etc/os-release
!cat /usr/local/spark/python/pyspark/version.py

Python 3.8.5
NAME="Ubuntu"
VERSION="20.04 LTS (Focal Fossa)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 20.04 LTS"
VERSION_ID="20.04"
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
VERSION_CODENAME=focal
UBUNTU_CODENAME=focal
__version__='3.0.0'


In [2]:
from pyspark.sql import SparkSession 
import pandas as pd

In [3]:
spark = SparkSession.builder.appName("version_check").master("local").getOrCreate() 
print(spark.sparkContext.version)

3.0.0


In [22]:
spark_file_dir = "/home/jovyan/work/Spark-The-Definitive-Guide"

# Ch2. 스파크 간단히 살펴보기

In [32]:
# !cat ~/work/Spark-The-Definitive-Guide/code/A_Gentle_Introduction_to_Spark-Chapter_2_A_Gentle_Introduction_to_Spark.py

In [4]:
spark = SparkSession.builder.appName("version_check").master("local").getOrCreate()

In [5]:
spark

In [6]:
myRange = spark.range(1000).toDF("number") # lazy evaluation(지연 연산): 연산 명령은 내리되 실행 계획만 생성하고 실행에 옮기지 않음

In [7]:
myRange

DataFrame[number: bigint]

In [8]:
divisBy2 = myRange.where("number % 2 = 0") # lazy evaluation(지연 연산): 연산 명령은 내리되 실행 계획만 생성하고 실행에 옮기지 않음

In [9]:
divisBy2

DataFrame[number: bigint]

In [10]:
divisBy2.columns, myRange.count(), divisBy2.count() # 액션: 주어진 실행계획에 대해 연산을 실행  

(['number'], 1000, 500)

In [57]:
# !cat ~/work/Spark-The-Definitive-Guide/code/A_Gentle_Introduction_to_Spark-Chapter_2_A_Gentle_Introduction_to_Spark.py

In [23]:
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv(spark_file_dir + "/data/flight-data/csv/2015-summary.csv")

In [24]:
flightData2015

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

In [30]:
flightData2015.take(3) # head 3

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [31]:
!head -n 4 ~/work/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344


In [35]:
flightData2015.sort("count") # count 컬럼을 기준으로 정렬(lazy evaluation)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

In [36]:
flightData2015.sort("count").explain() # 실행계획

== Physical Plan ==
*(1) Sort [count#56 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#56 ASC NULLS FIRST, 200), true, [id=#115]
   +- FileScan csv [DEST_COUNTRY_NAME#54,ORIGIN_COUNTRY_NAME#55,count#56] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Spark-The-Definitive-Guide/data/flight-data/csv/2015-sum..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [39]:
spark.conf.set("spark.sql.shuffle.partitions", "5") # 셔플 파티션의 갯수를 200(디폴트값)에서 5로 축소

In [41]:
flightData2015.sort("count").take(5)

[Row(DEST_COUNTRY_NAME='Malta', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Saint Vincent and the Grenadines', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Gibraltar', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]

In [42]:
flightData2015.createOrReplaceTempView("flight_data_2015") # SparkDataFrame을 사용하여 새 Temporary View를 만듭니다.

In [43]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

In [44]:
sqlWay

DataFrame[DEST_COUNTRY_NAME: string, count(1): bigint]

In [45]:
dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

In [46]:
dataFrameWay

DataFrame[DEST_COUNTRY_NAME: string, count: bigint]

In [47]:
sqlWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#54], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#54, 5), true, [id=#150]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#54], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#54] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Spark-The-Definitive-Guide/data/flight-data/csv/2015-sum..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [48]:
dataFrameWay.explain() # sqlWay.explain()와 동일한 실행계획

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#54], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#54, 5), true, [id=#169]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#54], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#54] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Spark-The-Definitive-Guide/data/flight-data/csv/2015-sum..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [49]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [52]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

In [53]:
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [54]:
from pyspark.sql.functions import desc

In [55]:
flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [56]:
flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#163L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#54,destination_total#163L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#54], functions=[sum(cast(count#56 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#54, 5), true, [id=#330]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#54], functions=[partial_sum(cast(count#56 as bigint))])
         +- FileScan csv [DEST_COUNTRY_NAME#54,count#56] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Spark-The-Definitive-Guide/data/flight-data/csv/2015-sum..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




In [None]:



# COMMAND ----------

from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()


# COMMAND ----------

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .explain()