In [52]:
from pyspark.sql import functions as F, Window as W
from datetime import date

# 입력(Silver) / 출력(Test)
SILVER_PATH = "s3a://warehouse/silver/gdelt_events/"
OUT_BASE = "s3a://warehouse/gold_table_test/"

# 안전 가드(운영 gold/silver에 쓰지 않기)
assert not OUT_BASE.startswith(
    "s3a://warehouse/gold/"
), "Don't write to production gold/"
assert not OUT_BASE.startswith("s3a://warehouse/silver/"), "Don't write under silver/"

# 로드
dfx = spark.read.parquet(SILVER_PATH)

# event_date 파생 (day: yyyyMMdd → date)
if "event_date" not in dfx.columns and "day" in dfx.columns:
    dfx = dfx.withColumn(
        "event_date", F.to_date(F.col("day").cast("string"), "yyyyMMdd")
    )

# 임시 국가코드 정규화: action_geo(FIPS2) 우선 → actor1 → actor2
dfx = (
    dfx.withColumn(
        "action_geo_country_code_clean",
        F.when(
            F.col("action_geo_country_code").rlike("^[A-Z]{2}$"),
            F.col("action_geo_country_code"),
        ),
    ).withColumn(
        "country_code_final",
        F.coalesce(
            "action_geo_country_code_clean",
            "actor1_country_code",
            "actor2_country_code",
        ),
    )
).cache()

print("Rows:", dfx.count(), "Cols:", len(dfx.columns))



Rows: 4177 Cols: 66


                                                                                

In [53]:
def build_daily_gold(dfx):
    # 기본 집계
    base = dfx.groupBy("event_date").agg(
        F.count("*").alias("events_total"),
        F.sum("num_articles").alias("articles_total"),
        F.sum("num_sources").alias("sources_total"),
        F.sum("num_mentions").alias("mentions_total"),
        F.avg("avg_tone").alias("avg_tone_mean"),
        F.expr("percentile_approx(avg_tone, 0.5)").alias("avg_tone_median"),
        F.avg("goldstein_scale").alias("goldstein_mean"),
        F.avg(F.abs(F.col("goldstein_scale"))).alias("goldstein_abs_mean"),
        F.avg(F.when(F.col("avg_tone") < 0, 1).otherwise(0)).alias("neg_ratio"),
    )

    # 쿼드 분포(1~4)
    quad = (
        dfx.groupBy("event_date", "quad_class")
        .count()
        .groupBy("event_date")
        .pivot("quad_class", [1, 2, 3, 4])
        .sum("count")
        .na.fill(0)
        .withColumnRenamed("1", "quad1_cnt")
        .withColumnRenamed("2", "quad2_cnt")
        .withColumnRenamed("3", "quad3_cnt")
        .withColumnRenamed("4", "quad4_cnt")
    )

    # 당일 Top 국가(기사 수 기준)
    w = W.partitionBy("event_date").orderBy(
        F.desc("sum_articles"), F.col("country_code_final")
    )
    top_country = (
        dfx.groupBy("event_date", "country_code_final")
        .agg(F.sum("num_articles").alias("sum_articles"))
        .withColumn("rn", F.row_number().over(w))
        .filter("rn=1")
        .select(
            "event_date",
            F.col("country_code_final").alias("top_country_iso3"),
            F.col("sum_articles").alias("top_country_articles"),
        )
    )

    # DQ 지표
    dq = dfx.groupBy("event_date").agg(
        F.avg(
            F.when(F.col("action_geo_country_code").rlike("^[A-Z]{2}$"), 1).otherwise(0)
        ).alias("dq_action_geo_alpha_ratio"),
        F.avg(
            F.when(F.col("action_geo_country_code").rlike("^[0-9]+$"), 1).otherwise(0)
        ).alias("dq_action_geo_numeric_ratio"),
        F.avg(
            F.when(F.col("actor1_country_code").rlike("^[A-Z]{3}$"), 1).otherwise(0)
        ).alias("dq_actor1_iso3_ratio"),
        F.avg(
            F.when(
                F.col("action_geo_lat").isNotNull()
                & F.col("action_geo_long").isNotNull(),
                1,
            ).otherwise(0)
        ).alias("dq_latlong_coverage"),
    )

    daily = (
        base.join(quad, "event_date", "left")
        .join(top_country, "event_date", "left")
        .join(dq, "event_date", "left")
        .withColumn("updated_at", F.current_timestamp())
    )
    return daily


