In [1]:
value = 'Spark'
print("This is %s Notebook!" % value)

In [2]:
### 데이터 저장소 연결하기(AWS S3 bucket에 연결)
ACCESS_KEY_ID = '<ACCESS_KEY_ID>'
SECRET_ACCESS_KEY = '<SECRET_ACCESS_KEY>'
ENCODED_SECRET_ACCESS_KEY = SECRET_ACCESS_KEY.replace("/","%2F")

dbutils.fs.mount("s3a://%s:%s@edwith-pyspark-dataset" % (ACCESS_KEY_ID, ENCODED_SECRET_ACCESS_KEY), "/mnt/us-carrier-dataset")

In [3]:
display(dbutils.fs.ls('/mnt/us-carrier-dataset'))

path,name,size
dbfs:/mnt/us-carrier-dataset/1987.csv,1987.csv,127162942
dbfs:/mnt/us-carrier-dataset/1988.csv,1988.csv,501039472
dbfs:/mnt/us-carrier-dataset/1989.csv,1989.csv,486518821
dbfs:/mnt/us-carrier-dataset/1990.csv,1990.csv,509194687
dbfs:/mnt/us-carrier-dataset/1991.csv,1991.csv,491210093
dbfs:/mnt/us-carrier-dataset/1992.csv,1992.csv,492313731
dbfs:/mnt/us-carrier-dataset/1993.csv,1993.csv,490753652
dbfs:/mnt/us-carrier-dataset/1994.csv,1994.csv,501558665
dbfs:/mnt/us-carrier-dataset/1995.csv,1995.csv,530751568
dbfs:/mnt/us-carrier-dataset/1996.csv,1996.csv,533922363


In [4]:
### SparkSession 객체 생성 후, 데이터 Load, 데이터 확인하기
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Spark EDA example on us aircraft data").getOrCreate()

raw_df = spark.read.csv("/mnt/us-carrier-dataset", header='True', inferSchema='True')
# 데이터 Schema(스키마) 형태 표시!
raw_df.printSchema()

In [5]:
# 데이터 개수 : 약 1억2천건!!
raw_df.count()

In [6]:
# 데이터 출력
raw_df.show(5)
# display(raw_df)

In [7]:
### 데이터 전처리!!
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType, IntegerType, StringType

# 1) NA값을 null로 바꾸기!
def cleanStr(value):
  if (value==None) | (value=='NA'):
    return None
  
# 2) Type 변경!
def str2int(value):
  if value=='NA':
    return None
  else:
    return int(value)
def int2Bool(value):
  return False if value==0 else True

### UDF(User Define Func) 만들기!
# 위에서 정의한 함수들을 우리가 앞으로 사용할 Spark SQL 함수를 사용자가 정의할 수 있도록 해주는 기능입니다.
cleanStrFunc = udf(cleanStr, StringType())
intFunc = udf(str2int, IntegerType())
boolFunc = udf(int2Bool, BooleanType())


us_carrier_df = raw_df \
.drop('DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'AirTime', 'ArrDelay', 'DepDelay', 'TaxiIn', 'TaxiOut', 'CancellationCode', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay') \  # 사용하지 않을 column 삭제
.withColumn('ActualElapsedTime', intFunc('ActualElapsedTime')) \ # 'NA' 항목 null로 변경하고 Integer 형으로 변경
.withColumn('CRSElapsedTime', intFunc('CRSElapsedTime')) \ 
.withColumn('Distance', intFunc('Distance')) \ 
.withColumn('Cancelled', boolFunc('Cancelled')) \ # 'NA' 항목 null로 변경하고 Boolean 형으로 변경
.withColumn('Diverted', boolFunc('Diverted')) 

us_carrier_df.printSchema()
us_carrier_df.show(5)


In [8]:
# 클러스터 메모리상에 남겨두고 삭제하지 않도록 설정!!
us_carrier_df.cache()

