# Median Week

In [1]:
import os
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql import functions as spark_functions
from pyspark.sql import types as spark_types
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf

In [2]:
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['CMU Serif Roman'] + plt.rcParams['font.serif']
plt.rcParams['font.size'] = 14

In [3]:
USER = os.getlogin()
WORKING_DIR = f'/home/{USER}/data/Land_use'
DATA_DIR = f'{WORKING_DIR}/data'
METROPOLES_SHAPE = f'{DATA_DIR}/cities'
IMG_DIR = f'{WORKING_DIR}/images'

CITY = 'Lyon'

TIME_RESOLUTION_IN_MINUTES = 30
TIME_RESOLUTION_IN_SECONDS = TIME_RESOLUTION_IN_MINUTES * 60

In [4]:
spark = SparkSession.builder\
    .master('spark://santiago:7077')\
    .appName('0-Traffic_Median_week')\
    .config('spark.network.timeout', 300)\
    .config('spark.dynamicAllocation.enabled', 'true')\
    .config('spark.shuffle.service.enabled', 'true')\
    .config('spark.dynamicAllocation.initialExecutors', 1)\
    .config('spark.dynamicAllocation.maxExecutors', 10)\
    .config('spark.dynamicAllocation.minExecutors', 0)\
    .config('spark.driver.maxResultSize', '120g')\
    .config('spark.executor.cores', 1)\
    .config('spark.executor.memory', '3g')\
    .config('spark.memory.fraction', 0.6)\
    .config('spark.cores.max', 10)\
    .config('spark.executor.memoryOverhead', '8g')\
    .config('spark.driver.memoryOverhead', '8g')\
    .getOrCreate()

spark.conf.set("spark.sql.session.timeZone", "Europe/Paris")

2023-08-14 13:21:42,095 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-08-14 13:21:42,380 WARN spark.SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


### Load Data

In [5]:
filename = f'hdfs://santiago:9000/HRDS/Traffic_maps/{CITY}_traffic_maps.parquet'
sdf_traffic = spark.read.parquet(filename)

sdf_traffic.show(2)

[Stage 1:>                                                          (0 + 1) / 1]

