## 1. TLC Trip, Coordinates 데이터 로딩

In [None]:
# [+] PySpark 시작
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local').setAppName('assgn-tlc-trip-arc-layer')
sc = SparkContext(conf=conf)

In [None]:
# 데이터 파일 경로 및 이름
path = './data/'
coord_file = 'taxi_zone_lookup_coordinates_v2.csv'
trip_file = 'fhvhv_tripdata_2020-03_short.csv'

In [None]:
# [+] coordinates 데이터 파일을 읽어 RDD로 생성
coord_lines = sc.textFile('./data/' + coord_file)

# [+] RDD 값 5개 출력
coord_lines.take(5)

In [None]:
# [+] Trip 데이터 파일을 읽어 RDD로 생성
trip_lines = sc.textFile('./data/' + trip_file)

# [+] RDD 값 5개 출력
trip_lines.take(5)

In [None]:
# [+] coord_lines의 헤더 제거
coord_header = coord_lines.first()
coord_filtered_lines = coord_lines.filter(lambda row:row != coord_header)

In [None]:
coord_filtered_lines.first()

In [None]:
# [+] trip_lines의 헤더 제거
trip_header = trip_lines.first()
trip_filtered_lines = trip_lines.filter(lambda row:row != trip_header)

In [None]:
trip_filtered_lines.first()

In [None]:
# [+] 헤더가 제거된 coord_filtered_lines 값 5개 출력
coord_filtered_lines.take(5)

In [None]:
# [+] 헤더가 제거된 trip_filtered_lines 값 5개 출력
trip_filtered_lines.take(5)

In [None]:
# [+] Key-Value RDD 생성
coord_kv = coord_filtered_lines.map(lambda x: ((x.split(",")[0]),(x.split(",")[4], x.split(",")[5])))

In [None]:
# [+] coord_kv 값 5개 출력
coord_kv.take(5)

In [None]:
# latitude, longitude 값을 float 타입으로 변환
coord_kv = coord_kv.mapValues(lambda x: [float(x[0]), float(x[1])])

In [None]:
# [+] coord_kv 값 5개 출력
coord_kv.take(5)

In [None]:
# [+] Key-Value RDD 생성
trip_kv = trip_filtered_lines.map(lambda x: ((x.split(",")[4]),(x.split(",")[5])))

In [None]:
# [+] trip_kv 값 5개 출력
trip_kv.take(5)

In [None]:
# [+] trip_kv와 coord_kv의 조인 연산
pu_joined = trip_kv.join(coord_kv)

In [None]:
# [+] pu_joined 5개 값 출력
pu_joined.take(5)

In [None]:
# Key(PULocationID) 제거
pu_joined = pu_joined.values()
pu_joined.take(5)

In [None]:
# [+] pu_joined와 coord_kv의 조인 연산
pudo_joined = pu_joined.join(coord_kv)

In [None]:
# [+] pudo_joined 값 5개 출력
pudo_joined.take(5)

In [None]:
# Key(DOLocationID) 제거
pudo_joined = pudo_joined.values()
pudo_joined.take(5)

### 2.6 결과 출력 및 DataFrame으로 변환

In [None]:
# [+] pudo_joined를 List 객체로 출력
coord_lst = pudo_joined.collect()

In [None]:
# 샘플 출력
coord_lst[0]

In [None]:
# 승차위치와 하차위치 리스트 결합
res_lst = []

for i in range(len(coord_lst)):
    res_lst.append(list(coord_lst[i][0] + coord_lst[i][1]))

In [None]:
# 샘플 출력
res_lst[0]

In [None]:
# 결과를 DataFrame 객체에 저장
import pandas as pd
df = pd.DataFrame(res_lst, columns=["latitude_pu", "longitude_pu",
                                    "latitude_do", "longitude_do"
                                    ])

In [None]:
df

---

## 3. TLC 승차 기록 시각화

In [None]:
# pydeck 설치 명령어
!pip install pydeck

In [None]:
# pydeck 임포트
import pydeck as pdk

In [None]:
# 시각화 옵션
GREEN_RGB = [0, 255, 0, 40]
RED_RGB = [240, 100, 0, 40]

In [None]:
%%time

# ArcLayer 시각화 설정
arc_layer = pdk.Layer(
    "ArcLayer",
    data=df.sample(100000),
    get_width="S000 * 2",
    get_source_position=["longitude_pu", "latitude_pu"],
    get_target_position=["longitude_do", "latitude_do"],
    get_tilt=15,
    get_source_color=RED_RGB,
    get_target_color=GREEN_RGB,
    pickable=True,
    auto_highlight=True,
)

# 초기화면 설정
view_state = pdk.ViewState(
    latitude=40.6928,
    longitude=-74.1854,
    bearing=45,
    pitch=50,
    zoom=8,
)


# 렌더링 옵션
TOOLTIP_TEXT = {"html": "{S000} trips <br /> Pickup Locations in red; Dropoff Locations in green"}
r = pdk.Deck(arc_layer, initial_view_state=view_state, tooltip=TOOLTIP_TEXT)
r.to_html("arc_layer.html")