#06_SparkDataAnal.ipynb
TLC Trip Record Data
출처: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("spark-sql").getOrCreate()

In [None]:
# 2015-summary.json

In [3]:
df = spark.read.format('json').load("learning_spark_data/2015-summary.json")

In [4]:
df.count()

256

In [5]:
df.dtypes

[('DEST_COUNTRY_NAME', 'string'),
 ('ORIGIN_COUNTRY_NAME', 'string'),
 ('count', 'bigint')]

In [6]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [7]:
df.collect()

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Sint Maarten', count=325),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Marshall Islands', count=39),
 

In [8]:
df.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [9]:
df.select('count').show(5)

+-----+
|count|
+-----+
|   15|
|    1|
|  344|
|   15|
|   62|
+-----+
only showing top 5 rows



In [11]:
df.select('DEST_COUNTRY_NAME').show(5)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
|    United States|
|            Egypt|
|    United States|
+-----------------+
only showing top 5 rows



In [14]:
# 도착국가명 중복제거
df.select('DEST_COUNTRY_NAME').distinct().show()

+--------------------+
|   DEST_COUNTRY_NAME|
+--------------------+
|            Anguilla|
|              Russia|
|            Paraguay|
|             Senegal|
|              Sweden|
|            Kiribati|
|              Guyana|
|         Philippines|
|            Djibouti|
|            Malaysia|
|           Singapore|
|                Fiji|
|              Turkey|
|                Iraq|
|             Germany|
|              Jordan|
|               Palau|
|Turks and Caicos ...|
|              France|
|              Greece|
+--------------------+
only showing top 20 rows



In [15]:
df1 = df.select('DEST_COUNTRY_NAME').distinct().cache() 
# cache Spark의 매우 중요한 최적화 기능입니다.
df1.count()

132

In [16]:
# ROW class를 이용한 단일 레코드 생성
from pyspark.sql import Row
myRow = Row('hello', None, 1, False)
myRow

<Row('hello', None, 1, False)>

In [18]:
# 새로운 컬럼 추가하기
from pyspark.sql.functions import expr

df3 = df.withColumn('withinCountry', expr('ORIGIN_COUNTRY_NAME==DEST_COUNTRY_NAME')) # expr은 sql표현식을 받아 생성
df3

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, withinCountry: boolean]

In [19]:
df3.show(3)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
+-----------------+-------------------+-----+-------------+
only showing top 3 rows



In [29]:
df3.filter("withinCountry == True").show()

+-----------------+-------------------+------+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|withinCountry|
+-----------------+-------------------+------+-------------+
|    United States|      United States|370002|         true|
+-----------------+-------------------+------+-------------+



In [30]:
df3.filter(expr("withinCountry == True")).show()

+-----------------+-------------------+------+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|withinCountry|
+-----------------+-------------------+------+-------------+
|    United States|      United States|370002|         true|
+-----------------+-------------------+------+-------------+



In [None]:
# case when 카운트 10 이하 under, 이상 upper로 변환 > category 컬럼 추가

In [35]:
df4 = df3.withColumn('category', expr("CASE WHEN count >= 10 THEN 'upper' ELSE 'under' END"))

df4.show(10)

+-----------------+-------------------+-----+-------------+--------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|category|
+-----------------+-------------------+-----+-------------+--------+
|    United States|            Romania|   15|        false|   upper|
|    United States|            Croatia|    1|        false|   under|
|    United States|            Ireland|  344|        false|   upper|
|            Egypt|      United States|   15|        false|   upper|
|    United States|              India|   62|        false|   upper|
|    United States|          Singapore|    1|        false|   under|
|    United States|            Grenada|   62|        false|   upper|
|       Costa Rica|      United States|  588|        false|   upper|
|          Senegal|      United States|   40|        false|   upper|
|          Moldova|      United States|    1|        false|   under|
+-----------------+-------------------+-----+-------------+--------+
only showing top 10 rows



In [None]:
# DataFrame의 select(), when(), filter() : 트렌스포메이션
# show(), count() : 액션

In [None]:
# 집계 함수

