## テレコムサンプルデータジェネレーター
Databricks Labs Data Generatorを使用してテレコム関連のデータを生成

### パラメータ設定

In [0]:
%run ./00_環境設定

### ライブラリインポート

In [0]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
import string

### 指名や地域等の値のパターンを定義

In [0]:
# ランダムデータ生成用のマスターリスト
LAST_NAMES = ['田中', '鈴木', '佐藤', '高橋', '渡辺', '伊藤', '山本', '中村', '小林', '加藤',
              '吉田', '山田', '佐々木', '山口', '松本', '井上', '木村', '林', '斎藤', '清水']
FIRST_NAMES = ['太郎', '花子', '一郎', '美咲', '健太', '由美', '直樹', '愛子', '大輔', '美穂',
               '翔太', '結衣', '和也', 'さくら', '健一', '真由美', '隆', '優子', '修', '恵子']
PREFECTURES = ['東京都', '大阪府', '神奈川県', '愛知県', '埼玉県', '千葉県', '兵庫県', '北海道',
               '福岡県', '静岡県', '茨城県', '広島県', '京都府', '宮城県', '新潟県']
CITIES = ['渋谷区', '新宿区', '港区', '中央区', '品川区', '北区', '中区', '西区', '南区', '東区',
          '札幌市', '横浜市', '名古屋市', '福岡市', '仙台市', '広島市', '京都市']
RADIO_TYPES = ['LTE', '5G', '5G']  # 5Gの比率を高める
BANDS = ['800MHz', '1.7GHz', '2.1GHz', '3.5GHz', '3.7GHz', '4.5GHz']
DST_IPS = ['172.217.26.238', '210.130.1.42', '13.114.40.30', '52.192.72.89',
           '157.240.1.35', '104.244.42.129', '8.8.8.8', '1.1.1.1']
PROTOCOLS = ['TCP', 'UDP']
DST_PORTS = [80, 443, 53, 8080, 22, 21, 25, 110]

# 都道府県の東西日本区分
EAST_JAPAN_PREFECTURES = [
    '北海道', '青森県', '岩手県', '宮城県', '秋田県', '山形県', '福島県',
    '茨城県', '栃木県', '群馬県', '埼玉県', '千葉県', '東京都', '神奈川県',
    '新潟県', '山梨県', '長野県', '静岡県'
]
WEST_JAPAN_PREFECTURES = [
    '富山県', '石川県', '福井県', '岐阜県', '愛知県', '三重県',
    '滋賀県', '京都府', '大阪府', '兵庫県', '奈良県', '和歌山県',
    '鳥取県', '島根県', '岡山県', '広島県', '山口県',
    '徳島県', '香川県', '愛媛県', '高知県',
    '福岡県', '佐賀県', '長崎県', '熊本県', '大分県', '宮崎県', '鹿児島県', '沖縄県'
]

### ヘルパー関数

In [0]:
def generate_imsi():
    """ランダムなIMSIを生成（440 + 2桁 + 9桁の数字）"""
    return f"4402{random.randint(0, 99):02d}{random.randint(100000000, 999999999)}"

def generate_msisdn():
    """ランダムな携帯電話番号を生成"""
    prefix = random.choice(['090', '080', '070'])
    return f"{prefix}{random.randint(10000000, 99999999)}"

