In [1]:
from libstdm.common.datasets import findspark
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.storagelevel import *
import pandas as pd
from pyspark.sql import Window
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

In [2]:
spark = findspark.get_spark_session(
    "analyze-safety-broadcast-scv",
    is_local=False
)

2025-11-13 18:24:08,221 [INFO] [findspark.py:62] set spark.yarn.dist.archives = hdfs://DClusterNmg3/user/xinsi_traffic/libstdm/libstdm-hello-latest.tgz#conda
2025-11-13 18:24:08,222 [INFO] [findspark.py:74] set spark yarn.queue = root.xinyewusibu_lukuang_dev
2025-11-13 18:24:08,222 [INFO] [findspark.py:78] set spark.jars = hdfs://DClusterNmg3/user/xinsi_traffic/libstdm/jars/pytraffic.jar


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Application Id: application_1758876150248_23677297, Tracking URL: http://bigdata-nmg-hdprm00.nmg01:8088/proxy/application_1758876150248_23677297/


# 2025-11-11 一次下发量、播放量、播放透出率

## 每个城市下发量、播报量

In [4]:
df =spark.sql("""
SELECT
            dt,
            city_id,
            city_name,
            -- 首次一次播报
            SUM(
                CASE
                    WHEN model_version = 3
                    AND is_sight = 0
                    AND is_sec_broad = 0 THEN broadcast_count
                    ELSE 0
                END
            ) AS first_brd_cnt,
            -- 一次下发
            SUM(
                CASE
                    WHEN model_version = 3
                    AND is_sight = 0
                    AND is_sec_broad = 0 THEN server_count
                    ELSE 0
                END
            ) AS first_server_cnt
        FROM
            traffic_online.dwd_map_traffic_safty_bdc_count_di
        WHERE
            dt = '2025-11-11'
        GROUP BY
            dt,
            city_id,
            city_name
        having
            (
                 first_brd_cnt + first_server_cnt 
            ) != 0
            order by city_id
            limit 100
""").toPandas()
print('每个城市下发量、播报量')
print(df)



每个城市下发量、播报量
            dt  city_id city_name  first_brd_cnt  first_server_cnt
0   2025-11-11        0      None           9236             34434
1   2025-11-11        1       北京市        3407044          23900631
2   2025-11-11        2       深圳市        1680337          12350920
3   2025-11-11        3       广州市        3024505          17476471
4   2025-11-11        4       上海市        2560031          16742195
5   2025-11-11        5       杭州市        1152144           7896048
6   2025-11-11        6       武汉市        1170476           5902638
7   2025-11-11        7       天津市        1336831           6077948
8   2025-11-11        8       沈阳市         296607           1428205
9   2025-11-11        9       郑州市         646260           3659198
10  2025-11-11       10       西安市        1181114           4992569
11  2025-11-11       11       南京市         766196           5081325
12  2025-11-11       12       济南市         384082           2270576
13  2025-11-11       13       青岛市         846036  

                                                                                

## 所有城市播报量、下发量

In [5]:
df =spark.sql("""
SELECT
            dt,
            -- 首次一次播报
            SUM(
                CASE
                    WHEN model_version = 3
                    AND is_sight = 0
                    AND is_sec_broad = 0 THEN broadcast_count
                    ELSE 0
                END
            ) AS first_brd_cnt,
            -- 一次下发
            SUM(
                CASE
                    WHEN model_version = 3
                    AND is_sight = 0
                    AND is_sec_broad = 0 THEN server_count
                    ELSE 0
                END
            ) AS first_server_cnt
        FROM
            traffic_online.dwd_map_traffic_safty_bdc_count_di
        WHERE
            dt = '2025-11-11'
        group by dt
        having
            (
                 first_brd_cnt + first_server_cnt 
            ) != 0
            limit 100
""").toPandas()
print('所有城市播报量、下发量')
print(df)



所有城市播报量、下发量
           dt  first_brd_cnt  first_server_cnt
0  2025-11-11       53475494         279623563


                                                                                

## 每个城市每公里播报量、下发量、播放透出率

