## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/yellow_tripdata_2018_01.parquet"
file_type = "parquet"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,
1,2018-01-01T00:29:29.000+0000,2018-01-01T00:32:48.000+0000,3,0.5,1,N,143,143,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:38:08.000+0000,2018-01-01T00:48:24.000+0000,2,1.7,1,N,50,239,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35,,
1,2018-01-01T00:49:29.000+0000,2018-01-01T00:51:53.000+0000,1,0.7,1,N,239,238,1,4.0,0.5,0.5,1.0,0.0,0.3,6.3,,
1,2018-01-01T00:56:38.000+0000,2018-01-01T01:01:05.000+0000,1,1.0,1,N,238,24,1,5.5,0.5,0.5,1.7,0.0,0.3,8.5,,
1,2018-01-01T00:17:04.000+0000,2018-01-01T00:22:24.000+0000,1,0.7,1,N,170,170,2,5.5,0.5,0.5,0.0,0.0,0.3,6.8,,


In [0]:
# Create a view or table

temp_table_name = "yellow_tripdata_2018_01_parquet"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `yellow_tripdata_2018_01_parquet`

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,
1,2018-01-01T00:29:29.000+0000,2018-01-01T00:32:48.000+0000,3,0.5,1,N,143,143,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:38:08.000+0000,2018-01-01T00:48:24.000+0000,2,1.7,1,N,50,239,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35,,
1,2018-01-01T00:49:29.000+0000,2018-01-01T00:51:53.000+0000,1,0.7,1,N,239,238,1,4.0,0.5,0.5,1.0,0.0,0.3,6.3,,
1,2018-01-01T00:56:38.000+0000,2018-01-01T01:01:05.000+0000,1,1.0,1,N,238,24,1,5.5,0.5,0.5,1.7,0.0,0.3,8.5,,
1,2018-01-01T00:17:04.000+0000,2018-01-01T00:22:24.000+0000,1,0.7,1,N,170,170,2,5.5,0.5,0.5,0.0,0.0,0.3,6.8,,


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "yellow_tripdata_2018_01_parquet"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [0]:
%sql
SELECT * FROM yellow_tripdata_2018_01_parquet

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,
1,2018-01-01T00:29:29.000+0000,2018-01-01T00:32:48.000+0000,3,0.5,1,N,143,143,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:38:08.000+0000,2018-01-01T00:48:24.000+0000,2,1.7,1,N,50,239,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35,,
1,2018-01-01T00:49:29.000+0000,2018-01-01T00:51:53.000+0000,1,0.7,1,N,239,238,1,4.0,0.5,0.5,1.0,0.0,0.3,6.3,,
1,2018-01-01T00:56:38.000+0000,2018-01-01T01:01:05.000+0000,1,1.0,1,N,238,24,1,5.5,0.5,0.5,1.7,0.0,0.3,8.5,,
1,2018-01-01T00:17:04.000+0000,2018-01-01T00:22:24.000+0000,1,0.7,1,N,170,170,2,5.5,0.5,0.5,0.0,0.0,0.3,6.8,,


In [0]:
display(df)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,
1,2018-01-01T00:29:29.000+0000,2018-01-01T00:32:48.000+0000,3,0.5,1,N,143,143,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:38:08.000+0000,2018-01-01T00:48:24.000+0000,2,1.7,1,N,50,239,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35,,
1,2018-01-01T00:49:29.000+0000,2018-01-01T00:51:53.000+0000,1,0.7,1,N,239,238,1,4.0,0.5,0.5,1.0,0.0,0.3,6.3,,
1,2018-01-01T00:56:38.000+0000,2018-01-01T01:01:05.000+0000,1,1.0,1,N,238,24,1,5.5,0.5,0.5,1.7,0.0,0.3,8.5,,
1,2018-01-01T00:17:04.000+0000,2018-01-01T00:22:24.000+0000,1,0.7,1,N,170,170,2,5.5,0.5,0.5,0.0,0.0,0.3,6.8,,


In [0]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [0]:

from pyspark.sql.functions import *
df = df.withColumn("revenue",df.fare_amount+df.extra+df.mta_tax+df.improvement_surcharge+df.tip_amount+df.tolls_amount+df.total_amount)
display(df.select("revenue"))

revenue
11.6
30.6
16.6
69.6
33.1
11.6
24.700000000000003
12.6
17.0
13.6


In [0]:
#Q2
display(df.groupBy("PULocationID","DOLocationID").sum("passenger_count"))

PULocationID,DOLocationID,sum(passenger_count)
79,116,580
246,249,5073
234,144,7047
161,193,100
231,261,5812
143,211,260
90,231,6447
142,144,971
90,142,2835
249,225,400


In [0]:
#Q3
display(df.groupBy("VendorID").sum("total_amount"))

VendorID,sum(total_amount)
1,58181947.048458464
2,77530629.1266784


In [0]:
#Q4
display(df.groupBy("payment_type").count())

payment_type,count
1,6106416
3,43204
2,2599215
4,11852


In [0]:
#Q5
df = df.withColumn("date",to_date(col("tpep_pickup_datetime")))
display(df.groupBy('VendorId','date').agg(sum("passenger_count").alias("total_passenger_count"),sum("trip_distance").alias("total_distance")).sort(desc(col("total_passenger_count"))))


VendorId,date,total_passenger_count,total_distance
2,2018-01-26,354497,513454.3299999982
2,2018-01-25,351475,518394.0400000009
2,2018-01-18,349639,508573.5399999963
2,2018-01-13,349181,500069.0500000009
2,2018-01-19,341985,500606.6799999969
2,2018-01-12,335281,503578.94999999343
2,2018-01-31,333799,477673.2199999911
2,2018-01-27,332070,459611.73000000376
2,2018-01-24,330770,483133.2800000022
2,2018-01-17,323704,466203.35000000615


In [0]:
#Q6
display(df.groupBy("PULocationID","DOLocationID").agg(max("passenger_count").alias("maximum_passenger")).sort(desc(col("maximum_passenger"))))


PULocationID,DOLocationID,maximum_passenger
88,88,9
236,138,9
161,1,9
164,164,9
194,265,9
138,265,9
87,87,9
264,50,9
264,68,9
265,265,9


In [0]:
#Q7
from pyspark.sql.functions import *
from datetime import timedelta 

diff_timestamp =  df.select(max("tpep_pickup_datetime").alias("timestamp")).first()["timestamp"] - timedelta(seconds=10)
df_filter = df.filter(df["tpep_pickup_datetime"] > diff_timestamp)
df_filter = df_filter.groupBy("PULocationID").agg(sum("passenger_count").alias("total_passenger")).sort(desc(col("total_passenger")))

display(df_filter)

PULocationID,total_passenger
48,2
