# DataFrame 기본 연산 - 2
다룰 내용
- 리터럴
- 5.4.3 스파크 데이터 타입으로 변환하기

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("Spark Test") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark

In [3]:
df = spark.read.format("json").load("../data/2015-summary.json")
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



                                                                                

#### 5.4.3 스파크 데이터 타입으로 변환하기
- 리터럴

In [13]:
from pyspark.sql.functions import lit, expr, col

df.select(expr("*"), lit(1).alias("One")).show(3)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
|    United States|            Ireland|  344|  1|
+-----------------+-------------------+-----+---+
only showing top 3 rows



In [7]:
# Using selectExpr
df.selectExpr("*", "1 as One").show(3)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
|    United States|            Ireland|  344|  1|
+-----------------+-------------------+-----+---+
only showing top 3 rows



#### 5.4.4 컬럼 추가하기, 컬럼명 변경하기

In [8]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [10]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [11]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

#### 5.4.7 대소문자 구분
- spark는 대소문자를 가리지 않음
- 아래 옵션을 통해, 대소문자 구분 가능
  - set spark.sql.caseSensitive true


#### 5.4.8 컬럼명 변경 / 컬럼 데이터 형 변경하기

In [12]:
df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

In [14]:
df.withColumn("count", col("count").cast("string"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: string]

#### 5.3.10 where 조건 / Distinct 

In [15]:
df.where("count < 2").show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 3 rows



In [16]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

                                                                                

256

#### 5.3.12 Random Sampling / Random Split

In [17]:
seed = 5
with_replacement = False
fraction = 0.5
df.sample(withReplacement=with_replacement, fraction=fraction, seed=seed).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [23]:
splited_df = df.randomSplit([0.25, 0.75], seed)
print(splited_df[0].count(), splited_df[1].count())

71 185


#### 5.4.14 로우 합치기와 추가하기
- DataFrame은 불변성을 가짐 -> 레고드 추가 불가능
- 원본 DataFrame과 새로운 DataFrame을 통합해야 함
- 반드시 **동일한 스키마와 컬럼 수**

In [26]:

from pyspark.sql import Row

schema = df.schema
new_rows = [
    Row("New Country", "Other Country", 5),
    Row("New Country 2", "Other Country 3", 1)
]
parallelized_rows = spark.sparkContext.parallelize(new_rows)
new_df = spark.createDataFrame(parallelized_rows, schema=schema)

df.union(new_df).where("count=1").where(col("ORIGIN_COUNTRY_NAME") != "United States").show()


[Stage 31:>                                                         (0 + 1) / 1]

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



                                                                                

In [29]:
df.sort("count").show(5)
df.orderBy("count", expr("DEST_COUNTRY_NAME desc")).show(5) # expr 필수

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



#### 컬럼 sortWithinPartitions
- 많이 사용하는 컬럼을 기준으로 파티셔닝

In [31]:
"""
# 트랜스포메이션 처리 전, 성능 최적화를 위한 파티션별 정렬 수행
# sortWithinPartition( col 명 )
""" 
spark.read.format("json").load("../data/2015-summary.json").sortWithinPartitions("count")


DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

#### 5.4.17 repartition 과 coalesce
- 자주 필터링하는 컬럼을 기준으로 데이터 분할
- `repartition` : 무조건 전체 데이터 셔플, 컬럼 기준 파티션 생성 및 향후 사용할 파티션 수가 많아질 경우 사용
- `coalesce` : 전체 데이터를 셔플 없이 병합하는 메소드 

In [33]:
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [36]:
# 5개의 파티션으로 분할 후, 셔플없이 2개로 병합
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)


DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

#### 5.4.18 드라이버로 로우 데이터 수집하기
- 스파크는 드라이버에서 클러스터 상태 정보를 유지
- `collect` : All data
- `take` : N-rows
- `show` : prettier

## **주의**
  > 드라이버로 모든 데이터 컬렉션을 수집하는 것은 매우 큰 비용 발생. collect, toLocalIterator 모두 마찬가지.

In [38]:
collect_df = df.limit(10)
collect_df.take(5)
collect_df.show()
collect_df.collect()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+



[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

23/04/25 18:05:39 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1077260 ms exceeds timeout 120000 ms
23/04/25 18:05:40 WARN SparkContext: Killing executors is not supported by current scheduler.