In [43]:
# df = spark.read.format('csv').load("learning_spark_data/emp.csv") 이렇게 불렀을 때
emp_df.show(5)

+-----+-----+--------+----+----------+----+----+------+
|  _c0|  _c1|     _c2| _c3|       _c4| _c5| _c6|   _c7|
+-----+-----+--------+----+----------+----+----+------+
|empno|ename|     job| mgr|  hiredate| sal|comm|deptno|
| 7369|SMITH|   CLERK|7902|1980-12-17| 800|NULL|    20|
| 7499|ALLEN|SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521| WARD|SALESMAN|7698|1981-02-22|1250| 500|    30|
| 7566|JONES| MANAGER|7839|1981-04-02|2975|NULL|    20|
+-----+-----+--------+----+----------+----+----+------+
only showing top 5 rows



In [44]:
# df = spark.read.format('csv').load("learning_spark_data/dept.csv") 이렇게 불렀을 때
dept_df.show(5)

+------+----------+--------+
|   _c0|       _c1|     _c2|
+------+----------+--------+
|deptno|     dname|     loc|
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+



In [45]:
# emp_df, dept_df
# header=True: 첫 번째 줄을 컬럼 이름으로 사용
# inferSchema=True: Spark가 데이터 타입을 자동으로 추론하도록 함
emp_df = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load("learning_spark_data/emp.csv")

dept_df = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load("learning_spark_data/dept.csv")

In [None]:
# 위에 방법이나 이 방법으로 불러오면 된다.
emp_df = spark.read.csv("learning_spark_data/emp.csv", header=True, inferSchema=True)
dept_df = spark.read.csv("learning_spark_data/dept.csv", header=True, inferSchema=True)

In [49]:
emp_df.show(5)

+-----+------+--------+----+----------+----+----+------+
|empno| ename|     job| mgr|  hiredate| sal|comm|deptno|
+-----+------+--------+----+----------+----+----+------+
| 7369| SMITH|   CLERK|7902|1980-12-17| 800|NULL|    20|
| 7499| ALLEN|SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521|  WARD|SALESMAN|7698|1981-02-22|1250| 500|    30|
| 7566| JONES| MANAGER|7839|1981-04-02|2975|NULL|    20|
| 7654|MARTIN|SALESMAN|7698|1981-09-28|1250|1400|    30|
+-----+------+--------+----+----------+----+----+------+
only showing top 5 rows



In [50]:
dept_df.show(5)

+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+



In [51]:
emp_df.count()

15

In [52]:
dept_df.count()

4

In [60]:
# 컬럼명은 대소문자 상관 X
emp_df.select('Ename', 'deptno').show()

+------+------+
| Ename|deptno|
+------+------+
| SMITH|    20|
| ALLEN|    30|
|  WARD|    30|
| JONES|    20|
|MARTIN|    30|
| BLAKE|    30|
| CLARK|    10|
| SCOTT|    20|
|  KING|    10|
|TURNER|    30|
| ADAMS|    20|
| JAMES|    30|
|  FORD|    20|
|MILLER|    10|
|  JACK|    70|
+------+------+



In [59]:
# filter랑 동일함
emp_df.select('*').where('deptno = 20').show() 

+-----+-----+-------+----+----------+----+----+------+
|empno|ename|    job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+-------+----+----------+----+----+------+
| 7369|SMITH|  CLERK|7902|1980-12-17| 800|NULL|    20|
| 7566|JONES|MANAGER|7839|1981-04-02|2975|NULL|    20|
| 7788|SCOTT|ANALYST|7566|1987-04-19|3000|NULL|    20|
| 7876|ADAMS|  CLERK|7788|1987-05-23|1100|NULL|    20|
| 7902| FORD|ANALYST|7566|1981-12-03|3000|NULL|    20|
+-----+-----+-------+----+----------+----+----+------+



In [64]:
emp_df.selectExpr('count(*)').show()

+--------+
|count(1)|
+--------+
|      15|
+--------+



In [67]:
from pyspark.sql.functions import countDistinct
emp_df.select(countDistinct('job')).show()