# 추후 SQL문 형태로 질의할 때 사용하기 위해 DataFrame을 전역 임시 뷰로 저장합니다!
us_carrier_df.createGlobalTempView("us_carrier")

In [9]:
### 1. 얼마나 많은 항공사가 있을까요?
# `UniqueCarrier` 항목만을 가지는 DataFrame
carriers_only_df = us_carrier_df.select("UniqueCarrier")
carriers_only_df.show(5)

In [10]:
# 중복된 갓을 가지지 않는 유니크한 값만 추출하기 위해 distinct() 함수 호출!
carriers_only_distinct_df = carriers_only_df.distinct()
carriers_only_distinct_df.show(5)

In [11]:
# SQL문을 바로 호출
spark.sql("SELECT DISTINCT UniqueCarrier FROM global_temp.us_carrier").show(5)

In [12]:
### 2. DL 항공사는 1990년에 얼마나 비행을 했나요?
from pyspark.sql.functions import col

us_carrier_df.filter((col('UniqueCarrier')=='DL') & (col('Year')==1990)).count()

# spark.sql("SELECT * FROM global_temp.us_carrier WHERE UniqueCarrier=='DL' AND Year==1990").show(5)


In [13]:
### 3. 운행 거리의 평균, 최소, 최대값은 얼마인가요?
from pyspark.sql.functions import mean, min, max

us_carrier_df.select(mean('distance'), min('distance'), max('distance')).show()

# spark.sql("SELECT MEAN(distance), MIN(distance), MAX(distance) FROM global_temp.us_carrier").show()

In [14]:
# 운항이 취소되거나 이륙 후 회항하여 출발지와 도착지가 같은 경우!
# 이 경우 출발지(Origin)와 도착지(Dest)가 같은 것도 특징입니다
us_carrier_df.filter(col('distance')==0).show(5)

In [15]:
us_carrier_df.describe('distance').show(5)

In [16]:
### 1987년의 전체 운항 노선 수는 어떠한가요?
# 1987년의 출발지와 목적지의 유니크한 값을 출력!!
line_distinct_1987_df = us_carrier_df.filter(col('Year') == 1987) \
  .select('Origin', 'Dest') \
  .distinct() \
  .orderBy('Origin','Dest')

line_distinct_1987_df.show(5)

# spark.sql("SELECT COUNT(DISTINCT(Origin, Dest)) AS COUNT FROM global_temp.us_carrier WHERE Year=1987").show()

In [17]:
### 5. 각 공항에는 얼마나 많은 도착 기록이 있나요?
from pyspark.sql.functions import desc

dest_by_arrival_df = us_carrier_df.groupby('Dest').count()
dest_by_arrival_df.show(5)
dest_by_arrival_df.orderBy(desc('count')).show(5)

# spark.sql("SELECT Dest, Count(*) FROM global_temp.us_carrier GROUP BY Dest ORDER BY Count(*) DESC").show()

In [18]:
### 6. 평균적으로 실제 비행 시간과 예상 비행 시간의 가장 크게 차이가 나는 노선은 어디인가요?
#  실제 평균 비행 시간
actual_elapsed_df = us_carrier_df.groupBy('Origin','Dest').mean('ActualElapsedTime')
actual_elapsed_df.show(5)

#  예상 최소 비행 시간
expected_elapsed_df = us_carrier_df.groupby('Origin','Dest').min('CRSElapsedTime')
expected_elapsed_df.show(5)

#  JOIN
joined_df = actual_elapsed_df.join(expected_elapsed_df, (actual_elapsed_df['Origin']==expected_elapsed_df['Origin']) & (actual_elapsed_df['Dest']==expected_elapsed_df['Dest']))
joined_df.show(5)

In [19]:
joined_aligned_df = joined_df.select(actual_elapsed_df['Origin'], actual_elapsed_df['Dest'], 'avg(ActualElapsedTime)', col('min(CRSElapsedTime)').alias('CRSElapsedTime'))
joined_aligned_df.show(5)

