# Joining Datasets in Spark:
For this example you will use data from the Bay Area Bike Share Data Challenge. The Bay Area Bike
Share program enables members to pick up bikes from designated stations and then drop off the
bikes at the same station or a different one. Bay Area Bike Share has made trip data available for
public use through the group’s Open Data program. For more information, see these sites:

1.   [Open-Data](http://www.bayareabikeshare.com/open-data) 
 
2.   [System-Data](https://www.fordgobike.com/system-data )


To make your job easier, the data files for this exercise are available in this book’s AWS S3 bucket:

1.     [station-csv](https://s3.amazonaws.com/sparkusingpython/bike-share/stations/stations.csv)

2.     [status-csv](https://s3.amazonaws.com/sparkusingpython/bike-share/status/status.csv)

3.     [trips-csv](https://s3.amazonaws.com/sparkusingpython/bike-share/trips/trips.csv)

4.     [weather-csv](https://s3.amazonaws.com/sparkusingpython/bike-share/weather/weather.csv)

You can download these files to your local Spark installation and access them locally. For this
exercise, you should download the files and store them in your $SPARK_HOME/data directory as
follows:

<pre>
   - data
      -bike-share
         -stations
            stations.csv
         -status
            status.csv
         -trips
            trips.csv
         -weather
           weather.csv
</pre>

In this exercise, you will use this data to return the average number of bikes available by the hour
for one week (February 22 to February 28) for stations located in the San Jose area only

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf

configure = SparkConf().setAppName("joining-datasets").setMaster("local")
sc = SparkContext(conf = configure)

spark = SparkSession.builder \
        .appName("joining-datasets") \
        .getOrCreate()
    
spark.sparkContext.getConf().getAll()

[('spark.master', 'local'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1590559680408'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'joining-datasets'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '60737'),
 ('spark.driver.host', 'DESKTOP-I7971JS')]

In [2]:
stations = sc.textFile('file:///C:\\Spark\\data\\bike-share\\stations')
#FOR LINUX USERS,
# stations = sc.textFile("file://opt/Spark/data/bike-share/stations")
status = sc.textFile('file:///C:\\Spark\\data\\bike-share\\status')
#FOR LINUX USERS,
# status = sc.textFile("file://opt/Spark/data/bike-share/status")

In [4]:
status.take(4)

['10,9,6,"2015-02-28 23:59:01"',
 '10,9,6,"2015-02-28 23:58:02"',
 '10,9,6,"2015-02-28 23:57:02"',
 '10,8,7,"2015-02-28 23:56:02"']

In [5]:
# Split the status data into discrete fields, projecting only the fields necessary, and
# decompose the date string so that you can filter records by date more easily in the next step:
status2 = status.map(lambda x:x.split(',')) \
                .map(lambda x: (x[0], x[1], x[2], x[3].replace('"',''))) \
                .map(lambda x: (x[0], x[1], x[2], x[3].split(' '))) \
                .map(lambda x: (x[0], x[1], x[2], x[3][0].split('-'), x[3][1].split(':'))) \
                .map(lambda x: (int(x[0]), int(x[1]), int(x[3][0]), int(x[3][1]), int(x[3][2]),int(x[4][0])))
status2.take(5)
#Schema after operations:
#[(Station_id,bikes_available,year,month,day,hour)]

[(10, 9, 2015, 2, 28, 23),
 (10, 9, 2015, 2, 28, 23),
 (10, 9, 2015, 2, 28, 23),
 (10, 8, 2015, 2, 28, 23),
 (10, 8, 2015, 2, 28, 23)]

In [15]:
# Because status.csv is the biggest of the datasets (more than 36 million records), restrict
# the dataset to only the dates required and then drop the date fields because they are no
# longer necessary:
status3 = status2.filter(lambda x: x[2]==2015 and x[3]==2 and x[4]>=22) \
                .map(lambda x: (x[0] , x[1],x[5]))

status3.take(5)

[(10, 9, 23), (10, 9, 23), (10, 9, 23), (10, 8, 23), (10, 8, 23)]

In [16]:
stations.take(4)

['2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,8/6/2013',
 '3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013',
 '4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013',
 '5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013']

In [17]:
# Filter the stations dataset to include only stations where landmark='San Jose':
stations2 = stations.map(lambda x:x.split(',')) \
                    .filter(lambda x:x[5]=='San Jose') \
                    .map(lambda x: (int(x[0]), x[1]))

stations2.collect()

[(2, 'San Jose Diridon Caltrain Station'),
 (3, 'San Jose Civic Center'),
 (4, 'Santa Clara at Almaden'),
 (5, 'Adobe on Almaden'),
 (6, 'San Pedro Square'),
 (7, 'Paseo de San Antonio'),
 (8, 'San Salvador at 1st'),
 (9, 'Japantown'),
 (10, 'San Jose City Hall'),
 (11, 'MLK Library'),
 (12, 'SJSU 4th at San Carlos'),
 (13, 'St James Park'),
 (14, 'Arena Green / SAP Center'),
 (16, 'SJSU - San Salvador at 9th'),
 (80, 'Santa Clara County Civic Center'),
 (84, 'Ryland Park')]

Convert both RDDs to key/value pair RDDs to prepare for a join() operation:

In [18]:
status_kv = status3.keyBy(lambda x: x[0])
stations_kv = stations2.keyBy(lambda x: x[0])

Join the status_kv key/value pair RDD to the stations_kv key/value pair RDD by their
keys (station_id):

In [19]:
joined = status_kv.join(stations_kv)
joined.take(5)

[(10, ((10, 9, 23), (10, 'San Jose City Hall'))),
 (10, ((10, 9, 23), (10, 'San Jose City Hall'))),
 (10, ((10, 9, 23), (10, 'San Jose City Hall'))),
 (10, ((10, 8, 23), (10, 'San Jose City Hall'))),
 (10, ((10, 8, 23), (10, 'San Jose City Hall')))]

Clean the joined RDD:

In [20]:
cleaned = joined.map(lambda x: (x[0], x[1][0][1], x[1][0][2], x[1][1][1]))
cleaned.take(5)
# The schema for the cleaned RDD is as follows:
# [(station_id,bikes_available,hour,name),...]

[(10, 9, 23, 'San Jose City Hall'),
 (10, 9, 23, 'San Jose City Hall'),
 (10, 9, 23, 'San Jose City Hall'),
 (10, 8, 23, 'San Jose City Hall'),
 (10, 8, 23, 'San Jose City Hall')]

Create a key/value pair with the key as a tuple consisting of the station name and the hour
and then compute the averages by each hour for each station:

In [25]:
avgbyhour = cleaned.keyBy(lambda x: (x[3],x[2])) \
                   .mapValues(lambda x: (x[1], 1)) \
                   .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
                   .mapValues(lambda x: (x[0]/x[1]))
avgbyhour.take(5)

# The schema for the cleaned RDD is as follows:
# [((name,hour),bikes_available),...]

[(('San Jose City Hall', 23), 7.29047619047619),
 (('San Jose City Hall', 21), 7.383333333333334),
 (('San Jose City Hall', 19), 7.35),
 (('San Jose City Hall', 17), 7.038095238095238),
 (('San Jose City Hall', 15), 6.754761904761905)]

Find the top 10 averages by station and hour by using the sortBy() function:

In [34]:
topavail = avgbyhour.keyBy(lambda x: x[1]) \
                    .sortByKey(ascending=False) \
                    .map(lambda x: (x[1][0][0],x[1][0][1],x[0]))
topavail.take(10)

[('San Jose Diridon Caltrain Station', 17, 16.590476190476192),
 ('San Jose Diridon Caltrain Station', 7, 16.492857142857144),
 ('San Jose Diridon Caltrain Station', 6, 16.34285714285714),
 ('San Jose Diridon Caltrain Station', 18, 16.21904761904762),
 ('San Jose Diridon Caltrain Station', 19, 15.64047619047619),
 ('San Jose Diridon Caltrain Station', 22, 15.516666666666667),
 ('San Jose Diridon Caltrain Station', 0, 15.445238095238095),
 ('San Jose Diridon Caltrain Station', 20, 15.416666666666666),
 ('San Jose Diridon Caltrain Station', 1, 15.392857142857142),
 ('San Jose Diridon Caltrain Station', 4, 15.383333333333333)]