In [None]:
# Если работать в colab
# !pip install pyspark==3.5.3

In [1]:
from pyspark.sql import SparkSession

from pyspark.sql import functions as F # типо functions

from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, FloatType

## PySpark simple EDA

Датасет взят: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

Данные за 2025 - July

1. Скачайте датасет
2. Поместите в удобное место на диске
3. Пропишите путь до скачанного файла

In [None]:
# Инициализация SparkSession

# Можно задать желаемые параметры конфигурации

# spark = (
#     SparkSession.builder 
#     .appName("IntroBD") # Позволяет задать название
#     .master("local[4]") # "*" - использовать все доступные ядра на машине
#     .config("spark.executor.cores", "2") # Кол-во рабочих процессов
#     .config("spark.driver.memory", "2g") # Память для драйвера
#     .config("spark.executor.memory", "1g") # Кол-во памяти для каждого рабочего процесса
#     .config("spark.sql.shuffle.partitions", "12") # Кол-во партиций (фрагментов данных)
#     .getOrCreate()
# )
# spark

# Указание памяти: 1g - 1GB, 500m - 500 MB

# Пока рекомендуется использовать стандарные параметры, создавать ссесию следующим образом:
# spark = SparkSession.builder.getOrCreate()

In [2]:
spark = SparkSession.builder.getOrCreate()
spark

In [6]:
# Инициализируем df к источнику данных, представленных файлом fhvhv_tripdata_2025-07.parquet
path2data_file = "../../../spark_sem_draft/data/fhvhv_tripdata_2025-07.parquet" # Путь до файла

df = spark.read.parquet(path2data_file, header=True, inferSchema=False)

In [7]:
# Кол-во строк в df
print(f'total rows: {df.count()}')

total rows: 19653012


In [None]:
# Схема
df.printSchema()

In [None]:
# Типы колонок
df.dtypes

In [None]:
# Удобный вертикальный вывод
df.show(1, vertical=True)

In [None]:
# df разбит на 12 партиций
df.rdd.getNumPartitions()

In [None]:
# Определение минимальной, максимальной даты
df.select(F.min("request_datetime"), F.max("request_datetime")).show()

In [None]:
# Сколько строк с датой меньше заданной
df.filter(df.request_datetime < "2025-07-01").count()

In [None]:
# Создание нового столбца
df = df.withColumn("timestamp_rd", F.col("request_datetime").cast("timestamp"))

In [None]:
df.explain()

In [None]:
# Если в датафрейме пресутствует формат даты, который не устраивает, можно сделать конвертацию

df = df.withColumn("request_datetime_time_convert", F.to_timestamp(F.col("request_datetime"), "dd/MM/yyyy HH:mm"))

In [None]:
df.dtypes

In [None]:
# Определить определить количество уникальных значений в колонке PULocationID, DOLocationID
print(df.select(df.PULocationID).distinct().count())
print(df.select(df.DOLocationID).distinct().count())

# Для вывода уникальных значений - заменить count() на show()

In [None]:
df.select(df.shared_request_flag).distinct().count()

In [None]:
# Для работы с датами есть прекрасные функции, которые позволяют выбирать из даты желаемое значение
# Например: F.year, F.month, F.day, F.dayofweek, F.dayofweek, F.dayofyear

df = df.withColumn("timestamp_rd_day", F.day("timestamp_rd"))

In [None]:
# Преобразование к желаемому типу данных
df = df.withColumn("timestamp_rd_day", F.col("timestamp_rd_day").cast("int"))

In [None]:
# На схеме можно заметить новые значения
df.printSchema()

In [None]:
df.filter(df.timestamp_rd < "2025-07-01").count()

# Можно сделать сравнения через between
# df.filter(df.timestamp_rd.between("2025-07-01", "2025-07-02")).show()

# Добавить 7 дней
# F.date_add(F.col("event_time"), 7)

# Разница между двумя датами
# F.datediff(F.current_date(), F.col("event_time"))

In [None]:
# Получение описательных статистик для числовых признаков (выполняется долго 1.5+ мин)
%time df.describe().show()

In [None]:
# Определяет кореляцию между двумя признаками
df.stat.corr("PULocationID", "DOLocationID")

