In [None]:
# check if this instance of the notebook already has files present
# and thus determine which steps required prior to reading in file and handling the data
!ls


In [None]:
# set-up spark (NB if Apache amend versions on download site we will need to amend path in wget command)
## NOTE that this version would make use of Hadoop if installed BUT that HDFS & Hadoop is not installed on our Colab
## (we are only using a single node (probably as a VM) so we will not be able to benefit from parallelism)
!clear
!echo welcome

!rm -f spark-3.3.[01]-bin-hadoop3.tgz* 
!rm -rf spark-3.3.[01]-bin-hadoop3

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar -xf spark-3.3.2-bin-hadoop3.tgz

!ls -alt
print("standalone Spark is now installed")

In [None]:
# init spark (ensure SPARK_HOME set to same version as we download earlier)
!pip3 install findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
# the next line gives us 'local' mode. try 'local[2]' to use 2 cores or 'master:NNNN' to run on Spark standalone cluster at port NNNN
spark_conf = SparkConf().setMaster('local[2]').setAppName('MyApp')
sc = SparkContext(conf=spark_conf)
# see what we have by examining the Spark User Interface
from pyspark.sql import *
from pyspark.sql.functions import *
SparkSession.builder.getOrCreate()
## 

In [None]:
## this is how one could upload a file into colab using the colab GUI (uncomment both lines if want to try it)

#from google.colab import files
#files.upload()


In [None]:
# get file for given year from TfL open data
!wget https://cycling.data.tfl.gov.uk/usage-stats/cyclehireusagestats-2014.zip
!unzip cyclehireusagestats-2014.zip

In [None]:
# at this point we have Spark initialised and we have a number of CSV files. 
# NB you can try also download the zipfile to your host machine and try opening in Excel (Win)
# (in Linux, easiest to open a file manager GUI then double-click on .csv file to open associated spreadsheet app)

In [None]:
# read in file
!ls
file="./1. Journey*csv"
spark = SparkSession.builder.appName("bikes").getOrCreate()
j_df = (spark.read.format("csv")
         .option("header", "true")
         .option("inferSchema", "true")
         .load(file))

# show top 10
j_df.show(10)

In [None]:
# group "StartStation Id" and sum their "Duration" (seconds), sort in descending order
start_df = (j_df.select("StartStation Id", "StartStation Name", "EndStation Id", "EndStation Name","Duration").groupBy("StartStation Id", "StartStation Name")).sum("Duration").orderBy("sum(Duration)", ascending=False)
# show top 10
start_df.show(10)
print('Note "Duration" is in seconds (see above table)')

In [None]:
# group "EndStation Id" and sum their "Duration", sort in descending order
end_df = (j_df.select("EndStation Id", "EndStation Name", "EndStation Id", "EndStation Name","Duration").groupBy("EndStation Id", "EndStation Name")).sum("Duration").orderBy("sum(Duration)", ascending=False)
# show top 10
end_df.show(10)

In [None]:
# find rows where Duration is zero
j_df.filter("Duration = 0").count()

In [None]:
# TO DO: determine how many rows have Duration of over a day

In [None]:
# try simple plot via pandas (we cover plotting again in demo#3)
import matplotlib.pyplot as plt
import pandas as pd


non_zeros_df = j_df.filter("Duration > 0")
# plot those with duration less than 1800 seconds (0.5 hrs)
pd_df = non_zeros_df.filter("Duration<1800")[["Duration"]].toPandas()
print("sample of ",pd_df.count())

pd_df.plot(kind="hist") # do without and then repeat but set #bins same as Excel (presume 94 bins)
plt.show()

In [None]:
# TO DO: determine %age of rides >3 days and interpret what this means
# TO DO: chose a different input file and compare outputs, discuss what this means (=> sim to first steps you will take in your assignment)

ideas for what we would want to do next
*   read all files and get top 10 source and top 10 dest stations
*   link with geo data to plot routes (heatmap for popularity)
*   use geo data to group (e.g. all Hyde Park as single entity)
*   compare top sites by month (or by weather (new dataset))
*   what dataset would we need to 'join' to determine safest routes?
*   can we determine popular routes by 'mode' (e.g. short, day hire, commuting etc)










>