In [None]:
# see what already avail and thus determine which steps required prior to reading in file and handling the data
# if you see more than "sample_data" you can jump to the relevant step below
!ls

In [None]:
# set-up spark (NB if Apache amend versions on download site we will need to amend path in wget command)
print("\nWelcome to advanced top sites")
!ls
!rm -f spark-3.2.[01]-bin-hadoop3.2.tgz* 
!rm -rf spark-3.2.[01]-bin-hadoop3.2
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar -xf spark-3.2.1-bin-hadoop3.2.tgz
!ls -alt


In [None]:
# install findspark if not already installed
!pip3 install findspark


In [None]:
# init spark (ensure SPARK_HOME set to same version as we download earlier)
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
# the next line gives us 'local' mode. try 'local[2]' to use 2 cores or 'master:NNNN' to run on Spark standalone cluster at port NNNN
spark_conf = SparkConf().setMaster('local[2]').setAppName('MyApp')
sc = SparkContext(conf=spark_conf)
# see what we have by examining the Spark User Interface
from pyspark.sql import *
from pyspark.sql.functions import *
# "SparkSession" and "sc" are are key handles in to Spark API
##SparkSession.builder.getOrCreate()
spark = SparkSession.builder.appName("bikes").getOrCreate()

In [None]:
### # OPTIONAL upload any given file using Google Colab API/GUI
### from google.colab import files
### files.upload()


In [None]:
# get file for given year from TfL open data
!wget https://cycling.data.tfl.gov.uk/usage-stats/cyclehireusagestats-2014.zip
!unzip cyclehireusagestats-2014.zip



In [None]:
# TO DO: add the relevant spark commands (see week5?)
# read file in to a dataframe called "j_df"
file="./1. Journey*csv"
j_df = 


In [None]:

# show top 10 - if this gives an error you need to check the previous step/s
j_df.show(10)
print("duration is SECONDS")
# see how many entries (rows) in data
numRows = j_df.count()
print("there are ",numRows," rows")

# get "list" of last 2 rows
j_df.tail(2)
# we can deduce there is no inherent ordering of the rows

Group and Sort

In [None]:
# group "StartStation Id" and sum their "Duration", sort in descending order
start_df = (j_df.select("StartStation Id", "StartStation Name", "EndStation Id", "EndStation Name","Duration")
         .groupBy("StartStation Name")).sum("Duration").orderBy("sum(Duration)", ascending=False)
# show top 10
start_df.show(10)

In [None]:
# TO DO: group by ending destination, sum the ride times and sort

Conversions

In [None]:
# more useful perhaps than seconds would be converting to minutes
j_df.show(3)
j_df.selectExpr("Duration/60").show(3) # note that we can show without assigning to a new dataframe

# we can also use 'alias' to give more meaningful column name (note use of "col" from SQL syntax)
j_df.select((col("Duration")/60).alias('mins')).show(3)

min_df=j_df.select("StartStation Id", "StartStation Name", "EndStation Id", "EndStation Name",
                   ((col("Duration")/60).alias('minutes'))
)
min_df.groupBy("StartStation Id", "StartStation Name").sum("minutes").orderBy("sum(minutes)", ascending=False).show(5)

# NOTE that we can expect minutes to be integers since no seconds recorded in original spreadsheet

In [None]:
# TO DO: group by destination and show total ride duration in hours

Multiple Input Files

In [None]:
# MULTIPLE FILES INPUT AND ANALYSED (presuming identical schema - should test first!)
file1="./1. Journey*csv" # 05 Jan to 02 Feb
file2="./2. Journey*csv" # 03 Feb to 01 Mar
file3="./3. Journey*csv" # 02 Mar to 31 Mar
# we might say therefore that data in these 3 files corresponds to "winter" in the UK
winter_df = (spark.read.format("csv")
         .option("header", "true")
         .option("inferSchema", "true")
         .load([file1, file2, file3])) # i.e pass a Python list of files to load (into a single DF)

In [None]:
# TO DO: how many rows do we have in the winter_df DF?

In [None]:
# TO DO: group by "StartStation Id" and sum their "Duration", sorted with largest number listed first


Aggregates

In [None]:
# Find stats regarding duration of cycle rides 
# TO DO: print out the average and maximum values of "Duration" for winter_df
print("Stats of rides Jan-Mar inclusive (in seconds)")


In [None]:
# Find stats regarding duration of cycle rides grouped by starting destination AND sorted 
print("Sorted grouped-by stats of rides Jan-Mar inclusive")
winter_df.groupBy("StartStation Name").agg({"Duration": "mean"}).orderBy("avg(Duration)", ascending=False).show(8)
winter_df.groupBy("StartStation Name").agg({"Duration": "max"}).sort("max(Duration)", ascending=False).show(8)
# using intermediary DF:
winterSums_df = winter_df.groupBy("StartStation Name").agg({"Duration": "sum"})
winterSums_df.sort("sum(Duration)", ascending=False).show(8)

# TO DO: look at the minimum values for sorted grouped-by stats & at the smallest values of total Durations: what can you conclude?

Using a "join"

In [None]:
# multiple aggregate columns for grouped by
max_df = winter_df.groupBy("StartStation Name").agg({"Duration":"max"})
min_df = winter_df.groupBy("StartStation Name").agg({"Duration":"min"})

#max_df.show()
#min_df.show()

# join (via use of intermediary DF)
new_df = min_df.join(max_df, on=["StartStation Name"], how="inner")
new_df.show()


In [None]:
# do more aggregate cols and sort by sum(Duration)
mean_df = winter_df.groupBy("StartStation Name").agg({"Duration":"mean"})
sum_df = winter_df.groupBy("StartStation Name").agg({"Duration":"sum"})
new_df = min_df.join(max_df, on=["StartStation Name"], how="inner").join(mean_df, on=["StartStation Name"], how="inner").join(sum_df, on=["StartStation Name"], how="inner").sort("sum(Duration)", ascending=False).show()

In [None]:
print("exercise completed. please consider how you would use Spark to answer the following research questions")

# Research Hypothesis (example 1)
We might state out thinking that "more riding happens in summer than winter". To form a hypothesis, we need to be more precise:

Steps
*   define winter & summer (JFM, JJA say?)
*   define "riding" (sum of duration of rides in given time period); do we want to exclude any rides?
*   if using TfL2014 data, we are only testing for London in that year

Now we can state our hypothesis, which we test whether it is true or not:
"During 2014, more riding happens in London in Summer than Winter, where Summer is defined as JJA and Winter as JFM". And we can state our test for whether this is true: "We define the quantity of riding as the sum of the duration of the relevant rides, and will use TfL 2014 data (citation) to determine if the hypothesis holds for 2014."

We then perform the test by
*   find sum of duration of all valid rides for each period
*   Hypothesis is T (for a given year) if sum of all valid rides in summer exceeds sum of all valid rides in winter for that year.

The hypothesis is perhaps too narrow so you might want to think how to expand to not just London in 2014.


# Research Hypothesis (example 2)
Test the hypothesis "people ride for longer in June than in January". 

Possible outline steps
*   find mean of journeys in Jan
*   find mean of journeys in June
*   Hypothesis is T if mean of June > mean of Jan