+-------------------+
|count(DISTINCT job)|
+-------------------+
|                  5|
+-------------------+



In [69]:
# 대충 count를 세는 api
from pyspark.sql.functions import approx_count_distinct as acd
emp_df.select(acd('job', 0.1)).show()

+--------------------------+
|approx_count_distinct(job)|
+--------------------------+
|                         5|
+--------------------------+



In [None]:
# first, last, min, max, sum, avg --> (expr : sql문장말고, function으로 처리해봐)
# df.select( count('sal') ) 

In [116]:
from pyspark.sql.functions import first, last, min, max, sum, avg, mean, count, round, stddev
emp_df.select(first('ename')).show()

+------------+
|first(ename)|
+------------+
|       SMITH|
+------------+



In [72]:
emp_df.select(last('ename')).show()

+-----------+
|last(ename)|
+-----------+
|       JACK|
+-----------+



In [74]:
emp_df.select(min('deptno')).show()

+-----------+
|min(deptno)|
+-----------+
|         10|
+-----------+



In [75]:
emp_df.select(max('deptno')).show()

+-----------+
|max(deptno)|
+-----------+
|         70|
+-----------+



In [76]:
emp_df.select(sum('deptno')).show()

+-----------+
|sum(deptno)|
+-----------+
|        380|
+-----------+



In [77]:
emp_df.select(avg('deptno')).show()

+------------------+
|       avg(deptno)|
+------------------+
|25.333333333333332|
+------------------+



In [83]:
emp_df.select(max('ename')).show()

+----------+
|max(ename)|
+----------+
|      WARD|
+----------+



In [85]:
emp_df.select(min('ename')).show()

+----------+
|min(ename)|
+----------+
|     ADAMS|
+----------+



In [86]:
from pyspark.sql.functions import col
emp_df.select(sum(col('sal'))).show()

+--------+
|sum(sal)|
+--------+
|   32225|
+--------+



In [87]:
emp_df.selectExpr('sum(distinct sal)').show()

+-----------------+
|sum(DISTINCT sal)|
+-----------------+
|            27975|
+-----------------+



In [None]:
# total_salary / total_transaction, avg_salary, mean_salary

In [89]:
emp_df.select(sum('sal')).show()

+--------+
|sum(sal)|
+--------+
|   32225|
+--------+



In [99]:
emp_df.select((sum("sal") / emp_df.count()).alias("avg_sal")).show()

+------------------+
|           avg_sal|
+------------------+
|2148.3333333333335|
+------------------+



In [109]:
emp_df.select(avg('sal').alias('avg_salary')).show()

+------------------+
|        avg_salary|
+------------------+
|2148.3333333333335|
+------------------+



In [110]:
emp_df.select(mean('sal').alias('mean_salary')).show()

+------------------+
|       mean_salary|
+------------------+
|2148.3333333333335|
+------------------+



In [104]:
emp_df.select(
    count('sal').alias('total_transaction'),
    sum('sal').alias('total_salary'),
    avg('sal').alias('avg_salary'),
    mean('sal').alias('mean_salary')
).selectExpr(
    'total_salary/total_transaction',
    'avg_salary',
    'mean_salary'
).show()

+----------------------------------+------------------+------------------+
|(total_salary / total_transaction)|        avg_salary|       mean_salary|
+----------------------------------+------------------+------------------+
|                2148.3333333333335|2148.3333333333335|2148.3333333333335|
+----------------------------------+------------------+------------------+



In [105]:
# 그룹화
emp_df.groupBy('job').count().show()

+---------+-----+
|      job|count|
+---------+-----+
|  ANALYST|    2|
| SALESMAN|    4|
|    CLERK|    5|
|  MANAGER|    3|
|PRESIDENT|    1|
+---------+-----+



In [111]:
# select job,
# count(job),
# sum(sal)
# group by job
emp_df.groupBy('job').agg(
    count('job').alias('qty'),
    expr('count(job)'),
    sum('sal')
).show()

