# Investigate Bike Sharing Usage before/after COVID-19

Table of contents:
1. NiceRide Data Exploration, Downloading and Cleaning
2. COVID-19 Data Crawling
3. Data Analysis and Visualization

To run this notebook, simply click run all and it should run to the very last cell.

## 1. NiceRide Data Exploration, Downloading and Cleaning

### 1.1 Download sample data

This cell is not runnable. It is only used to show the data explotation step, as well as the data cleaning target schema. 

#### Commands
```sh
wget https://s3.amazonaws.com/niceride-data/201804-niceride-tripdata.csv.zip
wget https://s3.amazonaws.com/niceride-data/201904-niceride-tripdata.csv.zip
wget https://s3.amazonaws.com/niceride-data/202004-niceride-tripdata.csv.zip
wget https://s3.amazonaws.com/niceride-data/202104-niceride-tripdata.csv.zip
unzip 201804-niceride-tripdata.csv.zip
unzip 201904-niceride-tripdata.csv.zip
unzip 202004-niceride-tripdata.csv.zip
unzip 202104-niceride-tripdata.csv.zip
```

### 1.2 Explore data

#### Commands
```sh
echo "2018"
head 201804-niceride-tripdata.csv -n 3
echo ""
echo ""
echo "2019"
head 201904-niceride-tripdata.csv -n 3
echo ""
echo ""
echo "2020"
head 202004-niceride-tripdata.csv -n 3
echo ""
echo ""
echo "2021"
head 202104-niceride-tripdata.csv -n 3
```

#### Output
```
2018
"tripduration","start_time","end_time","start station id","start station name","start station latitude","start station longitude","end station id","end station name","end station latitude","end station longitude","bikeid","usertype","birth year","gender","bike type"
1373,"2018-04-24 16:03:04.2700","2018-04-24 16:25:57.6010",170,"Boom Island Park",44.99254,-93.270256,2,"100 Main Street SE",44.984892,-93.256551,2,"Customer",1969,0,"Classic"
1730,"2018-04-24 16:38:40.5210","2018-04-24 17:07:31.1070",2,"100 Main Street SE",44.984892,-93.256551,13,"North 2nd Street & 4th Ave N",44.986087,-93.272459,2,"Customer",1969,0,"Classic"


2019
"tripduration","start_time","end_time","start station id","start station name","start station latitude","start station longitude","end station id","end station name","end station latitude","end station longitude","bikeid","usertype","birth year","gender","bike type"
3568,"2019-04-22 09:03:33.7210","2019-04-22 10:03:02.6670",188,"Sanford Hall",44.980831,-93.240282,190,"Weisman Art Museum",44.973428353028844,-93.23731899261473,988,"Subscriber",1998,2,"Classic"
223,"2019-04-22 09:35:15.0170","2019-04-22 09:38:58.4050",188,"Sanford Hall",44.980831,-93.240282,190,"Weisman Art Museum",44.973428353028844,-93.23731899261473,1215,"Subscriber",1997,1,"Classic"


2020
ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
37276F98FD2F1372,docked_bike,2020-04-29 17:41:02,2020-04-29 18:20:55,Lake Street & West River Parkway,149,Coldwater Spring,155,44.9485,-93.2062,44.905,-93.1983,member
52B4BA53A4AF9262,docked_bike,2020-04-11 19:28:44,2020-04-11 19:47:35,Portland Ave & Washington Ave,91,Franklin & 28th Ave,47,44.9782,-93.2602,44.9627,-93.2309,casual


2021
ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
D2EB377B1A895A6E,classic_bike,2021-04-16 02:08:11,2021-04-16 02:12:54,Central Ave NE & 14th Ave NE,30204,Logan Park,30104,45.002526,-93.247162,44.99882,-93.25276,casual
50B388CD58CFAAB7,classic_bike,2021-04-24 20:17:47,2021-04-24 20:35:44,Central Ave NE & 14th Ave NE,30204,Logan Park,30104,45.002526,-93.247162,44.99882,-93.25276,casual
```

#### Raw tables
- 2018 & 2019:
| Column | tripduration | start_time | end_time  | start station id | start station name | start station latitude | start station longitude | end station id | end station name | end station latitude | end station longitude | bikeid | usertype | birth year | gender | bike type |
| ------ | ------------ | ---------- | --------- | ---------------- | ------------------ | ---------------------- | ----------------------- | -------------- | ---------------- | -------------------- | --------------------- | ------ | -------- | ---------- | ------ | --------- |
| Type   | int          | timestamp  | timestamp | int              | string             | double                 | double                  | int            | string           | double               | double                | int    | string   | int        | int    | string    |

