# 구조적 API 실습

In [1]:
sc

In [2]:
ls

2010-12-01.csv  2015-summary.json
2010-12-02.csv  20201010_structured_api.ipynb


In [3]:
pwd

'/home/ubuntu/spark_study/1010practice'

In [4]:
# 파일 읽기

df = spark.read.format("json").load("file:///home/ubuntu/spark_study/1010practice/2015-summary.json")

In [5]:
# 스키마 확인

df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [6]:
# pandas의 header와 동일
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

# 데이터 프레임 조작


In [7]:
# 스키마의 형태 확인

df.schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

In [10]:
# Schema를 만들기 위해 StructField, StructType 과 사용하고자하는 데이터 타입 StringType 과 LongType을 가져옵니다.
from pyspark.sql.types import StructField, StructType, StringType, LongType

# 순서대로 이름, 데이터타입, Null 여부
# Null이 False인 경우 metadata 지정이 가능)
mySchema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False, metadata = {"hello" : "world"})
])


df = spark.read.format("json").schema(mySchema).load("file:///home/ubuntu/spark_study/1010practice/2015-summary.json")

In [11]:
# DataFrame에 어떤 칼럼이 있을까?

df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [12]:
from pyspark.sql.functions import col, column, exp

# 컬럼은 다음과 같이 col 또는 column이라는 함수로 column 객체를 정의할 수도 있습니다.

col("someColumnName")

Column<b'someColumnName'>

In [13]:
#컬럼은 expression이다. 이정도만 알아두시면 될거 같아여
exp("someColumnName")

Column<b'EXP(someColumnName)'>

In [14]:
#다음은 로우를 확인하게 됩니다

df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [16]:
from pyspark.sql import Row

#다음과 같이 Row 객체를 이용해 정의도 가능합니다.
myRow = Row("세현", None, 1, False)

In [17]:
# Indexing도 가능합니다
myRow[0], myRow[2]

('세현', 1)

### DataFrame 조작 및 생성

In [18]:
# 실습용 스키마를 만들어주도록 하겠습니다.
# 다시 한 번 말씀드리자면 이름 - 데이터타입 - Nan값 여부 입니다.

practiceSchema = StructType([
    StructField("some", StringType(), True),
    StructField("col", StringType(), True),
    StructField("names", LongType(), False)
])

In [19]:
# Row를 생성하고
practiceRow = Row("passion", None, 16)

# 그 row와 schema 구조를 따라서 df 생성
practicedf = spark.createDataFrame([practiceRow], practiceSchema)

# 굳
practicedf.show()

+-------+----+-----+
|   some| col|names|
+-------+----+-----+
|passion|null|   16|
+-------+----+-----+



### select와 selectExpr
SQL 구문과 비슷합니다

In [20]:
# 두개의 컬럼만 2줄 보여줍니다
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [21]:
from pyspark.sql.functions import expr

# expr로 column을 조작하는거죠. 아까 나왔습니당
df.select(expr("DEST_COUNTRY_NAME AS passionyang")).show(2)

+-------------+
|  passionyang|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [22]:
# 요런 방식도 가능합니다

df.selectExpr("DEST_COUNTRY_NAME as passionyang").show(2)

+-------------+
|  passionyang|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [23]:
# 앞선 두 컬럼이 같은 지를 확인하는 withinCountry 칼럼을 만들었습니다
# *는 모든 컬럼을 포함하라는 말입니다

df.selectExpr("*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry" ).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [24]:
# count의 평균과, DEST_COUNTRY_NAME의 유니크한 수를 count 해주었습니다.

df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME)) as unique_count").show(2)

+-----------+------------+
| avg(count)|unique_count|
+-----------+------------+
|1770.765625|         132|
+-----------+------------+



### 명시적인 값 넣기 lit

In [25]:
# 명시적인 값을 넣기 위해선 lit 이라는 함수를 사용합니다 (literal)

from pyspark.sql.functions import lit

df.select(expr("*"),lit(1).alias("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



### 칼럼 추가 withcolumn

In [26]:
# 아까랑 같긴 한데 좀 형식적으로 하기 위해선 Withcolumn을 사용합니다

df.withColumn("One",lit(1)).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [27]:
df.withColumnRenamed("DEST_COUNTRY_NAME","dest").show(2)

+-------------+-------------------+-----+
|         dest|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|            Romania|   15|
|United States|            Croatia|    1|
+-------------+-------------------+-----+
only showing top 2 rows



In [28]:
# cast를 이용하여 column의 데이터타입을 변경하는 것도 가능합니다.

df.withColumn("count2" ,col("count").cast("string")).schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true),StructField(count2,StringType,true)))

### 칼럼 제거 drop

