# 4. 구조적 API 기본연산

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .appName("Python Spark SQL basic example") \
  .config("spark.some.config.option", "some-value") \
  .getOrCreate()

- Row 만들기

In [4]:
spark.range(2).collect()

# 5. 구조적 API 기본연산
- DataFrame은 Row타입의 **레코드**와 각 레코드에 수행할 연산 표현식을 나타내는 여러 **컬럼**으로 구성됨
- **스키마**는 각 컬럼명과 데이터 타입을 정의
- **파티셔닝**은 DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의
- **파티셔닝 스키마**는 파이썬을 배치하는 방법을 정의

In [6]:
# 스키마 확인
df = spark.read.format("json").load("/FileStore/tables/data/flight-data/json/2015_summary-ebaee.json")
display(df.printSchema())

## 5.1 스키마
- 스키마는 여러 개의 StructField 타입 필드로 구성된 StructType 객체
- StructField는 이름, 데이터 타입, 컬럼이 값이 없거나 null일 수 있는지 지정하는 불리언 값을 갖음

In [8]:
df.schema

In [9]:
# 스키마를 직접 만들어서 적용해보기
from pyspark.sql.types import StructType, StructField, StringType, LongType

myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", LongType(), False, metadata={"hello": "world"})
])

df = spark.read.format('json') \
  .schema(myManualSchema) \
  .load("/FileStore/tables/data/flight-data/json/2015_summary-ebaee.json")

In [10]:
display(df.limit(5))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62


## 5.2 컬럼과 표현식
- DataFrame을 통하지 않으면 외부에서 컬럼에 접근 불가
- 컬럼의 내용을 수정하려면 반드시 DataFrame의 스파크 트랜스포메이션을 사용

### 5.2.1 컬럼
- col, column 함수를 사용하는 것이 가장 간편함
- 컬럼은 컬럼명을 카탈로그에 저장된 정보와 비교하기 전까지 마확인 상태임

In [13]:
# 컬럼 생성
from pyspark.sql.functions import col, column

col("someColumnName")
column("someColumnName")

### 5.2.2 표현식
- 표현식은 DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미
- 여러 컬럼명을 입력받아 식별하고 단일값을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수
- 표현식은 연 순서를 지정하는 논리적 트리로 컴파일됨
- 실행 시점에서 동일한 논리 트리로 컴파일되기 때문에 동일한 성능을 발휘함

In [15]:
# 논리적 트리로 컴파일되는 표현식
from pyspark.sql.functions import expr

expr("(((someCol + 5) * 200) - 6) < otherCol")

- 프로그래밍 방식으로 컬럼에 접근할 때는 DataFrame의 columns 속성을 사용함

In [17]:
# column 속성을 사용
spark.read.format("json").load("/FileStore/tables/data/flight-data/json/2015_summary-ebaee.json").columns

## 5.3 레코드와 로우
- 스파크는 레코드를 Row객체로 표현
- Row객체는 내부에 바이트 배열을 가지며 오직 컬럼 표현식만으로 다룰 수 있으므로 사용자에게 노출되지 않음
- DataFrame을 사용하여 드라이버에게 개별 로우를 반환하는 명령은 항상 하나 이상의 Row 타입을 반환함

In [19]:
df.first()

In [20]:
# 로우 생성하기
from pyspark.sql import Row

myRow = Row("Hello", None, 1, False)

In [21]:
# 로우 접근하기
print(myRow[0])
print(myRow[1])
print(myRow[0:4])

## 5.4 DataFrame의 트랜스포메이션
### 5.4.1 DataFrame 생성하기
- 원시 데이터소스에서 DataFrame을 생성하고 임시 뷰를 등록
- Row 객체를 가진 Seq타입을 직접 변환하여 DataFrame을 생성

In [23]:
# 원시 데이터소스 활용
df = spark.read.format("json").load("/FileStore/tables/data/flight-data/json/2015_summary-ebaee.json")
df.createOrReplaceTempView("dfTable")

display(spark.sql("""
  SELECT *
  FROM dfTable
"""))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [24]:
# Row 객체 활용
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, LongType

myManualSchema = StructType([
  StructField("some", StringType(), True),
  StructField("col", StringType(), True),
  StructField("names", LongType(), False),  
])

myRow = Row("Hello", None, 1)
myDF = spark.createDataFrame([myRow], myManualSchema)
display(myDF)

some,col,names
Hello,,1


### 5.4.2 select와 selectExpr

In [26]:
# 단일 혹은 다중 컬럼 설정
display(df.select("DEST_COUNTRY_NAME").limit(2))

DEST_COUNTRY_NAME
United States
United States