- 2020 & 2021:
| Column | ride_id | rideable_type | started_at | ended_at  | start_station_name | start_station_id | end_station_name | end_station_id | start_lat | start_lng | end_lat | end_lng | member_casual |
| ------ | ------- | ------------- | ---------- | --------- | ------------------ | ---------------- | ---------------- | -------------- | --------- | --------- | ------- | ------- | ------------- |
| Type   | string  | string        | timestamp  | timestamp | string             | int              | string           | int            | double    | double    | double  | double  | string        |

#### Target tables
- Trip table
| Column              | trip_id   | start_time | start_station_id | end_time  | end_station_id | usertype                                    |
| ------------------- | --------- | ---------- | ---------------- | --------- | -------------- | ------------------------------------------- |
| Type                | uuid      | timestamp  | int              | timestamp | int            | int (0 for casual, 1 for member/subscriber) |
| Column in 2018/2019 | generated | start_time | start station id | end_time  | end station id | usertype                                    |
| Column in 2020/2021 | generated | started_at | start_station_id | ended_at  | end_station_id | member_casual                               |

- Bike station table
| Column | station_id | station_name | lat    | lon    |
|--------|------------|--------------|--------|--------|
| Type   | string     | int          | double | double |

### 1.3 Download data

The following two cells first download the and unzip the files from S3 and then move the files from the original file system to the Databricks file system.

In [0]:
%sh

rm -rf *-niceride-tripdata.csv
rm -rf *-niceride-tripdata.csv.zip

for year in 2018 2019 2020 2021
do
  for month in 04 05 06 07 08 09 10 11
  do
    wget https://s3.amazonaws.com/niceride-data/$year$month-niceride-tripdata.csv.zip
    unzip $year$month-niceride-tripdata.csv.zip
    rm $year$month-niceride-tripdata.csv.zip
  done
done

mv 2001906-niceride-tripdata.csv 201906-niceride-tripdata.csv  # fix a wrong file name for 2019-06

--2022-05-06 20:22:23--  https://s3.amazonaws.com/niceride-data/201804-niceride-tripdata.csv.zip
Resolving s3.amazonaws.com (s3.amazonaws.com)... 3.5.16.112
Connecting to s3.amazonaws.com (s3.amazonaws.com)|3.5.16.112|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 620441 (606K) [application/zip]
Saving to: ‘201804-niceride-tripdata.csv.zip’

     0K .......... .......... .......... .......... ..........  8%  713K 1s
    50K .......... .......... .......... .......... .......... 16%  710K 1s
   100K .......... .......... .......... .......... .......... 24%  712K 1s
   150K .......... .......... .......... .......... .......... 33%  715K 1s
   200K .......... .......... .......... .......... .......... 41%  718K 0s
   250K .......... .......... .......... .......... .......... 49%  712K 0s
   300K .......... .......... .......... .......... .......... 57% 63.2M 0s
   350K .......... .......... .......... .......... .......... 66%  722K 0s
   400K .......... ..

In [0]:
dbutils.fs.rm("dbfs:/p3/", True)

for i in range(2018, 2022):
  for j in range(4, 12):
    file_name = f"{i}{j:02d}-niceride-tripdata.csv"
    dbutils.fs.cp(f"file:/databricks/driver/{file_name}", f"dbfs:/p3/{i}/{file_name}")  # move file from file:/ to dbfs:/

### 1.4 Data cleaning

#### 1.4.1 Load data

In [0]:
from pyspark.sql.types import StructType, IntegerType, DoubleType, TimestampType, StringType

# specify the schema explicitly
schema_2018_2019 = StructType() \
  .add("tripduration", IntegerType(), True) \
  .add("start_time", TimestampType(), True) \
  .add("end_time", TimestampType(), True) \
  .add("start_station_id", IntegerType(), True) \
  .add("start_station_name", StringType(), True) \
  .add("start_lat", DoubleType(), True) \
  .add("start_lon", DoubleType(), True) \
  .add("end_station_id", IntegerType(), True) \
  .add("end_station_name", StringType(), True) \
  .add("end_lat", DoubleType(), True) \
  .add("end_lon", DoubleType(), True) \
  .add("bikeid", IntegerType(), True) \
  .add("usertype", StringType(), True) \
  .add("birth_year", IntegerType(), True) \
  .add("gender", IntegerType(), True) \
  .add("bike_type", StringType(), True)

schema_2020_2021 = StructType() \
  .add("ride_id", StringType(), True) \
  .add("rideable_type", StringType(), True) \
  .add("start_time", TimestampType(), True) \
  .add("end_time", TimestampType(), True) \
  .add("start_station_name", StringType(), True) \
  .add("start_station_id", IntegerType(), True) \
  .add("end_station_name", StringType(), True) \
  .add("end_station_id", IntegerType(), True) \
  .add("start_lat", DoubleType(), True) \
  .add("start_lon", DoubleType(), True) \
  .add("end_lat", DoubleType(), True) \
  .add("end_lon", DoubleType(), True) \
  .add("usertype", StringType(), True)

