## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
import pyspark.sql.functions as F
file_location = "/FileStore/tables/yellow_tripdata_2018_01-2.parquet"
file_type = "parquet"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)
print(df.count())

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,
1,2018-01-01T00:29:29.000+0000,2018-01-01T00:32:48.000+0000,3,0.5,1,N,143,143,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:38:08.000+0000,2018-01-01T00:48:24.000+0000,2,1.7,1,N,50,239,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35,,
1,2018-01-01T00:49:29.000+0000,2018-01-01T00:51:53.000+0000,1,0.7,1,N,239,238,1,4.0,0.5,0.5,1.0,0.0,0.3,6.3,,
1,2018-01-01T00:56:38.000+0000,2018-01-01T01:01:05.000+0000,1,1.0,1,N,238,24,1,5.5,0.5,0.5,1.7,0.0,0.3,8.5,,
1,2018-01-01T00:17:04.000+0000,2018-01-01T00:22:24.000+0000,1,0.7,1,N,170,170,2,5.5,0.5,0.5,0.0,0.0,0.3,6.8,,


8760687


In [0]:
# Create a view or table

temp_table_name = "yellow_tripdata_2018_01-2_parquet"

df.createOrReplaceTempView("temp_table_name")
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "yellow_tripdata_2018_01-2_parquet"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
#Question1: Query 1. - Add a column named as ""Revenue"" into dataframe which is the sum of the below columns 'Fare_amount','Extra','MTA_tax','Improvement_surcharge','Tip_amount','Tolls_amount','Total_amount'
df = df.withColumn("Revenue", 
     df["Fare_amount"]+
     df["Extra"]+
     df["MTA_tax"]+
     df["improvement_surcharge"]+
     df["Tip_amount"]+
     df["Tolls_amount"]+
     df["Total_amount"])   
display(df)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,Revenue
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,,11.6
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,,30.6
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,,16.6
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,,69.6
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,,33.1
1,2018-01-01T00:29:29.000+0000,2018-01-01T00:32:48.000+0000,3,0.5,1,N,143,143,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,,11.6
1,2018-01-01T00:38:08.000+0000,2018-01-01T00:48:24.000+0000,2,1.7,1,N,50,239,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35,,,24.700000000000003
1,2018-01-01T00:49:29.000+0000,2018-01-01T00:51:53.000+0000,1,0.7,1,N,239,238,1,4.0,0.5,0.5,1.0,0.0,0.3,6.3,,,12.6
1,2018-01-01T00:56:38.000+0000,2018-01-01T01:01:05.000+0000,1,1.0,1,N,238,24,1,5.5,0.5,0.5,1.7,0.0,0.3,8.5,,,17.0
1,2018-01-01T00:17:04.000+0000,2018-01-01T00:22:24.000+0000,1,0.7,1,N,170,170,2,5.5,0.5,0.5,0.0,0.0,0.3,6.8,,,13.6


