In [1]:
from pyspark.sql.functions import udf # 사용자 정의 함수
from pyspark.sql.types import BooleanType, IntegerType, StringType # 데이터 타입

In [2]:
# us_carrier_df = spark.read.load(schema='us_carrier_df')
us_carrier_df = spark.read.csv("/us_carrier_df.csv", header = 'True', inferSchema='True')
display(us_carrier_df)

Year,Month,DayofMonth,DayOfWeek,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,Origin,Dest,Distance,Cancelled,Diverted
1988,1,9,6,PI,942,,70.0,64,SYR,BWI,273,False,False
1988,1,10,7,PI,942,,69.0,64,SYR,BWI,273,False,False
1988,1,11,1,PI,942,,67.0,64,SYR,BWI,273,False,False
1988,1,12,2,PI,942,,64.0,64,SYR,BWI,273,False,False
1988,1,13,3,PI,942,,82.0,64,SYR,BWI,273,False,False
1988,1,14,4,PI,942,,75.0,64,SYR,BWI,273,False,False
1988,1,15,5,PI,942,,63.0,64,SYR,BWI,273,False,False
1988,1,16,6,PI,942,,60.0,64,SYR,BWI,273,False,False
1988,1,17,7,PI,942,,69.0,64,SYR,BWI,273,False,False
1988,1,18,1,PI,942,,90.0,64,SYR,BWI,273,False,False


In [3]:
us_carrier_df.cache()
us_carrier_df.createGlobalTempView("us_carrier")

In [4]:
# 1. 얼마나 많은 항공사가 있는가?

carriers_only_df = us_carrier_df.select("UniqueCarrier") # UniqueCarrier 컬럼만 가져오기
carriers_only_df.show(50)

In [5]:
# 중복제거
carriers_only_distinct_df = carriers_only_df.distinct()
carriers_only_distinct_df.show()

In [6]:
# 같은 작업을 SQL문으로 처리하는 방법
# createGlobalTempView()를 이용해 DataFrame을 미리 전역 뷰로 저장해 놓았음.(us_carrier)
spark.sql("SELECT DISTINCT UniqueCarrier FROM global_temp.us_carrier").show()

In [7]:
# 2. DL항공사는 1990년에 얼마나 비행을 했는가?
from pyspark.sql.functions import col # 특정 컬럼값에 해당하는 인스턴스만 가져오고 싶을 때 사용.

us_carrier_df.filter(col('UniqueCarrier') == 'DL').show() # DL 항공사의 비행정보만 가져옴

In [8]:
# 1990년에 있었던 비행 정보
us_carrier_df.filter(col('Year') == 1990).show()

In [9]:
# DL항공사의 1990년 비행 정보
# us_carrier_df.filter(col('UniqueCarrier') == 'DL').filter(col('Year') == 1990).show()
# 혹은
us_carrier_df.filter((col('UniqueCarrier') == 'DL') & (col('Year') == 1990)).show()

In [10]:
# DL항공사의 1990년 비행 횟수
us_carrier_df.filter((col('UniqueCarrier') == 'DL') & (col('Year') == 1990)).count()

In [11]:
# SQL문을 이용해 조건 검색
spark.sql("SELECT * FROM global_temp.us_carrier WHERE UniqueCarrier == 'DL' AND Year == 1990").show()

In [12]:
# 3. 운행 거리의 평균, 최소, 최대값은?

# SQL
spark.sql("SELECT MEAN(distance), MIN(distance), MAX(distance) FROM global_temp.us_carrier").show()


In [13]:
from pyspark.sql.functions import min, mean, max

# distance 컬럼에 대해 aggregation 수행.
# aggregation: 특정 컬럼값 전체에 대해 수행되어야하는 연산
us_carrier_df.select(mean('distance'), min('distance'), max('distance')).show()

In [14]:
# distance가 0인 비행?
# 운항이 취소되거나 회항한 항공편.(Cancelled, Diverted)
# 출발지와 도착지가 같음(Origin, Dest)
us_carrier_df.filter(col('distance') == 0).show()

In [15]:
# 가장 먼 거리를 운행한 항공편들
# 뉴욕(JFK) <--> 하와이(HNL)
us_carrier_df.filter(col('distance') == 4983).show()

In [16]:
# distance에 대한 주요 통계값 출력
us_carrier_df.describe('distance').show()

In [17]:
# 4. 1987년과 1993년의 전체 운항 노선 수는?
# 운항노선 = (출발지, 도착지)

line_distinct_1987_df = us_carrier_df.filter(col('Year') == 1987) \
  .select('Origin', 'Dest') \
  .distinct()

line_distinct_1987_df.show()

In [18]:
# 정렬해서 출력
line_distinct_1987_df.orderBy('Origin', 'Dest').show()