# read all CSV files under a direction into a single DataFrame
df_2018 = spark.read.option("header", True).schema(schema_2018_2019).csv("dbfs:/p3/2018")
df_2019 = spark.read.option("header", True).schema(schema_2018_2019).csv("dbfs:/p3/2019")
df_2020 = spark.read.option("header", True).schema(schema_2020_2021).csv("dbfs:/p3/2020")
df_2021 = spark.read.option("header", True).schema(schema_2020_2021).csv("dbfs:/p3/2021")

# drop any row with "null" in any column
df_2018 = df_2018.na.drop("any")
df_2019 = df_2019.na.drop("any")
df_2020 = df_2020.na.drop("any")
df_2021 = df_2021.na.drop("any")

# create temp view
df_2018.createOrReplaceTempView("raw_2018")
df_2019.createOrReplaceTempView("raw_2019")
df_2020.createOrReplaceTempView("raw_2020")
df_2021.createOrReplaceTempView("raw_2021")

#### 1.4.2 Extract common columns and union trip data

In [0]:
# extract common columns from four DataFrames and union into one view
spark.sql("""
  select start_time, end_time, start_station_name, start_lat, start_lon, end_station_name, end_lat, end_lon, usertype
  from raw_2018
  union
  select start_time, end_time, start_station_name, start_lat, start_lon, end_station_name, end_lat, end_lon, usertype
  from raw_2019
  union
  select start_time, end_time, start_station_name, start_lat, start_lon, end_station_name, end_lat, end_lon, usertype
  from raw_2020
  union
  select start_time, end_time, start_station_name, start_lat, start_lon, end_station_name, end_lat, end_lon, usertype
  from raw_2021;
""").createOrReplaceTempView("raw_trip_union")

# generate uuid as trip id
spark.sql("""
  select uuid() as trip_id, *
  from raw_trip_union
  order by start_time;
""").createOrReplaceTempView("raw_trip_all")

#### 1.4.3 Extract station information from trips

In [0]:
# extract distinct station informations from all trip data
spark.sql("""
  select distinct trim(start_station_name) as station_name, start_lat as lat, start_lon as lon from raw_trip_union
  union
  select distinct trim(end_station_name) as station_name, end_lat as lat, end_lon as lon from raw_trip_union;
""").createOrReplaceTempView("raw_station")

# average the latitude and longitude for the same station
spark.sql("""
  select station_name, avg(lat) as lat, avg(lon) as lon
  from raw_station
  group by station_name
  order by station_name;
""").createOrReplaceTempView("raw_station_latlon")

# create new ID for station, save station data as Databricks table
spark.sql("""
  select monotonically_increasing_id()+1 as station_id, * 
  from raw_station_latlon
  order by station_name;
""").write.saveAsTable("station", mode="overwrite")

#### 1.4.4 Trip data transfer: apply UDF, split trips by years and save as tables

In [0]:
# define usertype transfer UDF
def usertype_transfer(usertype):
  if usertype == "Subscriber" or usertype == "member": 
    return 1
  else:
    return 0

# register UDF to Spark
spark.udf.register("udf_usertype_transfer", usertype_transfer, IntegerType())

# apply the new station ID and usertype transfer to the raw trip data
spark.sql("""
  select t.trip_id, t.start_time, s1.station_id as start_station_id, t.end_time, s2.station_id as end_station_id, udf_usertype_transfer(t.usertype) as usertype
  from raw_trip_all t, station s1, station s2
  where t.start_station_name = s1.station_name
    and t.end_station_name = s2.station_name
  order by start_time;
""").createOrReplaceTempView("trip_all")

# save 2018 trip as table
spark.sql("""
  select * 
  from trip_all
  where year(start_time) = 2018
  order by start_time;
""").write.saveAsTable("trip_2018", mode="overwrite")

# save 2019 trip as table
spark.sql("""
  select * 
  from trip_all
  where year(start_time) = 2019
  order by start_time;
""").write.saveAsTable("trip_2019", mode="overwrite")

# save 2020 trip as table
spark.sql("""
  select * 
  from trip_all
  where year(start_time) = 2020
  order by start_time;
""").write.saveAsTable("trip_2020", mode="overwrite")

# save 2021 trip as table
spark.sql("""
  select * 
  from trip_all
  where year(start_time) = 2021
  order by start_time;
""").write.saveAsTable("trip_2021", mode="overwrite")

## 2. COVID-19 Data Crawling

### 2.1 Install package

Pandas requires the package lxml to parse the HTML content

In [0]:
%pip install lxml

Python interpreter will be restarted.
Collecting lxml
  Downloading lxml-4.8.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl (6.9 MB)
Installing collected packages: lxml
Successfully installed lxml-4.8.0
Python interpreter will be restarted.


### 2.2 Crawl data to Pandas dataframe

In [0]:
import pandas as pd