In [6]:
df = spark.sql("""
SELECT
    a.dt,
    a.city_name,
    -- 每公里下发量
    sum(a.first_server_cnt) / sum(b.sec_finish_order_dis) AS km_server_cnt,
    -- 每公里播报量
    sum(a.first_brd_cnt) / sum(b.sec_finish_order_dis) AS km_brd_cnt,
    -- 播放透出率 = 总播报次数 / 总下发次数
    sum(a.first_brd_cnt) / sum(a.first_server_cnt) AS server_brd_rate
FROM
(SELECT
            dt,
            city_id,
            city_name,
            -- 首次一次播报
            SUM(
                CASE
                    WHEN model_version = 3
                    AND is_sight = 0
                    AND is_sec_broad = 0 THEN broadcast_count
                    ELSE 0
                END
            ) AS first_brd_cnt,
            -- 一次下发
            SUM(
                CASE
                    WHEN model_version = 3
                    AND is_sight = 0
                    AND is_sec_broad = 0 THEN server_count
                    ELSE 0
                END
            ) AS first_server_cnt
        FROM
            traffic_online.dwd_map_traffic_safty_bdc_count_di
        WHERE
            dt = '2025-11-11'
        GROUP BY
            dt,
            city_id,
            city_name
        having
            (
                 first_brd_cnt + first_server_cnt 
            ) != 0
            order by city_id) a

JOIN
(SELECT
            dt,
            city_id,
            SUM(sec_finish_order_dis) AS sec_finish_order_dis,
            COUNT(DISTINCT order_id) AS order_count
        FROM
            (
                SELECT
                    dt,
                    city_id,
                    biz_type,
                    order_id,
                    sec_finish_order_dis
                FROM
                    traffic_online.dwd_map_traffic_safty_bdc_count_di
                WHERE
                    dt >= "2025-11-11"
                GROUP BY
                    dt,
                    city_id,
                    biz_type,
                    order_id,
                    sec_finish_order_dis
                    ) aa
                    GROUP BY
                    dt,
                    city_id) b

ON 
        a.dt = b.dt
        AND a.city_id = b.city_id
        group by a.dt ,a.city_name
        limit 100
""").toPandas()
print("每个城市每公里播报量、下发量、播放透出率")
print(df)



每个城市每公里播报量、下发量、播放透出率
            dt    city_name  km_server_cnt  km_brd_cnt  server_brd_rate
0   2025-11-11          银川市       0.919990    0.221858         0.241153
1   2025-11-11          忻州市       0.738699    0.222867         0.301702
2   2025-11-11          北京市       1.150773    0.164043         0.142550
3   2025-11-11    文山壮族苗族自治州       0.959619    0.234564         0.244435
4   2025-11-11        齐齐哈尔市       0.784619    0.216639         0.276108
5   2025-11-11          松原市       0.774786    0.204420         0.263840
6   2025-11-11          铜川市       1.023962    0.253893         0.247951
7   2025-11-11          莱芜市       1.299568    0.090351         0.069524
8   2025-11-11          安康市       0.874046    0.247803         0.283513
9   2025-11-11          内江市       0.920732    0.191164         0.207622
10  2025-11-11    西双版纳傣族自治州       1.012255    0.260114         0.256965
11  2025-11-11         葫芦岛市       0.770514    0.201221         0.261152
12  2025-11-11          清远市       1.109601 

                                                                                

## 所有城市每公里播报量、下发量、播放透出率

In [7]:
df = spark.sql("""
SELECT
    a.dt,
    -- 每公里下发量
    sum(a.first_server_cnt) / sum(b.sec_finish_order_dis) AS km_server_cnt,
    -- 每公里播报量
    sum(a.first_brd_cnt) / sum(b.sec_finish_order_dis) AS km_brd_cnt,
    -- 播放透出率 = 总播报次数 / 总下发次数
    sum(a.first_brd_cnt) / sum(a.first_server_cnt) AS server_brd_rate
FROM
(SELECT
            dt,
            city_id,
            city_name,
            -- 首次一次播报
            SUM(
                CASE
                    WHEN model_version = 3
                    AND is_sight = 0
                    AND is_sec_broad = 0 THEN broadcast_count
                    ELSE 0
                END
            ) AS first_brd_cnt,
            -- 一次下发
            SUM(
                CASE
                    WHEN model_version = 3
                    AND is_sight = 0
                    AND is_sec_broad = 0 THEN server_count
                    ELSE 0
                END
            ) AS first_server_cnt
        FROM
            traffic_online.dwd_map_traffic_safty_bdc_count_di
        WHERE
            dt = '2025-11-11'
        GROUP BY
            dt,
            city_id,
            city_name
        having
            (
                 first_brd_cnt + first_server_cnt 
            ) != 0
            order by city_id) a

JOIN
(SELECT
            dt,
            city_id,
            SUM(sec_finish_order_dis) AS sec_finish_order_dis,
            COUNT(DISTINCT order_id) AS order_count
        FROM
            (
                SELECT
                    dt,
                    city_id,
                    biz_type,
                    order_id,
                    sec_finish_order_dis
                FROM
                    traffic_online.dwd_map_traffic_safty_bdc_count_di
                WHERE
                    dt >= "2025-11-11"
                GROUP BY
                    dt,
                    city_id,
                    biz_type,
                    order_id,
                    sec_finish_order_dis
                    ) aa
                    GROUP BY
                    dt,
                    city_id) b

ON 
        a.dt = b.dt
        AND a.city_id = b.city_id
        group by a.dt
        limit 100
""").toPandas()
print("所有城市每公里播报量、下发量、播放透出率")
print(df)



所有城市每公里播报量、下发量、播放透出率
           dt  km_server_cnt  km_brd_cnt  server_brd_rate
0  2025-11-11       1.060689    0.202847         0.191241


                                                                                

In [8]:
spark.stop()