In [19]:
line_distinct_1987_df.count()

In [20]:
# SQL
spark.sql("SELECT COUNT(DISTINCT(Origin, Dest)) AS COUNT FROM global_temp.us_carrier WHERE Year = 1987").show()

In [21]:
us_carrier_df.filter(col('Year') == 1993) \
  .select('Origin', 'Dest') \
  .distinct() \
  .count()

In [22]:
# 5. 각 항공에는 얼마나 많은 도착 기록이 있는가?

dest_by_arrival_df = us_carrier_df.groupby('Dest').count()
dest_by_arrival_df.show()

In [23]:
# 항공명 기준 정렬
dest_by_arrival_df.orderBy('Dest').show()

In [24]:
# 도착수 기준 정렬
from pyspark.sql.functions import desc

dest_by_arrival_df.orderBy(desc('count')).show()

In [25]:
# SQL
spark.sql("SELECT Dest, COUNT(*) FROM global_temp.us_carrier GROUP BY Dest ORDER BY Dest").show()

In [26]:
# 6. 평균적으로 실제 비행 시간과 예상 비행 시간 간의 차이가 가장 큰 노선은?
# 각 노선별 실제 비행 시간 평균
actual_elapsed_df = us_carrier_df.groupBy('Origin', 'Dest').mean('ActualElapsedTime')
actual_elapsed_df.show()

In [27]:
# 예상 시간은 항상 동일한 것이 원칙.
# 편의상 예상 시간 중 최소값 선택
expected_elapsed_df = us_carrier_df.groupBy('Origin', 'Dest').min('CRSElapsedTime')
expected_elapsed_df.show()

In [28]:
# 실제 평균 시간 테이블과 예상 시간 테이블 조인
joined_df = actual_elapsed_df\
  .join(expected_elapsed_df, (actual_elapsed_df['Origin'] == expected_elapsed_df['Origin']) & (actual_elapsed_df['Dest'] == expected_elapsed_df['Dest']))

joined_df.show()

In [29]:
# 동일한 컬럼 제거 및 컬럼명 수정
joined_aligned_df = joined_df\
  .select(actual_elapsed_df['Origin'], actual_elapsed_df['Dest'], 'avg(ActualElapsedTime)', col('min(CRSElapsedTime)').alias('CRSElapsedTime'))

joined_aligned_df.show()

In [30]:
# 실제 평균 시간과 예상 시간의 차이
from pyspark.sql.functions import abs

difference_df = joined_aligned_df \
  .select('Origin', 'Dest', abs(col('avg(ActualElapsedTime)') - col('CRSElapsedTime')))

difference_df.show()

In [31]:
# 컬럼명 변경 및 차이값 내림차순 정렬
difference_df \
  .select('Origin', 'Dest', \
          col('abs((avg(ActualElapsedTime) - CRSElapsedTime))').alias('difference'))\
  .orderBy(desc('difference'))\
  .show()

In [32]:
# 7. 각 항공별로 다른 공항으로 향하는 운항 수는 몇%?

# 공항별 출발 운항 수
count_per_origin_df = us_carrier_df\
  .groupBy('Origin')\
  .count()\
  .select('Origin', col('count').alias('total'))\
  .orderBy('Origin')

count_per_origin_df.show()

In [33]:
# 공항, 목적지별 운항 수
count_per_origin_dest_df = us_carrier_df\
  .groupBy('Origin', 'Dest')\
  .count()\
  .orderBy('Origin', 'Dest')

count_per_origin_dest_df.show()

In [34]:
joined_df = count_per_origin_df \
  .join(count_per_origin_dest_df, count_per_origin_df['Origin'] == count_per_origin_dest_df['Origin'])\
  .drop(count_per_origin_df['Origin'])

# 표시 순서 변경 + (Origin, Dest) 순으로 정렬.
joined_reordered_df = joined_df\
  .select('Origin', 'Dest', 'count', 'total')\
  .orderBy('Origin', 'Dest')

joined_reordered_df.show()

In [35]:
# 공항별 운항수% 계산
from pyspark.sql.functions import round

origin_to_dest_rate = joined_reordered_df.select('Origin', 'Dest', \
                                                 round((col('count') / col('total') * 100), 2).alias('rate'))

origin_to_dest_rate.show()

In [36]:
# Databricks cloud의 display로 확인
display(origin_to_dest_rate)

Origin,Dest,rate
ABE,ALB,0.0
ABE,ATL,13.45
ABE,AVP,1.32
ABE,AZO,0.0
ABE,BDL,0.0
ABE,BHM,0.0
ABE,BWI,2.08
ABE,CLE,4.77
ABE,CLT,5.9
ABE,CVG,5.6


In [37]:
# 8. 해당 공항에서 출발, 도착을 모두 하는 항공사는 얼마나 있는가?
# 공항별 출발 항공사
from pyspark.sql.functions import collect_list

