In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FirstSparkSessionApp").getOrCreate()

In [8]:
df = spark.read.format('csv').load('data/2015-summary.csv', inferSchema=True, header=True)

In [9]:
df.schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,IntegerType,true)))

In [10]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [11]:
df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [12]:
# SPARK DATA TABLE
df.select("DEST_COUNTRY_NAME").show(5)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
|    United States|
|            Egypt|
|    United States|
+-----------------+
only showing top 5 rows



In [13]:
df.createOrReplaceTempView("mobility_data")

In [16]:
df.count()

256

In [25]:
df_dup = df.select('DEST_COUNTRY_NAME').dropDuplicates()
df_dup.show()

+--------------------+
|   DEST_COUNTRY_NAME|
+--------------------+
|            Anguilla|
|              Russia|
|            Paraguay|
|             Senegal|
|              Sweden|
|            Kiribati|
|              Guyana|
|         Philippines|
|            Djibouti|
|            Malaysia|
|           Singapore|
|                Fiji|
|              Turkey|
|                Iraq|
|             Germany|
|              Jordan|
|               Palau|
|Turks and Caicos ...|
|              France|
|              Greece|
+--------------------+
only showing top 20 rows



In [22]:
df_dup.distinct().count()

                                                                                

132

In [26]:
df_dup = df.select('DEST_COUNTRY_NAME').dropDuplicates().cache() #z캐시를 할때랑 하지 않을때의 차이는 4040포트 사이트에서 확인
# 캐시를 쓰면 본래 사용되는 메모리가 줄어든다. 
df_dup.show()

+--------------------+
|   DEST_COUNTRY_NAME|
+--------------------+
|            Anguilla|
|              Russia|
|            Paraguay|
|             Senegal|
|              Sweden|
|            Kiribati|
|              Guyana|
|         Philippines|
|            Djibouti|
|            Malaysia|
|           Singapore|
|                Fiji|
|              Turkey|
|                Iraq|
|             Germany|
|              Jordan|
|               Palau|
|Turks and Caicos ...|
|              France|
|              Greece|
+--------------------+
only showing top 20 rows



24/12/06 16:13:20 WARN CacheManager: Asked to cache already cached data.