url = "https://www.health.state.mn.us/diseases/coronavirus/situation.html"
pdf = pd.read_html(url, attrs={"id": "casetable"})[0]  # read a html component with specific id from the url
pdf = pdf[['Specimen collection date', 'Confirmed cases  (PCR positive)']]  # extract the desired columns
pdf. rename(columns = {'Specimen collection date':'date', 'Confirmed cases  (PCR positive)':'count'}, inplace = True)  # rename the columns

### 2.3 Load data to Spark, transfer date type

In [0]:
df = spark.createDataFrame(pdf)  # import Pandas dataframe into Spark directly
df.createOrReplaceTempView("covid_import")

# convert date string to date type, drop the rows does not meet the date format
spark.sql("""
  select to_date(date, "M/d/yy") as date, count 
  from covid_import
  where isnotnull(to_date(date, "M/d/yy"))
  order by to_date(date, "M/d/yy");
""").write.saveAsTable("covid_case", mode="overwrite")

### 2.4 A quick view of the clean data

In [0]:
display(spark.sql("select * from station limit 3;"))

station_id,station_name,lat,lon
1,100 Main Street SE,44.984851700564974,-93.25640492090396
2,10th Street E & Cedar Street,44.950189,-93.09759
3,11th Ave S & S 2nd Street,44.97651009112594,-93.25212727867236


In [0]:
display(spark.sql("select * from trip_2018 limit 3;"))

trip_id,start_time,start_station_id,end_time,end_station_id,usertype
7043e69d-c16b-494c-8d51-e762978298d7,2018-04-12T08:49:49.855+0000,156,2018-04-12T09:31:20.522+0000,103,1
b8f9ea5d-d2e0-4f13-bf39-b9c66f673d37,2018-04-12T09:29:22.149+0000,10,2018-04-12T09:33:10.525+0000,102,1
5f56bec4-e7d8-49fa-8588-f57e2113a941,2018-04-12T10:15:30.340+0000,230,2018-04-12T10:16:51.143+0000,272,1


In [0]:
display(spark.sql("select * from trip_2019 limit 3;"))

trip_id,start_time,start_station_id,end_time,end_station_id,usertype
cf6e2ad9-8ce6-4000-8702-6ad6b3326530,2019-04-22T09:03:33.721+0000,224,2019-04-22T10:03:02.667+0000,262,1
7f179eb2-bd32-4d31-872a-67a0284c9054,2019-04-22T09:35:15.017+0000,224,2019-04-22T09:38:58.405+0000,262,1
db65619a-451a-43a6-aa46-193563c59652,2019-04-22T09:37:10.081+0000,200,2019-04-22T09:46:33.891+0000,92,0


In [0]:
display(spark.sql("select * from trip_2020 limit 3;"))

trip_id,start_time,start_station_id,end_time,end_station_id,usertype
04a83145-eccf-4729-8ddf-015e75785cc8,2020-04-01T13:47:57.000+0000,181,2020-04-01T13:48:29.000+0000,181,0
854a4745-9b03-4111-b1a9-808d3eb95793,2020-04-01T13:48:56.000+0000,181,2020-04-01T13:49:20.000+0000,181,0
a7605d75-f34b-4384-8026-6c30603ab660,2020-04-01T13:49:36.000+0000,181,2020-04-01T13:49:41.000+0000,181,0


In [0]:
display(spark.sql("select * from trip_2021 limit 3;"))

trip_id,start_time,start_station_id,end_time,end_station_id,usertype
0e42270a-0564-4cf1-b28c-f18195ddca5d,2021-04-06T09:23:53.000+0000,48,2021-04-06T09:50:56.000+0000,62,0
c6b86359-8ef1-4f3d-a442-473b28ac7e64,2021-04-06T10:35:58.000+0000,214,2021-04-06T10:36:21.000+0000,214,1
b5f1f8a1-b609-45c4-8150-0b8a402a577e,2021-04-06T10:36:00.000+0000,214,2021-04-06T10:36:05.000+0000,214,1


In [0]:
display(spark.sql("select * from covid_case limit 3;"))

date,count
2020-03-05,1
2020-03-06,0
2020-03-07,1


## 3. Data Analysis and Visualization

### 3.1 Global level

#### 3.1.1 Annual usage change (2018 - 2021)

In [0]:
import pandas as pd

# count trips in each year
member_count = []
casual_count = []
total_count = []
for i in range(2018, 2022):
  member = spark.sql(f"select count(1) from trip_{i} where usertype=1;").first()[0]
  casual = spark.sql(f"select count(1) from trip_{i} where usertype=0;").first()[0]
  member_count.append(member)
  casual_count.append(casual)
  total_count.append(member+casual)

years = list(range(2018, 2022))
d = {'Year':years, 'Total':total_count, 'Member': member_count, 'Casual': casual_count}
pdf = pd.DataFrame(data=d)
display(pdf)