In [27]:
display(df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").limit(2))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME
United States,Romania
United States,Croatia


In [28]:
# 컬럼을 참조하는 다양한 방법
display(df.select(
  expr("DEST_COUNTRY_NAME"),
  col("DEST_COUNTRY_NAME"),
  column("DEST_COUNTRY_NAME")
).limit(2))

DEST_COUNTRY_NAME,DEST_COUNTRY_NAME.1,DEST_COUNTRY_NAME.2
United States,United States,United States
United States,United States,United States


In [29]:
# expr을 이용한 컬럼 참조
display(df.select(expr("DEST_COUNTRY_NAME AS destination")).limit(2))

destination
United States
United States


In [30]:
display(df.select(expr("DEST_COUNTRY_NAME AS destination").alias("DEST_COUNTRY_NAME")).limit(2))

DEST_COUNTRY_NAME
United States
United States


- select메서드에 expr함수를 사용하는 패턴을 자주 사용함
- 스파크는 이를 위해 selectExpr 메서드를 제공

In [32]:
# selectExpr 활용 예시
display(df.selectExpr("DEST_COUNTRY_NAME AS newColumnName", "DEST_COUNTRY_NAME").limit(2))

newColumnName,DEST_COUNTRY_NAME
United States,United States
United States,United States


In [33]:
display(df.selectExpr("*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) AS withinCountry").limit(2))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,withinCountry
United States,Romania,15,False
United States,Croatia,1,False


In [34]:
# 집계함수 지정
display(df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))"))

avg(count),count(DISTINCT DEST_COUNTRY_NAME)
1770.765625,132


### 5.4.3 스파크 데이터 타입으로 변환하기

In [36]:
# 리터럴(literal)을 사용한 컬럼 추가
from pyspark.sql.functions import lit

display(df.select(expr("*"), lit(1).alias("one")).limit(2))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,one
United States,Romania,15,1
United States,Croatia,1,1


### 5.4.4 컬럼 추가하기

In [38]:
# withColumn으로 컬럼 추가
display(df.withColumn("numberOne", lit(1)).limit(2))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,numberOne
United States,Romania,15,1
United States,Croatia,1,1


In [39]:
# 컬럼 비교 : (변수이름, 연산)
display(df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).limit(2))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,withinCountry
United States,Romania,15,False
United States,Croatia,1,False


### 5.4.5 컬럼명 바꾸기

In [41]:
# withColumnRenamed
df.withColumnRenamed("DEST_COUNTRY_NAME", "DEST").columns

