# Transformation metrics
This script is responsible create all metrics necessary and upload them to PostgreSQL

In [1]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [446]:
from pyspark.sql import SparkSession, Window
from pyspark.conf import SparkConf
from pyspark.sql.functions import explode, from_unixtime, col, to_date, sum, avg, udf, timestamp_diff
from pyspark.sql.types import DateType, TimestampType, StructType, DoubleType, StructField, StringType
from prophet import Prophet

from glob import glob
import requests
import json
from collections import defaultdict
import locale
import os

DB_URL = "jdbc:postgresql://postgres:5432/themeparkwizard"
PROPERTIES_CUSTOM = {"user": os.environ['POSTGRES_USER'],"password": os.environ['POSTGRES_PASSWORD'], "driver": "org.postgresql.Driver"}

spark = SparkSession.builder \
    .appName("MetricBuilder") \
    .config("spark.jars", "jars/postgresql-42.7.7.jar") \
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
    .getOrCreate()

ImportError: cannot import name 'timestamp_diff' from 'pyspark.sql.functions' (/usr/local/spark/python/pyspark/sql/functions.py)

In [456]:
@udf(returnType=TimestampType())
def min_hour(data):
    if isinstance(data, list):
        for ee in data:
            if ee.type == 'Early Entry':
                return ee.startTime

@udf(returnType=TimestampType())
def max_hour(data):
    if isinstance(data, list):
        for op in data:
            if op.type == 'Operating':
                return op.endTime

def save_into_postgres(df, table, mode):
    df.write.jdbc(url=DB_URL, table=table, mode=mode, properties=PROPERTIES_CUSTOM)

def agg_avg_time_compute():
    print('Computing agg_avg_time...')
    result_avg = spark.sql("""
    SELECT 
        extracted_date,
        id as entity_id, 
        AVG(queue.STANDBY.waitTime) AS avg_standby_waittime
    FROM datalake_table
    WHERE entity_type = 'ATTRACTION' AND queue.STANDBY.waitTime is not null
    GROUP BY 1, 2
    ORDER BY 1
    """)
    result_avg.printSchema()
    save_into_postgres(result_avg, "themeparkwizard.agg_avg_time", 'append')
    # result_avg.write.jdbc(url=DB_URL, table="themeparkwizard.agg_avg_time", mode='append', properties=PROPERTIES_CUSTOM)

