In [183]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("spark_sql_basic2")
sc   = SparkContext(conf=conf)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=spark-sql, master=local) created by getOrCreate at /tmp/ipykernel_3777/4175860230.py:2 

In [None]:
# RDD만을 이용한 데이터 추출

In [3]:

movies_rdd = sc.parallelize([
    (1, ("어벤져스", "마블")),
    (2, ("슈퍼맨", "DC")),
    (3, ("배트맨", "DC")),
    (4, ("겨울왕국", "디즈니")),
    (5, ("아이언맨", "마블"))
])


attendances_rdd = sc.parallelize([
    (1, (13934592, "KR")),
    (2, (2182227,"KR")),
    (3, (4226242, "KR")),
    (4, (10303058, "KR")),
    (5, (4300365, "KR"))
])

In [None]:
# 마블 영화 중 관객 수가 500만 이상인 영화를 가져오기

In [4]:
# CASE1. join 먼저, filter 나중에
movie_att = movies_rdd.join(attendances_rdd)
movie_att.take(5)

[(2, (('슈퍼맨', 'DC'), (2182227, 'KR'))),
 (4, (('겨울왕국', '디즈니'), (10303058, 'KR'))),
 (1, (('어벤져스', '마블'), (13934592, 'KR'))),
 (3, (('배트맨', 'DC'), (4226242, 'KR'))),
 (5, (('아이언맨', '마블'), (4300365, 'KR')))]

In [None]:

movie_att.filter(
    lambda x : x[1][0][1] == "마블" and x[1][1][0] > 5000000
).collect()

In [None]:
# CASE 2. filter 먼저, join 나중에
filtered_movies = movies_rdd.filter(lambda x : x[1][1] == '마블')
filtered_att = attendances_rdd.filter(lambda x : x[1][0] > 5000000)

filtered_movies.join(filtered_att).collect()

In [5]:
sc.stop()

# Spark SQL 사용해 보기

In [187]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("spark-sql").getOrCreate()

In [188]:
# 컬럼 추가
movies = [
    (1, "어벤져스", "마블", 2012, 4, 26),
    (2, "슈퍼맨", "DC", 2013, 6, 13),
    (3, "배트맨", "DC", 2008, 8, 6),
    (4, "겨울왕국", "디즈니", 2014, 1, 16),
    (5, "아이언맨", "마블", 2008, 4, 30)
]

In [189]:
#스키마를 알아야 한다.
movie_schema = ["id", "name", "company", "year", "month", "day"]

In [None]:
# 2. 데이터 프레임 만들기

In [190]:
df = spark.createDataFrame(data=movies, schema=movie_schema)

In [9]:
df.dtypes

[('id', 'bigint'),
 ('name', 'string'),
 ('company', 'string'),
 ('year', 'bigint'),
 ('month', 'bigint'),
 ('day', 'bigint')]

In [10]:
print(df.show())

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  4|겨울왕국| 디즈니|2014|    1| 16|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+

None


In [11]:
df.select("company").show()

+-------+
|company|
+-------+
|   마블|
|     DC|
|     DC|
| 디즈니|
|   마블|
+-------+



In [12]:
df.select("name").show()

+--------+
|    name|
+--------+
|어벤져스|
|  슈퍼맨|
|  배트맨|
|겨울왕국|
|아이언맨|
+--------+



In [18]:
df.filter(df['day'] >= 15).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  4|겨울왕국| 디즈니|2014|    1| 16|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



In [19]:
df.filter(df.year >= 2000).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  4|겨울왕국| 디즈니|2014|    1| 16|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



In [21]:
#2013년 이후 영화만 꺼내기
df.filter(df.year >= 2013).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  4|겨울왕국| 디즈니|2014|    1| 16|
+---+--------+-------+----+-----+---+



In [22]:
# 마블영화만 꺼내기 
df.filter(df.company == '마블').show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



In [193]:
# df를 "movies"라는 이름의 임시 SQL 뷰로 등록함

df.createOrReplaceTempView("movies")

In [192]:
# 영화 이름만 가져오기

query = """

SELECT name
  FROM movies

"""
spark.sql(query).show()

+--------+
|    name|
+--------+
|어벤져스|
|  슈퍼맨|
|  배트맨|
|겨울왕국|
|아이언맨|
+--------+



In [19]:
#영화 이름만 가져오기 
df.filter(df.company.isin('마블','DC')).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  3|  배트맨|     DC|2008|    8|  6|
|  5|아이언맨|   마블|2008|    4| 30|
+---+--------+-------+----+-----+---+



In [194]:
# 2010년 이후에 개봉한 영화를 조회

query = """
select *
from movies
where year >= 2010
"""
spark.sql(query).show()

+---+--------+-------+----+-----+---+
| id|    name|company|year|month|day|
+---+--------+-------+----+-----+---+
|  1|어벤져스|   마블|2012|    4| 26|
|  2|  슈퍼맨|     DC|2013|    6| 13|
|  4|겨울왕국| 디즈니|2014|    1| 16|
+---+--------+-------+----+-----+---+



In [None]:
# 2012년도 이전에 개봉한 영화의 이름과 회사를 출력
query = """

"""
spark.sql(query).show()