daily = build_daily_gold(dfx)
daily.orderBy("event_date").show(5, truncate=False)

                                                                                

+----------+------------+--------------+-------------+--------------+-------------------+-----------------+--------------------+------------------+------------------+---------+---------+---------+---------+----------------+--------------------+-------------------------+---------------------------+--------------------+-------------------+--------------------------+
|event_date|events_total|articles_total|sources_total|mentions_total|avg_tone_mean      |avg_tone_median  |goldstein_mean      |goldstein_abs_mean|neg_ratio         |quad1_cnt|quad2_cnt|quad3_cnt|quad4_cnt|top_country_iso3|top_country_articles|dq_action_geo_alpha_ratio|dq_action_geo_numeric_ratio|dq_actor1_iso3_ratio|dq_latlong_coverage|updated_at                |
+----------+------------+--------------+-------------+--------------+-------------------+-----------------+--------------------+------------------+------------------+---------+---------+---------+---------+----------------+--------------------+----------------------

| Date       | Events | Articles | Tone(mean) | Goldstein(mean) | Q1/Q2/Q3/Q4 | Top Country (Articles) | DQ numeric ratio |
| ---------- | -----: | -------: | ---------: | --------------: | ----------- | ---------------------- | ---------------: |
| 2025-09-09 |     17 |       94 |      -0.64 |            1.40 | 10/3/3/1    | USA (47)               |         **1.00** |
| 2025-08-28 |      4 |       20 |      -3.86 |            3.40 | 4/0/0/0     | ISR (20)               |         **1.00** |
| 2025-08-10 |      7 |       40 |      -2.22 |           -0.17 | 1/2/3/1     | *(null)* (25)          |         **1.00** |
| 2025-08-05 |     10 |       44 |      -5.59 |            6.20 | 2/8/0/0     | USA (28)               |         **1.00** |
| 2024-09-09 |     17 |       94 |      -0.64 |            1.40 | 10/3/3/1    | USA (47)               |         **1.00** |
| 2024-09-04 |     24 |       70 |      -6.15 |            1.41 | 22/0/0/2    | USA (40)               |         **1.00** |


In [54]:
# 저장 경로
OUT_DAILY = OUT_BASE + "gdelt_daily_summary/"
OUT_DQ = OUT_BASE + "dq/action_geo_numeric_daily/"
OUT_SAMPLE = OUT_BASE + "sample/gdelt_daily_summary/"

# 일일 요약 저장 (테스트: overwrite, 운영 전환 시 append+partitionBy 권장)
(daily.repartition(1).write.mode("overwrite").parquet(OUT_DAILY))

# DQ 요약(숫자 국가코드 비율)
numeric_daily = (
    dfx.groupBy("event_date")
    .agg(
        F.sum(
            F.when(F.col("action_geo_country_code").rlike("^[0-9]+$"), 1).otherwise(0)
        ).alias("numeric_cnt"),
        F.count("*").alias("rows"),
    )
    .withColumn("numeric_ratio", F.col("numeric_cnt") / F.col("rows"))
)
(numeric_daily.repartition(1).write.mode("overwrite").parquet(OUT_DQ))

# 오늘 있으면 오늘, 없으면 최신 1행
today_str = date.today().isoformat()
has_today = dfx.where(F.col("event_date") == today_str).limit(1).count() > 0
latest_date = dfx.agg(F.max("event_date")).first()[0]
target_date = today_str if has_today else latest_date

