In [1]:
import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import requests
import os
import tqdm.notebook

In [2]:
MONTHS = [7, 8, 9] # місяці 2023 року, які будемо аналізувати

## Завантажимо датасет

In [3]:
for month in tqdm.notebook.tqdm(MONTHS):
    file_name = f'fhvhv_tripdata_2023-{month:02}.parquet'
    
    if not os.path.exists(file_name):
        url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{file_name}'
        r = requests.get(url, allow_redirects=True, stream=True)

        file_size = int(r.headers.get('Content-Length', 0))
        
        temp_file_name = f'{file_name}.tmp'
        chunk_size=10 * 1024

        progress_bar = tqdm.notebook.tqdm(total=file_size, unit='iB', unit_scale=True)

        with open(temp_file_name, 'wb') as f:
            for chunk in r.iter_content(chunk_size=chunk_size):
                progress_bar.update(len(chunk))
                f.write(chunk)
        os.rename(temp_file_name, file_name)

  0%|          | 0/3 [00:00<?, ?it/s]

## Створимо датафрейм із файлів датасету та поглянемо на колонки та їхні типи

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
df = spark.read.parquet(*[f'fhvhv_tripdata_2023-{month:02}.parquet' for month in MONTHS])

In [6]:
df.show(5)

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [7]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_

## Також поглянемо на додаткову таблицю із назвами районів

In [8]:
zone_lookup = spark.read.csv('taxi+_zone_lookup.csv', header=True)

In [9]:
zone_lookup.show(10)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 10 rows



## Виконаємо необхідні розрахунки для поставленої задачі.

Алгоритм -- в коментаріях нижче.

In [10]:
df.select('base_passenger_fare', 'tolls', 'bcf', 'sales_tax',
          'congestion_surcharge', 'airport_fee', 'tips', 'driver_pay').show(5)

+-------------------+-----+----+---------+--------------------+-----------+----+----------+
|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|
+-------------------+-----+----+---------+--------------------+-----------+----+----------+
|              22.34|  0.0|0.61|     1.98|                 0.0|        0.0| 0.0|     20.42|
|              25.83|  0.0|0.71|     2.29|                 0.0|        0.0| 0.0|     23.03|
|              26.51|  0.0|0.73|     2.35|                2.75|        0.0|6.46|     18.61|
|              16.32|  0.0|0.45|     1.45|                2.75|        0.0| 0.0|      8.13|
|              83.97|  0.0|2.31|     7.45|                2.75|        0.0| 0.0|     59.05|
+-------------------+-----+----+---------+--------------------+-----------+----+----------+
only showing top 5 rows



In [11]:
(
    df

    # Додаємо всі види стягнень, які пасажир заплатив за поїздку у колонку "passenger_pay".
    .withColumn(
        'passenger_pay',
        sum(F.col(c) for c in [
            'base_passenger_fare', 'tolls', 'bcf', 'sales_tax',
            'congestion_surcharge', 'airport_fee', 'tips'
        ])
    )

    # Залишаємо лише потрібні колонки, необхідні для нашої задачі.
    .select('PULocationID', 'tips', 'passenger_pay')

    # Для кожного району рахуємо середню складову чайових.
    .groupby('PULocationID')
    .agg((F.sum(F.col('tips')) / F.sum(F.col('passenger_pay'))).alias('tip_to_pay_ratio'))

    # Підтягуємо назви районів із допоміжної таблиці
    .join(zone_lookup, on=(df['PULocationID'] == zone_lookup['LocationID']), how='left')

    # Залишаємо лише назву району та відсоток чайових.
    .select('Zone', 'tip_to_pay_ratio')

    # Сортуємо за відсотком чайових.
    .orderBy(F.col('tip_to_pay_ratio').desc())

    # Показуємо райони з найвищим відсотком чайових.
    .show(truncate=False)
)

+----------------------------+-------------------+
|Zone                        |tip_to_pay_ratio   |
+----------------------------+-------------------+
|Newark Airport              |0.1787984558840526 |
|Windsor Terrace             |0.06491645753500011|
|LaGuardia Airport           |0.06153435664089218|
|Green-Wood Cemetery         |0.06119669752620329|
|Columbia Street             |0.05657467526480542|
|Battery Park                |0.05598614774837693|
|Carroll Gardens             |0.05531883061902894|
|Flushing Meadows-Corona Park|0.05524631901946194|
|Central Park                |0.05452574305293669|
|Upper West Side South       |0.05363349731024332|
|Park Slope                  |0.05353460264887645|
|Gowanus                     |0.0532265609234798 |
|Cobble Hill                 |0.0527557379093849 |
|Times Sq/Theatre District   |0.05249115469712333|
|Prospect Heights            |0.0500361889189264 |
|Midtown North               |0.04951462031414925|
|Midtown Center              |0