In [15]:
# SPARK SQL
spark.sql("select * from mobility_data").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [None]:
# GROUP BY 집계
'''
select * from 


In [31]:
from pyspark.sql.functions import expr

In [34]:
# 컬럼을 추가하기
df3 = df.withColumn('withInCountry', expr('ORIGIN_COUNTRY_NAME==DEST_COUNTRY_NAME'))

In [35]:
df3.show(2) 

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withInCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [39]:
# 컬럼을 추가하기(컬럼값의 조건식은 다음과 같음)
df4 = df.withColumn('category', expr('CASE WHEN count<10 THEN "under" WHEN count>=10 THEN "upper" END '))  #조건식은 SQL문으로
df4.show(2)

+-----------------+-------------------+-----+--------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|category|
+-----------------+-------------------+-----+--------+
|    United States|            Romania|   15|   upper|
|    United States|            Croatia|    1|   under|
+-----------------+-------------------+-----+--------+
only showing top 2 rows



In [40]:
# count값의 2배를 계산하여 새로운 컬럼 추가
df5 = df.withColumn('count_double', df['count'] * 2)
df5.show(2)

+-----------------+-------------------+-----+------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count_double|
+-----------------+-------------------+-----+------------+
|    United States|            Romania|   15|          30|
|    United States|            Croatia|    1|           2|
+-----------------+-------------------+-----+------------+
only showing top 2 rows



In [46]:
df6 = df5.where('count<5' )

In [47]:
df6.count()

46

In [49]:
df5.where('count < 5').where('ORIGIN_COUNTRY_NAME != "United States" ').show()

+-----------------+-------------------+-----+------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count_double|
+-----------------+-------------------+-----+------------+
|    United States|            Croatia|    1|           2|
|    United States|          Singapore|    1|           2|
|    United States|          Gibraltar|    1|           2|
|    United States|             Cyprus|    1|           2|
|    United States|           Malaysia|    3|           6|
|    United States|            Vietnam|    2|           4|
|    United States|            Estonia|    1|           2|
|    United States|            Hungary|    3|           6|
|    United States|           Thailand|    4|           8|
|    United States|            Liberia|    2|           4|
|    United States|              Malta|    2|           4|
|    United States|          Lithuania|    1|           2|
|    United States|           Bulgaria|    1|           2|
|    United States|            Georgia|    1|           

In [51]:
# count가 200 이상인 ORIGIN_COUNTRY_NAME 필터링
count_above_200 = df5.filter(df5['count'] >= 200).select('ORIGIN_COUNTRY_NAME').distinct()

count_above_200.show()

+--------------------+
| ORIGIN_COUNTRY_NAME|
+--------------------+
|             Germany|
|Turks and Caicos ...|
|              France|
|              Taiwan|
|             Belgium|
|             Ecuador|
|           Nicaragua|
|                Peru|
|       United States|
|               China|
|      Cayman Islands|
|               Italy|
|               Spain|
|                Cuba|
|             Ireland|
|              Panama|
|           Hong Kong|
|           Venezuela|
|             Iceland|
|         South Korea|
+--------------------+
only showing top 20 rows



In [53]:
# 국내 여행이 아닌 데이터를 필터링하고, 횟수가 많은 ORIGIN_COUNTRY_NAME Top 10 추출
non_domestic_top10 = df5.filter(df5['DEST_COUNTRY_NAME'] != df5['ORIGIN_COUNTRY_NAME']).groupBy('ORIGIN_COUNTRY_NAME') \
                        .agg(sum('count').alias('total_count')).orderBy('total_count', ascending=False)

# 결과 출력
non_domestic_top10.show(10)

TypeError: unsupported operand type(s) for +: 'int' and 'str'

In [54]:
spark.stop()

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SecondSparkSessionApp").getOrCreate()

24/12/09 10:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("data/emp.csv")

df.printSchema()

                                                                                

root
 |-- empno: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: integer (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: integer (nullable = true)
 |-- comm: integer (nullable = true)
 |-- deptno: integer (nullable = true)



In [3]:
df.select('ename', 'deptno').where('deptno=20').show() #조건을 추가

+-----+------+
|ename|deptno|
+-----+------+
|SMITH|    20|
|JONES|    20|
|SCOTT|    20|
|ADAMS|    20|
| FORD|    20|
+-----+------+



In [38]:
from pyspark.sql.functions import count, countDistinct, approx_count_distinct, min, max, expr

In [7]:
# 카운트 집계
df.select(count('job')).show() #count함수는 따로 정의해주어야함. 

+----------+
|count(job)|
+----------+
|        15|
+----------+



In [8]:
df.selectExpr('count(*)').show() #이렇게 하면 null이 제외되며 위의 값과 비교해 null의 존재를 확인

+--------+
|count(1)|
+--------+
|      15|
+--------+



In [10]:
df.select('job').distinct().count()

                                                                                

5

In [16]:
df.select(approx_count_distinct('job', 0.1)).show() #한번에 쓰는법, 성능이 조금 더 빠른 편, 옆에 0.1은 10%정도의 오차를 감수한다는 뜻

+--------------------------+
|approx_count_distinct(job)|
+--------------------------+
|                         5|
+--------------------------+



In [21]:
df.select(max('deptno')).show()  # deptno 컬럼의 최대값, 위에서 max를 쓰지 않으면 파이썬에 내장된 max를 쓰게 된다!

+-----------+
|max(deptno)|
+-----------+
|         70|
+-----------+



In [22]:
df.select(min('deptno')).show()  # deptno 컬럼의 최대값

+-----------+
|min(deptno)|
+-----------+
|         10|
+-----------+



In [28]:
# df.select().distinct().sum()
df.select('sal').distinct().agg({'sal': 'sum'}).show()



+--------+
|sum(sal)|
+--------+
|   27975|
+--------+



                                                                                

In [30]:
dfs = df.select(
    count('sal').alias('total_tx'),
    sum('sal').alias('total_salary'),
    avg('sal').alias('avg_salary'),
    espr('mean(sal)')
)

dfs.show()

TypeError: unsupported operand type(s) for +: 'int' and 'str'

In [31]:
df.groupBy('job').count().show()

+---------+-----+
|      job|count|
+---------+-----+
|  ANALYST|    2|
| SALESMAN|    4|
|    CLERK|    5|
|  MANAGER|    3|
|PRESIDENT|    1|
+---------+-----+



In [36]:
dfs = df.groupBy('job').agg(expr('avg(sal) as SAL_AVG'))
dfs.show()

+---------+------------------+
|      job|           SAL_AVG|
+---------+------------------+
|  ANALYST|            3000.0|
| SALESMAN|            1400.0|
|    CLERK|            1470.0|
|  MANAGER|2758.3333333333335|
|PRESIDENT|            5000.0|
+---------+------------------+



In [40]:
df.groupBy('job').agg(expr('stddev_pop(sal)')).show()

+---------+------------------+
|      job|   stddev_pop(sal)|
+---------+------------------+
|  ANALYST|               0.0|
| SALESMAN|154.11035007422439|
|    CLERK| 880.6815542521599|
|  MANAGER|223.91714737574006|
|PRESIDENT|               0.0|
+---------+------------------+



In [None]:
# 윈도우 함수 
# - 순위,정렬(rank, row_number, dense_rank)
# - 누계(sum, avg, max, min, over())
# - 이동평균(over+rowsBetween, rangeBetween)
# - 시차, 선행(lag, lead)
# ex) 세션 구간내 분석, 특정 시간동안 일어난 활동 그룹화

In [43]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, rank

# 윈도우 명세 설정
windowspec = Window.orderBy(desc('sal'))

In [45]:
salAllRank = rank().over(windowspec)
salAllRank

Column<'RANK() OVER (ORDER BY sal DESC NULLS LAST unspecifiedframe$())'>

In [46]:
df.withColumn("salary_rank", salAllRank).show()

24/12/09 13:39:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-----+------+---------+----+----------+----+----+------+-----------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|salary_rank|
+-----+------+---------+----+----------+----+----+------+-----------+
| 7839|  KING|PRESIDENT|null|1981-11-17|5000|null|    10|          1|
| 9292|  JACK|    CLERK|7782|1982-01-23|3200|null|    70|          2|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|null|    20|          3|
| 7902|  FORD|  ANALYST|7566|1981-12-03|3000|null|    20|          3|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975|null|    20|          5|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|null|    30|          6|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450|null|    10|          7|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|          8|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500|   0|    30|          9|
| 7934|MILLER|    CLERK|7782|1982-01-23|1300|null|    10|         10|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250| 500|    30|         11|
| 7654|MARTIN| SALES

In [49]:
from pyspark.sql.functions import col
df.select('empno', col('sal').alias('salary_rank')).show()

+-----+-----------+
|empno|salary_rank|
+-----+-----------+
| 7369|        800|
| 7499|       1600|
| 7521|       1250|
| 7566|       2975|
| 7654|       1250|
| 7698|       2850|
| 7782|       2450|
| 7788|       3000|
| 7839|       5000|
| 7844|       1500|
| 7876|       1100|
| 7900|        950|
| 7902|       3000|
| 7934|       1300|
| 9292|       3200|
+-----+-----------+



In [62]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank



df = df.withColumn('salary_rank', rank().over(windowSpec))

df.select('empno', 'salary_rank').show()

+-----+-----------+
|empno|salary_rank|
+-----+-----------+
| 7839|          1|
| 9292|          2|
| 7788|          3|
| 7902|          3|
| 7566|          5|
| 7698|          6|
| 7782|          7|
| 7499|          8|
| 7844|          9|
| 7934|         10|
| 7521|         11|
| 7654|         11|
| 7876|         13|
| 7900|         14|
| 7369|         15|
+-----+-----------+



24/12/09 13:52:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [63]:
windowSpec = Window.orderBy(col('sal').desc())
windowspec1 = windowSpec.partitionBy('job').orderBy(desc('sal'))
salJobRank = rank().over(windowspec1)

In [64]:
df.select(
    'job', 'ename', 'sal',
    windowspec.alias('salJobRank')
).show()

AttributeError: 'WindowSpec' object has no attribute 'alias'

In [67]:
windowSpec = Window.partitionBy('job').orderBy(col('sal').desc())
df = df.withColumn('salJobRank', rank().over(windowSpec))
df.select('job', 'ename', 'sal', 'salJobRank').show()

+---------+------+----+----------+
|      job| ename| sal|salJobRank|
+---------+------+----+----------+
|  ANALYST| SCOTT|3000|         1|
|  ANALYST|  FORD|3000|         1|
| SALESMAN| ALLEN|1600|         1|
| SALESMAN|TURNER|1500|         2|
| SALESMAN|  WARD|1250|         3|
| SALESMAN|MARTIN|1250|         3|
|    CLERK|  JACK|3200|         1|
|    CLERK|MILLER|1300|         2|
|    CLERK| ADAMS|1100|         3|
|    CLERK| JAMES| 950|         4|
|    CLERK| SMITH| 800|         5|
|  MANAGER| JONES|2975|         1|
|  MANAGER| BLAKE|2850|         2|
|  MANAGER| CLARK|2450|         3|
|PRESIDENT|  KING|5000|         1|
+---------+------+----+----------+



new_df = df.withColumn('dept_salary_rank', dense_rank().over(dept_window_spec))
new_df.select('ename', 'deptno', 'sal', 'dept_salary_rank').show()

In [72]:
# 연습 : 부서별로 평균 급여를 구해보기
from pyspark.sql.functions import avg

dept_avg_salary = df.groupBy('deptno').agg(avg('sal').alias('avg_salary'))
dept_avg_salary.show()

+------+------------------+
|deptno|        avg_salary|
+------+------------------+
|    20|            2175.0|
|    10|2916.6666666666665|
|    70|            3200.0|
|    30|1566.6666666666667|
+------+------------------+



In [74]:
from pyspark.sql.functions import lag, lead

# 부서별로 empno 기준으로 정렬된 윈도우 스펙 정의
row_window_spec = Window.partitionBy('deptno').orderBy('empno')

# 이전 급여와 다음 급여 컬럼 추가
lead_lag_sal_df = df.withColumn('prev_salary', lag('sal').over(row_window_spec)) \
    .withColumn('next_salary', lead('sal').over(row_window_spec))

# 필요한 열만 선택하여 출력
lead_lag_sal_df.select('ename', 'deptno', 'sal', 'prev_salary', 'next_salary').show()

+------+------+----+-----------+-----------+
| ename|deptno| sal|prev_salary|next_salary|
+------+------+----+-----------+-----------+
| SMITH|    20| 800|       null|       2975|
| JONES|    20|2975|        800|       3000|
| SCOTT|    20|3000|       2975|       1100|
| ADAMS|    20|1100|       3000|       3000|
|  FORD|    20|3000|       1100|       null|
| CLARK|    10|2450|       null|       5000|
|  KING|    10|5000|       2450|       1300|
|MILLER|    10|1300|       5000|       null|
|  JACK|    70|3200|       null|       null|
| ALLEN|    30|1600|       null|       1250|
|  WARD|    30|1250|       1600|       1250|
|MARTIN|    30|1250|       1250|       2850|
| BLAKE|    30|2850|       1250|       1500|
|TURNER|    30|1500|       2850|        950|
| JAMES|    30| 950|       1500|       null|
+------+------+----+-----------+-----------+



=
SELECT 
    ename, 
    deptno, 
    sal, 
    LAG(sal) OVER(PARTITION BY deptno ORDER BY empno) AS prev_salary, 
    LEAD(sal) OVER(PARTITION BY deptno ORDER BY empno) AS next_salary
FROM emp;

# rollup과 cube : group by랑 유사한 기능을 하나 
rollup : 계층적집계, 부분합, 총합
cube : 모든 값으로 부분합, 결함 가능한 모든 값의 부분합을 구한다. 

In [75]:
df

DataFrame[empno: int, ename: string, job: string, mgr: int, hiredate: string, sal: int, comm: int, deptno: int, salary_rank: int, salJobRank: int]

In [79]:
# 그룹화를 진행

from pyspark.sql.functions import count, sum

# 그룹화를 진행하고 집계 연산 수행
df.groupBy('deptno', 'job').agg(
    count('*').alias('count'),
    sum('sal').alias('total_salary')
).show()

+------+---------+-----+------------+
|deptno|      job|count|total_salary|
+------+---------+-----+------------+
|    20|  ANALYST|    2|        6000|
|    20|  MANAGER|    1|        2975|
|    30|  MANAGER|    1|        2850|
|    70|    CLERK|    1|        3200|
|    30| SALESMAN|    4|        5600|
|    30|    CLERK|    1|         950|
|    20|    CLERK|    2|        1900|
|    10|PRESIDENT|    1|        5000|
|    10|    CLERK|    1|        1300|
|    10|  MANAGER|    1|        2450|
+------+---------+-----+------------+



In [85]:
df.rollup('deptno', 'job').agg(count('*'), max('sal')).orderBy('deptno','job').show()

+------+---------+--------+--------+
|deptno|      job|count(1)|max(sal)|
+------+---------+--------+--------+
|  null|     null|      15|    5000|
|    10|     null|       3|    5000|
|    10|    CLERK|       1|    1300|
|    10|  MANAGER|       1|    2450|
|    10|PRESIDENT|       1|    5000|
|    20|     null|       5|    3000|
|    20|  ANALYST|       2|    3000|
|    20|    CLERK|       2|    1100|
|    20|  MANAGER|       1|    2975|
|    30|     null|       6|    2850|
|    30|    CLERK|       1|     950|
|    30|  MANAGER|       1|    2850|
|    30| SALESMAN|       4|    1600|
|    70|     null|       1|    3200|
|    70|    CLERK|       1|    3200|
+------+---------+--------+--------+



                                                                                

df.cube('deptno', 'job').agg(count('*'), sum('sal')).orderBy('deptno','job').show()

In [87]:
df.rollup('deptno', 'job').agg(
    count('*').alias('count'),
    max('sal').alias('max_sal'),
    min('sal').alias('min_sal')
).orderBy('deptno', 'job').show()

+------+---------+-----+-------+-------+
|deptno|      job|count|max_sal|min_sal|
+------+---------+-----+-------+-------+
|  null|     null|   15|   5000|    800|
|    10|     null|    3|   5000|   1300|
|    10|    CLERK|    1|   1300|   1300|
|    10|  MANAGER|    1|   2450|   2450|
|    10|PRESIDENT|    1|   5000|   5000|
|    20|     null|    5|   3000|    800|
|    20|  ANALYST|    2|   3000|   3000|
|    20|    CLERK|    2|   1100|    800|
|    20|  MANAGER|    1|   2975|   2975|
|    30|     null|    6|   2850|    950|
|    30|    CLERK|    1|    950|    950|
|    30|  MANAGER|    1|   2850|   2850|
|    30| SALESMAN|    4|   1600|   1250|
|    70|     null|    1|   3200|   3200|
|    70|    CLERK|    1|   3200|   3200|
+------+---------+-----+-------+-------+



                                                                                

```sql
SELECT 
    deptno, 
    job, 
    MAX(sal) AS max_sal, 
    MIN(sal) AS min_sal
FROM emp
GROUP BY ROLLUP(deptno, job);
ORER BY deptno, job;

In [91]:
df.groupBy('job').agg(avg('sal').alias('avg_salary')).show()

+---------+------------------+
|      job|        avg_salary|
+---------+------------------+
|  ANALYST|            3000.0|
| SALESMAN|            1400.0|
|    CLERK|            1470.0|
|  MANAGER|2758.3333333333335|
|PRESIDENT|            5000.0|
+---------+------------------+



In [95]:
df.cube('deptno', 'job').agg(avg('sal').alias('avg_salary'),max('sal').alias('max_salary')).orderBy('deptno', 'job').show()

+------+---------+------------------+----------+
|deptno|      job|        avg_salary|max_salary|
+------+---------+------------------+----------+
|  null|     null|2148.3333333333335|      5000|
|  null|  ANALYST|            3000.0|      3000|
|  null|    CLERK|            1470.0|      3200|
|  null|  MANAGER|2758.3333333333335|      2975|
|  null|PRESIDENT|            5000.0|      5000|
|  null| SALESMAN|            1400.0|      1600|
|    10|     null|2916.6666666666665|      5000|
|    10|    CLERK|            1300.0|      1300|
|    10|  MANAGER|            2450.0|      2450|
|    10|PRESIDENT|            5000.0|      5000|
|    20|     null|            2175.0|      3000|
|    20|  ANALYST|            3000.0|      3000|
|    20|    CLERK|             950.0|      1100|
|    20|  MANAGER|            2975.0|      2975|
|    30|     null|1566.6666666666667|      2850|
|    30|    CLERK|             950.0|       950|
|    30|  MANAGER|            2850.0|      2850|
|    30| SALESMAN|  

                                                                                

In [96]:
spark.