In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
.appName("chapter8") \
.getOrCreate()

24/10/04 14:01:11 WARN Utils: Your hostname, gimsehyeon-ui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.89 instead (on interface en0)
24/10/04 14:01:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/04 14:01:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
person = spark.createDataFrame([
    (0, "Ning Ning", 0, [100]),
    (1, "Kang hae rin", 1, [500, 250, 100]),
    (2, "Karina", 1, [250, 100])
]) \
.toDF("id", "name", "graduate_program", "spark_status")

graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "Harvard"),
    (1, "Ph D", "EECS", "MIT"),
]) \
.toDF("id", "degree", "department", "school")

sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor"),
]) \
.toDF("id", "status")

In [5]:
person.createOrReplaceTempView("persion")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

In [7]:
person.show()

                                                                                

+---+------------+----------------+---------------+
| id|        name|graduate_program|   spark_status|
+---+------------+----------------+---------------+
|  0|   Ning Ning|               0|          [100]|
|  1|Kang hae rin|               1|[500, 250, 100]|
|  2|      Karina|               1|     [250, 100]|
+---+------------+----------------+---------------+



## Inner Join

내부 조인은 DataFrame이나 테이블에 존재하는 키를 평가한다. 그리고 TRUE으로 평가되는 로우만 결합한다. 다음은 graduateProgram DataFrame과 person DataFrame을 조인해 새로운 DataFrame을 만드는 예제이다.

In [9]:
joinExpression = person["graduate_program"] == graduateProgram["id"]

person.join(graduateProgram, joinExpression).show()



+---+------------+----------------+---------------+---+-------+--------------------+-----------+
| id|        name|graduate_program|   spark_status| id| degree|          department|     school|
+---+------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Ning Ning|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|Kang hae rin|               1|[500, 250, 100]|  1|   Ph D|                EECS|        MIT|
|  2|      Karina|               1|     [250, 100]|  1|   Ph D|                EECS|        MIT|
+---+------------+----------------+---------------+---+-------+--------------------+-----------+



                                                                                

## Outer Join

외부 조인은 DataFrame이나 테이블에 존재하는 키를 평가해 참/거짓으로 평가한 로우를 포함(그리고 조인)한다. 왼쪽이나 오른쪽 DataFrame에 일치하는 로우가 없다면 스파크는 해당 위치에 NULL을 삽입한다.

In [10]:
person.join(graduateProgram, joinExpression, "outer").show()

+----+------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|        name|graduate_program|   spark_status| id| degree|          department|     school|
+----+------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Ning Ning|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|Kang hae rin|               1|[500, 250, 100]|  1|   Ph D|                EECS|        MIT|
|   2|      Karina|               1|     [250, 100]|  1|   Ph D|                EECS|        MIT|
|NULL|        NULL|            NULL|           NULL|  2|Masters|                EECS|    Harvard|
+----+------------+----------------+---------------+---+-------+--------------------+-----------+



## Left Outer Join

왼쪽 외부 조인은 DataFrame이나 테이블에 존재하는 키를 평가한다. 그리고 왼쪽 DataFrame의 모든 로우와 왼쪽 DataFrame과 일치하는 오른쪽 DataFrame의 로우를 함께 포함한다.

오른쪽 DataFrame에 일치하는 로우가 없다면 스파크는 해당 위치에 NULL을 삽입한다.

In [11]:
graduateProgram.join(person, joinExpression, "left_outer").show()

+---+-------+--------------------+-----------+----+------------+----------------+---------------+
| id| degree|          department|     school|  id|        name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Ning Ning|               0|          [100]|
|  2|Masters|                EECS|    Harvard|NULL|        NULL|            NULL|           NULL|
|  1|   Ph D|                EECS|        MIT|   2|      Karina|               1|     [250, 100]|
|  1|   Ph D|                EECS|        MIT|   1|Kang hae rin|               1|[500, 250, 100]|
+---+-------+--------------------+-----------+----+------------+----------------+---------------+



## Right Outer Join

오른쪽 외부 조인은 DataFrame이나 테이블에 존재하는 키를 평가한다. 그리고 오른쪽 DataFrame의 모든 로우와 오른쪽 DataFrame과 일치하는 왼쪽 DataFrame의 로우를 함께 포함한다.

왼쪽 DataFrame에 일치하는 로우가 없다면 스파크는 해당 위치에 NULL을 삽입한다.

In [12]:
person.join(graduateProgram, joinExpression, "right_outer").show()

+----+------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|        name|graduate_program|   spark_status| id| degree|          department|     school|
+----+------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Ning Ning|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|NULL|        NULL|            NULL|           NULL|  2|Masters|                EECS|    Harvard|
|   2|      Karina|               1|     [250, 100]|  1|   Ph D|                EECS|        MIT|
|   1|Kang hae rin|               1|[500, 250, 100]|  1|   Ph D|                EECS|        MIT|
+----+------------+----------------+---------------+---+-------+--------------------+-----------+



