Ноутбук по проекту chicago_spark.  
Агрегация данных по гео-ключам (районы, округа) и временным промежуткам

## Импорты

In [1]:
import os
import sys
import warnings

In [2]:
import pandas as pd
from scipy import stats
from scipy.signal import welch
from sklearn.neighbors import NearestNeighbors
import numpy as np

In [3]:
from importlib import reload
import time
from tqdm import tqdm
from functools import reduce
from itertools import islice
from collections import defaultdict

In [4]:
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
from mpl_toolkits.mplot3d import Axes3D
from IPython.display import display, HTML

In [5]:
from enviserv.dictan import DictAnalyzer # анализ словарей
import pandserv as pds # форматирование небольших пандас ДФ

In [6]:
from sparkserv import SparkApp, Cols
# в SparkApp упакованы функции создания спарк приложения 
# с определением IP мастер-ноды и с подключением к кластеру

# Col - класс для формирования коротких псевдонимов имен столбцов
# при этом исходные имена полей не меняются

In [7]:
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, LongType
from pyspark.sql.window import Window
from pyspark.sql import Row

In [8]:
# гео библиотеки
import geopandas as gpd
from shapely import wkt
from shapely.geometry import Point

## Создание сессии, загрузка данных

In [9]:
spark_app = SparkApp(my_logger_create_level = 'INFO')

INFO:spark_app:spark_master_ip: 172.18.0.2
INFO:spark_app:pyspark version: 3.4.1
INFO:spark_app:starting building spark app object: pyspark-taxi-forecasting
INFO:spark_app:Spark app object built as: <pyspark.sql.session.SparkSession object at 0x7fd20d1f4ad0>
INFO:spark_app:Spark object can be accessed as the SparkApp_object.spark property


In [10]:
spark_master_ip = spark_app.get_spark_master_ip()
# print(spark_master_ip)

INFO:spark_app:spark_master_ip: 172.18.0.2


In [11]:
# spark = spark_app.build_spark_app(spark_master_ip=spark_master_ip)
# spark = spark_app.spark

Для корректного завершения спарк-сессии (например, для переключения между ноутбуками) следует останавливать сессию полностью. Для этого использую метод .stop_spark_app() класса SparkApp

In [12]:
# spark_app.stop_spark_app()

In [13]:
print(spark_app.spark)

<pyspark.sql.session.SparkSession object at 0x7fd20d1f4ad0>


In [14]:
spark_app.build_spark_app()

INFO:spark_app:pyspark version: 3.4.1
INFO:spark_app:starting building spark app object: pyspark-taxi-forecasting
INFO:spark_app:Spark app object built as: <pyspark.sql.session.SparkSession object at 0x7fd20d1f4ad0>
INFO:spark_app:Spark object can be accessed as the SparkApp_object.spark property


Получим стандартный объект `spark` из созданного выше объекта `spark_app`

In [15]:
spark = spark_app.spark

Проверка работы спарк-объекта на кластере. Если все в порядке, то тест должен выполниться достаточно быстро и отобразить тестовый ДФ.  
```txt
+------------+-----------+
|student_name|student_age|
+------------+-----------+
|       Alice|         10|
|         Bob|         20|
+------------+-----------+
```

Если исходные образы кластера собраны с ошибкой, возможно "зависание" работы теста.

In [16]:
spark_app.test_spark_functionality()

Spark session created successfully.
DataFrame created successfully.
Alias DataFrame created successfully.
DataFrame data matches expected result.
+------------+-----------+
|student_name|student_age|
+------------+-----------+
|       Alice|         10|
|         Bob|         20|
+------------+-----------+

DataFrame show output matches expected output.

