In [124]:
%cleanup -f

In [1]:
%%configure -f
{
    "numExecutors": 12,
    "driverMemory": "20g"
}

In [69]:
import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import OneHotEncoder, Normalizer, MinMaxScaler
from pyspark.ml.linalg import SparseVector, DenseVector, Vectors, VectorUDT
import numpy as np


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1673843097496_0003,pyspark,idle,Link,Link,,✔


In [11]:
spark.conf.set('spark.default.parallelism', 1200)
spark.conf.set('spark.sql.shuffle.partitions', 1200)
spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '-1')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
@f.udf(returnType=VectorUDT())
def sparse_vector_sum(vectors):
    res = None
    for vec in vectors:
        if res is None:
            res = vec
        else:
            res = np.add(vec, res)
    return SparseVector(len(res), {k: v for k, v in enumerate(res) if v != 0})

@f.udf(returnType=ArrayType(FloatType()))
def to_dense(sparse_vector):
    v = DenseVector(sparse_vector)
    feature = [float(x) for x in v]
    feature.append(0.0)  # adding util placeholder
    return feature

@f.udf(returnType=ArrayType(FloatType()))
def to_dense_flat(sparse_vector):
    v = DenseVector(sparse_vector)
    feature = [float(x) for x in v]
    return feature

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
valid_flat_nsw = spark.read.parquet('s3://kl-prod-tpgt-knowledge-lake-sandpit/TempDir/tmp/case_study/valid_flat_nsw/')
valid_flat_nsw = valid_flat_nsw.filter(f.col('utilisation').isNotNull())
valid_flat_nsw.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----+--------------------+--------------------+--------------------+---------------+------------------+--------+-------------------------+
|   Subs_Id|hour|         sector_name|  event_code_feature|           cell_name|          state|       utilisation|trace_id|customer_experience_index|
+----------+----+--------------------+--------------------+--------------------+---------------+------------------+--------+-------------------------+
|2100006261|  01|202583_RandwickCe...|     (30,[13],[1.0])|202583_RandwickCe...|New South Wales|10.772307692307692|       0|                     97.0|
|2100006261|  04|202583_RandwickCe...|      (30,[8],[1.0])|202583_RandwickCe...|New South Wales|12.544615384615383|       1|                     26.0|
|2100006261|  07|202583_RandwickCe...|     (30,[13],[1.0])|202583_RandwickCe...|New South Wales|24.883076923076924|       2|                     93.0|
|2100006261|  10|202583_RandwickCe...|      (30,[8],[1.0])|202583_RandwickCe...|New South Wale

In [34]:
# use only one hr. reset trace_id
valid_flat_nsw_hr = valid_flat_nsw.filter(f.col('hour') == '19').withColumn(
    'trace_id', f.dense_rank().over(Window.orderBy('trace_id')) - 1
)

valid_flat_nsw_hr.persist()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[Subs_Id: string, hour: string, sector_name: string, event_code_feature: vector, cell_name: string, state: string, utilisation: double, trace_id: int, customer_experience_index: float]

In [36]:
valid_flat_nsw_hr.select(f.max('trace_id')).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+
|max(trace_id)|
+-------------+
|       690625|
+-------------+

In [37]:
valid_flat_nsw_hr.groupBy('trace_id', 'sector_name').count().orderBy(f.col('count').desc()).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------+-----+
|trace_id|         sector_name|count|
+--------+--------------------+-----+
|    6477|   227598_LurneaM5_2|    6|
|  121189|   228034_Rookwood_2|    6|
|    7458|227544_RevesbyHei...|    6|
|   12652|     227393_Coogee_3|    6|
|   24858|202803_FairfieldS...|    6|
|   28836|227350_BellevueHi...|    6|
|   45843|228311_Mcmahonsso...|    6|
|   56592|227818_WarwickFar...|    6|
|   58226|227096_NorthBondi...|    6|
|   61777|     227587_Sefton_2|    6|
|   62236|227581_MountPritc...|    6|
|   73120| 227365_Hurstville_3|    6|
|   74322|227489_RedfernSou...|    6|
|   90664| 227846_Werrington_3|    6|
|   94096|227707_EppingSouth_3|    6|
|   95126|227602_WestEpping...|    6|
|  102550| 202934_Camperdown_1|    6|
|  106076|227915_NarellenTo...|    6|
|  111413|   228034_Rookwood_2|    6|
|  112349|  202099_Penshurst_3|    6|
+--------+--------------------+-----+
only showing top 20 rows