sample_row = daily.where(F.col("event_date") == target_date)
(
    sample_row.coalesce(1)
    .write.mode("overwrite")
    .parquet(OUT_SAMPLE + f"date={target_date}/")
)

print("✅ wrote:")
print(" - daily :", OUT_DAILY)
print(" - dq    :", OUT_DQ)
print(" - sample:", OUT_SAMPLE + f"date={target_date}/")

                                                                                

✅ wrote:
 - daily : s3a://warehouse/gold_table_test/gdelt_daily_summary/
 - dq    : s3a://warehouse/gold_table_test/dq/action_geo_numeric_daily/
 - sample: s3a://warehouse/gold_table_test/sample/gdelt_daily_summary/date=2025-09-09/


In [55]:
# 스키마 → Markdown
schema_rows = [(f.name, f.dataType.simpleString()) for f in daily.schema.fields]
md = ["| 컬럼 | 타입 |", "|---|---|"] + [f"| {n} | {t} |" for n, t in schema_rows]
print("\n".join(md))

# DQ 임계치와 값
TH = {
    "dq_action_geo_alpha_ratio": (">=", 0.95),
    "dq_action_geo_numeric_ratio": ("<=", 0.05),
    "dq_actor1_iso3_ratio": (">=", 0.95),
    "dq_latlong_coverage": (">=", 0.70),
}
dq_cols = list(TH.keys())
dq_vals = (sample_row.select(*dq_cols).toPandas().iloc[0]).to_dict()
defs = {
    "dq_action_geo_alpha_ratio": "action_geo_country_code가 알파벳 2자(FIPS)인 비율",
    "dq_action_geo_numeric_ratio": "action_geo_country_code가 숫자인 비율(매핑 오류 지표)",
    "dq_actor1_iso3_ratio": "actor1_country_code가 3자 코드(CAMEO/ISO3 유사) 비율",
    "dq_latlong_coverage": "action_geo_lat/long 둘 다 존재하는 비율",
}

lines = ["| 지표 | 정의(요지) | 임계치 | 값 | PASS? |", "|---|---|---:|---:|:--:|"]
for k, (op, thr) in TH.items():
    v = float(dq_vals.get(k, 0) if dq_vals.get(k, None) is not None else 0)
    ok = (v >= thr) if op == ">=" else (v <= thr)
    lines.append(
        f"| {k} | {defs[k]} | {op} {thr:.2f} | {v:.2f} | {'✅' if ok else '❌'} |"
    )

print("\n".join(lines))

| 컬럼 | 타입 |
|---|---|
| event_date | date |
| events_total | bigint |
| articles_total | bigint |
| sources_total | bigint |
| mentions_total | bigint |
| avg_tone_mean | double |
| avg_tone_median | double |
| goldstein_mean | double |
| goldstein_abs_mean | double |
| neg_ratio | double |
| quad1_cnt | bigint |
| quad2_cnt | bigint |
| quad3_cnt | bigint |
| quad4_cnt | bigint |
| top_country_iso3 | string |
| top_country_articles | bigint |
| dq_action_geo_alpha_ratio | double |
| dq_action_geo_numeric_ratio | double |
| dq_actor1_iso3_ratio | double |
| dq_latlong_coverage | double |
| updated_at | timestamp |
| 지표 | 정의(요지) | 임계치 | 값 | PASS? |
|---|---|---:|---:|:--:|
| dq_action_geo_alpha_ratio | action_geo_country_code가 알파벳 2자(FIPS)인 비율 | >= 0.95 | 0.00 | ❌ |
| dq_action_geo_numeric_ratio | action_geo_country_code가 숫자인 비율(매핑 오류 지표) | <= 0.05 | 1.00 | ❌ |
| dq_actor1_iso3_ratio | actor1_country_code가 3자 코드(CAMEO/ISO3 유사) 비율 | >= 0.95 | 0.53 | ❌ |
| dq_latlong_coverage | action_geo

In [56]:
# 날짜별 요약
numeric_daily.orderBy("event_date").show(10, truncate=False)