def generate_cell_id():
    """ランダムな基地局IDを生成"""
    numbers = ''.join([str(random.randint(0, 9)) for _ in range(5)])
    letters = ''.join([random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ') for _ in range(2)])
    return numbers + letters

def generate_area_code():
    """ランダムなエリアコードを生成"""
    return f"{random.randint(10000, 99999)}"
  
def get_area_region(location):
    """locationから都道府県を抽出し、東日本/西日本を判定"""
    for pref in EAST_JAPAN_PREFECTURES:
        if location.startswith(pref):
            return 'east'
    for pref in WEST_JAPAN_PREFECTURES:
        if location.startswith(pref):
            return 'west'
    return 'unknown'

def generate_unique_code(length=5):
    chars = string.ascii_lowercase + string.digits
    return ''.join(random.sample(chars, length))

### 追加するファイル名に付与するサフィックスの生成

In [0]:
random_suffix = generate_unique_code()

### 加入者マスタのCDF作成 (リカバリ用)

In [0]:
import glob
import os

# CSVファイルの一覧を取得
subscriber_files = glob.glob(os.path.join(subscriber_master_cdf_dir, '*.csv'))

# 全てのCSVファイルを読み込んで結合
subscriber_master = pd.concat([pd.read_csv(f) for f in subscriber_files], ignore_index=True)

In [0]:
# 加入者マスターのChange Data Feed
num_insert_sub = int(NUM_SUBSCRIBERS * 0.2)  # 10%の新規レコード
num_update_sub = int(NUM_SUBSCRIBERS * 0.1)  # 5%の更新レコード
num_delete_sub = int(NUM_SUBSCRIBERS * 0.1)  # 5%の削除レコード

subscriber_cdf_records = []

# INSERT: 新規レコード追加
print(f"  加入者マスター - INSERT: {num_insert_sub}件")
for _ in range(num_insert_sub):
    act_date = datetime(2024, 6, 1) + timedelta(days=random.randint(0, 180))
    subscriber_cdf_records.append({
        'imsi': generate_imsi(),
        'msisdn': generate_msisdn(),
        'subscriber_name': f"{random.choice(LAST_NAMES)} {random.choice(FIRST_NAMES)}",
        'contract_type': random.choice(['個人', '法人']),
        'subscriber_status': random.choice(['有効', '有効', '有効', '有効', '無効']),
        'activation_date': act_date,
        'expiry_date': datetime(2027, 1, 1) + timedelta(days=random.randint(0, 1095)),
        'subscriber_last_updated': act_date + timedelta(days=random.randint(1, 30),
                                             hours=random.randint(0, 23),
                                             minutes=random.randint(0, 59),
                                             seconds=random.randint(0, 59)),
        'operation': 'INSERT'
    })

# UPDATE: 既存レコードの更新
print(f"  加入者マスター - UPDATE: {num_update_sub}件")
update_indices = random.sample(range(len(subscriber_master)), num_update_sub)
for idx in update_indices:
    record = subscriber_master.iloc[idx].copy()
    # 一部のフィールドを変更
    record['msisdn'] = generate_msisdn()
    record['subscriber_status'] = random.choice(['有効', '無効'])
    record['subscriber_last_updated'] = datetime.now() - timedelta(days=random.randint(0, 30),
                                                        hours=random.randint(0, 23),
                                                        minutes=random.randint(0, 59),
                                                        seconds=random.randint(0, 59))
    record['operation'] = 'UPDATE'
    subscriber_cdf_records.append(record.to_dict())

# DELETE: 既存レコードの削除
print(f"  加入者マスター - DELETE: {num_delete_sub}件")
delete_indices = random.sample([i for i in range(len(subscriber_master)) if i not in update_indices], 
                               num_delete_sub)
for idx in delete_indices:
    record = subscriber_master.iloc[idx].copy()
    record['operation'] = 'DELETE'
    subscriber_cdf_records.append(record.to_dict())

subscriber_cdf = pd.DataFrame(subscriber_cdf_records)

subscriber_cdf["activation_date"] = pd.to_datetime(subscriber_cdf["activation_date"])
subscriber_cdf["expiry_date"] = pd.to_datetime(subscriber_cdf["expiry_date"])
subscriber_cdf["subscriber_last_updated"] = pd.to_datetime(subscriber_cdf["subscriber_last_updated"])

In [0]:
subscriber_cdf.to_csv(f'{subscriber_master_backfill_dir}subscriber_master_backfill_{random_suffix}.csv', index=False, encoding='utf-8-sig')

### 基地局マスタのCDF作成

In [0]:
cell_master_sdf = spark.read.table(cell_master_source_table)
cell_master = cell_master_sdf.toPandas()

In [0]:
# 基地局マスターのChange Data Feed
num_insert_cell = int(NUM_CELLS * 0.1)  # 10%の新規レコード
num_update_cell = int(NUM_CELLS * 0.05)  # 5%の更新レコード
num_delete_cell = int(NUM_CELLS * 0.05)  # 5%の削除レコード

cell_cdf_records = []

# INSERT: 新規レコード追加
print(f"  基地局マスター - INSERT: {num_insert_cell}件")
for _ in range(num_insert_cell):
    cell_cdf_records.append({
        'cell_id': generate_cell_id(),
        'location': f"{random.choice(PREFECTURES)}{random.choice(CITIES)}",
        'radio_type': random.choice(RADIO_TYPES),
        'band': random.choice(BANDS),
        'area_code': generate_area_code(),
        'cell_status': random.choice(['running', 'running', 'running', 'maintenance']),
        'cell_last_updated': datetime(2025, 1, 1) + timedelta(days=random.randint(0, 300),
                                                          hours=random.randint(0, 23),
                                                          minutes=random.randint(0, 59),
                                                          seconds=random.randint(0, 59)),
        'operation': 'INSERT'
    })

# UPDATE: 既存レコードの更新
print(f"  基地局マスター - UPDATE: {num_update_cell}件")
update_indices_cell = random.sample(range(len(cell_master)), num_update_cell)
for idx in update_indices_cell:
    record = cell_master.iloc[idx].copy()
    # 一部のフィールドを変更
    record['status'] = random.choice(['running', 'maintenance'])
    record['last_updated'] = datetime.now() - timedelta(days=random.randint(0, 30),
                                                        hours=random.randint(0, 23),
                                                        minutes=random.randint(0, 59),
                                                        seconds=random.randint(0, 59))
    record['operation'] = 'UPDATE'
    cell_cdf_records.append(record.to_dict())

# # DELETE: 既存レコードの削除
# print(f"  基地局マスター - DELETE: {num_delete_cell}件")
# delete_indices_cell = random.sample([i for i in range(len(cell_master)) if i not in update_indices_cell], 
#                                     num_delete_cell)
# for idx in delete_indices_cell:
#     record = cell_master.iloc[idx].copy()
#     record['operation'] = 'DELETE'
#     cell_cdf_records.append(record.to_dict())

cell_cdf = pd.DataFrame(cell_cdf_records)

In [0]:
cell_cdf_sdf = spark.createDataFrame(cell_cdf)
cell_cdf_sdf.createOrReplaceTempView("cell_cdf_temp")

spark.sql(f"""
MERGE INTO {cell_master_source_table} AS t
USING cell_cdf_temp as s
ON t.cell_id = s.cell_id
WHEN MATCHED AND s.operation = 'UPDATE' THEN
  UPDATE SET *
WHEN MATCHED AND s.operation = 'DELETE' THEN
  DELETE
WHEN NOT MATCHED AND s.operation = 'INSERT' THEN
  INSERT *
""")

### 通信ログの追加データ作成

In [0]:
# 各cell_idのエリアを判定してマッピングを作成
cell_master['region'] = cell_master['location'].apply(get_area_region)
cell_id_to_region = dict(zip(cell_master['cell_id'], cell_master['region']))

In [0]:
print("\n" + "=" * 60)
print("新規レコード用の通信ログ生成中...")
print("=" * 60)

# INSERTされた新規imsiとcell_idを抽出
new_imsi_list = subscriber_cdf[subscriber_cdf['operation'] == 'INSERT']['imsi'].tolist()
sampled_existing_imsi_list = [str(random.choice(list(subscriber_master.imsi))) for _ in range(int(num_insert_sub * 2))]
candidate_imsi_list = new_imsi_list + sampled_existing_imsi_list

new_cell_id_list = cell_cdf[cell_cdf['operation'] == 'INSERT']['cell_id'].tolist()

# 新規Cell_IDのエリアをマッピングに追加
for _, row in cell_cdf[cell_cdf['operation'] == 'INSERT'].iterrows():
    cell_id_to_region[row['cell_id']] = get_area_region(row['location'])


print(f"  新規IMSI数: {len(candidate_imsi_list)}")
print(f"  新規Cell_ID数: {len(new_cell_id_list)}")

# 新規通信ログのレコード数（元の10%程度）
num_new_logs = int(NUM_LOGS * 0.2)
print(f"  生成する通信ログ数: {num_new_logs}件")

if len(candidate_imsi_list) > 0 and len(new_cell_id_list) > 0:
    # 新規通信ログを生成
    # タイムスタンプは最新の日時を使用
    new_log_base_time = datetime.now() - timedelta(days=random.randint(0, 7))
    
    new_traffic_data = {
        'timestamp': [new_log_base_time + timedelta(seconds=random.randint(0, 604800)) 
                     for _ in range(num_new_logs)],
        'imsi': [random.choice(candidate_imsi_list) for _ in range(num_new_logs)],
        'src_ip': [f"10.{random.randint(10, 250)}.{random.randint(1, 250)}.{random.randint(1, 250)}" 
                  for _ in range(num_new_logs)],
        'dst_ip': [random.choice(DST_IPS) for _ in range(num_new_logs)],
        'protocol': [random.choice(PROTOCOLS) for _ in range(num_new_logs)],
        'src_port': [random.randint(30000, 65000) for _ in range(num_new_logs)],
        'dst_port': [random.choice(DST_PORTS) for _ in range(num_new_logs)],
        'cell_id': [random.choice(new_cell_id_list) for _ in range(num_new_logs)],
        'session_duration': [random.randint(1, 5000) for _ in range(num_new_logs)],
        'bytes_sent': [random.randint(64, 10000000) for _ in range(num_new_logs)],
        'bytes_received': [random.randint(128, 50000000) for _ in range(num_new_logs)],
        'throughput_mbps': [round(random.uniform(0.5, 200.0), 1) for _ in range(num_new_logs)]
    }
    
    network_traffic_log_new = pd.DataFrame(new_traffic_data)
    network_traffic_log_new = network_traffic_log_new.sort_values('timestamp').reset_index(drop=True)
    
    # データ型の設定
    network_traffic_log_new['src_port'] = network_traffic_log_new['src_port'].astype(int)
    network_traffic_log_new['dst_port'] = network_traffic_log_new['dst_port'].astype(int)
    network_traffic_log_new['session_duration'] = network_traffic_log_new['session_duration'].astype(int)
    network_traffic_log_new['bytes_sent'] = network_traffic_log_new['bytes_sent'].astype(int)
    network_traffic_log_new['bytes_received'] = network_traffic_log_new['bytes_received'].astype(int)
    
    # 新規通信ログをエリア別に分割して保存
    network_traffic_log_new['region'] = network_traffic_log_new['cell_id'].map(cell_id_to_region)
    log_new_east = network_traffic_log_new[network_traffic_log_new['region'] == 'east'].drop('region', axis=1)
    log_new_west = network_traffic_log_new[network_traffic_log_new['region'] == 'west'].drop('region', axis=1)
    
    log_new_east.to_csv(f'{network_traffic_log_east_dir}network_traffic_log_east_{random_suffix}.csv', index=False, encoding='utf-8-sig')
    log_new_west.to_csv(f'{network_traffic_log_west_dir}network_traffic_log_west_{random_suffix}.csv', index=False, encoding='utf-8-sig')
    
    print(f"\n  ✓ 新規通信ログを生成しました")
    print(f"    - 使用された新規IMSI数: {len(set(network_traffic_log_new['imsi']))}")
    print(f"    - 使用された新規Cell_ID数: {len(set(network_traffic_log_new['cell_id']))}")
    print(f"    - 東日本エリア: {len(log_new_east)}件")
    print(f"    - 西日本エリア: {len(log_new_west)}件")
else:
    print("\n  ⚠ 新規IMSIまたはCell_IDがないため、新規通信ログは生成されませんでした")