+---------+---+----------+--------+
|      job|qty|count(job)|sum(sal)|
+---------+---+----------+--------+
|  ANALYST|  2|         2|    6000|
| SALESMAN|  4|         4|    5600|
|    CLERK|  5|         5|    7350|
|  MANAGER|  3|         3|    8275|
|PRESIDENT|  1|         1|    5000|
+---------+---+----------+--------+



In [112]:
group_df = emp_df.groupBy('job').agg(
    count('job').alias('qty'),
    expr('count(job)'),
    sum('sal')
)
group_df.show()

+---------+---+----------+--------+
|      job|qty|count(job)|sum(sal)|
+---------+---+----------+--------+
|  ANALYST|  2|         2|    6000|
| SALESMAN|  4|         4|    5600|
|    CLERK|  5|         5|    7350|
|  MANAGER|  3|         3|    8275|
|PRESIDENT|  1|         1|    5000|
+---------+---+----------+--------+



In [117]:
# salary의 평균(SAL_AVG)과 표준편차(SAL_STDEV) (job별)

from pyspark.sql.functions import stddev, round
emp_df.groupBy('job').agg(
    count('job').alias('qty'),
    round(avg('sal'), 2).alias('SAL_AVG'),
    round(stddev('sal'), 2).alias('SAL_STDEV')
).show()

+---------+---+-------+---------+
|      job|qty|SAL_AVG|SAL_STDEV|
+---------+---+-------+---------+
|  ANALYST|  2| 3000.0|      0.0|
| SALESMAN|  4| 1400.0|   177.95|
|    CLERK|  5| 1470.0|   984.63|
|  MANAGER|  3|2758.33|   274.24|
|PRESIDENT|  1| 5000.0|     NULL|
+---------+---+-------+---------+



In [130]:
# 급여 TOP10 구하기
emp_df.orderBy(emp_df.sal.desc()).limit(10).show()

+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7839|  KING|PRESIDENT|NULL|1981-11-17|5000|NULL|    10|
| 9292|  JACK|    CLERK|7782|1982-01-23|3200|NULL|    70|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|NULL|    20|
| 7902|  FORD|  ANALYST|7566|1981-12-03|3000|NULL|    20|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975|NULL|    20|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|NULL|    30|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450|NULL|    10|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500|   0|    30|
| 7934|MILLER|    CLERK|7782|1982-01-23|1300|NULL|    10|
+-----+------+---------+----+----------+----+----+------+



In [140]:
# 급여 TOP10 구하기
# 윈도우 함수

from pyspark.sql.functions import desc, rank
from pyspark.sql.window import Window

windowspec = Window.orderBy(desc('sal'))
salAllRank = rank().over(windowspec)
salAllRank

Column<'RANK() OVER (ORDER BY sal DESC NULLS LAST unspecifiedframe$())'>

In [138]:
emp_df.withColumn('salary_rank', salAllRank).show(10)

+-----+------+---------+----+----------+----+----+------+-----------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|salary_rank|
+-----+------+---------+----+----------+----+----+------+-----------+
| 7839|  KING|PRESIDENT|NULL|1981-11-17|5000|NULL|    10|          1|
| 9292|  JACK|    CLERK|7782|1982-01-23|3200|NULL|    70|          2|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|NULL|    20|          3|
| 7902|  FORD|  ANALYST|7566|1981-12-03|3000|NULL|    20|          3|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975|NULL|    20|          5|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|NULL|    30|          6|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450|NULL|    10|          7|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|          8|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500|   0|    30|          9|
| 7934|MILLER|    CLERK|7782|1982-01-23|1300|NULL|    10|         10|
+-----+------+---------+----+----------+----+----+------+-----------+
only showing top 10 

In [150]:
# 직무별로 rank 작성
# window.partitionaBy()
# job_rank_df 작성

windowspec2 = Window.partitionBy('job').orderBy(emp_df.sal.desc())
jobSalRank =  rank().over(windowspec2)
jobSalRank

Column<'RANK() OVER (PARTITION BY job ORDER BY sal DESC NULLS LAST unspecifiedframe$())'>

In [155]:
job_rank_df = emp_df.withColumn('job_salary_rank', jobSalRank)
job_rank_df.show()