## Left Semi Join

세미 조인은 오른쪽 DataFrame의 어떤 값도 포함하지 않기 때문에 다른 조인 타입과는 약간 다르다. 단지 두 번째 DataFrame은 값이 존재하는지 확인하기 위해 값만 비교하는 용도로 사용한다. 

만약 값이 존재한다면 왼쪽 DataFrame에 중복 키가 존재하더라도 해당 로우는 결과에 포함된다. 왼쪽 세미 조인은 기존 조인 기능과는 달리 DataFrame의 필터 정도로 볼 수 있다.

In [14]:
gradProgram2 = graduateProgram.union(spark.createDataFrame([
    (0, "Masters", "Duplicated Row", "Duplicated School"),
]))

gradProgram2.createOrReplaceTempView("gradProgram2")

gradProgram2.join(person, joinExpression, "left_semi").show()

+---+-------+--------------------+-----------------+
| id| degree|          department|           school|
+---+-------+--------------------+-----------------+
|  0|Masters|School of Informa...|      UC Berkeley|
|  1|   Ph D|                EECS|              MIT|
|  0|Masters|      Duplicated Row|Duplicated School|
+---+-------+--------------------+-----------------+



## Left Anti Join

왼쪽 안티 조인은 왼쪽 세미 조인의 반대 개념이다. 왼쪽 세미 조인처럼 오른쪽 DataFrame의 어떤 값도 포함하지 않는다. 단지 두 번째 DataFrame은 값이 존재하는지 확인하기 위해 값만 비교하는 용도로 사용한다.

하지만 두 번째 DataFrame에 존재하는 값을 유지하는 대신 두 번째 DataFrame에서 관련된 키를 찾을 수 없는 로우만 결과에 포함한다.

안티 조인은 SQL의 NOT IN과 같은 스타일의 필터로 볼 수 있다.

In [16]:
graduateProgram.join(person, joinExpression, "left_anti").show()

+---+-------+----------+-------+
| id| degree|department| school|
+---+-------+----------+-------+
|  2|Masters|      EECS|Harvard|
+---+-------+----------+-------+



## Natural Join

자연조인은 조인하려는 컬럼을 암시적으로 추정한다. 즉, 일치하는 컬럼을 찾고 그 결과를 반환한다. 왼쪽과 오른쪽 그리고 외부 자연 조인을 사용할 수 있다.

## Cross Join | Cartesian Join

교차 조인은 조건절을 기술하지 않은 내부 조인을 의미한다. 교차 조인은 왼쪽 DataFrame의 모든 로우를 오른쪽 DataFrame의 모든 로우와 결합한다. 교차 조인을 거치면 정말 엄청난 수의 로우를 가진 DataFrame이 생성될 수 있다.

1000개의 로우가 존재하는 두 개의 DataFrame에 교차 조인을 수행하면 1000000개의 결과 로우가 생성될 수 있다.

따라서 반드시 키워드를 사용해 교차 조인을 수행한다는 것을 명시적으로 선언해야 한다.

In [17]:
graduateProgram.join(person, joinExpression, "cross").show()

+---+-------+--------------------+-----------+---+------------+----------------+---------------+
| id| degree|          department|     school| id|        name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+---+------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|  0|   Ning Ning|               0|          [100]|
|  1|   Ph D|                EECS|        MIT|  1|Kang hae rin|               1|[500, 250, 100]|
|  1|   Ph D|                EECS|        MIT|  2|      Karina|               1|     [250, 100]|
+---+-------+--------------------+-----------+---+------------+----------------+---------------+



## 복합 데이터 타입의 조인

복합 데이터 타입의 조인이 어려워 보이지만 사실 그렇지 않다. 불리언을 반환하는 모든 표현식은 조인 표현식으로 간주할 수 있다.

In [18]:
person.withColumnRenamed("id", "personId") \
.join(sparkStatus, expr("array_contains(spark_status, id)")).show()

                                                                                

+--------+------------+----------------+---------------+---+--------------+
|personId|        name|graduate_program|   spark_status| id|        status|
+--------+------------+----------------+---------------+---+--------------+
|       0|   Ning Ning|               0|          [100]|100|   Contributor|
|       1|Kang hae rin|               1|[500, 250, 100]|500|Vice President|
|       1|Kang hae rin|               1|[500, 250, 100]|250|    PMC Member|
|       1|Kang hae rin|               1|[500, 250, 100]|100|   Contributor|
|       2|      Karina|               1|     [250, 100]|250|    PMC Member|
|       2|      Karina|               1|     [250, 100]|100|   Contributor|
+--------+------------+----------------+---------------+---+--------------+

