In [4]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

# Join

## 목표
* join type
* join 사용법
* 스파크가 클러스터에서 조인을 실행하는 원리

## 8.1 Join 표현식(Join expression)
* 조인표현식에 따라 왼쪽과 오른쪽 데이터셋을 결합한다.
* 두 로우의 조인 여부를 결정한다.

### 동등 조인(equi join)
* 왼쪽과 오른쪽 데이터셋에 지정된 키가 동일하지 비교하는 조인
* 일치하는 키가 없는 로우의 경우 조인에 포함하지 않음.

## 8.2 Join Type
결과 데이터셋에 어떤 데이터가 있어야 하는지를 결정

* 내부 조인(inner join) - 왼쪽과 오른쪽에 키가 있는 로우를 유지
* 외부 조인(outer join) - 왼쪽 혹은 오른쪽에 키가 있는 로우를 유지
  - 왼쪽 혹은 오른쪽에 키가 존재하지 않는 로우일 경우 null을 채워 넣는다.
* 왼쪽 외부 조인(left outer join) - 왼쪽 데이터셋에 키가 있는 로우를 유지
* 오른쪽 외부 조인(right outer join) - 오른쪽 데이터셋에 키가 있는 로우를 유지
* 왼쪽 세미 조인(left semi join)
  - 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 있는 경우에 키가 일치하는 왼쪽 데이터셋만 유지
  - 다른 조인과 달리 필터 정도로 볼 수 있다.
* 왼쪽 안티 조인(left anti join) - 왼쪽 데이터셋의 키가 오른쪽 데이터셋에 없는 경우에 왼쪽 데이터셋만 유지
* 자연 조인(natural join) 
  - 두 데이터 셋에 동일한 이름을 가진 컬럼을 암식적으로 결합하는 조인
  - "암시"적인 것은 언제나 조심해서 사용해야 한다.
* 교차 조인(cross join) - 왼쪽 데이터셋의 모든 로우와 오른쪽 데이터셋의 모든 경우를 조합

## 8.11 조인 사용 시 문제점

### 복합 데이터 타입의 조인
  * 불리언을 반환하는 모든 표현식은 조인 표현식으로 간주
### 중복 컬럼명 처리
DataFrame의 각 column은 스파크 SQL 엔진인 카탈리스트내의 고유 ID가 존재. \
고유 ID는 카탈리스트 엔진 내에서만 사용이 가능할 뿐, 외부에서 참조할 수는 없다.
 
* 문제 상황 2가지
  - 조인에 사용할 특정 키가 동일한 이름을 가지며, 키가 제거되지 않도록 조인 표현식에 명시하는 경우.
  - 조인 대상이 아닌 두 개의 컬럼이 동일한 이름을 가지는 경우.
  
## 8.12 스파크의 조인 수행 방식
### 네트워크 통신 전략
* 셔플 조인 
  - 전제 노드 간 통신이 발생한다. (네트워크 부하가 크다)
* 브로드캐스트 조인
  - 큰 DF와 작은 DF 사이에 발생하는 조인으로, 작은 DF(하나의 파티션에 올라갈 정도로 작은 크기)를 큰DF의 파티션들이 잇는 노드에 복사하여 조인을 수행.
  - broadcast() 함수를 이용해, 옵티마이저에 힌트를 줄 수 있다.

In [2]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])
]).toDF("id", "name", "graduate_program", "spark_status")

graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D", "EECS", "UC Berkeley")
]).toDF("id", "degree", "department", "school")

sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")
]).toDF("id", "status")

In [15]:
# 내부 조인

# join expression
joinExpression = person["graduate_program"] == graduateProgram["id"]
wrongJoinExpression = person["name"] == graduateProgram["school"]

# join type
joinType = "inner"
person.join(graduateProgram, joinExpression, joinType).show()

# 외부 조인
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()

# 왼쪽 외부 조인
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()

# 오른쪽 외부 조인
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()

# 왼쪽 세미 조인
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()

gradProgram2 = graduateProgram.union(
    spark.createDataFrame([
        (0, "Masters", "Duplicated Row", "Duplicated School")]))

gradProgram2.join(person, joinExpression, joinType).show()

# 왼쪽 안티 조인
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()

# 교차 조인
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()
graduateProgram.crossJoin(person).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|   Ph.D|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|   Ph.D|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------

In [19]:
# 복합 데이터 타입의 조인

person.printSchema()
sparkStatus.printSchema()

person.withColumnRenamed("id", "personId") \
  .join(sparkStatus, F.expr("array_contains(spark_status, id)")).show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- graduate_program: long (nullable = true)
 |-- spark_status: array (nullable = true)
 |    |-- element: long (containsNull = true)

root
 |-- id: long (nullable = true)
 |-- status: string (nullable = true)

+--------+----------------+----------------+---------------+---+--------------+
|personId|            name|graduate_program|   spark_status| id|        status|
+--------+----------------+----------------+---------------+---+--------------+
|       0|   Bill Chambers|               0|          [100]|100|   Contributor|
|       1|   Matei Zaharia|               1|[500, 250, 100]|500|Vice President|
|       1|   Matei Zaharia|               1|[500, 250, 100]|250|    PMC Member|
|       1|   Matei Zaharia|               1|[500, 250, 100]|100|   Contributor|
|       2|Michael Armbrust|               1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|               1|     [250, 100]|100|   Contribu

In [36]:
# 중복 컬럼명 처리
# 동일한 이름을 가진 키로 조인 한 후, 동일한 이름의 키를 select 하면 error가 발생한다.

gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")
gradProgramDupe.printSchema()
person.printSchema()

joinExpr = gradProgramDupe["graduate_program"] == person["graduate_program"]
#joinedDF = person.join(gradProgramDupe, joinExpr)
#joinedDF.select("graduate_program").show()

# Solution1 - 다른 표현식 사용 (조인 시 중복 컬럼 자동으로 제거)
person.join(gradProgramDupe, "graduate_program").select("graduate_program").show()

# Solution2 - 조인 후 컬럼 제거 (원래 DF 이름 명시 필요)
person.join(gradProgramDupe, joinExpr).drop(person["graduate_program"]).select("graduate_program").show()
person.join(gradProgramDupe, joinExpr).drop(person.graduate_program).select("graduate_program").show()

# Solution3 - 조인 전 컬럼명 변경
gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")
joinExpr = person.id == gradProgram3.grad_id
person.join(gradProgram3, joinExpr).show()


root
 |-- graduate_program: long (nullable = true)
 |-- degree: string (nullable = true)
 |-- department: string (nullable = true)
 |-- school: string (nullable = true)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- graduate_program: long (nullable = true)
 |-- spark_status: array (nullable = true)
 |    |-- element: long (containsNull = true)

+----------------+
|graduate_program|
+----------------+
|               0|
|               1|
|               1|
+----------------+

+----------------+
|graduate_program|
+----------------+
|               0|
|               1|
|               1|
+----------------+

+----------------+
|graduate_program|
+----------------+
|               0|
|               1|
|               1|
+----------------+

+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status|grad_id| degree|          department|     school|
+---+----