+-----+------+---------+----+----------+----+----+------+---------------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|job_salary_rank|
+-----+------+---------+----+----------+----+----+------+---------------+
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|NULL|    20|              1|
| 7902|  FORD|  ANALYST|7566|1981-12-03|3000|NULL|    20|              1|
| 9292|  JACK|    CLERK|7782|1982-01-23|3200|NULL|    70|              1|
| 7934|MILLER|    CLERK|7782|1982-01-23|1300|NULL|    10|              2|
| 7876| ADAMS|    CLERK|7788|1987-05-23|1100|NULL|    20|              3|
| 7900| JAMES|    CLERK|7698|1981-12-03| 950|NULL|    30|              4|
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|NULL|    20|              5|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975|NULL|    20|              1|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|NULL|    30|              2|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450|NULL|    10|              3|
| 7839|  KING|PRESIDENT|NULL|1981-11-1

In [156]:
job_rank_df.filter("job == 'SALESMAN'").show()

+-----+------+--------+----+----------+----+----+------+---------------+
|empno| ename|     job| mgr|  hiredate| sal|comm|deptno|job_salary_rank|
+-----+------+--------+----+----------+----+----+------+---------------+
| 7499| ALLEN|SALESMAN|7698|1981-02-20|1600| 300|    30|              1|
| 7844|TURNER|SALESMAN|7698|1981-09-08|1500|   0|    30|              2|
| 7521|  WARD|SALESMAN|7698|1981-02-22|1250| 500|    30|              3|
| 7654|MARTIN|SALESMAN|7698|1981-09-28|1250|1400|    30|              3|
+-----+------+--------+----+----------+----+----+------+---------------+



In [182]:
# 부서별 순위
job_rank_df.select('ename','job','sal', 'job_salary_rank').show()

+------+---------+----+---------------+
| ename|      job| sal|job_salary_rank|
+------+---------+----+---------------+
| SCOTT|  ANALYST|3000|              1|
|  FORD|  ANALYST|3000|              1|
|  JACK|    CLERK|3200|              1|
|MILLER|    CLERK|1300|              2|
| ADAMS|    CLERK|1100|              3|
| JAMES|    CLERK| 950|              4|
| SMITH|    CLERK| 800|              5|
| JONES|  MANAGER|2975|              1|
| BLAKE|  MANAGER|2850|              2|
| CLARK|  MANAGER|2450|              3|
|  KING|PRESIDENT|5000|              1|
| ALLEN| SALESMAN|1600|              1|
|TURNER| SALESMAN|1500|              2|
|  WARD| SALESMAN|1250|              3|
|MARTIN| SALESMAN|1250|              3|
+------+---------+----+---------------+



In [193]:
# 부서별 순위
from pyspark.sql.functions import rank, dense_rank, row_number, col, desc

# 2. 부서별 급여 순위
dept_window_spec = Window.partitionBy('deptno').orderBy(desc('sal'))
df_with_dept_rank = emp_df.withColumn('dept_salary_rank', 
                                  row_number().over(dept_window_spec))
df_with_dept_rank.select('ename', 'deptno', 'sal', 'dept_salary_rank').show()

+------+------+----+----------------+
| ename|deptno| sal|dept_salary_rank|
+------+------+----+----------------+
|  KING|    10|5000|               1|
| CLARK|    10|2450|               2|
|MILLER|    10|1300|               3|
| SCOTT|    20|3000|               1|
|  FORD|    20|3000|               2|
| JONES|    20|2975|               3|
| ADAMS|    20|1100|               4|
| SMITH|    20| 800|               5|
| BLAKE|    30|2850|               1|
| ALLEN|    30|1600|               2|
|TURNER|    30|1500|               3|
|  WARD|    30|1250|               4|
|MARTIN|    30|1250|               5|
| JAMES|    30| 950|               6|
|  JACK|    70|3200|               1|
+------+------+----+----------------+



In [163]:
# 누적 급여 sum('sal').over
job_rank_df.select(sum('sal')).show()

+--------+
|sum(sal)|
+--------+
|   32225|
+--------+