+----+-------------------+-----------+--------------------+--------------------+
|city|                app|       time|      traffic_map_up|      traffic_map_dn|
+----+-------------------+-----------+--------------------+--------------------+
|Lyon|Amazon Web Services|  1.55286E9|[[0.0, 0.0, 0.0, ...|[[0.0, 0.0, 0.0, ...|
|Lyon|Amazon Web Services|1.5529509E9|[[0.0, 0.0, 0.0, ...|[[0.0, 0.0, 0.0, ...|
+----+-------------------+-----------+--------------------+--------------------+
only showing top 2 rows



                                                                                

In [6]:
### ⚠️⚠️ Small test, comment this for large dataset ⚠️⚠️
some_rows = sdf_traffic.take(50)
data = []
for row in some_rows:
    data.append(row.asDict())

df = pd.DataFrame(data)
df.iteritems = df.items # workaround for pandas remove of iteritems
sdf_traffic = spark.createDataFrame(df)
sdf_traffic.show(2)
### ⚠️⚠️ Small test, comment this for large dataset ⚠️⚠️

2023-08-14 13:21:54,743 WARN scheduler.TaskSetManager: Stage 3 contains a task of very large size (53824 KiB). The maximum recommended task size is 1000 KiB.
[Stage 3:>                                                          (0 + 1) / 1]

+----+-------------------+-----------+--------------------+--------------------+
|city|                app|       time|      traffic_map_up|      traffic_map_dn|
+----+-------------------+-----------+--------------------+--------------------+
|Lyon|Amazon Web Services|  1.55286E9|[[0.0, 0.0, 0.0, ...|[[0.0, 0.0, 0.0, ...|
|Lyon|Amazon Web Services|1.5529509E9|[[0.0, 0.0, 0.0, ...|[[0.0, 0.0, 0.0, ...|
+----+-------------------+-----------+--------------------+--------------------+
only showing top 2 rows



                                                                                

## Agg by time (30 mins) and also direction

In [7]:
sdf_traffic = sdf_traffic.withColumn('time', spark_functions.floor(sdf_traffic['time']/TIME_RESOLUTION_IN_SECONDS)*TIME_RESOLUTION_IN_SECONDS)
sdf_traffic = sdf_traffic.withColumn('datetime', sdf_traffic['time'].cast(spark_types.TimestampType()))
sdf_traffic = sdf_traffic.drop('time')

sdf_traffic.show(5)

2023-08-14 13:23:55,847 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (53824 KiB). The maximum recommended task size is 1000 KiB.
[Stage 4:>                                                          (0 + 1) / 1]

+----+-------------------+--------------------+--------------------+-------------------+
|city|                app|      traffic_map_up|      traffic_map_dn|           datetime|
+----+-------------------+--------------------+--------------------+-------------------+
|Lyon|Amazon Web Services|[[0.0, 0.0, 0.0, ...|[[0.0, 0.0, 0.0, ...|2019-03-17 23:00:00|
|Lyon|Amazon Web Services|[[0.0, 0.0, 0.0, ...|[[0.0, 0.0, 0.0, ...|2019-03-19 00:00:00|
|Lyon|Amazon Web Services|[[0.0, 0.0, 0.0, ...|[[0.0, 0.0, 0.0, ...|2019-03-22 23:30:00|
|Lyon|Amazon Web Services|[[0.0, 0.0, 0.0, ...|[[0.0, 0.0, 0.0, ...|2019-03-26 20:00:00|
|Lyon|Amazon Web Services|[[0.0, 0.0, 0.0, ...|[[0.0, 0.0, 0.0, ...|2019-03-28 14:30:00|
+----+-------------------+--------------------+--------------------+-------------------+
only showing top 5 rows



                                                                                

In [8]:
schema_traffic_map = spark_types.ArrayType(spark_types.ArrayType(spark_types.FloatType()))

@pandas_udf(schema_traffic_map)
def compute_traffic_map_udf(traffic_map_up: pd.Series, traffic_map_dn: pd.Series) -> schema_traffic_map:

    df_traffic = pd.DataFrame({'traffic_map_up': traffic_map_up,
                               'traffic_map_dn': traffic_map_dn})
    
    df_traffic['total_traffic'] = df_traffic.apply(lambda row: np.array(row['traffic_map_up']) + np.array(row['traffic_map_dn']), axis=1)

    df_traffic = df_traffic.drop(['traffic_map_up', 'traffic_map_dn'], axis=1)
    df_traffic_sum = df_traffic.sum(axis=0)
    total_traffic_map = df_traffic_sum['total_traffic']
    
    return total_traffic_map.tolist()

In [9]:
sdf_traffic_agg = sdf_traffic.groupBy('city', 'app','datetime').agg(compute_traffic_map_udf('traffic_map_up', 'traffic_map_dn').alias('traffic_map'))
sdf_traffic_agg.show(5)

2023-08-14 13:27:28,580 WARN scheduler.TaskSetManager: Stage 5 contains a task of very large size (53824 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+----+-------------------+-------------------+--------------------+
|city|                app|           datetime|         traffic_map|
+----+-------------------+-------------------+--------------------+
|Lyon|Amazon Web Services|2019-05-10 15:00:00|[[0.0, 0.0, 0.0, ...|
|Lyon|Amazon Web Services|2019-05-21 15:30:00|[[0.0, 0.0, 0.0, ...|
|Lyon|Amazon Web Services|2019-05-19 03:30:00|[[0.0, 0.0, 0.0, ...|
|Lyon|    Apple App Store|2019-04-15 05:30:00|[[0.0, 0.0, 0.0, ...|
|Lyon|Amazon Web Services|2019-04-27 23:30:00|[[0.0, 0.0, 0.0, ...|
+----+-------------------+-------------------+--------------------+
only showing top 5 rows



## Median Week

### Adding day of week & time

In [10]:
sdf_traffic_agg = sdf_traffic_agg.withColumn('time', spark_functions.date_format('datetime', 'HH:mm:ss'))
sdf_traffic_agg = sdf_traffic_agg.withColumn('day_of_week', spark_functions.date_format('datetime', 'EEEE'))

sdf_traffic_agg.show(5)

2023-08-14 13:30:21,880 WARN scheduler.TaskSetManager: Stage 13 contains a task of very large size (53824 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+----+-------------------+-------------------+--------------------+--------+-----------+
|city|                app|           datetime|         traffic_map|    time|day_of_week|
+----+-------------------+-------------------+--------------------+--------+-----------+
|Lyon|Amazon Web Services|2019-05-10 15:00:00|[[0.0, 0.0, 0.0, ...|15:00:00|     Friday|
|Lyon|Amazon Web Services|2019-05-21 15:30:00|[[0.0, 0.0, 0.0, ...|15:30:00|    Tuesday|
|Lyon|Amazon Web Services|2019-05-19 03:30:00|[[0.0, 0.0, 0.0, ...|03:30:00|     Sunday|
|Lyon|    Apple App Store|2019-04-15 05:30:00|[[0.0, 0.0, 0.0, ...|05:30:00|     Monday|
|Lyon|Amazon Web Services|2019-04-27 23:30:00|[[0.0, 0.0, 0.0, ...|23:30:00|   Saturday|
+----+-------------------+-------------------+--------------------+--------+-----------+
only showing top 5 rows



### UDF for median week

In [11]:
schema_traffic_map = spark_types.ArrayType(spark_types.ArrayType(spark_types.FloatType()))

@pandas_udf(schema_traffic_map)
def compute_median_traffic_map_udf(agg_traffic_map: pd.Series) -> schema_traffic_map :

    df_traffic = pd.DataFrame({'traffic_map': agg_traffic_map})
    
    df_traffic['traffic_map'] = df_traffic.apply(lambda row: np.array(list(row['traffic_map'])), axis=1)

    grouped_traffic_map = np.array(list(df_traffic['traffic_map']))
    median_week_traffic_map = np.median(grouped_traffic_map, axis=0)
    
    return  median_week_traffic_map.tolist()

In [12]:
sdf_traffic_agg_median_week = sdf_traffic_agg.groupBy('city', 'app', 'time', 'day_of_week').agg(compute_median_traffic_map_udf('traffic_map').alias('traffic_map'))
sdf_traffic_agg_median_week.show(2)

2023-08-14 13:30:31,892 WARN scheduler.TaskSetManager: Stage 21 contains a task of very large size (53824 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+----+-------------------+--------+-----------+--------------------+
|city|                app|    time|day_of_week|         traffic_map|
+----+-------------------+--------+-----------+--------------------+
|Lyon|Amazon Web Services|13:00:00|   Thursday|[[0.0, 0.0, 0.0, ...|
|Lyon|Amazon Web Services|19:00:00|    Tuesday|[[0.0, 0.0, 0.0, ...|
+----+-------------------+--------+-----------+--------------------+
only showing top 2 rows



### Save file

In [None]:
sdf_traffic_agg_median_week.write.parquet(f'hdfs://santiago:9000/land_use/{CITY}/{CITY}_traffic_maps_median_week.parquet', mode='overwrite')