import plotly.graph_objects as go

# plot the annual count as stacked bar chart
fig = go.Figure(data=[
    go.Bar(name='Member', x=years, y=member_count),
    go.Bar(name='Casual', x=years, y=casual_count)
])

fig.update_layout(
  barmode='stack',
  title_text="Annual Trips 2018-2021"
)
fig.show()

Year,Total,Member,Casual
2018,391601,114625,276976
2019,258931,118045,140886
2020,159221,59577,99644
2021,146039,57437,88602


#### 3.1.2 Correlation analysis: NiceRide daily usage VS COVID-19 daily cases

In [0]:
# daily covid case count
pdf_case = spark.sql("select * from covid_case order by date;").toPandas()

# count bike trip by date, union the results
pdf_trip = spark.sql("""
  select * from (
  select date(start_time) as date, count(1) as count from trip_2018 group by date(start_time)
  union
  select date(start_time) as date, count(1) as count from trip_2019 group by date(start_time)
  union
  select date(start_time) as date, count(1) as count from trip_2020 group by date(start_time)
  union
  select date(start_time) as date, count(1) as count from trip_2021 group by date(start_time)
) all_trip
order by date;
""").toPandas()


import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])

# Add trace: bike trip
fig.add_trace(
    go.Scatter(x=pdf_trip["date"], y=pdf_trip["count"], name="NiceRide trips"),
    secondary_y=False,
)

# Add trace: covid case
fig.add_trace(
    go.Scatter(x=pdf_case["date"], y=pdf_case["count"], name="COVID-19 cases"),
    secondary_y=True,
)

# Add figure title
fig.update_layout(
    title_text="NiceRide Trips VS COVID-19 Cases"
)

# Set x-axis title
fig.update_xaxes(title_text="Date")

# Set y-axes titles
fig.update_yaxes(title_text="NiceRide Trips", secondary_y=False)
fig.update_yaxes(title_text="COVID-19 Cases", secondary_y=True)

fig.show()

The two datasets have different date coverage. The bike trip data is from 2018 to 2021, April to November each year. The covid data is from 2020 to now. So in the following cell I join the two datasets on date and plot timeseries on the common dates.

In [0]:
# count daily trips in 2020
spark.sql("""
    select date(start_time) as date, count(1) as count from trip_2020 group by date(start_time) order by date(start_time)
""").createOrReplaceTempView("trip_count_2020")

# count daily trips in 2021
spark.sql("""
    select date(start_time) as date, count(1) as count from trip_2021 group by date(start_time) order by date(start_time)
""").createOrReplaceTempView("trip_count_2021")

# join trip count with case count
pdf_join_2020 = spark.sql("""
  select t.date, t.count as trip_count, c.count as case_count
  from trip_count_2020 t, covid_case c
  where t.date = c.date
  order by t.date
""").toPandas()

pdf_join_2021 = spark.sql("""
  select t.date, t.count as trip_count, c.count as case_count
  from trip_count_2021 t, covid_case c
  where t.date = c.date
  order by t.date
""").toPandas()

import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Create figure with secondary y-axis
fig = make_subplots(rows=1, cols=2, specs=[[{"secondary_y": True}, {"secondary_y": True}]])

# Add traces
fig.add_trace(
    go.Scatter(x=pdf_join_2020["date"], y=pdf_join_2020["trip_count"], name="2020 NiceRide trips", marker=dict(color="LimeGreen")),
    secondary_y=False, row=1, col=1
)

fig.add_trace(
    go.Scatter(x=pdf_join_2020["date"], y=pdf_join_2020["case_count"], name="2020 COVID-19 cases", marker=dict(color="LightCoral")),
    secondary_y=True, row=1, col=1
)

fig.add_trace(
    go.Scatter(x=pdf_join_2021["date"], y=pdf_join_2021["trip_count"], name="2021 NiceRide trips", marker=dict(color="LimeGreen")),
    secondary_y=False, row=1, col=2
)

fig.add_trace(
    go.Scatter(x=pdf_join_2021["date"], y=pdf_join_2021["case_count"], name="2021 COVID-19 cases", marker=dict(color="LightCoral")),
    secondary_y=True, row=1, col=2
)

# Add figure title
fig.update_layout(
    title_text="NiceRide Trips VS COVID-19 Cases"
)

# Set x-axis title
fig.update_xaxes(title_text="2020", row=1, col=1)
fig.update_xaxes(title_text="2021", row=1, col=2)

# Set y-axes titles
fig.update_yaxes(title_text="NiceRide Trips", secondary_y=False)
fig.update_yaxes(title_text="COVID-19 Cases", secondary_y=True)

fig.show()

### 3.2 Station level usage change (2019-2020)

