In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# ติดตั้ง Java 8 และ Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
!tar xf spark-3.5.5-bin-hadoop3.tgz
!mv spark-3.5.5-bin-hadoop3 spark

# ติดตั้ง findspark
!pip install -q findspark

# ตั้งค่าตัวแปรสภาพแวดล้อม
import os
import sys

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/kaggle/working/spark"
os.environ["PYTHONPATH"] = "/kaggle/working/spark/python:/kaggle/working/spark/python/lib"

sys.path.append("/kaggle/working/spark/python")
sys.path.append("/kaggle/working/spark/python/lib")

# เริ่มใช้งาน Spark
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F


# สร้าง Spark session
spark = SparkSession.builder.master("local[*]").appName("MySparkApp").getOrCreate()
spark



In [None]:
cluster_info_path = '/kaggle/input/all-traffy-cluster/cluster_info_df.csv'
df_path = '/kaggle/input/all-traffy-cluster/df_with_clustering.csv'
df = spark.read.csv(df_path, header=True, inferSchema=True)
cluster_info = spark.read.csv(cluster_info_path, header=True, inferSchema=True)

In [None]:
# ดูแถวที่มี cluster หรือ text_cluster เป็น null ก่อน concat
df.filter(
    (F.col("cluster").isNull() & (F.col("cluster") != -1)) | (F.col("text_cluster").isNull()& (F.col("cluster") != -1))
).select("cluster", "text_cluster").show()

# ดูแถวที่ concat แล้วมีรูปแบบไม่ถูกต้อง (ไม่มี '.')
df.withColumn(
    "cluster_id", F.concat_ws('.', F.col("cluster"), F.col("text_cluster").cast("int"))
).filter(~F.col("cluster_id").contains('_')).select("cluster", "text_cluster", "cluster_id").show()


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Create a unique row ID
df = df.withColumn("row_id", F.monotonically_increasing_id())

# Define a window for ordering
window_spec = Window.orderBy("row_id")

# Assign a unique noise ID for rows with cluster_id == '-1' or text_cluster is null
df = df.withColumn(
    "cluster",
    F.when(
        (F.col("cluster") == "-1") | (F.col("text_cluster").isNull()),
        F.concat(F.lit("noise_"), F.row_number().over(window_spec))
    ).otherwise(F.col("cluster"))
)

# Drop the helper column
df = df.drop("row_id")


In [None]:
from pyspark.sql import functions as F

# สมมติว่า df คือตารางของคุณ
df = df.withColumn("cluster_id", F.concat_ws('_', F.col('cluster'), F.col('text_cluster').cast("int")))
df = df.drop('cluster', 'text_cluster')

# ตรวจสอบผลลัพธ์
# df.show()

In [None]:
cluster_info = cluster_info.withColumn("cluster_id", F.concat_ws('_', F.col('cluster_id'), F.col('text_cluster').cast("int")))
cluster_info = cluster_info.drop('text_cluster')
# cluster_info.show()

In [None]:
mean_coords = df.groupBy("cluster_id").agg(
    F.mean("latitude").alias("lat"),
    F.mean("longitude").alias("lon")
)

In [None]:
cluster_info = cluster_info.join(mean_coords, on='cluster_id', how='right')

In [None]:
from pyspark.sql import functions as F

# สร้าง mapping ของ comment แรกต่อ cluster_id ที่ไม่มีคำอธิบาย
noise_desc = df.filter(F.col("cluster_id").startswith("noise_")) \
    .groupBy("cluster_id") \
    .agg(F.first("comment", ignorenulls=True).alias("noise_comment"))

# สมมติ df_cluster คือ DataFrame ที่มี column cluster_id, cluster_desc, lat, lon
df_cluster_filled = cluster_info.join(
    noise_desc,
    on="cluster_id",
    how="left"
).withColumn(
    "cluster_desc",
    F.when(F.col("cluster_desc").isNull(), F.col("noise_comment"))
     .otherwise(F.col("cluster_desc"))
).drop("noise_comment")

In [None]:
from pyspark.sql import functions as F

# หาจำนวนของแต่ละ organization ในแต่ละ cluster
df_organization_count = df.groupBy("cluster_id", "organization").count()

# เลือก organization ที่มีจำนวนมากที่สุดในแต่ละ cluster
df_max_organization = df_organization_count.withColumn(
    "rank", F.row_number().over(Window.partitionBy("cluster_id").orderBy(F.col("count").desc()))
).filter(F.col("rank") == 1).drop("rank", "count")

# Join ผลลัพธ์กลับไปยัง df_cluster_filled
df_cluster_filled = df_cluster_filled.join(
    df_max_organization.select("cluster_id", "organization"),
    on="cluster_id",
    how="left"
)

# แสดงผลลัพธ์


In [None]:
df.coalesce(1).write.option("header", "true").csv("/kaggle/working/df_with_cluster_id")
df_cluster_filled.coalesce(1).write.option("header", "true").csv("/kaggle/working/cluster_information")