In [None]:
# Определяет ковариацию между двумя признаками
df.stat.cov("PULocationID", "DOLocationID")

In [None]:
# Подвыборка из всего набора данных (df) fraction=0.05 - объем, seed=42 - сид, для воспроизведения эксперимента
sampled = df.sample(fraction=0.05, seed=42)

In [None]:
# Можно определить объем полученной подвыборки
sampled.count()

## HW 1

Скачать датасет `High Volume For-Hire Vehicle Trip Records` за любой месяц с: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page


* `Yellow Taxi Trip Records` - таблицы с записями о поездках на желтых такси
* `Green Taxi Trip Records` - таблицы с записями о поездках на зеленых такси (у них ограниченые маршруты)
* `For-Hire Vehicle Trip Records` - таблицы с записями компаний, в которых совершается более 10.000+ поездок в сутки
* `High Volume For-Hire Vehicle Trip Records` - таблицы с записями поездок 

Файл с описанием колонок в датасетах `High Volume For-Hire Vehicle Trip Records`: https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_hvfhs.pdf (тут можно ознакомиться с значениями каждых из колонок)

---
#### 1. Фильтрация
Прочитать датасет и вывести его схему, количество строк. 

Выполнить фильтрацию по чаевым и расстоянию поездки (оставить строки, где водитель получил чаевые и расстояние поездки было больше 5 миль).

Сколько строк осталось после фильтрации?

In [None]:
### your code here 

#### 2. Подсчет общей цены поездки. 

Работаем с изначальным датасетом

Создать новую колонку (`total_receipt`), которая будет являться суммой `base_passenger_fare`, `tolls`, `sales_tax`, `congestion_surcharge`, `airport_fee` и `tips`.

Вывести топ-15 самых дорогих поездок (`total_receipt`).

In [None]:
### your code here 

#### 3. Конвертация времени
Создать новую колонку (`trip_time_minutes`) - время поездки в минутах (из `trip_time`).

Вывести первые 5 строк только с ID зоной посадки, ID зоной высадки, `trip_time_minutes`.

In [None]:
### your code here 

---
#### 4. Самые крупные чаевые  
Сгруппировать данные по зоне высадки (`DOLocationID`) и определить:
* Средний размер чаевых (`tips`), задать название колонке `mean_tips`
* Максимальное расстояние поездки (`trip_miles`)

Отсортировать результат по средней сумме чаевых (`mean_tips`).

Вывести топ-10 зон с самыми высокими средними чаевыми (`mean_tips`) 

In [None]:
### your code here 

#### 5. Совместные поездки
Отфильтровать данные по полю `shared_request_flag`, оставить только те, где пассажир согласился на совместную поездку. 

Сгруппировать по зоне посадки (`PULocationID`) и посчитать кол-во запросов. 

In [None]:
### your code here 

#### 6. SQL
Зарегистрировать датафрейм как таблицу.

Написать SQL-запрос, который получает сумму всех выплат водителям (поле `driver_pay`), для каждой компании `hvfhs_license_num`.

Вывести компании и полученные суммы

(Сумма может получиться с E, например, 4.56E+08 = 456 000 000)

In [None]:
### your code here 

---
## Оконные функции

Позволяют выполнять расчеты по группе строк (окну) и не "сворачивают" данные как groupby. Каждая строка получает результат, зависящий от окна. 

Используются для:
* Ранжирования внутри группы
* Накопительной суммы
* Сравнение с предыдушей/следующей строкой
* Определение скользящего среднего 
* Определение доли от общей суммы в группе
* и др. где нужно определять какое-то значение в окне

#### 7. Оконные функции (Дополнительно)
Для каждой зоны посадки, найти 3 самые длительные поездки

In [None]:
### your code here 

#### 8. Самые загруженные часы
Извлечь час из колонки `request_datetime`. 

Определить количество поездок для каждого часа.

Определить самый загруженный час.

Дополнительно: построить график или гистограмму количество поездок по часам дня

In [None]:
### your code here 

#### 9. Время ожидания 
Определить среднее время ожидания такси

In [None]:
### your code here 