In [29]:
# 드랍하고 남은 칼럼 세어봅니다

df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

### Row 필터링 filter 와 where

In [30]:
# 이번엔 row를 조작하는데 filter, where 둘다 같습니다

df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [31]:
#where 여러개 동시에도 가능합니다
df.where(col("count") <2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [32]:
#unique한 값을 얻으려면 distinct 함수를 사용합니다.
df.select("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").distinct().count()

256

In [33]:
# 랜덤 샘플 생성
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement,fraction,seed).count()

126

In [34]:
# 랜덤하게 split 후 확인
dataFrames = df.randomSplit([0.25,0.75],seed)
dataFrames[0].count() 

60

In [35]:
dataFrames[1].count()

196

### Row 합치기와 추가하기

In [36]:
# DataFrame은 immutable 하기 때문에 사실 새로운 row를 추가해준다는 개념이 없습니다.
# 따라서 두 개의 DataFrame을 만들어 두 개를 통합 (union) 해주어야합니다.

from pyspark.sql import Row

# 기존 df의 스키마를 따와, newRows 라는 데이터를 형성합니다.
schema = df.schema
newRows = [
    Row("New Country", "Other Country", 5),
    Row("New New Country", "Other Country", 1)
]

#newDF로 데이터프레임화시킵니다.
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

In [37]:
# df와 newDF를 통합(union) 하고 count 가1, ORIGIN_COUNTRY_NAME 이 'Untied States' 가 아닌 친구들을 보여줍니다.

df.union(newDF).where("count=1").where(col("ORIGIN_COUNTRY_NAME") != "United States").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|  New New Country|      Other Country|    1|
+-----------------+-------------------+-----+



### 데이터 정렬하기 sort 와 orderBy

In [39]:
# 데이터를 정렬하고 싶을 때 sort와 orderBy를 사용합니다. 두 개는 동일합니다.

df.sort("count").show(5)
df.orderBy("count").show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [40]:
from pyspark.sql.functions import desc, asc

In [41]:
# 이런 식으로 오름차순, 내림차순의 설정도 가능하네요 :) 

df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(5)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [42]:
# 추출할 row의 수를 제한하고 싶으면 다음과 같이 limit 함수를 사용해주시면 됩니다 :) 

df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [43]:
#파티션 나누기도 가능합니다
df.repartition(5,col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

### 데이터 수집하기 take() collect() show()

In [44]:
# 우리는 분산처리에 대해 공부하고 있기 때문에 지금 보고 있는 DataFrame 또한 여러 클러스터에 나눠져있습니다.
# 따라서 로컬환경에서 다루고자 한다면, 데이터를 "수집"해야합니다.

df.take(5) # take는 상위 N개의 row를 반환합니다. 안 이쁘네요.

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

In [45]:
df.collect() # DataFrame의 모든 데이터를 수집합니다. 이쁘진 않네요

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Sint Maarten', count=325),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Marshall Islands', count=39),
 

In [46]:
df.show(5) #이렇게 해야 예쁘다.

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



## 집계함수

In [48]:
#다른 데이터도 불러옵니다
df = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("file:///home/ubuntu/spark_study/1010practice/2010-12-01.csv")\
  .coalesce(5)

### count

In [49]:
# 레코드의 전체 수를 확인하기 위해 count 함수를 사용합니다.
from pyspark.sql.functions import count
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|            3108|
+----------------+



### countDistinct

In [50]:
# 고유 레코드 수를 알고자 한다면 countDistinct를 사용해줍니다.
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     1351|
+-------------------------+



### approx_count_Distinct

In [51]:
# 어느 정도 수준의 정확도를 가지는 적당한 근사치만 알고 싶을 수 있습니다. 이건 위에보다 빠릅니다
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            1382|
+--------------------------------+



### first, last