In [None]:
# like 문자열 데이터에서 특정 단어나 문장을 포함한 데이터를 찾을 때
# % 기호를 사용해서 문장이 매칭되는지 확인 가능!
# 제목이 ~~맨으로 끝나는 데이터의 모든 정보를 조회
query = """

"""
spark.sql(query).show()

In [None]:

# BETWEEN 특정 데이터와 데이터 사이를 조회

# 개봉 월이 4 ~ 8월 사이. 4 <= 개봉월 <= 8


In [None]:
# Join 구현하기

In [None]:

attendances = [
    (1, 13934592., "KR"),
    (2, 2182227.,"KR"),
    (3, 4226242., "KR"),
    (4, 10303058., "KR"),
    (5, 4300365., "KR")
]

In [None]:
# 직접 스키마 지정해 보기
from pyspark.sql.types import StringType, FloatType\
    , IntegerType\
    , StructType, StructField

In [None]:
att_schema = StructType([ # 모든 컬럼의 타입을 통칭 - 컬럼 데이터의 집합
    StructField("id", IntegerType(), True), # StructField : 컬럼
    StructField("att", FloatType(), True),
    StructField("theater_country", StringType(), True)
])

In [None]:

att_df = spark.createDataFrame(
    data=attendances,
    schema=att_schema
)

att_df.dtypes

In [None]:
att_df.createOrReplaceTempView("att")

# join 구현하기

In [195]:

attendances = [
    (1, 13934592., "KR"),
    (2, 2182227.,"KR"),
    (3, 4226242., "KR"),
    (4, 10303058., "KR"),
    (5, 4300365., "KR")
]

In [196]:

# 직접 스키마 지정해 보기
from pyspark.sql.types import StringType, FloatType\
    , IntegerType\
    , StructType, StructField

In [197]:

att_schema = StructType([ # 모든 컬럼의 타입을 통칭 - 컬럼 데이터의 집합
    StructField("id", IntegerType(), True), # StructField : 컬럼
    StructField("att", FloatType(), True),
    StructField("theater_country", StringType(), True)
])

In [198]:

att_df = spark.createDataFrame(
    data=attendances,
    schema=att_schema
)

att_df.dtypes

[('id', 'int'), ('att', 'float'), ('theater_country', 'string')]

In [199]:

att_df.createOrReplaceTempView("att")

In [200]:
att_df.select('*').show()

+---+-----------+---------------+
| id|        att|theater_country|
+---+-----------+---------------+
|  1|1.3934592E7|             KR|
|  2|  2182227.0|             KR|
|  3|  4226242.0|             KR|
|  4|1.0303058E7|             KR|
|  5|  4300365.0|             KR|
+---+-----------+---------------+



In [201]:
# df와 join

query = '''
select movies.id, movies.name , movies.company, att.att
from movies
join att ON movies.id = att.id
'''

spark.sql(query).show()

+---+--------+-------+-----------+
| id|    name|company|        att|
+---+--------+-------+-----------+
|  1|어벤져스|   마블|1.3934592E7|
|  2|  슈퍼맨|     DC|  2182227.0|
|  3|  배트맨|     DC|  4226242.0|
|  4|겨울왕국| 디즈니|1.0303058E7|
|  5|아이언맨|   마블|  4300365.0|
+---+--------+-------+-----------+



In [None]:
# 데이터 프레임 API

In [None]:
# select
df.select("*").collect()

In [None]:
df.select("name", "company").collect()

In [None]:
df.select(df.name, (df.year-2000).alias("year")).show()

In [None]:
# agg : Aggreagte의 약자로써, 그룹핑 후 데이터를 하나로 합쳐주는 역할
df.agg({"id": "count"}).collect()

In [None]:
from pyspark.sql import functions as F
df.agg(F.min(df.year)).collect()

In [None]:
df.groupBy().avg().collect()

In [None]:
# 회사별 개봉월의 평균
df.groupBy('company').agg({"month": "mean"}).collect()

In [None]:
# 회사 별 월 별 영화 개수 정보


In [None]:
# join : 다른 데이터 프레임과 사용자가 지정한 컬럼을 기준으로 합치는 작업
df.join(att_df, 'id').select(df.name, att_df.att).show()

In [None]:
# select, where, orderBy 절 사용
marvel_df = df.select("name", "company", "year").where("company=='마블'").orderBy("id")
marvel_df.collect()

In [203]:
spark.stop()
# sc.stop()

In [None]:
# SQL 최적화

In [204]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()

In [216]:
trip_file = "learning_spark_data/fhvhv_tripdata_2020-03.csv"

In [217]:
# inferSchema : 자동으로 스키마 예측하게 하기
data = spark.read.csv(trip_file, inferSchema=True, header=True)

In [218]:
data.createOrReplaceTempView("mobility_data")

In [219]:
query = """
select *
from mobility_data

"""
spark.sql(query).show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   NULL|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   NULL|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
|           HV0003|              B02764|2020-03-01 00:18:42|2020-03-01 00:38:42|         209|          80|   NULL|
|           HV0003|              B02764|2020-03-01 00:44:24|2020-03-01 00:58:44|         256|         226|   NULL|
|           HV0003|              B02682|2020-03-01 00:17:23|2020-03-01 00:39:35|

In [None]:
# 스파크 SQL을 사용하는 이유

In [223]:
query = """

select split(pickup_datetime, ' ')[0] as pickup_date, count(*) as trips
from mobility_data

group by pickup_date
"""