In [20]:
# 이제 평균 비행 시간과 예상 비행 시간의 차이를 구해 보도록 하겠습니다.
from pyspark.sql.functions import abs

difference_df = joined_aligned_df.select('Origin','Dest', abs(col('avg(ActualElapsedTime)') - col('CRSElapsedTime')))
difference_df.show(5)

In [21]:
difference_df.select('Origin', 'Dest', col('abs((avg(ActualElapsedTime) - CRSElapsedTime))').alias('difference')).orderBy(desc('difference')).show(5)

In [22]:
### 7. 각 공항별로 다른 공항으로 향하는 운항 수가 몇 %인가요?
# 1. 공항별로 출발 운항 수를 구한다
count_per_origin_df = us_carrier_df.groupBy('Origin').count().select('Origin', col('count').alias('total')).orderBy('Origin')
count_per_origin_df.show(5)

In [23]:
# 2. 공항별로 목적지별 운항 수를 구한다
count_per_origin_dest_df = us_carrier_df.groupBy('Origin','Dest').count().orderBy('Origin','Dest')
count_per_origin_dest_df.show()

In [24]:
# 3. 1과 2의 결과물을 합칩니다
joined_df = count_per_origin_df.join(count_per_origin_dest_df, count_per_origin_df['Origin']==count_per_origin_dest_df['Origin']).drop(count_per_origin_dest_df['Origin'])
joined_df.show(5)

# 표시 순서 변경 + (Origin, Dest) 순으로 정렬.
joined_reordered_df = joined_df.select('Origin','Dest','Count','total').orderBy('Origin','Dest')
joined_reordered_df.show()

In [25]:
# 4. 마지막으로 count / total을 계산해서 마무리합니다. -> 반올림(round)
from pyspark.sql.functions import round

origin_to_dest_rate = joined_reordered_df.select('Origin','Dest', round((col('count') / col('total') * 100) ,2).alias('rate'))
origin_to_dest_rate.show(5)

In [26]:
### 8. 해당 공항에서 출발, 도착을 모두 하는 항공사는 얼마나 있을까요?
from pyspark.sql.functions import collect_list

# 공항별로 출발 항공편을 운행하는 항공사들을 묶어 보도록 하겠습니다.
depart_carriers = us_carrier_df.select('Origin','UniqueCarrier').distinct().groupBy('Origin').agg(collect_list('UniqueCarrier').alias('departs'))
depart_carriers.show(5)

# 공항별로 도착 항공편 항공사들을 묶어 보도록 하겠습니다
arrival_carriers = us_carrier_df.select('Dest', 'UniqueCarrier').distinct().groupBy('Dest').agg(collect_list('UniqueCarrier').alias('arrivals'))
arrival_carriers.show(5)

In [27]:
# 이제 join을 사용해 둘을 합칩니다.
# 'Origin' 과 'Dest'는 중복되니까 'Origin'의 이름은 'airport'로 바꾸겠습니다.
departs_arrivals = depart_carriers.join(arrival_carriers, depart_carriers['Origin']==arrival_carriers['Dest']).select(col('origin').alias('airport'),'departs','arrivals')
departs_arrivals.show(5)

In [28]:
# 공항별로 departs와 arriavl의 교집합을 구하면 각 공항에서 출발, 도착 모두 하는 항공사의 목록이 나옵니다
# 공항별로 arrival에서 departs를 뺀 차집합을 구하면 도착만 하는 항공사의 목록이 나옵니다
# 공항별로 departs에서 arrival을 뺀 차집합을 구하면 출발만 하는 항공사의 목록이 나옵니다
# 새 column 추가는 withColumn 함수를, 삭제는 drop 함수를 사용합니다
# 교집합과 차집합은 각각 array_intersect, array_except 함수를 사용합니다
from pyspark.sql.functions import array_intersect, array_except

departs_arrivals_both = departs_arrivals\
.withColumn('both', array_intersect('departs','arrivals'))\
.withColumn('arrival_only', array_except('arrivals', 'departs'))\
.withColumn('depart_only', array_except('departs','arrivals'))\
.drop('departs','arrivals')