In [38]:
valid_flat_nsw_hr.groupBy('trace_id', 'cell_name').count().orderBy(f.col('count').desc()).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------+-----+
|trace_id|           cell_name|count|
+--------+--------------------+-----+
|       0|228053_PicnicPoin...|    1|
|       1|201645_Paddington...|    1|
|       2|240028_Telarah_L1...|    1|
|       3|201574_MarsfieldW...|    1|
|       4|202647_Berrima_L0...|    1|
|       4|202647_Berrima_L1...|    1|
|       5|252006_RiverwoodS...|    1|
|       6|240137_Morpeth_L1...|    1|
|       6|240137_Morpeth_L1...|    1|
|       6|240137_Morpeth_L0...|    1|
|       7|227563_SaltPanCre...|    1|
|       8|201656_CabbageTrl...|    1|
|       8|201656_CabbageTrl...|    1|
|       9|202644_BywongHill...|    1|
|      10|241168_ManlyHospT...|    1|
|      10|227811_ManlySouth...|    1|
|      10|227811_ManlySouth...|    1|
|      10|241168_ManlyHospT...|    1|
|      11|201803_Penhust594...|    1|
|      12|227535_Jannali_L0...|    1|
+--------+--------------------+-----+
only showing top 20 rows

In [39]:
valid_flat_nsw_hr.groupBy('trace_id', 'Subs_Id').count().orderBy(f.col('count').desc()).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+-----+
|trace_id|   Subs_Id|count|
+--------+----------+-----+
|  656178| 391540187|   68|
|  578142| 390469037|   65|
|  191279| 374119754|   65|
|  473025| 388742141|   62|
|  389801| 386296534|   61|
|  348697| 384869344|   60|
|     279|2100012126|   58|
|  397985| 386559394|   58|
|  191311| 374122733|   57|
|  193764| 374649986|   57|
|  422759| 387397008|   57|
|  292425| 381393558|   56|
|  499501| 389203341|   56|
|  661590| 391601183|   56|
|  235364| 378126148|   55|
|  117005| 365078768|   55|
|  409296| 386947531|   55|
|  558031| 390180924|   54|
|  617102| 391042030|   54|
|   67515| 358302148|   54|
+--------+----------+-----+
only showing top 20 rows

