In [0]:
%pip install h3==3.7.7

In [0]:
%pip install shapely

In [0]:
%sql

use catalog trueanalytics_data;


In [0]:
import h3
import pyspark
import pyspark.sql.functions as F
from pyspark.databricks.sql import functions as H
from pyspark.sql.functions import *
import pyspark.sql.types as T
from pyspark.sql import Window
from functools import partial
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

In [0]:
def save_to_parquet(df, save_path):
    (df.write.format('parquet')
        .mode('overwrite')
        .option("header", "true")
        .save(save_path)
    )
    return print("successfully save parquet to ", save_path)

def save_to_csv(df, save_path):
    (df.coalesce(1)
        .write.format('csv')
        .mode('overwrite')
        .option("header", "true")
        .save(save_path)
    )
    return print("successfully save csv to ", save_path)

In [0]:
# parameter: par_day
dbutils.widgets.text("par_day", "20251128")
par_day = dbutils.widgets.get("par_day")

try:
  par_month = int(par_day)
except ValueError:
  par_month = 0
  raise ValueError("par_day value must be numeric")

if par_day!=0:
  pass
else:
  dbutils.notebook.exit("Aborting as ondition not met. Further tasks will be skipped")

# debug
display(par_day)

In [0]:
date_cst360 = 20251101
path_staging = 'gs://tdg-ds-tech-delivery/2025/Hatyai_Flooding/'
report_name = 'Flooding_V2_'+par_day+".parquet"

In [0]:
df_cdr_hour = spark.read.table('trueanalytics_data.trueanalytics_bus.fact_cdr_geo_agg_hour_v2')\
  .filter(F.col('par_day')==par_day)\
  .select('msisdn','cellsite_sk','user_estimated_lat','user_estimated_long','par_day')

In [0]:
# Songkhla, Yala, Narathiwat, Trang, Phatthalung, Satun, Pattani, and Nakhon Si Thammarat
df_cellsite = spark.read.table('trueanalytics_data.trueanalytics_base.acsappo_dim_cellsite_location').select('province_en','district_en','cellsite_sk')
df_cellsite = df_cellsite.filter(F.col('province_en').isin(['songkhla','yala','narathiwat','trang','phatthalung','satun','pattani','nakhon si thammarat']))

In [0]:
df_footfall = df_cdr_hour.join(F.broadcast(df_cellsite), on=["cellsite_sk"], how="inner")

In [0]:
def latlon_to_h3_dynamic(lat, lon, res):
    if lat is None or lon is None or res is None:
        return None
    return h3.geo_to_h3(lat, lon, int(res))

latlon_to_h3_dynamic_udf = F.udf(latlon_to_h3_dynamic, StringType())

df_footfall_H3 = df_footfall.withColumn(
    "h3_index_res9",  latlon_to_h3_dynamic_udf(F.col("user_estimated_lat"), F.col("user_estimated_long"), F.lit(9))
)

In [0]:
# display(df_footfall_H3.limit(5))

In [0]:
df_h3_loc = spark.read.table('trueanalytics_data.gdb_intel.h3_loc_mapping')\
    .select('h3_index_res9','province','district','centroid_lat','centroid_lng','border_points')\
    .filter(F.col('province').isin(['Songkhla','Yala','Narathiwat','Trang','Phatthalung','Satun','Pattani','Nakhon Si Thammarat']))

df_footfall_H3_filtered = df_footfall_H3.join(df_h3_loc, on='h3_index_res9', how='inner')

# debug
# check_4 = df_h3_loc.groupby(F.col('province')).agg(F.count('h3_index_res9'))
# display(check_4)