spark.sql(query).show()

+-----------+------+
|pickup_date| trips|
+-----------+------+
| 2020-03-03|697880|
| 2020-03-02|648986|
| 2020-03-01|784246|
| 2020-03-06|739715|
| 2020-03-05|731165|
| 2020-03-04|707879|
+-----------+------+



In [221]:
# 실행 계획 살펴보기
spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Aggregate ['pickup_date], ['split('pickup_datetime,  )[0] AS pickup_date#3576, 'count(1) AS trips#3577]
+- 'UnresolvedRelation [mobility_data], [], false

== Analyzed Logical Plan ==
pickup_date: string, trips: bigint
Aggregate [split(cast(pickup_datetime#3503 as string),  , -1)[0]], [split(cast(pickup_datetime#3503 as string),  , -1)[0] AS pickup_date#3576, count(1) AS trips#3577L]
+- SubqueryAlias mobility_data
   +- View (`mobility_data`, [hvfhs_license_num#3501,dispatching_base_num#3502,pickup_datetime#3503,dropoff_datetime#3504,PULocationID#3505,DOLocationID#3506,SR_Flag#3507])
      +- Relation [hvfhs_license_num#3501,dispatching_base_num#3502,pickup_datetime#3503,dropoff_datetime#3504,PULocationID#3505,DOLocationID#3506,SR_Flag#3507] csv

== Optimized Logical Plan ==
Aggregate [_groupingexpression#3581], [_groupingexpression#3581 AS pickup_date#3576, count(1) AS trips#3577L]
+- Project [split(cast(pickup_datetime#3503 as string),  , -1)[0] AS _grouping

In [None]:
# 두번째 쿼리
spark.sql("""select 
                pickup_date, 
                count(*) as trips
             from ( select
                          split(pickup_datetime, ' ')[0] as pickup_date
                          from mobility_data )
             group by pickup_date""").explain(True)

In [None]:
spark.stop()

In [224]:
trip_file = "fhvhv_tripdata_2020-03.csv"
zone_file = "taxi+_zone_lookup.csv"

In [225]:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()

In [226]:
#운행 데이터 프레임 생성, Zone 데이터프레임 생성
trip_data = spark.read.format("csv")\
    .option("header", 'true')\
    .option('inferSchema', 'true')\
    .load('learning_spark_data/fhvhv_tripdata_2020-03.csv')
zone_data = spark.read.format("csv")\
    .option("header", 'true')\
    .option('inferSchema', 'true')\
    .load('learning_spark_data/taxi+_zone_lookup.csv')

In [235]:
trip_data.createOrReplaceTempView("trip")

In [234]:

zone_data.createOrReplaceTempView("zone")

In [227]:

trip_data.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)



In [228]:

zone_data.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [None]:

## 승차 Location(PULocationID)별 개수 세기
# 하차 Location(DOLocationID)별 개수 세기
#HV0003 운송사업자의 승차 지역별 트립 건수를 집계하고, 
#가장 많은 운송사업자순으로 정렬하는 분석 쿼리  hvfhs_license_num
#운송사별 운행 건수 비교
#승차 위치 Borough별 운행 건수
#서비스 존별 승차/하차 건수

In [230]:
trip_data.show(3)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   NULL|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   NULL|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
only showing top 3 rows



In [231]:
zone_data.show(3)

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+
only showing top 3 rows



In [242]:
 # 승차 Location(PULocationID)별 개수 세기

query = """

select PULocationID, count(*)
from trip
group by PULocationID
"""

spark.sql(query).show(2)

+------------+--------+
|PULocationID|count(1)|
+------------+--------+
|         148|   41395|
|         243|   25701|
+------------+--------+
only showing top 2 rows



In [241]:
# 하차 Location(DOLocationID)별 개수 세기
query = """

select DOLocationID, count(*)
from trip
group by DOLocationID
"""

spark.sql(query).show(2)

+------------+--------+
|DOLocationID|count(1)|
+------------+--------+
|         148|   31962|
|         243|   25076|
+------------+--------+
only showing top 2 rows



In [247]:
#HV0003 운송사업자의 승차 지역별 트립 건수를 집계
query = """
select PULocationID, count(*)
from trip
where hvfhs_license_num = 'HV0003'
group by PULocationID

"""

spark.sql(query).show(2)

+------------+--------+
|PULocationID|count(1)|
+------------+--------+
|         148|   26737|
|         243|   20195|
+------------+--------+
only showing top 2 rows



In [259]:
#가장 많은 운송사업자순으로 정렬하는 분석 쿼리  hvfhs_license_num
query = """
select hvfhs_license_num, count(*)
from trip
group by hvfhs_license_num
order by count(*) desc

"""

spark.sql(query).show()

+-----------------+--------+
|hvfhs_license_num|count(1)|
+-----------------+--------+
|           HV0003| 3143107|
|           HV0005| 1030267|
|           HV0004|  136497|
+-----------------+--------+



In [260]:
#운송사별 운행 건수 비교
query = """
select hvfhs_license_num, count(*)
from trip
group by hvfhs_license_num

"""

spark.sql(query).show()

+-----------------+--------+
|hvfhs_license_num|count(1)|
+-----------------+--------+
|           HV0004|  136497|
|           HV0005| 1030267|
|           HV0003| 3143107|
+-----------------+--------+



In [257]:
#승차 위치 Borough별 운행 건수
query = """
select Borough, count(*)
from trip
inner join zone
on trip.PULocationID = zone.LocationID
group by Borough

"""

spark.sql(query).show()

+-------------+--------+
|      Borough|count(1)|
+-------------+--------+
|       Queens|  763719|
|          EWR|     132|
|      Unknown|     231|
|     Brooklyn| 1123416|
|Staten Island|   55488|
|    Manhattan| 1802744|
|        Bronx|  564140|
+-------------+--------+



In [252]:
trip_data.show(5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   NULL|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   NULL|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
|           HV0003|              B02764|2020-03-01 00:18:42|2020-03-01 00:38:42|         209|          80|   NULL|
|           HV0003|              B02764|2020-03-01 00:44:24|2020-03-01 00:58:44|         256|         226|   NULL|
+-----------------+--------------------+-------------------+-------------------+

In [256]:
zone_data.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [258]:
#서비스 존별 승차/하차 건수

query = """
select service_zone, count(PULocationID), count(DOLocationID)
from trip
inner join zone
on trip.PULocationID = zone.LocationID
group by service_zone

"""

spark.sql(query).show()

+------------+-------------------+-------------------+
|service_zone|count(PULocationID)|count(DOLocationID)|
+------------+-------------------+-------------------+
|         EWR|                132|                132|
|         N/A|                231|                231|
| Yellow Zone|            1535246|            1535246|
|    Airports|             134404|             134404|
|   Boro Zone|            2639857|            2639857|
+------------+-------------------+-------------------+



# 택시

In [24]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("spark-sql").getOrCreate()

In [29]:
df=spark.read.format('json')\
    .load("learning_spark_data/2015-summary.json")

In [40]:
df.dtypes

[('DEST_COUNTRY_NAME', 'string'),
 ('ORIGIN_COUNTRY_NAME', 'string'),
 ('count', 'bigint')]

In [30]:
df.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [32]:
df.select('count').show(5)

+-----+
|count|
+-----+
|   15|
|    1|
|  344|
|   15|
|   62|
+-----+
only showing top 5 rows



In [33]:
df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [41]:
# 도착국가명 중복제거 
#cache - 이 결과를 저장해서 다음부터 이걸 재사용해 
df1 = df.select('DEST_COUNTRY_NAME').distinct().cache()
df1.count()

132

In [42]:
# Row class를 이용한 단일 레코드 생성 

from pyspark.sql import Row
myrow = Row('hello',None, 1, False)
myrow

<Row('hello', None, 1, False)>

In [43]:
# 새로운 컬럼 추가하기

from pyspark.sql.functions import expr

df3 = df.withColumn('withinCountry',expr('ORIGIN_COUNTRY_NAME==DEST_COUNTRY_NAME')) #expr sql표현식을 받아 생성
df3

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, withinCountry: boolean]

In [45]:
df3.show(3)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
+-----------------+-------------------+-----+-------------+
only showing top 3 rows



In [49]:
df3.filter(df3[3] == 'True').show()

+-----------------+-------------------+------+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|withinCountry|
+-----------------+-------------------+------+-------------+
|    United States|      United States|370002|         true|
+-----------------+-------------------+------+-------------+



In [53]:
# df3.filter(expr('withinCountry' == 'True')).show()

In [None]:
#case when 카운트 10 이하 under, 이상 upper로 변환 > category 컬럼 추가 

In [57]:
df4 = df3.withColumn('10_upperOrunder',expr("CASE WHEN 'count' >= 10 THEN 'upper' ELSE 'under'END"))
df4.show(5)

+-----------------+-------------------+-----+-------------+---------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|10_upperOrunder|
+-----------------+-------------------+-----+-------------+---------------+
|    United States|            Romania|   15|        false|          under|
|    United States|            Croatia|    1|        false|          under|
|    United States|            Ireland|  344|        false|          under|
|            Egypt|      United States|   15|        false|          under|
|    United States|              India|   62|        false|          under|
+-----------------+-------------------+-----+-------------+---------------+
only showing top 5 rows



In [58]:
# DataFrame의 select(), where(), filter() 트랜스포메이션
# show(), count() 액션

In [59]:
spark.stop()

# empt, dept

In [61]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("spark-sql").getOrCreate()

In [68]:
df_dept=spark.read.format('csv')\
    .option('header','true')\
    .option('inferSchema', 'true')\
    .load("learning_spark_data/dept.csv")
    

In [70]:
df_empt=spark.read.format('csv')\
    .option('header','true')\
    .option('inferSchema', 'true')\
    .load("learning_spark_data/emp.csv")

In [69]:
df_dept.show(5)

+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+



In [67]:
df1 = df1[column = df_dept.show(1) 

+------+-----+---+
|   _c0|  _c1|_c2|
+------+-----+---+
|deptno|dname|loc|
+------+-----+---+
only showing top 1 row



In [71]:
df_empt.show(5)

+-----+------+--------+----+----------+----+----+------+
|empno| ename|     job| mgr|  hiredate| sal|comm|deptno|
+-----+------+--------+----+----------+----+----+------+
| 7369| SMITH|   CLERK|7902|1980-12-17| 800|NULL|    20|
| 7499| ALLEN|SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521|  WARD|SALESMAN|7698|1981-02-22|1250| 500|    30|
| 7566| JONES| MANAGER|7839|1981-04-02|2975|NULL|    20|
| 7654|MARTIN|SALESMAN|7698|1981-09-28|1250|1400|    30|
+-----+------+--------+----+----------+----+----+------+
only showing top 5 rows



In [73]:
#컬럼명을 대소문자 구분하지 않음 
df_empt.select('ENAME').show(5)

+------+
| ENAME|
+------+
| SMITH|
| ALLEN|
|  WARD|
| JONES|
|MARTIN|
+------+
only showing top 5 rows



In [75]:
# filter랑 동일함 
df_empt.select('*').where('deptno=20').show()

+-----+-----+-------+----+----------+----+----+------+
|empno|ename|    job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+-------+----+----------+----+----+------+
| 7369|SMITH|  CLERK|7902|1980-12-17| 800|NULL|    20|
| 7566|JONES|MANAGER|7839|1981-04-02|2975|NULL|    20|
| 7788|SCOTT|ANALYST|7566|1987-04-19|3000|NULL|    20|
| 7876|ADAMS|  CLERK|7788|1987-05-23|1100|NULL|    20|
| 7902| FORD|ANALYST|7566|1981-12-03|3000|NULL|    20|
+-----+-----+-------+----+----------+----+----+------+



# 함수와 표현식(expr)의 차이점
<details>
<summary># 함수와 표현식(expr)의 차이점</summary>

✅ 결론 먼저:

```first('sal')```   
는 함수(function) 사용이고,


```expr("first(sal)")```   
는 표현식(expression) 사용이야.

→ 둘은 결과는 비슷할 수 있지만 작동 방식은 다르다.   

✅ 두 개의 차이점 비교   
| 항목     | `first('sal')`       | `expr("first(sal)")`  |
| ------ | -------------------- | --------------------- |
| 타입     | PySpark 함수 호출        | SQL 표현식 문자열           |
| 추천 사용처 | DataFrame API        | SQL 함수, 동적 표현         |
| 장점     | 안정적, 오타 적음           | 복잡한 조건, 문자열로 동적 생성 가능 |
| 내부 동작  | Python에서 Spark 함수 실행 | SQL 파서가 전체 문자열 해석 

✅ 예제로 비교    🔹 함수 기반   
```from pyspark.sql.functions import first```   
```
df.select(first('sal')).show```    
- first()는 PySpark의 내장 집계 함수
- 직접 Python에서 함수처럼 호출함

🔹 expr 기반   

```from pyspark.sql.functions import expr```   
```
df.select(expr("first(sal)")).show```   
- SQL 표현식 전체를 문자열로 작성
- when, case, math, date 함수들을 조합할 때 유리

✅ 언제 expr이 유리할까?   
- 복잡한 계산식을 문자열로 다룰 때   
- 동적 SQL을 만들 때   
- 여러 함수를 조합하거나 문자열 조건식을 써야 할 때

✅ 요약   
너가 쓴 first('sal')는 PySpark의 함수 사용이고   
expr("first(sal)")는 SQL 표현식을 문자열로 작성한 것   
두 방식 다 가능하지만,   
정확성 / 리팩토링엔 함수 방식   
유연성 / 조건 표현엔 expr() 방식이 유리해.   ()()   |

</details>

In [77]:
# 조건식을 쓸거면 expr을 적어야함 

df_empt.selectExpr('count(*)').show()

+--------+
|count(1)|
+--------+
|      15|
+--------+



In [79]:
from pyspark.sql.functions import countDistinct

df_empt.select(countDistinct('job')).show()

+-------------------+
|count(DISTINCT job)|
+-------------------+
|                  5|
+-------------------+



In [81]:
from pyspark.sql.functions import approx_count_distinct
df_empt.select(approx_count_distinct('job',0.1)).show()

+--------------------------+
|approx_count_distinct(job)|
+--------------------------+
|                         5|
+--------------------------+



In [82]:
# first (expr : sql문장 x), funtion으로 처리 
from pyspark.sql.functions import first
df_empt.select(first('sal')).show()

+----------+
|first(sal)|
+----------+
|       800|
+----------+



In [83]:
# last
from pyspark.sql.functions import last
df_empt.select(last('sal')).show()

+---------+
|last(sal)|
+---------+
|     3200|
+---------+



In [84]:
# min
from pyspark.sql.functions import min
df_empt.select(min('sal')).show()

+--------+
|min(sal)|
+--------+
|     800|
+--------+



In [85]:
# max
from pyspark.sql.functions import max
df_empt.select(max('sal')).show()

+--------+
|max(sal)|
+--------+
|    5000|
+--------+



In [86]:
# sum
from pyspark.sql.functions import sum
df_empt.select(sum('sal')).show()

+--------+
|sum(sal)|
+--------+
|   32225|
+--------+



In [90]:
import pyspark.sql.functions 
df_empt.select(sum('sal'),min('sal')).show()

+--------+--------+
|sum(sal)|min(sal)|
+--------+--------+
|   32225|     800|
+--------+--------+



In [93]:
df_empt.show(4)

+-----+-----+--------+----+----------+----+----+------+
|empno|ename|     job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+--------+----+----------+----+----+------+
| 7369|SMITH|   CLERK|7902|1980-12-17| 800|NULL|    20|
| 7499|ALLEN|SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521| WARD|SALESMAN|7698|1981-02-22|1250| 500|    30|
| 7566|JONES| MANAGER|7839|1981-04-02|2975|NULL|    20|
+-----+-----+--------+----+----------+----+----+------+
only showing top 4 rows



In [97]:
#total_salary / total_transaction(샐러리의 카운트), avg_salary, mean_salary

import pyspark.sql.functions 
df_empt1 = df_empt.selectExpr(
    "sum(sal) as total_salary",
    "count(sal) as total_transaction",
    "sum(sal)/count(sal) as avg_salary"
)

df_empt1.show()


+------------+-----------------+------------------+
|total_salary|total_transaction|        avg_salary|
+------------+-----------------+------------------+
|       32225|               15|2148.3333333333335|
+------------+-----------------+------------------+



In [106]:
import pyspark.sql.functions
from pyspark.sql.functions import count,sum,avg,mean
df_empt.select(
    count("sal").alias("total_transaction"),
    sum("sal").alias("total_salary"),
    avg("sal").alias("avg_salary"),
    mean("sal").alias("mean_salary")
).show()

+-----------------+------------+------------------+------------------+
|total_transaction|total_salary|        avg_salary|       mean_salary|
+-----------------+------------+------------------+------------------+
|               15|       32225|2148.3333333333335|2148.3333333333335|
+-----------------+------------+------------------+------------------+



# 그룹화 

In [108]:
df_empt.groupBy('job').count().show()

+---------+-----+
|      job|count|
+---------+-----+
|  ANALYST|    2|
| SALESMAN|    4|
|    CLERK|    5|
|  MANAGER|    3|
|PRESIDENT|    1|
+---------+-----+



In [110]:
# select job,
#     count(job)
#     sum(sal)
# groupBy job

froup_df = df_empt.groupBy('job').agg(
    count('job').alias('qty'),
    expr('count(job)'),
    sum('sal')
).show()

+---------+---+----------+--------+
|      job|qty|count(job)|sum(sal)|
+---------+---+----------+--------+
|  ANALYST|  2|         2|    6000|
| SALESMAN|  4|         4|    5600|
|    CLERK|  5|         5|    7350|
|  MANAGER|  3|         3|    8275|
|PRESIDENT|  1|         1|    5000|
+---------+---+----------+--------+



In [123]:
# sal의 평균 as SAL_AVG, 표준편차 as SAL_STDEV를 job별로 계산해서 출력 
from pyspark.sql.functions import avg, stddev,round


group1 = df_empt.groupby('job').agg(
    round(avg('sal'),2).alias('SAL_AVG'),
    round(stddev('sal'),2).alias('SAL_STDEV')
)

# 급여 평균 상위 10개 job 정렬 + 제한
group1.orderBy('SAL_AVG', ascending=False).limit(10).show()

+---------+-------+---------+
|      job|SAL_AVG|SAL_STDEV|
+---------+-------+---------+
|PRESIDENT| 5000.0|     NULL|
|  ANALYST| 3000.0|      0.0|
|  MANAGER|2758.33|   274.24|
|    CLERK| 1470.0|   984.63|
| SALESMAN| 1400.0|   177.95|
+---------+-------+---------+



In [131]:
# 윈도우함수
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, rank

windowspec = Window.orderBy(desc('sal'))
salAllRank = rank().over(windowspec)
salAllRank

Column<'RANK() OVER (ORDER BY sal DESC NULLS LAST unspecifiedframe$())'>

In [135]:
df1 = df_empt.withColumn('salary_rank', salAllRank)
df1.show(2)

+-----+-----+---------+----+----------+----+----+------+-----------+
|empno|ename|      job| mgr|  hiredate| sal|comm|deptno|salary_rank|
+-----+-----+---------+----+----------+----+----+------+-----------+
| 7839| KING|PRESIDENT|NULL|1981-11-17|5000|NULL|    10|          1|
| 9292| JACK|    CLERK|7782|1982-01-23|3200|NULL|    70|          2|
+-----+-----+---------+----+----------+----+----+------+-----------+
only showing top 2 rows



In [136]:
df_empt.dtypes

[('empno', 'int'),
 ('ename', 'string'),
 ('job', 'string'),
 ('mgr', 'int'),
 ('hiredate', 'date'),
 ('sal', 'int'),
 ('comm', 'int'),
 ('deptno', 'int')]

In [142]:
# 직무별로 rank 작성
#window.partitionBy()
#job_rank_df 작성

from pyspark.sql.window import Window
from pyspark.sql.functions import desc, rank

# 1. 윈도우 스펙: 직무별 그룹 + 급여 내림차순 정렬
windowspec = Window.partitionBy('job').orderBy(desc('sal'))

# 2. rank()를 사용하여 새 컬럼 생성
job_rank_df = df_empt.withColumn("job_rank_df", rank().over(windowspec))

# 3. 결과 확인
job_rank_df.select("ename", "job", "sal","job_rank_df").show(6)


+------+-------+----+-----------+
| ename|    job| sal|job_rank_df|
+------+-------+----+-----------+
| SCOTT|ANALYST|3000|          1|
|  FORD|ANALYST|3000|          1|
|  JACK|  CLERK|3200|          1|
|MILLER|  CLERK|1300|          2|
| ADAMS|  CLERK|1100|          3|
| JAMES|  CLERK| 950|          4|
+------+-------+----+-----------+
only showing top 6 rows



In [147]:
df_empt.show(2)

+-----+-----+--------+----+----------+----+----+------+
|empno|ename|     job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+--------+----+----------+----+----+------+
| 7369|SMITH|   CLERK|7902|1980-12-17| 800|NULL|    20|
| 7499|ALLEN|SALESMAN|7698|1981-02-20|1600| 300|    30|
+-----+-----+--------+----+----------+----+----+------+
only showing top 2 rows



In [148]:
df_empt.show(5)

+-----+------+--------+----+----------+----+----+------+
|empno| ename|     job| mgr|  hiredate| sal|comm|deptno|
+-----+------+--------+----+----------+----+----+------+
| 7369| SMITH|   CLERK|7902|1980-12-17| 800|NULL|    20|
| 7499| ALLEN|SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521|  WARD|SALESMAN|7698|1981-02-22|1250| 500|    30|
| 7566| JONES| MANAGER|7839|1981-04-02|2975|NULL|    20|
| 7654|MARTIN|SALESMAN|7698|1981-09-28|1250|1400|    30|
+-----+------+--------+----+----------+----+----+------+
only showing top 5 rows



In [159]:
# 부서별 순위

from pyspark.sql.functions import avg, sum

df_rank = df_empt.groupBy('job').agg(
    sum('sal').alias('sum_sal')
)

df_rank.orderBy('sum_sal', ascending=False).limit(10).show()


+---------+-------+
|      job|sum_sal|
+---------+-------+
|  MANAGER|   8275|
|    CLERK|   7350|
|  ANALYST|   6000|
| SALESMAN|   5600|
|PRESIDENT|   5000|
+---------+-------+



In [161]:
# 윈도우함수로 랭크 넣기
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

windowspec = Window.partitionBy('job').sum('sal')

job_sum_df = df_empt.withColumn('job_sum', sum('sal').over(windowspec))

job_sum_df.show(10)

+-----+------+-------+----+----------+----+----+------+-------+
|empno| ename|    job| mgr|  hiredate| sal|comm|deptno|job_sum|
+-----+------+-------+----+----------+----+----+------+-------+
| 7788| SCOTT|ANALYST|7566|1987-04-19|3000|NULL|    20|   6000|
| 7902|  FORD|ANALYST|7566|1981-12-03|3000|NULL|    20|   6000|
| 7369| SMITH|  CLERK|7902|1980-12-17| 800|NULL|    20|   7350|
| 7876| ADAMS|  CLERK|7788|1987-05-23|1100|NULL|    20|   7350|
| 7900| JAMES|  CLERK|7698|1981-12-03| 950|NULL|    30|   7350|
| 7934|MILLER|  CLERK|7782|1982-01-23|1300|NULL|    10|   7350|
| 9292|  JACK|  CLERK|7782|1982-01-23|3200|NULL|    70|   7350|
| 7566| JONES|MANAGER|7839|1981-04-02|2975|NULL|    20|   8275|
| 7698| BLAKE|MANAGER|7839|1981-05-01|2850|NULL|    30|   8275|
| 7782| CLARK|MANAGER|7839|1981-06-09|2450|NULL|    10|   8275|
+-----+------+-------+----+----------+----+----+------+-------+
only showing top 10 rows



In [149]:
# 부서별 누적급여

df_empt.groupBy('job').agg(
        sum('sal')
).show(10)

+---------+--------+
|      job|sum(sal)|
+---------+--------+
|  ANALYST|    6000|
| SALESMAN|    5600|
|    CLERK|    7350|
|  MANAGER|    8275|
|PRESIDENT|    5000|
+---------+--------+



In [154]:
# 부서별 평균 급여와 직원 개별 급여 비교 

from pyspark.sql.functions import avg
from pyspark.sql.window import Window

windowspec= Window.partitionBy('job')

job_avg_df = df_empt.withColumn('job_avg', avg('sal').over(windowspec))

job_avg_df.show(10)

+-----+------+-------+----+----------+----+----+------+------------------+
|empno| ename|    job| mgr|  hiredate| sal|comm|deptno|           job_avg|
+-----+------+-------+----+----------+----+----+------+------------------+
| 7788| SCOTT|ANALYST|7566|1987-04-19|3000|NULL|    20|            3000.0|
| 7902|  FORD|ANALYST|7566|1981-12-03|3000|NULL|    20|            3000.0|
| 7369| SMITH|  CLERK|7902|1980-12-17| 800|NULL|    20|            1470.0|
| 7876| ADAMS|  CLERK|7788|1987-05-23|1100|NULL|    20|            1470.0|
| 7900| JAMES|  CLERK|7698|1981-12-03| 950|NULL|    30|            1470.0|
| 7934|MILLER|  CLERK|7782|1982-01-23|1300|NULL|    10|            1470.0|
| 9292|  JACK|  CLERK|7782|1982-01-23|3200|NULL|    70|            1470.0|
| 7566| JONES|MANAGER|7839|1981-04-02|2975|NULL|    20|2758.3333333333335|
| 7698| BLAKE|MANAGER|7839|1981-05-01|2850|NULL|    30|2758.3333333333335|
| 7782| CLARK|MANAGER|7839|1981-06-09|2450|NULL|    10|2758.3333333333335|
+-----+------+-------+---

In [None]:
# 1. 윈도우 스펙: 직무별 그룹 + 급여 내림차순 정렬
windowspec = Window.partitionBy('job').orderBy(desc('sal'))

# 2. rank()를 사용하여 새 컬럼 생성
job_rank_df = df_empt.withColumn("job_rank_df", rank().over(windowspec))

# 3. 결과 확인
job_rank_df.select("ename", "job", "sal","job_rank_df").show(6)


In [None]:
# 부서별 직업별 소계

In [166]:
df_empt.groupBy('deptno', 'job').agg(count('*'), sum('sal'))\
        .orderBy('deptno', 'job').show() #평균급, 최대급, 최소급 

+------+---------+--------+--------+
|deptno|      job|count(1)|sum(sal)|
+------+---------+--------+--------+
|    10|    CLERK|       1|    1300|
|    10|  MANAGER|       1|    2450|
|    10|PRESIDENT|       1|    5000|
|    20|  ANALYST|       2|    6000|
|    20|    CLERK|       2|    1900|
|    20|  MANAGER|       1|    2975|
|    30|    CLERK|       1|     950|
|    30|  MANAGER|       1|    2850|
|    30| SALESMAN|       4|    5600|
|    70|    CLERK|       1|    3200|
+------+---------+--------+--------+



In [162]:
df_empt.cube('deptno', 'job').agg(count('*'), sum('sal'))\
    .orderBy('deptno', 'job').show()

+------+---------+--------+--------+
|deptno|      job|count(1)|sum(sal)|
+------+---------+--------+--------+
|  NULL|     NULL|      15|   32225|
|  NULL|  ANALYST|       2|    6000|
|  NULL|    CLERK|       5|    7350|
|  NULL|  MANAGER|       3|    8275|
|  NULL|PRESIDENT|       1|    5000|
|  NULL| SALESMAN|       4|    5600|
|    10|     NULL|       3|    8750|
|    10|    CLERK|       1|    1300|
|    10|  MANAGER|       1|    2450|
|    10|PRESIDENT|       1|    5000|
|    20|     NULL|       5|   10875|
|    20|  ANALYST|       2|    6000|
|    20|    CLERK|       2|    1900|
|    20|  MANAGER|       1|    2975|
|    30|     NULL|       6|    9400|
|    30|    CLERK|       1|     950|
|    30|  MANAGER|       1|    2850|
|    30| SALESMAN|       4|    5600|
|    70|     NULL|       1|    3200|
|    70|    CLERK|       1|    3200|
+------+---------+--------+--------+



# JOIN

In [167]:
df_empt.show()

+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|NULL|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975|NULL|    20|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|NULL|    30|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450|NULL|    10|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|NULL|    20|
| 7839|  KING|PRESIDENT|NULL|1981-11-17|5000|NULL|    10|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500|   0|    30|
| 7876| ADAMS|    CLERK|7788|1987-05-23|1100|NULL|    20|
| 7900| JAMES|    CLERK|7698|1981-12-03| 950|NULL|    30|
| 7902|  FORD|  ANALYST|7566|1981-12-03|3000|NULL|    20|
| 7934|MILLER|    CLERK|7782|1982-01-23|1300|NULL|    10|
| 9292|  JACK|

In [168]:
df_dept.show()

+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+



In [172]:
emp_dept_df = df_empt.join(df_dept, df_empt['deptno'] == df_dept['deptno'])
emp_dept_df.show()

+-----+------+---------+----+----------+----+----+------+------+----------+--------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|deptno|     dname|     loc|
+-----+------+---------+----+----------+----+----+------+------+----------+--------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|NULL|    20|    20|  RESEARCH|  DALLAS|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|    30|     SALES| CHICAGO|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250| 500|    30|    30|     SALES| CHICAGO|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975|NULL|    20|    20|  RESEARCH|  DALLAS|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250|1400|    30|    30|     SALES| CHICAGO|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|NULL|    30|    30|     SALES| CHICAGO|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450|NULL|    10|    10|ACCOUNTING|NEW YORK|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|NULL|    20|    20|  RESEARCH|  DALLAS|
| 7839|  KING|PRESIDENT|NULL|1981-11-17|5000|NULL|    10|    10|A

In [175]:
join_df = df_empt.join(df_dept, on='deptno', how='inner')
join_df.select('ename', 'deptno', 'dname').show()

+------+------+----------+
| ename|deptno|     dname|
+------+------+----------+
| SMITH|    20|  RESEARCH|
| ALLEN|    30|     SALES|
|  WARD|    30|     SALES|
| JONES|    20|  RESEARCH|
|MARTIN|    30|     SALES|
| BLAKE|    30|     SALES|
| CLARK|    10|ACCOUNTING|
| SCOTT|    20|  RESEARCH|
|  KING|    10|ACCOUNTING|
|TURNER|    30|     SALES|
| ADAMS|    20|  RESEARCH|
| JAMES|    30|     SALES|
|  FORD|    20|  RESEARCH|
|MILLER|    10|ACCOUNTING|
+------+------+----------+



In [None]:
spark.stop()