### 5.4.6 예약 문자와 키워드
- 공백이나 하이픈(-)과 같은 예약 문자를 컬럼명에서 사용하려면 백틱(\`) 문자를 사용해야 함

In [43]:
# withColumn, selectExpr, select 차이점
dfWithLongColName = df.withColumn("This Long Column-Name", expr("ORIGIN_COUNTRY_NAME")) # 첫번째 인수에서 사용하지 않음
display(dfWithLongColName.limit(2))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,This Long Column-Name
United States,Romania,15,Romania
United States,Croatia,1,Croatia


In [44]:
display(dfWithLongColName.selectExpr("`This Long Column-Name`", "`This Long Column-Name` as `new col`").limit(2))

This Long Column-Name,new col
Romania,Romania
Croatia,Croatia


In [45]:

display(dfWithLongColName.select(expr("`This Long Column-Name`")).limit(2))

This Long Column-Name
Romania
Croatia


### 5.4.7 대소문자 구분
- 기본적으로 스파크는 대소문자를 가리지 않음
~~~
set spark.sql.caseSensitive true # 대소문자를 구분하기 위한 옵션
~~~

### 5.4.8 컬럼 제거하기

In [48]:
# drop함수
df.drop("ORIGIN_COUNTRY_NAME").columns
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").columns # 여러 컬럼을 지우기

### 5.4.9 컬럼의 데이터 타입 변경하기

In [50]:
# cast 함수
df.withColumn("count2", col("count").cast("string"))

### 5.4.10 로우 필터링하기

In [52]:
# filter, where 함수
display(df.filter(col("count") < 2).limit(2))

In [53]:
display(df.where(col("count") < 2).limit(2)) # 동일한 결과를 리턴

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Croatia,1
United States,Singapore,1


In [54]:
# 같은 표현식에 여러 필터를 적용
display(df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").limit(2))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Singapore,1
Moldova,United States,1


### 5.4.11 고유한 로우 얻기

In [56]:
# distinct 함수
print(df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count())
print(df.select("DEST_COUNTRY_NAME").distinct().count())
print(df.select("ORIGIN_COUNTRY_NAME").distinct().count())

### 5.4.12 무작위 샘플 만들기

In [58]:
# sample함수
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

### 5.4.13 임의 분할하기

In [60]:
# randomsplit 함수
dataFrame = df.randomSplit([0.25, 0.75], seed)
dataFrame[0].count() > dataFrame[1].count()

### 5.4.14 로우 합치기와 추가하기
- 동일한 스키마와 컬럼 수를 가져야 함

In [62]:
""" union 함수 """
from pyspark.sql import Row

schema = df.schema
newRows = [
    Row("New Country", "Other Country", 5),
    Row("New Country 2", "Other Country 3", 1)
]

# Parallelized Collections :
# Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program.
# The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. 

parallelizedRows = spark.sparkContext.parallelize(newRows) 
newDF = spark.createDataFrame(parallelizedRows, schema)

display(newDF)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
New Country,Other Country,5
New Country 2,Other Country 3,1


In [63]:
display(df.union(newDF)\
    .where("count = 1")\
    .where(col("ORIGIN_COUNTRY_NAME") != "United States"))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Croatia,1
United States,Singapore,1
United States,Gibraltar,1
United States,Cyprus,1
United States,Estonia,1
United States,Lithuania,1
United States,Bulgaria,1
United States,Georgia,1
United States,Bahrain,1
United States,Papua New Guinea,1


### 5.4.15 로우 정렬하기
- asc, desc 함수를 사용해서 정렬 순서를 지정
- asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last 메서드로 null의 정렬 순서를 지정
- sortWithinPartitions 함수는 파티션별 정렬을 지원

In [65]:
# sort, orderBy 함수
display(df.sort("count").limit(5))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
Malta,United States,1
Saint Vincent and the Grenadines,United States,1
United States,Croatia,1
United States,Gibraltar,1
United States,Singapore,1


In [66]:
display(df.orderBy("count", "DEST_COUNTRY_NAME").limit(5))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
Burkina Faso,United States,1
Cote d'Ivoire,United States,1
Cyprus,United States,1
Djibouti,United States,1
Indonesia,United States,1


In [67]:
display(df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).limit(5))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
Burkina Faso,United States,1
Cote d'Ivoire,United States,1
Cyprus,United States,1
Djibouti,United States,1
Indonesia,United States,1


In [68]:
# 정렬순서 지정하기
from pyspark.sql.functions import desc, asc

display(df.orderBy(expr("count desc")).limit(5))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
Malta,United States,1
Saint Vincent and the Grenadines,United States,1
United States,Croatia,1
United States,Gibraltar,1
United States,Singapore,1


In [69]:
display(df.orderBy(col("ORIGIN_COUNTRY_NAME").desc(), col("DEST_COUNTRY_NAME").asc()).limit(5))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Vietnam,2
United States,Venezuela,246
United States,Uruguay,13
Algeria,United States,4
Angola,United States,15


In [70]:
# 파티션별 정렬
# 최적화는 3부에서 자세히 소개할 예정
display(spark.read.format("json").load("/FileStore/tables/data/flight-data/json/*_summary-*.json").sortWithinPartitions("count"))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Croatia,1
United States,Singapore,1
Moldova,United States,1
Malta,United States,1
United States,Gibraltar,1
Saint Vincent and the Grenadines,United States,1
Suriname,United States,1
United States,Cyprus,1
Burkina Faso,United States,1
Djibouti,United States,1


### 5.4.16 로우 수 제한하기

In [72]:
display(df.limit(5))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62


In [73]:
display(df.orderBy(expr("count desc")).limit(6))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
Malta,United States,1
Saint Vincent and the Grenadines,United States,1
United States,Croatia,1
United States,Gibraltar,1
United States,Singapore,1
Moldova,United States,1


### 5.4.17 repartition과 coalesce
- 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에 사용(repartition, 셔플이 필수로 발생)
- 자주 필터링되는 컬럼을 기준으로 파티션 재분배를 권장

In [75]:
display(df)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [76]:
# 파티션 나누기
df.rdd.getNumPartitions()

In [77]:
df.repartition(5)

In [78]:
df.repartition(col("DEST_COUNTRY_NAME"))

In [79]:
df.repartition(5, col("DEST_COUNTRY_NAME"))

In [80]:
# 파티션 합치기
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
df.rdd.getNumPartitions()

### 5.4.18 드라이버로 로우 데이터 수집
- 대규모 데이터셋에 collect 명령을 수행하면 드라이버 비정상 종료 우려

In [82]:
collectDF = df.limit(5)
collectDF.take(5) # 정수를 인수값으로 사용
collectDF.show()  # 결과를 정돈된 형태로 출력
collectDF.show(5, False)
collectDF.collect() # 전체 모든 데이터를 수집