In [0]:
cust360 = spark.read.table('trueanalytics_data.customer360.customer360_snapshot').filter(F.col('par_day') == date_cst360).select('msisdn','demo_age_group_v1_age_grp_cat')
df_footfall_H3_cust360 = df_footfall_H3_filtered.join(cust360, on='msisdn', how='left')
df_footfall_H3_cust360.columns


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-7125284755293972>, line 1[0m
[0;32m----> 1[0m cust360 [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mtable([38;5;124m'[39m[38;5;124mtrueanalytics_data.customer360.customer360_snapshot[39m[38;5;124m'[39m)[38;5;241m.[39mfilter(F[38;5;241m.[39mcol([38;5;124m'[39m[38;5;124mpar_day[39m[38;5;124m'[39m) [38;5;241m==[39m date_cst360)[38;5;241m.[39mselect([38;5;124m'[39m[38;5;124mmsisdn[39m[38;5;124m'[39m,[38;5;124m'[39m[38;5;124mdemo_age_group_v1_age_grp_cat[39m[38;5;124m'[39m)
[1;32m      2[0m df_footfall_H3_cust360 [38;5;241m=[39m df_footfall_H3_filtered[38;5;241m.[39mjoin(cust360, on[38;5;241m=[39m[38;5;124m'[39m[38;5;124mmsisdn[39m[38;5;124m'[39m, how[38;5;241m=[39m[38;5;124m'[39m[38;5;124mleft[39m[38;5;124m'[39m)
[1;32m      3

In [0]:
df_format = df_footfall_H3_cust360
# Age_0_17
df_format = df_format.withColumn('Age_0_17', F.when(F.col('demo_age_group_v1_age_grp_cat') == '<=17', F.lit(1)).otherwise(F.lit(0)))
# Age_18_22
df_format = df_format.withColumn('Age_18_22', F.when(F.col('demo_age_group_v1_age_grp_cat') == '18-22', F.lit(1)).otherwise(F.lit(0)))
# Age_23_26
df_format = df_format.withColumn('Age_23_26', F.when(F.col('demo_age_group_v1_age_grp_cat') == '23-26', F.lit(1)).otherwise(F.lit(0)))
# Age_27_30
df_format = df_format.withColumn('Age_27_30', F.when(F.col('demo_age_group_v1_age_grp_cat') == '27-30', F.lit(1)).otherwise(F.lit(0)))
# Age_31_40
df_format = df_format.withColumn('Age_31_40', F.when(F.col('demo_age_group_v1_age_grp_cat') == '31-40', F.lit(1)).otherwise(F.lit(0)))
# Age_41_50
df_format = df_format.withColumn('Age_41_50', F.when(F.col('demo_age_group_v1_age_grp_cat') == '41-50', F.lit(1)).otherwise(F.lit(0)))
# Age_50_plus
df_format = df_format.withColumn('Age_50_plus', F.when(F.col('demo_age_group_v1_age_grp_cat') == '>50', F.lit(1)).otherwise(F.lit(0)))
# Age_Unidentified
df_format = df_format.withColumn('Age_Unidentified', F.when(F.col('demo_age_group_v1_age_grp_cat').isNull(), F.lit(1)).otherwise(F.lit(0)))

In [0]:
df_result = df_format.groupby(['par_day','h3_index_res9','centroid_lat','centroid_lng','border_points','province','district']).agg(
    F.countDistinct('msisdn').alias('N_unique'),
    F.countDistinct(when(F.col("Age_0_17") == 1, F.col("msisdn"))).alias('Age_0_17'),
    F.countDistinct(when(F.col("Age_18_22") == 1, F.col("msisdn"))).alias('Age_18_22'),
    F.countDistinct(when(F.col("Age_23_26") == 1, F.col("msisdn"))).alias('Age_23_26'),
    F.countDistinct(when(F.col("Age_27_30") == 1, F.col("msisdn"))).alias('Age_27_30'),
    F.countDistinct(when(F.col("Age_31_40") == 1, F.col("msisdn"))).alias('Age_31_40'),
    F.countDistinct(when(F.col("Age_41_50") == 1, F.col("msisdn"))).alias('Age_41_50'),
    F.countDistinct(when(F.col("Age_50_plus") == 1, F.col("msisdn"))).alias('Age_50_plus'),
    F.countDistinct(when(F.col("Age_Unidentified") == 1, F.col("msisdn"))).alias('Age_Unidentified')
)

In [0]:
df_report = df_result.select("par_day", "h3_index_res9", "centroid_lat","centroid_lng", "border_points", "province", "district", "N_unique", "Age_0_17", "Age_18_22", "Age_23_26", "Age_27_30", "Age_31_40", "Age_41_50", "Age_50_plus", "Age_Unidentified")
df_report = df_report.withColumnRenamed("par_day", "date")
df_report = df_report.withColumnRenamed("N_unique", "density")

In [0]:
save_to_parquet(df_report, path_staging+report_name)

# Unit Test

In [0]:
# df_test = df_report.withColumn("sum_Age", F.col("Age_0_17") + F.col("Age_18_22") + F.col("Age_23_26") + F.col("Age_27_30") + F.col("Age_31_40") + F.col("Age_41_50") + F.col("Age_50_plus") + F.col("Age_Unidentified"))

In [0]:
# check_age = df_test.filter(F.col("density") < F.col("sum_Age"))
# assert check_age.count() == 0

In [0]:
# check_density = df_test.filter((F.col('density')==0))
# assert check_density.count() == 0