In [20]:
!pip install pyspark
!pip install findspark


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *

In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [14]:
df = spark.read.parquet("yellow_tripdata_2021-02.parquet")
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [15]:
df.show(3)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-02-01 07:40:47|  2021-02-01 07:48:28|            1.0|          2.3|       1.0|                 N|         141|         226|           2|        8.5|  3.0|    0.5|       0.

## Total Trip on Feb 15

In [16]:
df = df \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_dt') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_dt')

df.registerTempTable('nytaxi_tab')
df.show(1)



+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|          pickup_dt|         dropoff_dt|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1|2021-02-01 07:40:47|2021-02-01 07:48:28|            1.0|          2.3|       1.0|                 N|         141|         226|           2|        8.5|  3.0|    0.5|       0.0|         0

In [18]:
total_trip_feb15 = spark.sql(""" 

    SELECT COUNT(pickup_dt) AS total_trip_feb15
    FROM nytaxi_tab
    WHERE pickup_dt >= '2021-02-15 00:00:00' AND pickup_dt < '2021-02-16 00:00:00'

""")

total_trip_feb15.show(1)

+----------------+
|total_trip_feb15|
+----------------+
|           43686|
+----------------+



### Longest Trip per Day

In [19]:
df.createOrReplaceTempView('data_view')

In [20]:
longesttrip_perday = df.withColumn("pickup_dt" , to_date(df['pickup_dt']))\
                      .select(['pickup_dt','trip_distance'])\
                      .where("pickup_dt >= '2021-02-01' ")\
                      .groupby(F.col('pickup_dt')).agg(F.max('trip_distance').alias('longest_trip')).sort(desc("longest_trip"))
longesttrip_perday.show(5)

+----------+------------+
| pickup_dt|longest_trip|
+----------+------------+
|2021-02-16|   221188.25|
|2021-02-20|   188054.03|
|2021-02-08|   186617.92|
|2021-02-07|   186510.67|
|2021-02-03|   186079.73|
+----------+------------+
only showing top 5 rows



## Top 5 most frequent dispatching_base_num

In [24]:
df_1 = spark.read.parquet("fhvhv_tripdata_2021-02.parquet", header=True, inferSchema=True)
df_1.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

In [25]:
df_1.show(1)

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [26]:
df1_top5 = df_1.groupBy("dispatching_base_num").count() \
                    .orderBy(F.col('count').desc())

df1_top5.show(5)

+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
+--------------------+-------+
only showing top 5 rows



##Top 5 most common location pairs (PULocationID and DOLocationID)

In [27]:
df2_top5 = df.where("PUlocationID IS NOT NULL AND DOlocationID IS NOT NULL") \
                      .groupBy(["PUlocationID",'DOlocationID']) \
                      .count() \
                      .orderBy(F.col('count').desc())
df2_top5.show(5)

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|         237|         236|11455|
|         236|         237| 9901|
|         236|         236| 8819|
|         237|         237| 7324|
|         264|         264| 5732|
+------------+------------+-----+
only showing top 5 rows



In [28]:
top5_location_pairs_fhv = df_1.where("PUlocationID IS NOT NULL AND DOlocationID IS NOT NULL") \
                          .groupBy(["PUlocationID",'DOlocationID']) \
                          .count() \
                          .orderBy(F.col('count').desc())
top5_location_pairs_fhv.show(5)

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|          76|          76|45041|
|          26|          26|37329|
|          39|          39|28026|
|          61|          61|25976|
|          14|          14|17934|
+------------+------------+-----+
only showing top 5 rows



## Write all result to Bigquery table.

In [32]:
gcs_bucket = 'data-fellowship-9'
bq_dataset = 'task6_iykra'
bq_table = 'nyc_taxi_rec'

df_wiki_en_totals.write \
  .format("bigquery") \
  .option("table","{}.{}".format(bq_dataset, bq_table)) \
  .option("temporaryGcsBucket", gcs_bucket) \
  .mode('overwrite') \
  .save()

NameError: name 'df_wiki_en_totals' is not defined