# def operating_ratio_compute(): 
#     print('Computing operating ratio by day...')
#     result_ratio = spark.sql("""
#     WITH lag_status AS (
#         SELECT
#             extracted_date,
#             extracted_at_time,
#             start_time,
#             end_time,
#             id as entity_id,
#             status,
#             lag(status, 1) OVER (PARTITION BY id, start_time, end_time ORDER BY extracted_at_time) as prev_status
#         FROM datalake_table
#         WHERE entity_type = 'ATTRACTION'
#         ORDER BY 5, 2
#     ),
#     time_diff_operational AS (
#         SELECT
#             extracted_date,
#             extracted_at_time,
#             start_time,
#             end_time,
#             entity_id,
#             status,
#             prev_status,
#             lead(extracted_at_time, 1) OVER (PARTITION BY entity_id ORDER BY extracted_at_time) as next_time,
#             CASE WHEN status <> 'OPERATING' OR status IS NULL THEN
#                 CASE WHEN next_time IS NOT NULL THEN next_time - extracted_at_time
#                 ELSE end_time - extracted_at_time
#                 END
#             ELSE INTERVAL '0 DAY'
#             END AS diff_op
#         FROM lag_status
#         WHERE (status = 'OPERATING' and prev_status <> 'OPERATING') or (status <> 'OPERATING' and prev_status = 'OPERATING')
#         order by entity_id, extracted_at_time
#     ),
#     dates AS (
#         SELECT 
#             id as entity_id,
#             explode(sequence(to_date(min(extracted_date)), to_date(max(extracted_date)))) as extracted_date
#         FROM datalake_table
#         GROUP BY 1
#     ),
#     pre_compute_amount AS (
#         SELECT
#             d.extracted_date,
#             MIN(COALESCE(extracted_at_time, CAST(d.extracted_date as timestamp))) as extracted_ts,
#             d.extracted_date + 1 as next_date,
#             d.entity_id,
#             status,
#             prev_status,
#             next_time,
#             CASE
#             WHEN status = 'OPERATING' THEN
#                 CASE 
#                     WHEN prev_status <> 'OPERATING' AND to_date(next_time) = d.extracted_date THEN 
#                         SUM(diff_op)
#                     ELSE extracted_ts - CAST(d.extracted_date as timestamp)
#                 END
#             ELSE 
#                 CASE 
#                     WHEN prev_status = 'OPERATING' AND to_date(next_time) = d.extracted_date THEN 
#                         SUM(diff_op)
#                     ELSE CAST(next_date as timestamp) - extracted_ts
#                 END
#             END as closed_by_day,
#             CASE
#             WHEN SUM(diff_op) - closed_by_day >= INTERVAL '0 SECOND' THEN
#                 SUM(diff_op)
#             ELSE INTERVAL '0 SECOND'
#             END as sum_time_not_operating
#         FROM time_diff_operational tdo
#         RIGHT JOIN dates d
#         ON tdo.extracted_date = d.extracted_date AND d.entity_id = tdo.entity_id
#         GROUP BY 1,3, 4, 5,6,7
#         ORDER BY 1,2,3,4
#     ),
#     second_pre_compute AS (
#         SELECT 
#             entity_id,
#             extracted_date,
#             next_date,
#             closed_by_day,
#             status,
#             sum_time_not_operating,
#             SUM(sum_time_not_operating - closed_by_day) over (partition by entity_id order by extracted_date RANGE BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING) as cumsum,
#             CASE
#                 WHEN cumsum = INTERVAL '0 DAY' THEN
#                     INTERVAL '0 DAY'
#                 ELSE closed_by_day
#             END AS new_closed_by_day
#         FROM pre_compute_amount
#         GROUP BY 1,2,3,4,5,6
#         ORDER BY entity_id
        
#     )
#     SELECT 
#         entity_id,
#         extracted_date,
#         new_closed_by_day,
#         status,
#         sum_time_not_operating,
#         cumsum,
#         CASE
#         WHEN cumsum < INTERVAL '1 DAY' and cumsum >= INTERVAL '0 DAY' THEN
#             CASE 
#             WHEN status = 'OPERATING' THEN
#                 new_closed_by_day
#             ELSE new_closed_by_day - cumsum
#             END
#         ELSE closed_by_day
#         END AS final_sum
#     FROM second_pre_compute
#     ORDER BY entity_id
#     """)
#     result_ratio.printSchema()
#     result_ratio.show(70, truncate=False)

#     spark.sql("""
#     WITH lag_status AS (
#         SELECT
#             extracted_date,
#             extracted_at_time,
#             start_time,
#             end_time,
#             id as entity_id,
#             status,
#             lag(status, 1) OVER (PARTITION BY id, start_time, end_time ORDER BY extracted_at_time) as prev_status
#         FROM datalake_table
#         WHERE entity_type = 'ATTRACTION'
#         ORDER BY 5, 2
#     )
#         SELECT
#             extracted_date,
#             extracted_at_time,
#             start_time,
#             end_time,
#             entity_id,
#             status,
#             prev_status,
#             lead(extracted_at_time, 1) OVER (PARTITION BY entity_id ORDER BY extracted_at_time) as next_time,
#             CASE 
#             WHEN status <> 'OPERATING' THEN
#                 CASE WHEN next_time IS NOT NULL THEN next_time - extracted_at_time
#                 ELSE end_time - extracted_at_time
#                 END
#             ELSE INTERVAL '0 DAY'
#             END AS diff_op
#         FROM lag_status
#         WHERE (status = 'OPERATING' and prev_status <> 'OPERATING') or (status <> 'OPERATING' and prev_status = 'OPERATING')
#         order by entity_id, extracted_at_time""").show(40, truncate=False)
# ------------------------------------------------------------------------------------------------------------------------



# SUM(diff_op) OVER (PARTITION BY d.extracted_date ORDER BY extracted_at_time) as sum_time_not_operating

    #--unix_timestamp(next_time) - unix_timestamp(extracted_at_time)
    #--unix_timestamp(end_time) - unix_timestamp(extracted_at_time)