depart_carriers = us_carrier_df.select('Origin', 'UniqueCarrier').distinct().groupBy('Origin')\
.agg(collect_list('UniqueCarrier').alias('departs')) # collect_list 함수를 사용해 항공사들('UniqueCarrier')을 별도로 묶음!!!

depart_carriers.show()

In [38]:
# 공항별 도착 항공사
arrival_carriers = us_carrier_df.select('Dest', 'UniqueCarrier').distinct().groupBy('Dest').agg(collect_list('UniqueCarrier').alias('arrivals'))

arrival_carriers.show()

In [39]:
# 앞에 두개 합치고
departs_arrivals = depart_carriers.join(arrival_carriers, depart_carriers['Origin'] == arrival_carriers['Dest'])\
  .select(col('Origin').alias('airport'), 'departs', 'arrivals')

departs_arrivals.show()

In [40]:
# 각 공항에서 출발, 도착 모두 하는 항공사의 목록 = 공항별 departs와 arriavl의 교집합
# 도착만 하는 항공사의 목록 = 공항별 arrival에서 departs를 뺀 차집합
# 출발만 하는 항공사의 목록 = 공항별 departs에서 arrival을 뺀 차집합

from pyspark.sql.functions import array_intersect, array_except # 교집합, 차집합 함수

# 새 컬럼 추가 withColumn
departs_arrivals_both = departs_arrivals.withColumn('both', array_intersect('departs', 'arrivals'))\
  .withColumn('arrival_only', array_except('arrivals', 'departs'))\
  .withColumn('depart_only', array_except('departs', 'arrivals'))\
  .drop('departs')\
  .drop('arrivals')

departs_arrivals_both.show()

In [41]:
# 특이한 항공들(도착만 혹은 출발만이 있는....) 추려냄. filter
# 배열 크기가 0 이상인 인스턴스들
from pyspark.sql.functions import size # 배열크기

departs_arrivals_both.filter((size(departs_arrivals_both['arrival_only']) > 0) | (size(departs_arrivals_both['depart_only']) > 0)).show(100)

In [42]:
# 9. 각 노선별로 다음 네 항공사의 운행은 몇 개?
# UA, TW, DL, US

# pivot 테이블 이용
us_carrier_df\
  .groupBy('Origin', 'Dest')\
  .pivot('UniqueCarrier', ['UA', 'TW', 'DL', 'US'])\
  .count()\
  .orderBy('Origin', 'Dest')\
  .show()

In [43]:
# 10. 각 공항의 PageRank?
# PageRank란 방향성이 있는 그래프에서(Directed Graph) 무작위로 그래프를 탐색할 때 임의의 정점(Node)에 도착할 확률을 가리킴.
# 가장 번잡한 공항 예측?
# 각 공항을 정점(Node), 공항에서 다른 공항으로의 운항을 유향 간선(Directed Edge)
# spark의 확장 패키지인 graphframes를 이용해 페이지랭크 구하기

# 출발지
origin_df = us_carrier_df\
  .select(col('Origin').alias('id'))\
  .distinct()

# 도착지
dest_df = us_carrier_df\
  .select(col('Dest').alias('id'))\
  .distinct()

In [44]:
# graphframes를 구성하기 위한 조건 1
# 정점(vertice)을 가리키는 DataFrame에는 'id' column이 있어야 함.
# 전체 공항 목록
vertice_df = origin_df\
  .union(dest_df)\
  .distinct()

vertice_df.show()

In [45]:
# graphframes를 구성하기 위한 조건 2
# 간선(edge)을 가리키는 DataFrame에는 'src', 'dst' column이 있어야 함.
# 출발, 도착 쌍
edge_df = us_carrier_df\
  .select(col('Origin').alias('src'), col('Dest').alias('dst'))\
  .distinct()

edge_df.show()

In [46]:
from graphframes import *

gf = GraphFrame(vertice_df, edge_df)

print(gf)

In [47]:
# 페이지랭크 구하기
# 초기화 확률은 0.15로, 최대 반복 수는 10
pg_results_df = gf.pageRank(resetProbability=0.15, maxIter=10)

In [48]:
# vertices로 연산이 완료된 정점 정보 확인
pg_results_df.vertices.orderBy(desc('pagerank')).show() # 역순을 정렬

In [49]:
# Databricks cloud의 display로 확인
display(pg_results_df.vertices.orderBy(desc('pagerank')))

id,pagerank
ATL,8.569511327329835
SLC,6.837090454041103
DFW,6.818256025766324
ORD,6.503610249541604
MSP,5.888771938630684
DEN,5.620870225568939
IAH,5.365047285685108
CVG,4.762090722016809
LAX,4.419828105868707
DTW,4.351968303170751