In [0]:
import plotly.graph_objects as go
mapbox_access_token = "pk.eyJ1IjoieXVjaHVhbmh1YW5nIiwiYSI6ImNsMnAzM3gwZjJncjEzZXFoNmtlMXBnYzEifQ.iXpZWWFabItApIebtz4yWg"  # Mapbox needs access token to render the map

# function to generate sql to find top stations
# start_or_end: only accepts "start" or "end"
# increase_or_descrise: input "" for increasing stations, input "desc" for decreasing stations
def generate_sql_station_lavel(start_or_end, increase_or_descrise=""):
  return f"""
    select s.station_id, s.station_name, s.lat, s.lon, c1.count-c2.count as diff
    from 
    (
      select {start_or_end}_station_id, count(1) as count
      from trip_2019
      group by {start_or_end}_station_id
    ) c1, 
    (
      select {start_or_end}_station_id, count(1) as count
      from trip_2020
      group by {start_or_end}_station_id
    ) c2, 
    station s
    where c1.{start_or_end}_station_id = c2.{start_or_end}_station_id
      and c1.{start_or_end}_station_id = s.station_id
    order by c1.count-c2.count {increase_or_descrise}
    limit 10;
  """


def gen_sql_get_decrease_station(start_or_end):
  return generate_sql_station_lavel(start_or_end, "desc")
  
def gen_sql_get_increase_station(start_or_end):
  return generate_sql_station_lavel(start_or_end, "")

# function to plot the stations on the map
def plot_station_on_map(df, start_or_end, increase_or_descrise=""):
  pdf = df.toPandas()  # convert Spark DataFrame to Pandas DataFrame
  
  fig = go.Figure(data=go.Scattermapbox())
  for row in pdf.itertuples(index=False):
    if increase_or_descrise == "":
      marker_text = f"{row.station_name} (id:{row.station_id}), increase:{abs(row.diff)}"
    else:
      marker_text = f"{row.station_name} (id:{row.station_id}), decrease:{row.diff}"
    
    fig.add_trace(
      go.Scattermapbox(
        lat=[row.lat],
        lon=[row.lon],
        text=marker_text,
        name=marker_text,
        marker=dict(size=12)
      )
    )

  if increase_or_descrise == "":
    fig_title = f"Top 10 {start_or_end} stations increase most 2019-2020"
  else:
    fig_title = f"Top 10 {start_or_end} stations decrease most 2019-2020"
  
  fig.update_layout(
    title=fig_title,
    mapbox=dict(
      accesstoken=mapbox_access_token,
      center=go.layout.mapbox.Center(  # initial map center
        lat=44.97,
        lon=-93.25
      ),
      zoom=11  # initial zoom level
   ))
  fig.show()

# public function to be called
# start_or_end: only accepts "start" or "end"
# increase_or_descrise: only accepts "increase" or "decrease"
def query_and_plot_station(start_or_end, increase_or_decrease):
  if increase_or_decrease == "increase":
    df = spark.sql(gen_sql_get_increase_station(start_or_end))
    display(df) # display the result
    plot_station_on_map(df, start_or_end) # display the map
  else:
    df = spark.sql(gen_sql_get_decrease_station(start_or_end))
    display(df)
    plot_station_on_map(df, start_or_end, "desc")

In [0]:
# The "diff" column in the table is defined as "(count in 2019) - (count in 2020)"
# A negative "diff" value means an increase
# A posituve "diff" value means a decrease

# top 10 start stations increase most
query_and_plot_station("start", "increase")

station_id,station_name,lat,lon,diff
138,Lake of the Isles Park,44.951464544006186,-93.30830438948183,-891
24,30th Ave SE & SE 4th Street,44.971367929790624,-93.21344397873796,-733
253,W Lake of the Isles Pkwy & W 21st Street,44.96145397673092,-93.3047217534618,-643
129,Lagoon Ave & S Hennepin Ave,44.94935206213592,-93.29862215728156,-557
120,Hiawatha Ave & E 50th Street,44.91301376436241,-93.21020445705383,-543
130,Lake Calhoun Center,44.94847798892731,-93.31731659498152,-472
135,Lake Street & Humboldt,44.94867769745908,-93.3007607859604,-470
179,NE Water Street & NE 13th Ave,44.99977604233513,-93.27319690017822,-433
192,Northeast Athletic Field Park,45.00192804114834,-93.23805879633174,-410
169,N 26th Ave & Theodore Wirth Pkwy,45.0057252135765,-93.31636420506584,-407


In [0]:
# top 10 end stations increase most
query_and_plot_station("end", "increase")