# 대상일 상세(상위 코드 + 샘플 행)
fail_today = dfx.where(
    (F.col("event_date") == target_date)
    & F.col("action_geo_country_code").rlike("^[0-9]+$")
)
print("[오류 행 수]", fail_today.count())

(
    fail_today.groupBy("action_geo_country_code")
    .count()
    .orderBy(F.desc("count"))
    .show(10, truncate=False)
)

(
    fail_today.select(
        "global_event_id",
        "event_date",
        "action_geo_country_code",
        "action_geo_type",
        "action_geo_fullname",
        "action_geo_lat",
        "action_geo_long",
        "source_file",
    ).show(10, truncate=False)
)

+----------+-----------+----+-------------+
|event_date|numeric_cnt|rows|numeric_ratio|
+----------+-----------+----+-------------+
|2024-09-04|24         |24  |1.0          |
|2024-09-09|17         |17  |1.0          |
|2025-08-05|10         |10  |1.0          |
|2025-08-10|7          |7   |1.0          |
|2025-08-28|4          |4   |1.0          |
|2025-09-02|7          |7   |1.0          |
|2025-09-03|12         |12  |1.0          |
|2025-09-04|2318       |2318|1.0          |
|2025-09-08|4          |4   |1.0          |
|2025-09-09|1774       |1774|1.0          |
+----------+-----------+----+-------------+

[오류 행 수] 1774
+-----------------------+-----+
|action_geo_country_code|count|
+-----------------------+-----+
|4                      |598  |
|3                      |384  |
|2                      |374  |
|1                      |295  |
|0                      |103  |
|5                      |20   |
+-----------------------+-----+

+---------------+----------+--------------------

| 지표                              | 정의(요지)                                           |     임계치 |                                             값 | PASS? |
| ------------------------------- | ------------------------------------------------ | ------: | --------------------------------------------: | :---: |
| dq\_action\_geo\_alpha\_ratio   | `action_geo_country_code`가 **알파벳 2자(FIPS)** 인 비율 | >= 0.95 |                                      **0.00** |   ❌   |
| dq\_action\_geo\_numeric\_ratio | `action_geo_country_code`가 **숫자**인 비율(매핑오류 지표)   | <= 0.05 |                                      **1.00** |   ❌   |
| dq\_actor1\_iso3\_ratio         | `actor1_country_code`가 **3자(CAMEO/ISO3 유사)** 비율  | >= 0.95 | **0.76** (2025-09-09) / **0.53** (2024-09-04) |   ❌   |
| dq\_latlong\_coverage           | `action_geo_lat/long` **둘다 존재** 비율               | >= 0.70 |                                      **0.00** |   ❌   |


ActionGeo 블록 컬럼이 서로 밀려 매핑되었음

국가코드 위치에 타입(0~4) 또는 다른 값이 들어옴

type/lat/long/fullname 등 열 정렬이 어긋남

In [57]:
def generate_gold_and_dq(dfx, out_base="s3a://warehouse/gold_table_test/"):
    daily = build_daily_gold(dfx)
    out_daily = out_base + "gdelt_daily_summary/"
    daily.write.mode("overwrite").parquet(out_daily)

    numeric_daily = (
        dfx.groupBy("event_date")
        .agg(
            F.sum(
                F.when(F.col("action_geo_country_code").rlike("^[0-9]+$"), 1).otherwise(
                    0
                )
            ).alias("numeric_cnt"),
            F.count("*").alias("rows"),
        )
        .withColumn("numeric_ratio", F.col("numeric_cnt") / F.col("rows"))
    )
    out_dq = out_base + "dq/action_geo_numeric_daily/"
    numeric_daily.write.mode("overwrite").parquet(out_dq)
    return out_daily, out_dq


paths = generate_gold_and_dq(dfx, OUT_BASE)
print("✅ re-run paths:", paths)

✅ re-run paths: ('s3a://warehouse/gold_table_test/gdelt_daily_summary/', 's3a://warehouse/gold_table_test/dq/action_geo_numeric_daily/')