In [0]:
#Query 2. - Increasing count of total passengers in New York City by area
df.groupBy("PULocationID").count().sort("passenger_count").show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-4454216353532562>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      1[0m [0;31m#Query 2. - Increasing count of total passengers in New York City by area[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf[0m[0;34m.[0m[0mgroupBy[0m[0;34m([0m[0;34m"PULocationID"[0m[0;34m)[0m[0;34m.[0m[0mcount[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0msort[0m[0;34m([0m[0;34m"passenger_count"[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m          

In [0]:
#Query 3. - Realtime Average fare/total earning amount earned by 2 vendors
df.groupBy("VendorID").avg("Fare_amount").show()


+--------+------------------+
|VendorID|  avg(Fare_amount)|
+--------+------------------+
|       1|11.959975333672647|
|       2|12.467037370438032|
+--------+------------------+



In [0]:
#Query 4. - Moving Count of payments made by each payment mode
df.groupBy("payment_type").count().show()

+------------+-------+
|payment_type|  count|
+------------+-------+
|           1|6106416|
|           3|  43204|
|           2|2599215|
|           4|  11852|
+------------+-------+



In [0]:
df.filter("PULocationID!=DOLocationID").groupBy("VendorID").max("passenger_count").show()

+--------+--------------------+
|VendorID|max(passenger_count)|
+--------+--------------------+
|       1|                   9|
|       2|                   9|
+--------+--------------------+



In [0]:
df.groupBy("PULocationID").max("passenger_count").show();

+------------+--------------------+
|PULocationID|max(passenger_count)|
+------------+--------------------+
|          26|                   6|
|          29|                   6|
|          65|                   6|
|         191|                   6|
|         222|                   6|
|         243|                   6|
|          54|                   6|
|          19|                   4|
|         113|                   7|
|         112|                   6|
|         167|                   6|
|         155|                   6|
|         237|                   6|
|         241|                   6|
|          22|                   6|
|         198|                   6|
|         130|                   6|
|         196|                   6|
|           7|                   6|
|          77|                   5|
+------------+--------------------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import col

In [0]:
#Query 6. - Most no of passenger between a route of two location.
a=df.select("PULocationID","DOLocationID","passenger_count")
a.filter("PULocationID!=DOLocationID").sort(col("passenger_count").desc()).show()


+------------+------------+---------------+
|PULocationID|DOLocationID|passenger_count|
+------------+------------+---------------+
|         236|         138|              9|
|         194|         265|              9|
|         264|         265|              9|
|         264|         262|              9|
|         264|          68|              9|
|         161|          95|              9|
|         264|           1|              9|
|         264|         181|              9|
|         138|         265|              9|
|         161|           1|              9|
|         264|         261|              9|
|         264|          50|              9|
|         230|         265|              9|
|         138|         265|              8|
|         264|         257|              8|
|         132|         265|              8|
|         264|         164|              8|
|         264|         137|              8|
|         264|         265|              8|
|         138|         265|     

In [0]:
# Query 7. - Get top pickup locations with most passengers in last 5/10 seconds. 
a=df.select("PULocationID","tpep_pickup_datetime","passenger_count")
a.sort(col("passenger_count").desc(), col("tpep_pickup_datetime").desc()).show()

+------------+--------------------+---------------+
|PULocationID|tpep_pickup_datetime|passenger_count|
+------------+--------------------+---------------+
|          87| 2018-01-30 09:38:43|              9|
|         230| 2018-01-29 11:33:02|              9|
|           1| 2018-01-28 20:14:35|              9|
|         144| 2018-01-28 03:19:59|              9|
|         264| 2018-01-26 22:22:20|              9|
|          79| 2018-01-25 23:20:53|              9|
|         264| 2018-01-24 23:38:03|              9|
|         161| 2018-01-23 13:35:56|              9|
|         138| 2018-01-22 13:13:59|              9|
|           1| 2018-01-21 15:17:42|              9|
|           1| 2018-01-21 13:47:04|              9|
|         163| 2018-01-20 21:37:15|              9|
|         264| 2018-01-18 17:36:26|              9|
|         236| 2018-01-17 12:53:00|              9|
|         264| 2018-01-14 17:34:30|              9|
|         164| 2018-01-14 02:51:00|              9|
|         16

In [0]:
a=df.select("passenger_count", "trip_distance", "Revenue", "tpep_pickup_datetime").distinct()
a.sort(col("Revenue").desc()).show()

+---------------+-------------+------------------+--------------------+
|passenger_count|trip_distance|           Revenue|tpep_pickup_datetime|
+---------------+-------------+------------------+--------------------+
|              1|          4.8|           16033.6| 2018-01-13 19:06:28|
|              1|          0.0|           11452.6| 2018-01-30 15:59:04|
|              1|          4.0|            6028.6| 2018-01-17 10:50:36|
|              1|          1.2|            6013.6| 2018-01-24 12:39:15|
|              1|          0.0|            5389.6| 2018-01-11 16:04:51|
|              1|       484.91| 4835.620000000001| 2018-01-12 17:36:09|
|              1|          0.0|            4620.6| 2018-01-19 09:44:15|
|              1|          0.0|            2038.6| 2018-01-26 08:30:54|
|              1|          2.0|            2034.6| 2018-01-29 09:02:42|
|              1|          5.4|            2007.0| 2018-01-20 18:13:16|
|              1|          0.2|1960.7199999999998| 2018-01-25 21