station_id,station_name,lat,lon,diff
181,NRM Bike Shop 2,44.983218,-93.223675,-957
138,Lake of the Isles Park,44.951464544006186,-93.30830438948183,-898
120,Hiawatha Ave & E 50th Street,44.91301376436241,-93.21020445705383,-667
253,W Lake of the Isles Pkwy & W 21st Street,44.96145397673092,-93.3047217534618,-603
135,Lake Street & Humboldt,44.94867769745908,-93.3007607859604,-558
129,Lagoon Ave & S Hennepin Ave,44.94935206213592,-93.29862215728156,-514
24,30th Ave SE & SE 4th Street,44.971367929790624,-93.21344397873796,-508
169,N 26th Ave & Theodore Wirth Pkwy,45.0057252135765,-93.31636420506584,-426
179,NE Water Street & NE 13th Ave,44.99977604233513,-93.27319690017822,-425
130,Lake Calhoun Center,44.94847798892731,-93.31731659498152,-398


In [0]:
# top 10 start stations decrease most
query_and_plot_station("start", "decrease")

station_id,station_name,lat,lon,diff
210,S 5th Street & Nicollet Mall,44.97877148442826,-93.27047608072412,4401
1,100 Main Street SE,44.984851700564974,-93.25640492090396,3845
119,Hennepin County Government Center,44.97683920723105,-93.26652638183424,3747
275,YWCA Downtown,44.97225740279114,-93.27627755726664,3348
262,Weisman Art Museum,44.97353695079061,-93.2372757526141,2935
272,Willey Hall,44.97242945490809,-93.24284129361776,2916
261,Washington Ave SE & Union Street SE,44.973928100679984,-93.23217580120888,2358
200,Portland Ave & Washington Ave,44.978151045576105,-93.26028674402396,2110
159,Mill City Quarter,44.98049188728323,-93.26176370963393,2075
231,South 2nd Street & 3rd Ave S,44.9808918066702,-93.26356354518936,2075


In [0]:
# top 10 end stations decrease most
query_and_plot_station("end", "decrease")

station_id,station_name,lat,lon,diff
210,S 5th Street & Nicollet Mall,44.97877148442826,-93.27047608072412,4537
1,100 Main Street SE,44.984851700564974,-93.25640492090396,4133
119,Hennepin County Government Center,44.97683920723105,-93.26652638183424,3347
272,Willey Hall,44.97242945490809,-93.24284129361776,2626
275,YWCA Downtown,44.97225740279114,-93.27627755726664,2535
262,Weisman Art Museum,44.97353695079061,-93.2372757526141,2530
261,Washington Ave SE & Union Street SE,44.973928100679984,-93.23217580120888,2517
200,Portland Ave & Washington Ave,44.978151045576105,-93.26028674402396,2328
230,Social Sciences,44.97060015700933,-93.24359089007685,2271
186,Nicollet Mall & 10th Street,44.97395759427608,-93.27436145075762,2208


### 3.3 Trip level usage change (2019-2020)

In [0]:
import plotly.graph_objects as go
mapbox_access_token = "pk.eyJ1IjoieXVjaHVhbmh1YW5nIiwiYSI6ImNsMnAzM3gwZjJncjEzZXFoNmtlMXBnYzEifQ.iXpZWWFabItApIebtz4yWg"

# function to generate sql to find top trips
# increase_or_descrise: input "" for increasing trips, input "desc" for decreasing trips
def generate_sql_trip_level(increase_or_descrise=""):
  return f"""
    select 
      s1.station_id as start_station_id, 
      s1.station_name as start_station_name, 
      s1.lat as start_lat, 
      s1.lon as start_lon,
      s2.station_id as end_station_id, 
      s2.station_name as end_station_name, 
      s2.lat as end_lat, 
      s2.lon as end_lon,
      c1.count - c2.count as diff
    from 
    (
      select start_station_id, end_station_id, count(1) as count
      from trip_2019
      group by start_station_id, end_station_id
    ) c1, 
    (
      select start_station_id, end_station_id, count(1) as count
      from trip_2020
      group by start_station_id, end_station_id
    ) c2, 
    station s1, station s2
    where c1.start_station_id = c2.start_station_id
      and c1.end_station_id = c2.end_station_id
      and c1.start_station_id = s1.station_id
      and c1.end_station_id = s2.station_id
    order by c1.count - c2.count {increase_or_descrise}
    limit 10;
  """

def gen_sql_get_decrease_trip():
  return generate_sql_trip_level("desc")
  
def gen_sql_get_increase_trip():
  return generate_sql_trip_level()