*      ____              __    *
*     / __/__  ___ _____/ /__  *
*    _\ \/ _ \/ _ `/ __/  '_/  *
*   /__ / .__/\_,_/_/ /_/\_\   *
*      /_/                     * 
        


In [17]:
da = DictAnalyzer()

In [18]:
# Функция простой рандомизированной выборки
def random_sample_dataframe(dataframe, percentage):
    # Генерируем случайные числа от 0 до 1 и фильтруем строки
    df = dataframe.filter(f.rand() < percentage)
    
    return df

In [None]:
# %%time
# agg_hour.coalesce(1).write.csv("/work/data/taxis_agg_hour_growth.csv", header=True, mode="overwrite")

In [17]:
%%time
agg_hour = spark.read.load('/work/data/taxis_agg_hour_growth.csv', 
                       format='csv', header='true'
                        , inferSchema='true'
                       )

CPU times: user 18 ms, sys: 1.11 ms, total: 19.2 ms
Wall time: 34.2 s


In [18]:
agg_hour.count()

3079450

In [21]:
agg_hour.cache()

DataFrame[ct: bigint, ca: int, hour_start: timestamp, trips_p: int, time_p: int, miles_p: double, velocity_p: double, farem_p: double, tipsm_p: double, tollsm_p: double, extrasm_p: double, totalm_p: double, comp1_p: int, comp2_p: int, comp3_p: int, comp4_p: int, comp5_p: int, compless5_p: int, trips_d: int, time_d: int, miles_d: double, velocity_d: double, farem_d: double, tipsm_d: double, tollsm_d: double, extrasm_d: double, totalm_d: double, comp1_d: int, comp2_d: int, comp3_d: int, comp4_d: int, comp5_d: int, compless5_d: int, cumulative_balance: int, trips_p_growth_1_to_0: double, trips_p_growth_2_to_1: double, trips_p_growth_3_to_2: double, trips_p_growth_4_to_3: double, trips_d_growth_1_to_0: double, trips_d_growth_2_to_1: double, trips_d_growth_3_to_2: double, trips_d_growth_4_to_3: double, velocity_p_growth_1_to_0: double, velocity_p_growth_2_to_1: double, velocity_p_growth_3_to_2: double, velocity_p_growth_4_to_3: double, velocity_d_growth_1_to_0: double, velocity_d_growth_2_t

In [None]:
agg_null = agg_hour.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in agg_hour.columns]).toPandas()

In [None]:
agg_null = agg_null.transpose()

In [None]:
agg_null[agg_null.iloc[:, 0] > 0]

Загрузим данные по исключаемым (высокоррелированным) полям

In [19]:
excl_f = pd.read_csv('/work/data/excluded_fields.csv')

In [20]:
# Функция для преобразования DataFrame в словарь
def get_excuded_fields_to_dict(df):
    excluded_fields_tot = {}
    grouped = df.groupby(['ct', 'ca'])
    for (ct, ca), group in grouped:
        excluded_fields_tot[(ct, ca)] = group['excluded_field'].tolist()
    return excluded_fields_tot

In [21]:
excluded_fields_tot = get_excuded_fields_to_dict(excl_f)

In [22]:
da.print_dict(dict(islice(excluded_fields_tot.items(), 3)))

{
    (10000000000, 91): ['time_p', 'miles_p', 'farem_p', 'comp1_p', 'comp3_p', 'comp4_p', 'comp5_p', 'extrasm_p']
    (12000000001, 53): ['time_p', 'farem_p', 'trips_d', 'time_d', 'miles_d', 'farem_d', 'totalm_p', 'velocity_p_growth_3_to_2']
    (12000000002, 75): ['time_p', 'miles_p', 'farem_p', 'tipsm_p', 'comp5_p', 'trips_d', 'time_d', 'miles_d', 'farem_d', 'tipsm_d', 'comp4_p']
}


Будем считать, что от мультиколлинеарности в линейных моделях с помощью исключения этих полей получится избавиться.

In [89]:
# %%time
# agg_hour.coalesce(1).write.csv("/work/data/taxis_agg_hour_result.csv", header=True, mode="overwrite")

CPU times: user 48.1 ms, sys: 10.2 ms, total: 58.3 ms
Wall time: 3min 17s


In [23]:
%%time
data = spark.read.load('/work/data/taxis_agg_hour_result.csv', 
                       format='csv', header='true'
                        , inferSchema='true'
                       )

CPU times: user 16.1 ms, sys: 567 µs, total: 16.7 ms
Wall time: 48.2 s


In [27]:
print(pds.gvf(data.count()))
data.cache()

3'062'043


DataFrame[ct: bigint, ca: int, hour_start: timestamp, time_p: int, miles_p: double, velocity_p: double, farem_p: double, tipsm_p: double, tollsm_p: double, extrasm_p: double, totalm_p: double, comp1_p: int, comp2_p: int, comp3_p: int, comp4_p: int, comp5_p: int, compless5_p: int, trips_d: int, time_d: int, miles_d: double, velocity_d: double, farem_d: double, tipsm_d: double, tollsm_d: double, extrasm_d: double, totalm_d: double, comp1_d: int, comp2_d: int, comp3_d: int, comp4_d: int, comp5_d: int, compless5_d: int, cumulative_balance: int, trips_p_growth_1_to_0: double, trips_p_growth_2_to_1: double, trips_p_growth_3_to_2: double, trips_p_growth_4_to_3: double, trips_d_growth_1_to_0: double, trips_d_growth_2_to_1: double, trips_d_growth_3_to_2: double, trips_d_growth_4_to_3: double, velocity_p_growth_1_to_0: double, velocity_p_growth_2_to_1: double, velocity_p_growth_3_to_2: double, velocity_p_growth_4_to_3: double, velocity_d_growth_1_to_0: double, velocity_d_growth_2_to_1: double, v

### Отберем несколько полей и один район для проверки

In [28]:
f_to_sel = [
    'ct', 'ca', 'hour_start', 'trips_target', 'time_p', 'miles_p', 'velocity_p',
]

data_sample = data.select(*f_to_sel).filter(f.col('ct')==17031090200)
data_sample.show(3)
# data_sample.cache()

+-----------+---+-------------------+------------+------+-------+--------------------+
|         ct| ca|         hour_start|trips_target|time_p|miles_p|          velocity_p|
+-----------+---+-------------------+------------+------+-------+--------------------+
|17031090200|  6|2021-01-08 01:00:00|           1|     0|    0.0|                 0.0|
|17031090200|  6|2021-01-08 02:00:00|           2|   706|   3.24|0.004589235127478754|
|17031090200|  6|2021-01-08 03:00:00|           1|   990|    2.9|0.002929292929292929|
+-----------+---+-------------------+------------+------+-------+--------------------+
only showing top 3 rows



Разделю ДФ на обучающую и тестовую выборки: тест - 2024 год, трэйн - 2021-2023 гг. 

In [67]:
data = agg_hour

In [70]:
# Разделение данных на тренировочный и тестовый наборы
train_data = data.filter(f.year(f.col("hour_start")) < 2024)
test_data = data.filter(f.year(f.col("hour_start")) == 2024)

In [None]:
# fields_to_shift = [
#     'trips_p'
# ]

# for field in fields_to_shift:
#     agg_sample = a______e.withColumn(
#         f"{field}_shifted", f.lead(f.col(field), 1).over(window_ct_ca)
#     )