departs_arrivals_both.show(5)

In [29]:
# 대부분의 경우 'arrival_only'나 'depart_only'가 비어있는 것을 확인할 수 있습니다
# 비행기가 어딘가에 간 뒤에 다시 돌아와야 하므로 매우 자연스러운 결과라고 할 수 있습니다
# 오히려 비어있지 않은 공항들이 신기해 보이는데, 이런 것들만 따로 추려보도록 하겠습니다
# 배열의 크기를 기준으로 filtering을 하면 되는데, 배열의 크기는 size 함수를 사용해 알아낼 수 있습니다.
from pyspark.sql.functions import size

departs_arrivals_both.filter((size(departs_arrivals_both['arrival_only'])>0) | (size(departs_arrivals_both['depart_only'])>0)).show(5)

In [30]:
### 9. 각 노선별로 다음 네 항공사의 운행은 몇 개인가요? : UA, TW, DL, US
us_carrier_df\
  .groupBy('Origin', 'Dest')\
  .pivot('UniqueCarrier', ['UA', 'TW', 'DL', 'US'])\
  .count()\
  .orderBy('Origin', 'Dest')\
  .show(5)

In [31]:
# 먼저 ABE-ATL 노선에는 주어진 항공사 중 DL만이 취항하고 있는가?
us_carrier_df.filter((col('Origin')=='ABE') & (col('Dest')=='ATL')).select('UniqueCarrier').distinct().show()

In [32]:
### 10. 각 공항의 PageRank는 어떠한가요?
# PageRank란 방향성이 있는 그래프에서(Directed Graph) 무작위로 그래프를 탐색할 때 임의의 정점(Node)에 도착할 확률을 가리킵니다(참조)

# 1. 정점(=전체 공항)과 간선(=전체 운항)을 가리키는 DataFrame을 구합니다
origin_df = us_carrier_df.select(col('Origin').alias('id')).distinct()
dest_df = us_carrier_df.select(col('Dest').alias('id')).distinct()


# 먼저 전체 공항 목록부터 구해보겠습니다
# 출발지 공항과 도착지 공항을 저장하는 DataFrame을 구한 뒤 union 메소드를 사용해서 합치면 됩니다
vertice_df = origin_df.union(dest_df).distinct() # 합집합(union) 
vertice_df.show()

In [33]:
# 다음은 간선 목록입니다. 위에서 보았듯이 이 경우는 groupBy()를 쓰면 쉽게 구할 수 있습니다
# 간선(edge)을 가리키는 DataFrame에는 'src', 'dst' column이 있어야 함.
edge_df = us_carrier_df.select(col('Origin').alias('src'), col('Dest').alias('dst')).distinct()
edge_df.show(5)

In [34]:
# 지금부터 GraphFrames의 기능을 써보겠습니다
from graphframes import GraphFrame

gf = GraphFrame(vertice_df, edge_df)
# GraphFrame(v:[id: string], e:[src: string, dst: string])
print(gf)

In [35]:
# 이제 PageRank 값을 구하는 연산을 호출해주기만 하면 됩니다
# 초기화 확률은 0.15로, 최대 반복 수는 10으로 하겠습니다
# 이 작업은 시간이 조금 소요됩니다
pg_results_df = gf.pageRank(resetProbability=0.15, maxIter=10)
pg_results_df.vertices.orderBy(desc('pagerank')).show()

# display(pg_results_df.vertices.orderBy(desc('pagerank')))

In [36]:
display(pg_results_df.vertices.orderBy(desc('pagerank')))

id,pagerank
ATL,8.569511327329833
SLC,6.837090454041102
DFW,6.818256025766323
ORD,6.503610249541602
MSP,5.888771938630683
DEN,5.620870225568938
IAH,5.365047285685106
CVG,4.762090722016808
LAX,4.419828105868706
DTW,4.35196830317075