#     WITH lag_status AS (
# unix_timestamp(extracted_at_time) - lag(unix_timestamp(extracted_at_time), 1) OVER (PARTITION BY entity_id, start_time, end_time ORDER BY extracted_at_time) as diff_op
                #     WHEN end_time = next_end_time THEN
                #     unix_timestamp(next_time) - unix_timestamp(extracted_at_time)
                # WHEN end_time <> next_end_time THEN
                #     CASE 
                #     WHEN status = 'OPERATING' THEN
                #         unix_timestamp(end_time) - unix_timestamp(extracted_at_time)
                #     ELSE
                #         unix_timestamp(next_time) - unix_timestamp(extracted_at_time)
# ),

        # --time_diff_operational AS (
#     time_diff_operational AS (
#         SELECT
#             extracted_date,
#             extracted_at_time,
#             start_time,
#             end_time,
#             entity_id,
#             unix_timestamp(extracted_at_time) - lag(unix_timestamp(extracted_at_time), 1) OVER (PARTITION BY entity_id, start_time, end_time ORDER BY extracted_at_time) as diff_op
#         FROM lag_status
#         WHERE status = 'OPERATING'
#     )
#     SELECT
#         to_date(start_time) as extracted_date,
#         entity_id,
#         COALESCE(SUM(diff_op)/60, 0) AS sum_time_not_operating
#     FROM time_diff_operational
#     GROUP BY 1,2
#     ORDER BY 2,1

    
    # result_ratio.printSchema()
    # save_into_postgres(result_ratio, "themeparkwizard.amount_not_operating_by_day", 'append')

In [457]:
# df_dl = spark.read.orc('datalake_layer/epcot').cache()

In [458]:
# operating_ratio_compute()

In [459]:
schema = StructType([
    StructField("extracted_date", DateType(), False),
    StructField("entity_id", StringType(), False),
    StructField("avg_standby_waittime", DoubleType(), False),
])
save_into_postgres(spark.createDataFrame([], schema), "themeparkwizard.agg_avg_time", 'overwrite')

schema = StructType([
    StructField("extracted_date", DateType(), False),
    StructField("entity_id", StringType(), False),
    StructField("sum_time_not_operating", DoubleType(), False),
])
save_into_postgres(spark.createDataFrame([], schema), "themeparkwizard.amount_not_operating_by_day", 'overwrite')

In [443]:
# Load dim_park_entity
df_parks = spark.read.json('general_schemas_tables/park_by_entity_meta_new.json')
df_parks.write.jdbc(url=DB_URL, table=f"themeparkwizard.dim_park_entity", mode='overwrite', properties=PROPERTIES_CUSTOM)

In [460]:

for path in glob('datalake_layer/*'):
    print(f'Transforming {path} ...')
    df_dl = spark.read.orc(path).cache()
    # df_dl_working_hour = df_dl.withColumn('start_time', min_hour(col('operatingHours')))\
    #                 .withColumn('end_time', max_hour(col('operatingHours')))\
    #                 .filter(col('extracted_at_time').between(col('start_time'), col('end_time')))
    # df_dl_working_hour.createOrReplaceTempView('datalake_table_working_hour')
    df_dl.createOrReplaceTempView('datalake_table')
    agg_avg_time_compute()
    #operating_ratio_compute()
# df_dl.printSchema()

Transforming datalake_layer/animal_kingdom ...
Computing agg_avg_time...
root
 |-- extracted_date: date (nullable = true)
 |-- entity_id: string (nullable = true)
 |-- avg_standby_waittime: double (nullable = true)

Transforming datalake_layer/epcot ...
Computing agg_avg_time...
root
 |-- extracted_date: date (nullable = true)
 |-- entity_id: string (nullable = true)
 |-- avg_standby_waittime: double (nullable = true)

Transforming datalake_layer/hollywood_studios ...
Computing agg_avg_time...
root
 |-- extracted_date: date (nullable = true)
 |-- entity_id: string (nullable = true)
 |-- avg_standby_waittime: double (nullable = true)

Transforming datalake_layer/magic_kingdom ...
Computing agg_avg_time...
root
 |-- extracted_date: date (nullable = true)
 |-- entity_id: string (nullable = true)
 |-- avg_standby_waittime: double (nullable = true)