In [52]:
# 가장 첫 열과 마지막 열을 보기 위해 위에서 사용했던 것처럼 first와 last를 이용할 수 있습니다.
from pyspark.sql.functions import first, last
df.select(first("StockCode"), last("StockCode")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          20755|
+----------------+---------------+



### min, max, sum, sumDistinct, avg

In [53]:
# 예상하셨겠지만 min, max, sum, avg 또한 가능합니다. sumDistinct는 고유값들의 합입니다.
from pyspark.sql.functions import min, max, sum, avg, sumDistinct

df.select(min("Quantity"), max("Quantity"), sum("Quantity"), avg("Quantity"), sumDistinct("Quantity")).show()

+-------------+-------------+-------------+-----------------+----------------------+
|min(Quantity)|max(Quantity)|sum(Quantity)|    avg(Quantity)|sum(DISTINCT Quantity)|
+-------------+-------------+-------------+-----------------+----------------------+
|          -24|          600|        26814|8.627413127413128|                  4690|
+-------------+-------------+-------------+-----------------+----------------------+



### 분산과 표준편차

In [54]:
# 분산과 표준편차도 제공합니다
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp
df.select(var_pop("Quantity"), var_samp("Quantity"),
  stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
|695.2492099104054| 695.4729785650273|  26.367578764657278|   26.371821677029203|
+-----------------+------------------+--------------------+---------------------+



### 공분산과 상관관계

In [55]:
from pyspark.sql.functions import corr, covar_pop, covar_samp
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
    covar_pop("InvoiceNo", "Quantity")).show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     -0.12225395743668731|            -235.56327681311157|            -235.4868448608685|
+-------------------------+-------------------------------+------------------------------+



### Groupby

In [56]:
from pyspark.sql.functions import count, expr

df.groupBy("InvoiceNo").agg(
    count("Quantity").alias("quan")).show()

+---------+----+
|InvoiceNo|quan|
+---------+----+
|   536596|   6|
|   536597|  28|
|   536414|   1|
|   536550|   1|
|   536460|  14|
|   536398|  17|
|   536523|  12|
|   536374|   1|
|   536386|   3|
|   536577|   4|
|   536477|  14|
|   536583|   1|
|   536528|  57|
|   536585|   1|
|   536366|   2|
|   536592| 592|
|   536541|   1|
|   536387|   5|
|   536385|   7|
|   536375|  16|
+---------+----+
only showing top 20 rows



### MAP

In [57]:
# 이런식으로 매핑도 가능합니다
from pyspark.sql.functions import create_map, col

df.select(create_map(col("InvoiceNo"), col("Quantity"))).show()

+------------------------+
|map(InvoiceNo, Quantity)|
+------------------------+
|           [536365 -> 6]|
|           [536365 -> 6]|
|           [536365 -> 8]|
|           [536365 -> 6]|
|           [536365 -> 6]|
|           [536365 -> 2]|
|           [536365 -> 6]|
|           [536366 -> 6]|
|           [536366 -> 6]|
|          [536367 -> 32]|
|           [536367 -> 6]|
|           [536367 -> 6]|
|           [536367 -> 8]|
|           [536367 -> 6]|
|           [536367 -> 6]|
|           [536367 -> 3]|
|           [536367 -> 2]|
|           [536367 -> 3]|
|           [536367 -> 3]|
|           [536367 -> 4]|
+------------------------+
only showing top 20 rows



### Rollup

In [59]:
#groupby에서 Multi-dimensional을 사용할때 쓰게됩니다
dfNoNull = dfWithDate.drop() 
dfNoNull.createOrReplaceTempView("dfNoNull")


rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
  .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
  .orderBy("Date")
rolledUpDF.show()

#걍 이렇게 하는거라고 알고 넘기자. 이거 다른 df가져와서 한거임.

NameError: name 'dfWithDate' is not defined

## Join 함수

In [60]:
#서로다른 데이터 프레임을 만들어줍니다
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
  .toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
  .toDF("id", "status")


In [61]:
#type과 어떤 기준을 세울지 정해야합니다
joinType = "inner"
joinExpression = person['graduate_program'] == graduateProgram['id']
person.join(graduateProgram,joinExpression,joinType).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [62]:
#type과 어떤 기준을 세울지 정해야합니다
joinType = "outer"
joinExpression = person['graduate_program'] == graduateProgram['id']
person.join(graduateProgram,joinExpression,joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [63]:
#type과 어떤 기준을 세울지 정해야합니다
joinType = "left_outer"
joinExpression = person['graduate_program'] == graduateProgram['id']
graduateProgram.join(person,joinExpression,joinType).show()

+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
| id| degree|          department|     school|  id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Bill Chambers|               0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   1|   Matei Zaharia|               1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   2|Michael Armbrust|               1|     [250, 100]|
|  2|Masters|                EECS|UC Berkeley|null|            null|            null|           null|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+



In [64]:
#type과 어떤 기준을 세울지 정해야합니다
joinType = "left_semi"
joinExpression = person['graduate_program'] == graduateProgram['id']
graduateProgram.join(person,joinExpression,joinType).show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



이 외에도 right, right semi, natural join, cross join 등이 있습니다.

# ----------끝!-----------

## HOMEWORK

# 2010-12-01.csv 와 2010-12-02.csv 두 파일이 있습니다.
# 두 파일을 불러와 준다음에, 우선 하나로 합칩니다.
# 다음은 앞서 실습 때 진행했던 코드를 최소한 7개를 사용해서 하나의 정리된 데이터 프레임을 만들어 주세요. (where, count, groupby, descending, map 뭐든 상관 없습니다)
# 그리고 스샷찍어서 제출해주시면됩니다.