## Spark Initialize

In [None]:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import Window


# Configure Spark settings

ram = 16
cpu = 8*3
# Define the application name and setup session
appName = "Connect To ClickHouse via PySpark"
spark = (SparkSession.builder
         .appName(appName)
         .config("spark.executor.memory", f"{ram}g")
         .config("spark.driver.maxResultSize", f"{ram}g")
         .config("spark.driver.memory", f"{ram}g")
         .config("spark.executor.memoryOverhead", f"{ram}g")
         .getOrCreate()
         )


25/09/03 22:40:27 WARN Utils: Your hostname, cougar resolves to a loopback address: 127.0.1.1; using 192.168.1.150 instead (on interface wlp10s0)
25/09/03 22:40:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/03 22:40:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Заказы пеших курьеров

У компании по доставке еды есть БД в которой содержится таблица заказов пеших курьеров [couriers_orders.parquet](https://drive.google.com/file/d/1ZJlyPkU8W3qlSJI57qYg1e_PQIxCjwZN/view?usp=sharing)  .



In [2]:
couriers_orders = spark.read.parquet("data/couriers_orders.parquet")
couriers_orders.show(5)

+-------------------+----------+--------+--------+-----------+
|               date|courier_id|order_id|distance|travel_time|
+-------------------+----------+--------+--------+-----------+
|2021-07-12 00:00:00|        10|       1|     1.9|      36.17|
|2021-07-02 00:00:00|         3|       2|    3.98|      21.34|
|2021-04-15 00:00:00|         6|       3|    3.98|      43.33|
|2021-07-16 00:00:00|        10|       4|    2.85|      14.01|
|2021-06-11 00:00:00|        10|       5|    4.89|      32.09|
+-------------------+----------+--------+--------+-----------+
only showing top 5 rows



In [105]:
couriers_orders.printSchema()

root
 |-- date: timestamp_ntz (nullable = true)
 |-- courier_id: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- distance: double (nullable = true)
 |-- travel_time: double (nullable = true)



### EDA

In [111]:
# check null and nan values in each column spark df, nan doesn't work for timestamp dtype
def null_nan_counts(dfs: SparkDataFrame):
    exprs = []
    for c in dfs.columns:
        dtype = dict(dfs.dtypes)[c]
        if dtype in ['double', 'float']:
            expr = F.count(F.when(F.isnull(c) | F.isnan(c), c)).alias(c)
        else:
            expr = F.count(F.when(F.isnull(c), c)).alias(c)
        exprs.append(expr)
    return dfs.select(*exprs).show()
    

null_nan_counts(couriers_orders)

+----+----------+--------+--------+-----------+
|date|courier_id|order_id|distance|travel_time|
+----+----------+--------+--------+-----------+
|   0|         0|       0|       0|          0|
+----+----------+--------+--------+-----------+



In [34]:
couriers_orders.select(F.col("distance"), F.col("travel_time")).describe().show()

+-------+------------------+------------------+
|summary|          distance|       travel_time|
+-------+------------------+------------------+
|  count|              1666|              1666|
|   mean|2.7334933973589397| 34.81731692677075|
| stddev|1.2954651948730809|14.475623110887007|
|    min|               0.5|             10.01|
|    max|               5.0|             59.97|
+-------+------------------+------------------+



### Вопрос №1.1:

В конце каждого месяца компания выдает премию для своих курьеров, средняя скорость доставки за прошедший месяц которых больше средней скорости среди всех курьеров. Сколько курьеров получили премию за июнь 2021 года.


In [76]:
# travel time in min and distance in km, add speed column to couriers_orders km/h and filter only june orders
couriers_orders_june = couriers_orders.withColumn("order_speed", F.expr("distance / (travel_time / 60)")) \
    .filter((F.col('date') >= F.lit('2021-06-01')) & (F.col('date') <= F.lit('2021-06-30')))

couriers_orders_june.show(5)

+-------------------+----------+--------+--------+-----------+------------------+
|               date|courier_id|order_id|distance|travel_time|       order_speed|
+-------------------+----------+--------+--------+-----------+------------------+
|2021-06-11 00:00:00|        10|       5|    4.89|      32.09| 9.143035213462136|
|2021-06-14 00:00:00|         4|       9|    4.13|      29.34| 8.445807770961146|
|2021-06-27 00:00:00|         8|      10|    1.04|      12.56| 4.968152866242038|
|2021-06-27 00:00:00|         1|      19|    1.85|      13.56| 8.185840707964601|
|2021-06-28 00:00:00|         2|      25|    4.02|      12.43|19.404666130329844|
+-------------------+----------+--------+--------+-----------+------------------+
only showing top 5 rows



In [77]:
# add column with mean speed by courier_id

# compute mean speed per courier
mean_speed_courier = (
    couriers_orders_june.groupBy("courier_id")
    .agg(F.mean("order_speed").alias("mean_speed_by_courier"))
)

# compute overall mean (as DataFrame, not collected scalar)
overall_mean_df = mean_speed_courier.agg(
    F.mean("mean_speed_by_courier").alias("overall_mean_speed")
)

# Filter couriers whose mean speed is greater than overall mean
couriers_bonus = mean_speed_courier.filter(F.col("mean_speed_by_courier") > overall_mean_df.collect()[0]["overall_mean_speed"])
couriers_bonus.show(), couriers_bonus.count()

+----------+---------------------+
|courier_id|mean_speed_by_courier|
+----------+---------------------+
|         6|    6.791953132084712|
|         1|   6.7493254093092405|
|        10|     6.58047952209051|
|         8|    6.826371958342846|
|         2|    6.528423151187261|
|         4|    7.692341132350769|
+----------+---------------------+



(None, 6)

### Вопрос №1.2:
используйте данные из предыдущего вопроса №1.1

Компания хочет понять, насколько равномерно курьеры работают в течение месяца. Для этого нужно найти ID курьера с наибольшей разницей между максимальной и минимальной средней дневной скоростью в июне 2021 года.

In [92]:
daily_mean_speed_june = couriers_orders_june.groupBy("date", "courier_id").agg(F.mean("order_speed").alias("daily_mean_speed"))
daily_mean_speed_june.show(5)

+-------------------+----------+------------------+
|               date|courier_id|  daily_mean_speed|
+-------------------+----------+------------------+
|2021-06-21 00:00:00|         6| 8.125091054426372|
|2021-06-25 00:00:00|         9| 8.552623561272851|
|2021-06-25 00:00:00|         8| 5.324176574701887|
|2021-06-30 00:00:00|         8|1.7039153083515375|
|2021-06-03 00:00:00|         6| 1.403067638923812|
+-------------------+----------+------------------+
only showing top 5 rows



In [99]:
courier_speed_stats = daily_mean_speed_june.groupBy("courier_id")\
    .agg(F.max("daily_mean_speed").alias("max_daily_speed_by_courier"))\
    .join(
        daily_mean_speed_june.groupBy("courier_id")\
            .agg(F.min("daily_mean_speed").alias("min_daily_speed_by_courier")),
        "courier_id"
    )\
    .withColumn("speed_diff", F.col("max_daily_speed_by_courier") - F.col("min_daily_speed_by_courier"))


courier_speed_stats.show(5)

courier_speed_stats\
    .filter(F.col("speed_diff") == courier_speed_stats.select(F.max("speed_diff")).first()[0])\
    .select("courier_id", "speed_diff").show()


+----------+--------------------------+--------------------------+------------------+
|courier_id|max_daily_speed_by_courier|min_daily_speed_by_courier|        speed_diff|
+----------+--------------------------+--------------------------+------------------+
|         7|         11.33508936970837|        1.0535557506584723|10.281533619049899|
|         6|         25.49718574108818|         1.403067638923812| 24.09411810216437|
|         9|        13.315508021390373|        0.7547857793983591|12.560722241992014|
|         5|        17.651376146788987|         1.104111823559212|16.547264323229776|
|         1|        25.342333654773388|        1.6655313351498635|23.676802319623523|
+----------+--------------------------+--------------------------+------------------+
only showing top 5 rows

+----------+------------------+
|courier_id|        speed_diff|
+----------+------------------+
|         4|24.802606662802628|
+----------+------------------+



## Покупки клиентов

У нас есть данные о покупках клиентов [purchases.parquet](https://drive.google.com/file/d/1LWqMEDVX5M3iEGPv7zZh2NnlqSMvOcom/view?usp=sharing). Проанализируйте интервалы времени между последовательными покупками для каждого клиента в наборе данных о покупках - напишите код для вычисления разницы в днях между текущей покупкой и предыдущей покупкой каждого клиента. Отобразите результат в новом столбце days_between_purchases.

### Вопрос №2.1:

Какое количество NaN в столбце days_between_purchases?

In [100]:
purchases = spark.read.parquet("data/purchases.parquet")
purchases.show(5)

+-----------+-------------------+
|customer_id|      purchase_date|
+-----------+-------------------+
|          2|2021-01-01 00:00:00|
|          7|2021-01-01 00:00:00|
|          7|2021-01-01 00:00:00|
|         11|2021-01-01 00:00:00|
|         21|2021-01-01 00:00:00|
+-----------+-------------------+
only showing top 5 rows



In [114]:
purchases_days_bt = purchases.withColumn('days_between_purchases', F.datediff(
    F.col('purchase_date'), F.lag('purchase_date')\
        .over(Window.partitionBy('customer_id').orderBy('purchase_date'))))

purchases_days_bt.show(5)

+-----------+-------------------+----------------------+
|customer_id|      purchase_date|days_between_purchases|
+-----------+-------------------+----------------------+
|          1|2021-01-14 00:00:00|                  NULL|
|          1|2021-01-18 00:00:00|                     4|
|          1|2021-01-28 00:00:00|                    10|
|          1|2021-02-05 00:00:00|                     8|
|          1|2021-02-06 00:00:00|                     1|
+-----------+-------------------+----------------------+
only showing top 5 rows



In [126]:
# find count of rows where days_between_purchases is null
purchases_days_bt.filter(F.col('days_between_purchases').isNull()).count()

50

### Вопрос №2.2 :
(используйте данные из предыдущего вопроса №2.1)

У какого количества уникальных клиентов разница между текущей покупкой и предыдущей покупкой равна 20-ти дням?

In [131]:
purchases_days_bt.filter(F.col('days_between_purchases') == 20).groupBy('customer_id').count().count()

10