def plot_trip_on_map(df, increase_or_descrise=""):
  pdf = df.toPandas()
  fig = go.Figure(data=go.Scattermapbox())
  for row in pdf.itertuples(index=False):

    if increase_or_descrise == "":
      marker_text = f"start:{row.start_station_name} (id:{row.start_station_id})->end:{row.end_station_name} (id:{row.end_station_id}), increase:{abs(row.diff)}"
    else:
      marker_text = f"start:{row.start_station_name} (id:{row.start_station_id})->end:{row.end_station_name} (id:{row.end_station_id}), decrease:{row.diff}"

    fig.add_trace(
      go.Scattermapbox(
        lat=[row.start_lat, row.end_lat],
        lon=[row.start_lon, row.end_lon],
        text=marker_text,
        name=marker_text,
        mode = "markers+lines",  # plot trips as lines
        marker=dict(size=12)
      )
    )

  if increase_or_descrise == "":
    fig_title = f"Top 10 trips increase most 2019-2020"
  else:
    fig_title = f"Top 10 trips decrease most 2019-2020"

  fig.update_layout(
    title=fig_title,
    mapbox=dict(
      accesstoken=mapbox_access_token,
      center=go.layout.mapbox.Center(
        lat=44.97,
        lon=-93.25
      ),
      zoom=11
   ))
  fig.show()

# public function to be called
# increase_or_descrise: only accepts "increase" or "decrease"
def query_and_plot_trip(increase_or_decrease):
  if increase_or_decrease == "increase":
    df = spark.sql(gen_sql_get_increase_trip())
    display(df)
    plot_trip_on_map(df)
  else:
    df = spark.sql(gen_sql_get_decrease_trip())
    display(df)
    plot_trip_on_map(df, "desc")

In [0]:
# The "diff" column in the table is defined as "(count in 2019) - (count in 2020)"
# A negative "diff" value means an increase
# A posituve "diff" value means a decrease

# top 10 trips increase most
query_and_plot_trip("increase")

start_station_id,start_station_name,start_lat,start_lon,end_station_id,end_station_name,end_lat,end_lon,diff
130,Lake Calhoun Center,44.94847798892731,-93.31731659498152,130,Lake Calhoun Center,44.94847798892731,-93.31731659498152,-1462
37,6th Ave SE & University Ave,44.98332983380283,-93.24812568849768,37,6th Ave SE & University Ave,44.98332983380283,-93.24812568849768,-996
135,Lake Street & Humboldt,44.94867769745908,-93.3007607859604,135,Lake Street & Humboldt,44.94867769745908,-93.3007607859604,-695
3,11th Ave S & S 2nd Street,44.97651009112594,-93.25212727867236,3,11th Ave S & S 2nd Street,44.97651009112594,-93.25212727867236,-590
247,W 36th Street & W Calhoun Parkway,44.93779912651373,-93.32027690694711,247,W 36th Street & W Calhoun Parkway,44.93779912651373,-93.32027690694711,-553
120,Hiawatha Ave & E 50th Street,44.91301376436241,-93.21020445705383,120,Hiawatha Ave & E 50th Street,44.91301376436241,-93.21020445705383,-441
134,Lake Nokomis,44.91592274607331,-93.24274092452008,134,Lake Nokomis,44.91592274607331,-93.24274092452008,-427
138,Lake of the Isles Park,44.951464544006186,-93.30830438948183,138,Lake of the Isles Park,44.951464544006186,-93.30830438948183,-334
264,West 15th Street & Willow,44.96810773947872,-93.28195172393583,264,West 15th Street & Willow,44.96810773947872,-93.28195172393583,-330
24,30th Ave SE & SE 4th Street,44.971367929790624,-93.21344397873796,24,30th Ave SE & SE 4th Street,44.971367929790624,-93.21344397873796,-328


In [0]:
# top 10 trips decrease most
query_and_plot_trip("decrease")

start_station_id,start_station_name,start_lat,start_lon,end_station_id,end_station_name,end_lat,end_lon,diff
1,100 Main Street SE,44.984851700564974,-93.25640492090396,1,100 Main Street SE,44.984851700564974,-93.25640492090396,734
262,Weisman Art Museum,44.97353695079061,-93.2372757526141,272,Willey Hall,44.97242945490809,-93.24284129361776,584
272,Willey Hall,44.97242945490809,-93.24284129361776,262,Weisman Art Museum,44.97353695079061,-93.2372757526141,582
262,Weisman Art Museum,44.97353695079061,-93.2372757526141,230,Social Sciences,44.97060015700933,-93.24359089007685,581
132,Lake Harriet Bandshell,44.92934037272726,-93.30712101196174,132,Lake Harriet Bandshell,44.92934037272726,-93.30712101196174,580
230,Social Sciences,44.97060015700933,-93.24359089007685,262,Weisman Art Museum,44.97353695079061,-93.2372757526141,411
275,YWCA Downtown,44.97225740279114,-93.27627755726664,210,S 5th Street & Nicollet Mall,44.97877148442826,-93.27047608072412,402
57,Cedar Field Park,44.95715202075472,-93.24839643176104,57,Cedar Field Park,44.95715202075472,-93.24839643176104,369
119,Hennepin County Government Center,44.97683920723105,-93.26652638183424,66,Chicago Ave & S Washington Ave,44.97753179252417,-93.2569107181978,337
272,Willey Hall,44.97242945490809,-93.24284129361776,69,Coffman Union,44.9733731441144,-93.23475714012643,312