Transforming datalake_layer/universal_epic_universe ...
Computing agg_avg_time...
root
 |-- extracted_date: date (nullable = true)
 |-- entity_

In [454]:
# 5a43d1a7-ad53-4d25-abfe-25625f0da304
df_magic = spark.read.orc('datalake_layer/magic_kingdom').where("id = '5a43d1a7-ad53-4d25-abfe-25625f0da304'").select('extracted_at_time', 'queue.STANDBY.waitTime')\
                        .orderBy('extracted_at_time', ascending=False).show(40)

+-------------------+--------+
|  extracted_at_time|waitTime|
+-------------------+--------+
|2025-08-17 02:04:42|      60|
|2025-08-17 01:59:42|      60|
|2025-08-17 01:54:42|      60|
|2025-08-17 01:49:42|      70|
|2025-08-17 01:44:42|      70|
|2025-08-17 01:39:42|      80|
|2025-08-17 01:34:42|      80|
|2025-08-17 01:29:42|      80|
|2025-08-17 01:24:41|      80|
|2025-08-17 01:19:42|      80|
|2025-08-17 01:14:42|      80|
|2025-08-17 01:09:41|      80|
|2025-08-17 01:04:41|      80|
|2025-08-17 00:59:41|      80|
|2025-08-17 00:54:41|      80|
|2025-08-17 00:49:41|      80|
|2025-08-17 00:44:41|      80|
|2025-08-17 00:39:41|      80|
|2025-08-17 00:34:41|      80|
|2025-08-17 00:29:41|      90|
|2025-08-17 00:24:41|      90|
|2025-08-17 00:19:41|      90|
|2025-08-17 00:14:41|      90|
|2025-08-17 00:09:40|      85|
|2025-08-17 00:04:40|      85|
|2025-08-16 23:59:40|      85|
|2025-08-16 23:54:40|      85|
|2025-08-16 23:49:40|      85|
|2025-08-16 23:44:40|      85|
|2025-08

In [4]:
df_dl.createOrReplaceTempView('datalake_table')
test_df = spark.sql("""
SELECT 
    extracted_date,
    name AS attraction_name, 
    AVG(queue.STANDBY.waitTime) AS avg_standby_waittime
FROM datalake_table
WHERE entity_type = 'ATTRACTION' AND queue.STANDBY.waitTime is not null
GROUP BY 1, 2
ORDER BY 1
""")
test_df.printSchema()
test_df.write.jdbc(url=DB_URL, table="themeparkwizard.agg_avg_time_epcot", mode='overwrite', properties=PROPERTIES_CUSTOM)

root
 |-- extracted_date: date (nullable = true)
 |-- attraction_name: string (nullable = true)
 |-- avg_standby_waittime: double (nullable = true)



In [9]:
df_dl_to_model = df_dl.where("name == 'Guardians of the Galaxy: Cosmic Rewind' AND queue.STANDBY.waitTime is not null")\
                        .select('extracted_at_time', 'queue.STANDBY.waitTime')\
                        .orderBy('extracted_at_time')\
                        .withColumnRenamed('extracted_at_time', 'ds')\
                        .withColumnRenamed('queue.STANDBY.waitTime', 'y')
df_dl_to_model.show()

+-------------------+--------+
|                 ds|waitTime|
+-------------------+--------+
|2025-07-25 12:35:04|      30|
|2025-07-25 12:40:04|      30|
|2025-07-25 12:45:06|      30|
|2025-07-25 12:50:05|      45|
|2025-07-25 12:55:08|      45|
|2025-07-25 13:00:05|      45|
|2025-07-25 13:05:05|      60|
|2025-07-25 13:10:05|      70|
|2025-07-25 13:15:05|      80|
|2025-07-25 13:20:04|      80|
|2025-07-25 13:25:05|      80|
|2025-07-25 13:30:04|      80|
|2025-07-25 13:35:05|      80|
|2025-07-25 13:40:05|      80|
|2025-07-25 13:45:04|      70|
|2025-07-25 13:50:04|      70|
|2025-07-25 13:55:04|      70|
|2025-07-25 14:00:06|      70|
|2025-07-25 14:05:04|      70|
|2025-07-25 14:10:05|      70|
+-------------------+--------+
only showing top 20 rows



