In [2]:
import sys, re
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.master("local").appName("join-dataset-test1").getOrCreate()

### stations.csv
- station_id : 스테이션 ID 번호
- name : 스테이션 이름
- lat : 위도
- long : 경도
- dockcount : 스테이션에 설치된 독 수
- landmark : 도시
- installation : 스테이션이 설치된 날짜
- ex) 
 9,Japantown,37.348742,-121.894715,15,San Jose,8/5/2013

In [3]:
stations = spark_session.sparkContext.textFile("./data/stations.csv") \
	.map(lambda x: x.split(',')) \
	.filter(lambda x: x[5] == 'San Jose') \
	.map(lambda x: (int(x[0]), x[1])) \
	.keyBy(lambda x: x[0])
print(stations.take(5))

[(2, (2, 'San Jose Diridon Caltrain Station')), (3, (3, 'San Jose Civic Center')), (4, (4, 'Santa Clara at Almaden')), (5, (5, 'Adobe on Almaden')), (6, (6, 'San Pedro Square'))]


### status.csv
- station_id : 스테이션 ID 번호
- bikes_available : 사용가능한 자전거 수
- docks_available : 사용가능한 독 수
- time : 날짜 및 시간, PST
- ex)
10,9,6,"2015-02-28 23:59:01"

In [5]:

tmp1 = spark_session.sparkContext.textFile("./data/status.csv")\
	.map(lambda x: x.split(',')) \
	.map(lambda x: (x[0], x[1], x[2], x[3].replace('"',''))) 
tmp2 = tmp1.map(lambda x: (x[0], x[1], x[2], x[3].split(' '))) \
	.map(lambda x: (x[0], x[1], x[2], x[3][0].split('-'), x[3][1].split(':'))) \
	.map(lambda x: (int(x[0]), int(x[1]), int(x[3][0]), int(x[3][1]), int(x[3][2]), int(x[4][0])))
status = tmp2.filter(lambda x: x[2]==2015 and x[3]==2 and x[4]>=22) \
	.map(lambda x: (x[0], x[1], x[5])) \
	.keyBy(lambda x: x[0])
print(tmp1.take(5))
print(tmp2.take(5))
print(status.take(5))

[('10', '9', '6', '2015-02-28 23:59:01'), ('10', '9', '6', '2015-02-28 23:58:02'), ('10', '9', '6', '2015-02-28 23:57:02'), ('10', '8', '7', '2015-02-28 23:56:02'), ('10', '8', '7', '2015-02-28 23:55:02')]
[(10, 9, 2015, 2, 28, 23), (10, 9, 2015, 2, 28, 23), (10, 9, 2015, 2, 28, 23), (10, 8, 2015, 2, 28, 23), (10, 8, 2015, 2, 28, 23)]
[(10, (10, 9, 23)), (10, (10, 9, 23)), (10, (10, 9, 23)), (10, (10, 8, 23)), (10, (10, 8, 23))]


In [8]:
joined = status.join(stations)
cleaned = joined.map(lambda x: (x[0], x[1][0][1], x[1][0][2], x[1][1][1]))

print(joined.take(5))
print(cleaned.take(5))

[(10, ((10, 9, 23), (10, 'San Jose City Hall'))), (10, ((10, 9, 23), (10, 'San Jose City Hall'))), (10, ((10, 9, 23), (10, 'San Jose City Hall'))), (10, ((10, 8, 23), (10, 'San Jose City Hall'))), (10, ((10, 8, 23), (10, 'San Jose City Hall')))]
[(10, 9, 23, 'San Jose City Hall'), (10, 9, 23, 'San Jose City Hall'), (10, 9, 23, 'San Jose City Hall'), (10, 8, 23, 'San Jose City Hall'), (10, 8, 23, 'San Jose City Hall')]


In [9]:
tmp1 = cleaned.keyBy(lambda x: (x[3],x[2]))
tmp2 = tmp1.mapValues(lambda x: (x[1], 1)) \
	.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
	.mapValues(lambda x: (x[0]/x[1]))
topavail = tmp2.keyBy(lambda x: x[1]) \
	.sortByKey(ascending=False) \
	.map(lambda x: (x[1][0][0], x[1][0][1], x[0])) \
	.persist()
print(tmp1.take(5))
print(tmp2.take(5))
print(topavail.take(5))

[(('San Jose City Hall', 23), (10, 9, 23, 'San Jose City Hall')), (('San Jose City Hall', 23), (10, 9, 23, 'San Jose City Hall')), (('San Jose City Hall', 23), (10, 9, 23, 'San Jose City Hall')), (('San Jose City Hall', 23), (10, 8, 23, 'San Jose City Hall')), (('San Jose City Hall', 23), (10, 8, 23, 'San Jose City Hall'))]
[(('San Jose City Hall', 22), 7.3619047619047615), (('San Jose City Hall', 20), 7.354761904761904), (('San Jose City Hall', 18), 7.242857142857143), (('San Jose City Hall', 16), 6.697619047619048), (('San Jose City Hall', 14), 7.252380952380952)]
[('San Jose Diridon Caltrain Station', 17, 16.590476190476192), ('San Jose Diridon Caltrain Station', 7, 16.492857142857144), ('San Jose Diridon Caltrain Station', 6, 16.34285714285714), ('San Jose Diridon Caltrain Station', 18, 16.21904761904762), ('San Jose Diridon Caltrain Station', 19, 15.64047619047619)]


In [18]:
print(cleaned.keyBy(lambda x: (x[3],x[2])).take(5))

[(('San Jose City Hall', 23), (10, 9, 23, 'San Jose City Hall')), (('San Jose City Hall', 23), (10, 9, 23, 'San Jose City Hall')), (('San Jose City Hall', 23), (10, 9, 23, 'San Jose City Hall')), (('San Jose City Hall', 23), (10, 8, 23, 'San Jose City Hall')), (('San Jose City Hall', 23), (10, 8, 23, 'San Jose City Hall'))]


In [19]:
top10stations = topavail.take(10)
for stationinfo in top10stations:
	print(str(stationinfo))

('San Jose Diridon Caltrain Station', 17, 16.590476190476192)
('San Jose Diridon Caltrain Station', 7, 16.492857142857144)
('San Jose Diridon Caltrain Station', 6, 16.34285714285714)
('San Jose Diridon Caltrain Station', 18, 16.21904761904762)
('San Jose Diridon Caltrain Station', 19, 15.64047619047619)
('San Jose Diridon Caltrain Station', 22, 15.516666666666667)
('San Jose Diridon Caltrain Station', 0, 15.445238095238095)
('San Jose Diridon Caltrain Station', 20, 15.416666666666666)
('San Jose Diridon Caltrain Station', 1, 15.392857142857142)
('San Jose Diridon Caltrain Station', 4, 15.383333333333333)