In [194]:
# 누적 급여 sum('sal').over
window_spec_sum = Window.partitionBy('deptno').orderBy('empno')
df_cumulative = emp_df.withColumn('cumulative_salary', 
                                sum('sal').over(window_spec_sum))
df_cumulative.select('ename', 'deptno', 'sal', 'cumulative_salary').show()

+------+------+----+-----------------+
| ename|deptno| sal|cumulative_salary|
+------+------+----+-----------------+
| CLARK|    10|2450|             2450|
|  KING|    10|5000|             7450|
|MILLER|    10|1300|             8750|
| SMITH|    20| 800|              800|
| JONES|    20|2975|             3775|
| SCOTT|    20|3000|             6775|
| ADAMS|    20|1100|             7875|
|  FORD|    20|3000|            10875|
| ALLEN|    30|1600|             1600|
|  WARD|    30|1250|             2850|
|MARTIN|    30|1250|             4100|
| BLAKE|    30|2850|             6950|
|TURNER|    30|1500|             8450|
| JAMES|    30| 950|             9400|
|  JACK|    70|3200|             3200|
+------+------+----+-----------------+



In [175]:
# 부서별 누적 급여
job_total_sal =job_rank_df.groupBy('job').agg(
    sum('sal').alias('SUM'),
    round(avg('sal'),2).alias('AVG')
)
job_total_sal.show()

+---------+----+-------+
|      job| SUM|    AVG|
+---------+----+-------+
|  ANALYST|6000| 3000.0|
| SALESMAN|5600| 1400.0|
|    CLERK|7350| 1470.0|
|  MANAGER|8275|2758.33|
|PRESIDENT|5000| 5000.0|
+---------+----+-------+



In [198]:
# 부서별 누적 급여
window_spec_avg = Window.partitionBy('deptno')
df_avg_compare = emp_df.withColumn('dept_avg_salary', 
                        round(avg('sal').over(window_spec_avg),2))
df_avg_compare.select('ename', 'deptno', 'sal', 'dept_avg_salary').show()

+------+------+----+---------------+
| ename|deptno| sal|dept_avg_salary|
+------+------+----+---------------+
| CLARK|    10|2450|        2916.67|
|  KING|    10|5000|        2916.67|
|MILLER|    10|1300|        2916.67|
| SMITH|    20| 800|         2175.0|
| JONES|    20|2975|         2175.0|
| SCOTT|    20|3000|         2175.0|
| ADAMS|    20|1100|         2175.0|
|  FORD|    20|3000|         2175.0|
| ALLEN|    30|1600|        1566.67|
|  WARD|    30|1250|        1566.67|
|MARTIN|    30|1250|        1566.67|
| BLAKE|    30|2850|        1566.67|
|TURNER|    30|1500|        1566.67|
| JAMES|    30| 950|        1566.67|
|  JACK|    70|3200|         3200.0|
+------+------+----+---------------+



In [186]:
# 부서별 평균급여와 직원 개별 급여 비교

# 1. 부서별 평균 급여를 별도의 DataFrame으로 계산 (사용자 코드)
job_avg_sal_df = job_rank_df.groupBy('job').agg(
    round(avg('sal'), 2).alias('job_avg_sal')
)

# 2. 원래의 DataFrame에 부서별 평균 급여를 조인
new_df_with_join = job_rank_df.join(
    job_avg_sal_df,
    on='job',
    how='left'
)

# 3. 개별 급여와 부서별 평균 급여를 비교하여 새로운 컬럼을 만듭니다.
new_df_with_join = new_df_with_join.withColumn(
    'sal_difference',
    round(col('sal') - col('job_avg_sal'),2)
)

# 4. 결과 확인
new_df_with_join.select('*').show()