In [40]:
valid_flat_nsw_hr.filter(f.col('utilisation').isNull()).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [54]:
@f.udf(returnType=ArrayType(ArrayType(StringType())))
def construct_nodes(edge_input):
    """
    node types:
    0 - user
    1 - cell
    2 - sector
    """
    node_list = []
    for a in edge_input:
        if [a[0], 0] not in node_list:
            node_list.append([a[0], 0])
        if [a[1], 1] not in node_list:
            node_list.append([a[1], 1])
        if [a[2], 2] not in node_list:
            node_list.append([a[2], 2])
    return node_list


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
# write node overview
valid_flat_nsw_hr.groupBy('trace_id').agg(
    construct_nodes(
            f.collect_list(f.struct('Subs_Id', 'cell_name', 'sector_name'))
        ).alias('node_list')
).withColumn(
    'node_id_type', f.explode('node_list')
).drop('node_list').withColumn(
    'node_name', f.col('node_id_type')[0]
).withColumn(
    'node_type', f.col('node_id_type')[1]
).drop('node_id_type').withColumn(
    'node_id', f.row_number().over(Window.partitionBy('trace_id').orderBy('node_type')) - 1
).write.parquet('s3://kl-prod-tpgt-knowledge-lake-sandpit/TempDir/tmp/case_study/dataset_hr/node_overview/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
subs_feature = valid_flat_nsw_hr.select(
    'trace_id', 'Subs_Id', 'cell_name', 'sector_name', 'event_code_feature', 'utilisation'
).groupBy('trace_id', 'Subs_Id').agg(
    sparse_vector_sum(f.collect_list('event_code_feature')).alias('event_code_feature')
)
subs_feature.persist()
subs_feature.show()
subs_feature.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+--------------------+
|trace_id|   Subs_Id|  event_code_feature|
+--------+----------+--------------------+
|       0|2100006348|      (30,[8],[1.0])|
|       1|2100006351|      (30,[8],[1.0])|
|       2|2100006360|     (30,[13],[1.0])|
|       3|2100006447|(30,[10,13],[1.0,...|
|       4|2100006452|     (30,[13],[9.0])|
|       5|2100006479|      (30,[8],[1.0])|
|       6|2100006486|(30,[11,12,13],[1...|
|       7|2100006508|     (30,[13],[1.0])|
|       8|2100006510|    (30,[13],[12.0])|
|       9|2100006519|(30,[7,8],[1.0,1.0])|
|      10|2100006522|(30,[7,13],[22.0,...|
|      11|2100006525|(30,[7,8],[1.0,1.0])|
|      12|2100006528|(30,[11,12,13],[1...|
|      13|2100006543|(30,[6,7,8,11,13,...|
|      14|2100006548|      (30,[8],[1.0])|
|      15|2100006553|     (30,[13],[3.0])|
|      16|2100006566|(30,[8,11,12,13],...|
|      17|2100006571|(30,[9,10,13,16],...|
|      18|2100006597|(30,[7,8,11,12,13...|
|      19|2100006598|     (30,[13],[1.0])|
+--------+-

In [97]:
cell_feature = valid_flat_nsw_hr.select(
    'trace_id', 'Subs_Id', 'cell_name', 'sector_name', 'event_code_feature', 'utilisation'
).groupBy('trace_id', 'cell_name').agg(
    sparse_vector_sum(f.collect_list('event_code_feature')).alias('event_code_feature')
)
cell_feature.persist()
cell_feature.show()
cell_feature.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------+--------------------+
|trace_id|           cell_name|  event_code_feature|
+--------+--------------------+--------------------+
|       0|228053_PicnicPoin...|      (30,[8],[1.0])|
|       1|201645_Paddington...|      (30,[8],[1.0])|
|       2|240028_Telarah_L1...|     (30,[13],[1.0])|
|       3|201574_MarsfieldW...|(30,[10,13],[1.0,...|
|       4|202647_Berrima_L0...|     (30,[13],[8.0])|
|       4|202647_Berrima_L1...|     (30,[13],[1.0])|
|       5|252006_RiverwoodS...|      (30,[8],[1.0])|
|       6|240137_Morpeth_L0...|(30,[12,13],[1.0,...|
|       6|240137_Morpeth_L1...|(30,[11,13],[1.0,...|
|       6|240137_Morpeth_L1...|     (30,[13],[1.0])|
|       7|227563_SaltPanCre...|     (30,[13],[1.0])|
|       8|201656_CabbageTrl...|     (30,[13],[6.0])|
|       8|201656_CabbageTrl...|     (30,[13],[6.0])|
|       9|202644_BywongHill...|(30,[7,8],[1.0,1.0])|
|      10|227811_ManlySouth...|(30,[7,13],[10.0,...|
|      10|227811_ManlySouth...|(30,[7,13],[2.0

In [95]:
sector_feature = valid_flat_nsw_hr.select(
    'trace_id', 'Subs_Id', 'cell_name', 'sector_name', 'event_code_feature', 'utilisation'
).groupBy('trace_id', 'sector_name').agg(
    sparse_vector_sum(f.collect_list('event_code_feature')).alias('event_code_feature'),
    f.max('utilisation').alias('utilisation')
)
sector_feature.persist()
sector_feature.show()
sector_feature.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------+--------------------+------------------+
|trace_id|         sector_name|  event_code_feature|       utilisation|
+--------+--------------------+--------------------+------------------+
|       0|228053_PicnicPoint_3|      (30,[8],[1.0])|28.719999999999995|
|       1|201645_Paddington...|      (30,[8],[1.0])|33.221538461538465|
|       2|    240028_Telarah_2|     (30,[13],[1.0])|            17.805|
|       3|201574_MarsfieldW...|(30,[10,13],[1.0,...| 57.44615384615385|
|       4|    202647_Berrima_1|     (30,[13],[9.0])|15.850000000000001|
|       5|252006_RiverwoodS...|      (30,[8],[1.0])| 90.54666666666667|
|       6|    240137_Morpeth_3|(30,[11,12,13],[1...|             27.92|
|       7|227563_SaltPanCre...|     (30,[13],[1.0])| 70.74153846153847|
|       8|201656_CabbageTrl...|    (30,[13],[12.0])|            70.075|
|       9| 202644_BywongHill_2|(30,[7,8],[1.0,1.0])| 52.54666666666666|
|      10| 227811_ManlySouth_1|(30,[7,13],[12.0,...|54.464615384

In [98]:
normaler = Normalizer(inputCol='event_code_feature', outputCol='feature_norm', p=1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [99]:
subs_feature = normaler.transform(subs_feature)
cell_feature = normaler.transform(cell_feature)
sector_feature = normaler.transform(sector_feature)

subs_feature.show(5)
cell_feature.show(5)
sector_feature.show(5, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+--------------------+--------------------+
|trace_id|   Subs_Id|  event_code_feature|        feature_norm|
+--------+----------+--------------------+--------------------+
|       0|2100006348|      (30,[8],[1.0])|      (30,[8],[1.0])|
|       1|2100006351|      (30,[8],[1.0])|      (30,[8],[1.0])|
|       2|2100006360|     (30,[13],[1.0])|     (30,[13],[1.0])|
|       3|2100006447|(30,[10,13],[1.0,...|(30,[10,13],[0.25...|
|       4|2100006452|     (30,[13],[9.0])|     (30,[13],[1.0])|
+--------+----------+--------------------+--------------------+
only showing top 5 rows

+--------+--------------------+--------------------+--------------------+
|trace_id|           cell_name|  event_code_feature|        feature_norm|
+--------+--------------------+--------------------+--------------------+
|       0|228053_PicnicPoin...|      (30,[8],[1.0])|      (30,[8],[1.0])|
|       1|201645_Paddington...|      (30,[8],[1.0])|      (30,[8],[1.0])|
|       2|240028_Telarah_L1..

In [101]:
@f.udf(returnType=VectorUDT())
def add_sparse31(v1, val):
    v1d = dict(zip(v1.indices, v1.values))
    new_idx = v1.size
    v1d[new_idx] = val
    return SparseVector(new_idx+1, {k: v for k, v in v1d.items() if v != 0.0})

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [103]:
# covert utilisation to 0-1
sector_feature = sector_feature.withColumn('utilisation', f.col('utilisation')/100.0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [104]:
subs_feature = subs_feature.withColumn('util_vec', add_sparse31(f.col('feature_norm'), f.lit(0)))
cell_feature = cell_feature.withColumn('util_vec', add_sparse31(f.col('feature_norm'), f.lit(0)))
sector_feature = sector_feature.withColumn('util_vec', add_sparse31(f.col('feature_norm'), f.col('utilisation')))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [105]:
subs_feature.show(2, False)
cell_feature.show(2, False)
sector_feature.show(2, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+------------------+--------------+--------------+
|trace_id|Subs_Id   |event_code_feature|feature_norm  |util_vec      |
+--------+----------+------------------+--------------+--------------+
|0       |2100006348|(30,[8],[1.0])    |(30,[8],[1.0])|(31,[8],[1.0])|
|1       |2100006351|(30,[8],[1.0])    |(30,[8],[1.0])|(31,[8],[1.0])|
+--------+----------+------------------+--------------+--------------+
only showing top 2 rows

+--------+---------------------------+------------------+--------------+--------------+
|trace_id|cell_name                  |event_code_feature|feature_norm  |util_vec      |
+--------+---------------------------+------------------+--------------+--------------+
|0       |228053_PicnicPoint_L21C_3  |(30,[8],[1.0])    |(30,[8],[1.0])|(31,[8],[1.0])|
|1       |201645_PaddingtonNth_L21C_2|(30,[8],[1.0])    |(30,[8],[1.0])|(31,[8],[1.0])|
+--------+---------------------------+------------------+--------------+--------------+
only showing top 2 ro

In [72]:
# scaler = MinMaxScaler(inputCol='util_vec', outputCol='feature_scaled')
# subs_feature = scaler.fit(subs_feature).transform(subs_feature)

# scaler = MinMaxScaler(inputCol='util_vec', outputCol='feature_scaled')
# cell_feature = scaler.fit(cell_feature).transform(cell_feature)

# scaler = MinMaxScaler(inputCol='util_vec', outputCol='feature_scaled')
# sector_feature = scaler.fit(sector_feature).transform(sector_feature)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [100]:
# subs_feature.show(2, False)
# cell_feature.show(2, False)
# sector_feature.show(2, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [75]:
# subs_feature.write.parquet('s3://kl-prod-tpgt-knowledge-lake-sandpit/TempDir/tmp/case_study/dataset_hr/subs_feature/')
# cell_feature.write.parquet('s3://kl-prod-tpgt-knowledge-lake-sandpit/TempDir/tmp/case_study/dataset_hr/cell_feature/')
# sector_feature.write.parquet('s3://kl-prod-tpgt-knowledge-lake-sandpit/TempDir/tmp/case_study/dataset_hr/sector_feature/')

# subs_feature = spark.read.parquet('s3://kl-prod-tpgt-knowledge-lake-sandpit/TempDir/tmp/case_study/dataset_hr/subs_feature/')
# cell_feature = spark.read.parquet('s3://kl-prod-tpgt-knowledge-lake-sandpit/TempDir/tmp/case_study/dataset_hr/cell_feature/')
# sector_feature = spark.read.parquet('s3://kl-prod-tpgt-knowledge-lake-sandpit/TempDir/tmp/case_study/dataset_hr/sector_feature/')



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [106]:
node_feature = subs_feature.select('trace_id', f.col('Subs_Id').alias('node_name'), 'util_vec').union(
    cell_feature.select('trace_id', f.col('cell_name').alias('node_name'), 'util_vec')
).union(
    sector_feature.select('trace_id', f.col('sector_name').alias('node_name'), 'util_vec')
)

node_feature.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+--------------------+
|trace_id| node_name|            util_vec|
+--------+----------+--------------------+
|       0|2100006348|      (31,[8],[1.0])|
|       1|2100006351|      (31,[8],[1.0])|
|       2|2100006360|     (31,[13],[1.0])|
|       3|2100006447|(31,[10,13],[0.25...|
|       4|2100006452|     (31,[13],[1.0])|
|       5|2100006479|      (31,[8],[1.0])|
|       6|2100006486|(31,[11,12,13],[0...|
|       7|2100006508|     (31,[13],[1.0])|
|       8|2100006510|     (31,[13],[1.0])|
|       9|2100006519|(31,[7,8],[0.5,0.5])|
|      10|2100006522|(31,[7,13],[0.333...|
|      11|2100006525|(31,[7,8],[0.5,0.5])|
|      12|2100006528|(31,[11,12,13],[0...|
|      13|2100006543|(31,[6,7,8,11,13,...|
|      14|2100006548|      (31,[8],[1.0])|
|      15|2100006553|     (31,[13],[1.0])|
|      16|2100006566|(31,[8,11,12,13],...|
|      17|2100006571|(31,[9,10,13,16],...|
|      18|2100006597|(31,[7,8,11,12,13...|
|      19|2100006598|     (31,[13],[1.0])|
+--------+-

In [107]:
node_feature.filter('trace_id = 0').show(10, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-------------------------+-------------------------------------+
|trace_id|node_name                |util_vec                             |
+--------+-------------------------+-------------------------------------+
|0       |2100006348               |(31,[8],[1.0])                       |
|0       |228053_PicnicPoint_L21C_3|(31,[8],[1.0])                       |
|0       |228053_PicnicPoint_3     |(31,[8,30],[1.0,0.28719999999999996])|
+--------+-------------------------+-------------------------------------+

In [108]:
node_feature.filter('trace_id = 1').show(10, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------------------------+-------------------------------------+
|trace_id|node_name                  |util_vec                             |
+--------+---------------------------+-------------------------------------+
|1       |2100006351                 |(31,[8],[1.0])                       |
|1       |201645_PaddingtonNth_L21C_2|(31,[8],[1.0])                       |
|1       |201645_PaddingtonNth_2     |(31,[8,30],[1.0,0.33221538461538463])|
+--------+---------------------------+-------------------------------------+

In [88]:
cei_df = spark.read.parquet('s3://kl-prod-bi-analy-data-extract/features/kl-prod/Mobility/customer_exp_index_v3/part_dt=2023-01-11/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [111]:
cei_df.filter(
    "Subs_Id in ('2100006519', '2100006351') AND local_hr = 19"
).select('Subs_Id', 
         'customer_experience_utilisation', 'customer_experience_irregular', 'customer_experience_index').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------------------------------+-----------------------------+-------------------------+
|   Subs_Id|customer_experience_utilisation|customer_experience_irregular|customer_experience_index|
+----------+-------------------------------+-----------------------------+-------------------------+
|2100006519|                           75.0|                         95.0|                     89.0|
|2100006351|                           67.0|                          0.0|                     20.0|
+----------+-------------------------------+-----------------------------+-------------------------+

In [112]:
valid_flat_nsw_hr.filter(
    "Subs_Id in ('2100006519', '2100006351') AND hour = 19"
).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----+--------------------+--------------------+--------------------+---------------+------------------+--------+-------------------------+
|   Subs_Id|hour|         sector_name|  event_code_feature|           cell_name|          state|       utilisation|trace_id|customer_experience_index|
+----------+----+--------------------+--------------------+--------------------+---------------+------------------+--------+-------------------------+
|2100006351|  19|201645_Paddington...|      (30,[8],[1.0])|201645_Paddington...|New South Wales|33.221538461538465|       1|                     20.0|
|2100006519|  19| 202644_BywongHill_2|(30,[7,8],[1.0,1.0])|202644_BywongHill...|New South Wales| 52.54666666666666|       9|                     89.0|
+----------+----+--------------------+--------------------+--------------------+---------------+------------------+--------+-------------------------+

In [110]:
valid_flat_nsw_hr.filter('customer_experience_index > 70').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----+--------------------+--------------------+--------------------+---------------+------------------+--------+-------------------------+
|   Subs_Id|hour|         sector_name|  event_code_feature|           cell_name|          state|       utilisation|trace_id|customer_experience_index|
+----------+----+--------------------+--------------------+--------------------+---------------+------------------+--------+-------------------------+
|2100006360|  19|    240028_Telarah_2|     (30,[13],[1.0])|240028_Telarah_L1...|New South Wales|            17.805|       2|                    100.0|
|2100006452|  19|    202647_Berrima_1|     (30,[13],[8.0])|202647_Berrima_L0...|New South Wales|15.850000000000001|       4|                     95.0|
|2100006452|  19|    202647_Berrima_1|     (30,[13],[1.0])|202647_Berrima_L1...|New South Wales|15.850000000000001|       4|                     95.0|
|2100006486|  19|    240137_Morpeth_3|     (30,[13],[1.0])|240137_Morpeth_L1...|New South Wale

In [114]:
df = spark.read.parquet('s3://kl-prod-tpgt-knowledge-lake-sandpit/TempDir/tmp/tokenised_ebmr_sample/dt=2023-01-10/')
df.withColumn(
    'hour', f.hour('event_timestamp')
).filter(
    "Subs_Id = 2100006519 AND hour = 19"
).show(50, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------------+------+--------------------+--------+----------+------+-------+----+
|    event_timestamp|event_type_cd|result|            loc_info|     TAC|   Subs_Id|prefix|matched|hour|
+-------------------+-------------+------+--------------------+--------+----------+------+-------+----+
|2023-01-10 19:09:46|            7|     0|L_505_03__20295__...|35204513|2100006519|   614|   true|  19|
|2023-01-10 19:09:44|            7|     0|L_505_03__20295__...|35204513|2100006519|   614|   true|  19|
|2023-01-10 19:09:36|            7|     0|L_505_03__20295__...|35204513|2100006519|   614|   true|  19|
|2023-01-10 19:10:04|            7|     0|L_505_03__20295__...|35204513|2100006519|   614|   true|  19|
|2023-01-10 19:10:05|            7|     0|L_505_03__20295__...|35204513|2100006519|   614|   true|  19|
|2023-01-10 19:10:05|            7|     0|L_505_03__20295__...|35204513|2100006519|   614|   true|  19|
|2023-01-10 19:09:53|            8|     0|L_505_03__20295__...|3

In [115]:
valid_flat_nsw.filter(
    "Subs_Id in ('2100006519', '2100006351') AND hour = 19"
).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----+--------------------+--------------------+--------------------+---------------+------------------+--------+-------------------------+
|   Subs_Id|hour|         sector_name|  event_code_feature|           cell_name|          state|       utilisation|trace_id|customer_experience_index|
+----------+----+--------------------+--------------------+--------------------+---------------+------------------+--------+-------------------------+
|2100006351|  19|201645_Paddington...|      (30,[8],[1.0])|201645_Paddington...|New South Wales|33.221538461538465|      62|                     20.0|
|2100006519|  19| 202644_BywongHill_2|(30,[7,8],[1.0,1.0])|202644_BywongHill...|New South Wales| 52.54666666666666|     227|                     89.0|
+----------+----+--------------------+--------------------+--------------------+---------------+------------------+--------+-------------------------+

In [116]:
event_feature_data = spark.read.parquet('s3://kl-prod-tpgt-knowledge-lake-sandpit/TempDir/tmp/ebmr_event_feature/dt=2023-01-10/')
event_feature_data.filter(
    "Subs_Id in ('2100006519', '2100006351') AND hour = 19"
).show(10, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------------+----+---------------------------+
|Subs_Id   |loc_info                |hour|event_code_feature         |
+----------+------------------------+----+---------------------------+
|2100006519|L_505_03__20295__7239702|19  |(30,[7,13],[2.0,1.0])      |
|2100006519|L_505_03__20295__7212595|19  |(30,[7],[1.0])             |
|2100006519|L_505_03__20295__7192372|19  |(30,[7,13],[8.0,3.0])      |
|2100006519|L_505_03__20295__7255821|19  |(30,[7],[1.0])             |
|2100006519|L_505_03__20295__7211276|19  |(30,[13],[1.0])            |
|2100006519|L_505_03__20295__6009907|19  |(30,[7],[3.0])             |
|2100006519|L_505_03__20295__7192332|19  |(30,[7,8,13],[6.0,1.0,1.0])|
|2100006519|L_505_03__20295__7194391|19  |(30,[7,8],[5.0,1.0])       |
|2100006519|L_505_03__20295__7194381|19  |(30,[7],[4.0])             |
|2100006519|L_505_03__20295__7192352|19  |(30,[7,8],[4.0,1.0])       |
+----------+------------------------+----+---------------------------+
only s

In [118]:
event_feature_data.filter(
    "Subs_Id in ('2100006519', '2100006351') AND hour = 19"
).join(
    cell_info.filter(f.col('state') == 'New South Wales'),
    on=event_feature_data.loc_info==cell_info.glbl_loc,
    how='inner'
).show(60, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------------+----+--------------------+------------------------+--------------------------+---------------------------+---+---+---------------+
|Subs_Id   |loc_info                |hour|event_code_feature  |glbl_loc                |glbl_loc_id               |cell_name                  |mcc|mnc|state          |
+----------+------------------------+----+--------------------+------------------------+--------------------------+---------------------------+---+---+---------------+
|2100006351|L_505_03__20243__5184564|19  |(30,[8],[1.0])      |L_505_03__20243__5184564|L_505_03_20243_|_005184564|201645_PaddingtonNth_L21C_2|505|03 |New South Wales|
|2100006519|L_505_03__20292__5240332|19  |(30,[7,8],[1.0,1.0])|L_505_03__20292__5240332|L_505_03_20292_|_005240332|202644_BywongHill_L18P_2   |505|03 |New South Wales|
+----------+------------------------+----+--------------------+------------------------+--------------------------+---------------------------+---+---+---------

In [123]:
cell_info.filter("glbl_loc = 'L_505_03__20295__7239702'").show(20, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------+--------------------------+-------------------------+---+---+----------------------------+
|glbl_loc                |glbl_loc_id               |cell_name                |mcc|mnc|state                       |
+------------------------+--------------------------+-------------------------+---+---+----------------------------+
|L_505_03__20295__7239702|L_505_03_20295_|_007239702|240114_ForrestEast_L18K_2|505|03 |Australian Capital Territory|
+------------------------+--------------------------+-------------------------+---+---+----------------------------+

In [117]:
# cell info
@f.udf(returnType=StringType())
def resolve_cell_name(cell_name):
    """
    resolve cell name to match with EDS event data
    """
    try:
        split_parts = cell_name.split("_")
        m2000id = split_parts[0]
        site_name = split_parts[1]
        technology = split_parts[3]
        cell_number = split_parts[4]
        return f"{m2000id}_{site_name}_{technology}_{cell_number}"
    except Exception as _:
        return cell_name

locglbl = spark.read.parquet(
    's3://kl-prod-bi-analy-data-extract/features/kl-prod/ICE/EDS_locglbl_v3_extract/part_dt=2023-01-11/'
)
cell_lookup = spark.read.parquet(
    's3://kl-prod-bi-analy-data-extract/features/kl-prod/ICE/TD_cell_look_up/part_dt=2023-01-11/'
)
site_df = spark.read.parquet(
    's3://kl-prod-bi-analy-data-extract/features/kl-prod/ICE/TD_site_table/part_dt=2023-01-11/'
)

cell_info = (
    locglbl.join(
        cell_lookup.filter(
            f.col("Radio_Type_Cd").isin(["L", "U", "N"])
        ),  # ignore 3G cells. ie. 'U'. Ignores 5G for test. ie 'N'
        on=locglbl.glbl_loc_id == cell_lookup.Glbl_loc_Id,
        how="inner",
    )
    .withColumn("m2000_id", f.split(f.col("Cell_Name"), "_").getItem(0))
    .join(
        site_df,  # .filter(f.col("sa6_state_nm") == sa6_nm),
        on="m2000_id",
        how="inner",
    )
    .withColumn("cell_name_adjusted", resolve_cell_name(f.col("Cell_Name")))
    .select(
        locglbl.glbl_loc,
        locglbl.glbl_loc_id,
        f.col("cell_name_adjusted").alias("cell_name"),
        locglbl.mcc,
        locglbl.mnc,
        site_df.state
    )
)
cell_info.show(10, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------+--------------------------+----------------------------+---+---+---------------+
|glbl_loc                  |glbl_loc_id               |cell_name                   |mcc|mnc|state          |
+--------------------------+--------------------------+----------------------------+---+---+---------------+
|L_505_03__20211__005822525|L_505_03_20211_|_005822525|200140_DarlingParkNth_L21H_1|505|03 |New South Wales|
|L_505_03__20211__005822527|L_505_03_20211_|_005822527|200140_DarlingParkNth_L21H_3|505|03 |New South Wales|
|L_505_03__20211__5822527  |L_505_03_20211_|_005822527|200140_DarlingParkNth_L21H_3|505|03 |New South Wales|
|U_505_03_211__9674_       |U_505_03_00211_09674_09674|200140_DarlingParkNth_U21B_1|505|03 |New South Wales|
|U_505_03_00211__9674_     |U_505_03_00211_09674_09674|200140_DarlingParkNth_U21B_1|505|03 |New South Wales|
|U_505_03_00211__9677_     |U_505_03_00211_09677_09677|200140_DarlingParkNth_U21A_1|505|03 |New South Wales|
|U_505_03_00211__24