In [12]:
model = Prophet()
model.fit(df_dl_to_model)

PySparkValueError: [CANNOT_CONVERT_COLUMN_INTO_BOOL] Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

In [None]:
model.predict()

In [44]:
INTERVAL_OF_MINUTES = 60 * 15

In [48]:
rest_df = spark.sql(f"""
WITH wait_by_party AS (
    SELECT
        extracted_date,
        date_format(cast(floor(try_divide(extracted_at, {INTERVAL_OF_MINUTES}))*{INTERVAL_OF_MINUTES} as timestamp), 'HH:mm') as time_of_the_day, --extracted_date,
        name AS attraction_name,
        CASE 
        WHEN da_exp.partySize <= 2 THEN
            'Small group (<= 2)'
        WHEN da_exp.partySize > 2 AND da_exp.partySize <= 4 THEN
            'Medium group (3 and 4)'
        WHEN da_exp.partySize > 4 AND da_exp.partySize <= 6 THEN
            'Medium group (5 ant 6)'
        WHEN da_exp.partySize > 6 THEN
            'Big group (> 6)'
        END as party_size,
        COALESCE(AVG(da_exp.waitTime), 0) as avg_wait_time,
        STDDEV(da_exp.waitTime) AS stddev_wait_time
    FROM datalake_table
    LATERAL VIEW EXPLODE(diningAvailability) as da_exp
    WHERE entity_type = 'SHOW' --AND name = 'Garden Grill Restaurant'
    GROUP BY 1,2,3,4
)
SELECT 
    *
FROM wait_by_party
ORDER BY 1, 2, 3, 4
""").show(50, truncate=False)

+--------------+---------------+---------------+----------+-------------+----------------+
|extracted_date|time_of_the_day|attraction_name|party_size|avg_wait_time|stddev_wait_time|
+--------------+---------------+---------------+----------+-------------+----------------+
+--------------+---------------+---------------+----------+-------------+----------------+



In [46]:
rest_df.printSchema()
rest_df.write.jdbc(url=DB_URL, table="themeparkwizard.restaurant_wait_time_epcot", mode='overwrite', properties=PROPERTIES_CUSTOM)

root
 |-- extracted_date: date (nullable = true)
 |-- time_of_the_day: string (nullable = true)
 |-- attraction_name: string (nullable = true)
 |-- party_size: string (nullable = true)
 |-- avg_wait_time: double (nullable = false)
 |-- stddev_wait_time: double (nullable = true)



In [None]:
# function = 4**(10/r)*q*d/AVG(q)

In [444]:
4**(10/4.6)*20*0.8/

16.28981135980903

In [445]:
4**(10/4.6)*20*1/

20.362264199761288

In [None]:
# GET TABLE TO EXPLORE GEN_ALG
# with number_row as (
#     select
#         entity_id,
#         name,
#         entity_name,
#         latitude,
#         longitude,
#         wait_time,
#         extracted_at_time,
#         rating,
#         row_number() over (partition by entity_id order by extracted_at_time) as rn
#     from themeparkwizard.predictions_table pt
#     left join themeparkwizard.dim_park_entity dpe using(entity_id)
#     where was_predicted = 1
# ),
# avg_by_entity AS (
#     SELECT
#         entity_id,
#         AVG(avg_standby_waittime) as alltime_avg_waittime
#     FROM themeparkwizard.agg_avg_time
#     GROUP BY 1
# ),
# first_group as (
#     select extracted_at_time,
#            wait_time,
#            entity_name,
#            latitude,
#            longitude,
#            name,
#            rating,
#            alltime_avg_waittime
#     from number_row
#              left join avg_by_entity
#                        using (entity_id)
#     where rn <> 1
#     order by extracted_at_time, entity_name
# )
# select
#     extracted_at_time,
#     name,
#     a.entity_name as src_node,
#     b.entity_name as dst_node,
#     SQRT(POWER((b.latitude - a.latitude)*111, 2) + POWER((b.longitude - a.longitude)*111, 2)) AS euclidean_distance,
#     b.wait_time,
#     b.alltime_avg_waittime,
#     b.rating
# from first_group a
# full join first_group b
# using(extracted_at_time, name)
# where a.entity_name <> b.entity_name