+---------+-----+------+----+----------+----+----+------+---------------+-----------+--------------+
|      job|empno| ename| mgr|  hiredate| sal|comm|deptno|job_salary_rank|job_avg_sal|sal_difference|
+---------+-----+------+----+----------+----+----+------+---------------+-----------+--------------+
|  ANALYST| 7788| SCOTT|7566|1987-04-19|3000|NULL|    20|              1|     3000.0|           0.0|
|  ANALYST| 7902|  FORD|7566|1981-12-03|3000|NULL|    20|              1|     3000.0|           0.0|
|    CLERK| 9292|  JACK|7782|1982-01-23|3200|NULL|    70|              1|     1470.0|        1730.0|
|    CLERK| 7934|MILLER|7782|1982-01-23|1300|NULL|    10|              2|     1470.0|        -170.0|
|    CLERK| 7876| ADAMS|7788|1987-05-23|1100|NULL|    20|              3|     1470.0|        -370.0|
|    CLERK| 7900| JAMES|7698|1981-12-03| 950|NULL|    30|              4|     1470.0|        -520.0|
|    CLERK| 7369| SMITH|7902|1980-12-17| 800|NULL|    20|              5|     1470.0|      

In [188]:
# 부서별 평균급여와 직원 개별 급여 비교

In [192]:
emp_df.cube('deptno','job').agg(count('*'),sum('sal'))\
.orderBy('deptno', 'job').show() # 평균급, 최대급, 최소급

+------+---------+--------+--------+
|deptno|      job|count(1)|sum(sal)|
+------+---------+--------+--------+
|  NULL|     NULL|      15|   32225|
|  NULL|  ANALYST|       2|    6000|
|  NULL|    CLERK|       5|    7350|
|  NULL|  MANAGER|       3|    8275|
|  NULL|PRESIDENT|       1|    5000|
|  NULL| SALESMAN|       4|    5600|
|    10|     NULL|       3|    8750|
|    10|    CLERK|       1|    1300|
|    10|  MANAGER|       1|    2450|
|    10|PRESIDENT|       1|    5000|
|    20|     NULL|       5|   10875|
|    20|  ANALYST|       2|    6000|
|    20|    CLERK|       2|    1900|
|    20|  MANAGER|       1|    2975|
|    30|     NULL|       6|    9400|
|    30|    CLERK|       1|     950|
|    30|  MANAGER|       1|    2850|
|    30| SALESMAN|       4|    5600|
|    70|     NULL|       1|    3200|
|    70|    CLERK|       1|    3200|
+------+---------+--------+--------+



In [199]:
dept_df.printSchema()

root
 |-- deptno: integer (nullable = true)
 |-- dname: string (nullable = true)
 |-- loc: string (nullable = true)



In [202]:
# join
emp_dept_df = emp_df.join(dept_df, emp_df['deptno'] == dept_df['deptno'])
emp_dept_df.show()

+-----+------+---------+----+----------+----+----+------+------+----------+--------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|deptno|     dname|     loc|
+-----+------+---------+----+----------+----+----+------+------+----------+--------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|NULL|    20|    20|  RESEARCH|  DALLAS|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|    30|     SALES| CHICAGO|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250| 500|    30|    30|     SALES| CHICAGO|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975|NULL|    20|    20|  RESEARCH|  DALLAS|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250|1400|    30|    30|     SALES| CHICAGO|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|NULL|    30|    30|     SALES| CHICAGO|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450|NULL|    10|    10|ACCOUNTING|NEW YORK|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|NULL|    20|    20|  RESEARCH|  DALLAS|
| 7839|  KING|PRESIDENT|NULL|1981-11-17|5000|NULL|    10|    10|A

In [204]:
join_df = emp_df.join(dept_df, on='deptno', how='inner')
join_df.select('ename', 'deptno', 'dname').show()

+------+------+----------+
| ename|deptno|     dname|
+------+------+----------+
| SMITH|    20|  RESEARCH|
| ALLEN|    30|     SALES|
|  WARD|    30|     SALES|
| JONES|    20|  RESEARCH|
|MARTIN|    30|     SALES|
| BLAKE|    30|     SALES|
| CLARK|    10|ACCOUNTING|
| SCOTT|    20|  RESEARCH|
|  KING|    10|ACCOUNTING|
|TURNER|    30|     SALES|
| ADAMS|    20|  RESEARCH|
| JAMES|    30|     SALES|
|  FORD|    20|  RESEARCH|
|MILLER|    10|ACCOUNTING|
+------+------+----------+



In [205]:
spark.stop()