# RDD 프로그래밍을 이용한 TLC Trip 데이터 시각화
![](https://i.ibb.co/YD7NbWR/tlc-arclayer.jpg)
#### TLC 데이터의 승하차 위치(fhvhv... 파일)와 좌표 값(taxi_zone... 파일)의 결합을 통해 승차 기록을 시각화
#### [+]로 표시된 코드를 완성하세요.
### 준비사항
+ 데이터 파일('taxi_zone_lookup_coordinates_v2.csv', 'fhvhv_tripdata_2020-03_short.csv')을 data 폴더에 저장

## 1. TLC Trip, Coordinates 데이터 로딩

#### PySpark 시작
+ App name: 'assgn-tlc-trip-arc-layer'

In [None]:
# [+] PySpark 시작
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local').setAppName('assgn-tlc-trip-arc-layer')
sc = SparkContext(conf=conf) 

In [None]:
# 데이터 파일 경로 및 이름
path = './data/'
coord_file = 'taxi_zone_lookup_coordinates_v2.csv'
trip_file = 'fhvhv_tripdata_2020-03_short.csv'

**입력**: *coord_lines*
```
'LocationID,Borough,Zone,service_zone,latitude,longitude',
'1,EWR,Newark Airport,EWR,40.69287997,-74.18544993',
'2,Queens,Jamaica Bay,Boro Zone,40.6057,-73.8713',
'3,Bronx,Allerton/Pelham Gardens,Boro Zone,40.86521003,-73.8435548',
'4,Manhattan,Alphabet City,Yellow Zone,40.72599,-73.98057',
...
```
 
**출력**: *trip_lines*
```
'hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag',
'HV0005,B02510,2020-03-01 00:03:40,2020-03-01 00:23:39,81,159,',
'HV0005,B02510,2020-03-01 00:28:05,2020-03-01 00:38:57,168,119,',
'HV0003,B02764,2020-03-01 00:03:07,2020-03-01 00:15:04,137,209,1',
'HV0003,B02764,2020-03-01 00:18:42,2020-03-01 00:38:42,209,80,'
...
```

In [None]:
# [+] coordinates 데이터 파일을 읽어 RDD로 생성
coord_lines = sc.textFile(path + coord_file)

# [+] RDD 값 5개 출력
coord_lines.take(5)

In [None]:
# [+] Trip 데이터 파일을 읽어 RDD로 생성
trip_lines = sc.textFile(path + trip_file)

# [+] RDD 값 5개 출력
trip_lines.take(5)

## 2. 승하차 위치 정보 획득을 위한 RDD 처리
#### 수행 단계:
+ 헤더 제거: coord, trip 데이터 헤더 제거
+ Key-Value RDD 변환: 두 RDD로부터 필요한 값만 추출
    1. **coord 데이터**(<span style='color: blue; font-weight: bold'>Key</span>, <span style='color: purple; font-weight: bold'>Value</span>): (<span style='color: blue; font-weight: bold'>LocationID</span>, <span style='color: purple; font-weight: bold'>(latitude, longitude)</span>)
    2. **trip 데이터**[<span style='color: blue; font-weight: bold'>Key</span>, <span style='color: purple; font-weight: bold'>Value</span>]: (<span style='color: blue; font-weight: bold'>PULocationID</span>, <span style='color: purple; font-weight: bold'>DOLocationID</span>)
+ RDD 조인:  **LocationID**를 중심으로 두 RDD를 결합
    1. PULocationID를 기준으로 조인 &rarr; **승차 위치**에 대한 **좌표** 획득
    2. DOLocationID를 기준으로 조인 &rarr; **하차 위치**에 대한 **좌표** 획득
+ 결과 출력: RDD &rarr; List &rarr; DataFrame &rarr; 시각화

### 2.1 RDD 헤더 제거

In [None]:
# [+] coord_lines의 헤더 제거
coord_header = coord_lines.first()
coord_filtered_lines = coord_lines.filter(lambda x: x!= coord_header)

In [None]:
# [+] trip_lines의 헤더 제거
trip_header = trip_lines.first()
trip_filtered_lines = trip_lines.filter(lambda x: x!= trip_header)

In [None]:
# [+] 헤더가 제거된 coord_filtered_lines 값 5개 출력
coord_filtered_lines.take(5)

In [None]:
# [+] 헤더가 제거된 trip_filtered_lines 값 5개 출력
trip_filtered_lines.take(5)

### 2.2 coord RDD를 Key-Value RDD로 변환
+ Key: LocationID(0 번째 값)
+ Value: latitude(위도, 4 번째 값), longitude(경도, 5번째 값)

**입력**: *coord_filtered_lines*:
```
'1,EWR,Newark Airport,EWR,40.69287997,-74.18544993',
'2,Queens,Jamaica Bay,Boro Zone,40.6057,-73.8713',
'3,Bronx,Allerton/Pelham Gardens,Boro Zone,40.86521003,-73.8435548',
'4,Manhattan,Alphabet City,Yellow Zone,40.72599,-73.98057',
'5,Staten Island,Arden Heights,Boro Zone,40.5564,-74.1735'
...
```
**출력**: *coord_kv*:
```
('1', ['40.69287997', '-74.18544993']),
('2', ['40.6057', '-73.8713']),
('3', ['40.86521003', '-73.8435548']),
('4', ['40.72599', '-73.98057']),
('5', ['40.5564', '-74.1735'])
...
```

#### (LocationID, (latitude, longitude)) 형태의 Key-Value RDD 생성
+ Hint 1: ```x.split(',')```: ```','``` 문자를 기준으로 토크나이징
+ Hint 2: ```x.split(',')[4:6]```: 토크나이징 결과에서 4,5번 값을 선택

In [None]:
# [+] Key-Value RDD 생성
coord_kv = coord_filtered_lines.map(lambda x : (x.split(',')[0],(x.split(',')[4:6])))

In [None]:
# [+] coord_kv 값 5개 출력
coord_kv.take(5)

#### 위도(latitude), 경도(longitude) 값 모두 문자열 상태이므로, float으로 변환
+ 위도, 경도 데이터를 올바르게 처리하기 위해서는 반드시 float과 같은 수치형으로 표현

In [None]:
# latitude, longitude 값을 float 타입으로 변환
coord_kv = coord_kv.mapValues(lambda x: [float(x[0]), float(x[1])])

In [None]:
# [+] coord_kv 값 5개 출력
coord_kv.take(5)

### 2.3 Trip RDD를 Key-Value RDD로 변환
+ Key: PULocationID(4번째 값) = Pickup Location ID(승차 위치)
+ Value: DOLocationID(5번째 값) = Dropoff Location ID(하차 위치)

**입력**: *coord_filtered_lines*

```
'HV0005,B02510,2020-03-01 00:03:40,2020-03-01 00:23:39,81,159,',
'HV0005,B02510,2020-03-01 00:28:05,2020-03-01 00:38:57,168,119,',
'HV0003,B02764,2020-03-01 00:03:07,2020-03-01 00:15:04,137,209,1',
'HV0003,B02764,2020-03-01 00:18:42,2020-03-01 00:38:42,209,80,',
'HV0003,B02764,2020-03-01 00:44:24,2020-03-01 00:58:44,256,226,'
...

```

**출력**: *trip_kv*
```
['81', '159'], 
['168', '119'], 
['137', '209'], 
['209', '80'], 
['256', '226'],
...
```

#### [PULocationID, DOLocationID] 형태의 Key-Value RDD 생성

In [None]:
# [+] Key-Value RDD 생성
trip_kv = trip_filtered_lines.map(lambda x: (x.split(',')[4], x.split(',')[5]))

In [None]:
# [+] trip_kv 값 5개 출력
trip_kv.take(5)

### 2.4 Coord, Trip RDD의 첫 번째 조인

**입력 1**: trip_kv
```
['81', '159'], 
['168', '119'], 
['137', '209'], 
['209', '80'], 
['256', '226'],
...
```

**입력 2**: coord_kv
```
('1', ['40.69287997', '-74.18544993']),
('2', ['40.6057', '-73.8713']),
('3', ['40.86521003', '-73.8435548']),
('4', ['40.72599', '-73.98057']),
('5', ['40.5564', '-74.1735'])
...
```

**출력**: *pu_joined*
```
('209', ('80', ['40.7072', '-74.0027'])),
('209', ('37', ['40.7072', '-74.0027'])),
('209', ('13', ['40.7072', '-74.0027'])),
('209', ('127', ['40.7072', '-74.0027'])),
('209', ('39', ['40.7072', '-74.0027'])),
 ...
 ```

In [None]:
# [+] trip_kv와 coord_kv의 조인 연산
pu_joined = trip_kv.join(coord_kv)

In [None]:
# [+] pu_joined 5개 값 출력
pu_joined.take(5)

In [None]:
# Key(PULocationID) 제거
pu_joined = pu_joined.values()
pu_joined.take(5)

### 2.5 Coord, Trip RDD의 두 번째 조인

**입력 1**: *pu_joined*
```
('80', ['40.7072', '-74.0027']),
('37', ['40.7072', '-74.0027']),
('13', ['40.7072', '-74.0027']),
('127', ['40.7072', '-74.0027']),
('39', ['40.7072', '-74.0027'])
```

**입력 2**: *coord_kv*
```
('1', ['40.69287997', '-74.18544993']),
('2', ['40.6057', '-73.8713']),
('3', ['40.86521003', '-73.8435548']),
('4', ['40.72599', '-73.98057']),
('5', ['40.5564', '-74.1735'])
...
```

**출력**: *pudo_joined*
```
('40', (['40.7072', '-74.0027'], ['40.6802', '-74.00163'])),
('40', (['40.7072', '-74.0027'], ['40.6802', '-74.00163'])),
('40', (['40.7072', '-74.0027'], ['40.6802', '-74.00163'])),
('40', (['40.7072', '-74.0027'], ['40.6802', '-74.00163'])),
('40', (['40.7072', '-74.0027'], ['40.6802', '-74.00163']))
 ```

In [None]:
# [+] pu_joined와 coord_kv의 조인 연산
pudo_joined = pu_joined.join(coord_kv)

In [None]:
# [+] pudo_joined 값 5개 출력
pudo_joined.take(5)

In [None]:
# Key(DOLocationID) 제거
pudo_joined = pudo_joined.values()
pudo_joined.take(5)

### 2.6 결과 출력 및 DataFrame으로 변환

In [None]:
# [+] pudo_joined를 List 객체로 출력
coord_lst = pudo_joined.collect()

In [None]:
# 샘플 출력
coord_lst[0]

In [None]:
# 승차위치와 하차위치 리스트 결합
res_lst = []

for i in range(len(coord_lst)):
    res_lst.append(list(coord_lst[i][0] + coord_lst[i][1]))

In [None]:
# 샘플 출력
res_lst[0]

In [None]:
# 결과를 DataFrame 객체에 저장
import pandas as pd
df = pd.DataFrame(res_lst, columns=["latitude_pu", "longitude_pu",
                                    "latitude_do", "longitude_do"
                                    ])

In [None]:
df

---

## 3. TLC 승차 기록 시각화

#### 시각화 라이브러리 pydeck 설치
+ pydeck: Uber의 대규모 WebGL 기반 데이터 시각화 라이브러리인 Dec.gl의 Python 버전
+ pydeck API 참고 [(link)](https://deckgl.readthedocs.io/en/latest/index.html)

In [None]:
# pydeck 설치 명령어
!pip install pydeck

In [None]:
# pydeck 임포트
import pydeck as pdk

In [None]:
# 시각화 옵션
GREEN_RGB = [0, 255, 0, 40]
RED_RGB = [240, 100, 0, 40]

#### 최종 결과를 ArcLayer로 시각화
+ <span style="color: red; font-weight: bold">빨간선</span>: 출발지(승차 위치), <span style="color: green; font-weight: bold">초록선</span>: 도착치(하차 위치) 
+ ```df.sample(n)```: 컴퓨터 성능에 따라 n 값을 적절히 조정
+ Zoom In/Out, 로테이션(우클릭 드래그) 기능 등을 이용하여 시각화된 결과를 살펴보기

In [None]:
%%time

# ArcLayer 시각화 설정
arc_layer = pdk.Layer(
    "ArcLayer",
    data=df.sample(100000),
    get_width="S000 * 2",
    get_source_position=["longitude_pu", "latitude_pu"],
    get_target_position=["longitude_do", "latitude_do"],
    get_tilt=15,
    get_source_color=RED_RGB,
    get_target_color=GREEN_RGB,
    pickable=True,
    auto_highlight=True,
)

# 초기화면 설정
view_state = pdk.ViewState(
    latitude=40.6928,
    longitude=-74.1854,
    bearing=45,
    pitch=50,
    zoom=8,
)


# 렌더링 옵션
TOOLTIP_TEXT = {"html": "{S000} trips <br /> Pickup Locations in red; Dropoff Locations in green"}
r = pdk.Deck(arc_layer, initial_view_state=view_state, tooltip=TOOLTIP_TEXT)
r.to_html("arc_layer.html")