In [None]:
import gdown
import pandas as pd
import numpy as np
import tempfile
import json
import pyspark
import requests
from pyspark.sql import SparkSession
from shapely.geometry import Point
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, StructType, StructField, IntegerType, FloatType, StringType, BinaryType, ArrayType
from pyproj import Transformer
from pyspark.sql.window import Window


In [None]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
        .master('local[*]')
        .appName('Spark Tutorial')
        .config('spark.ui.port', '4040')
        .getOrCreate()
        )

spark

# Download Data

In [None]:
import os
import gdown

folder = "data"
os.makedirs(folder, exist_ok=True)

output = os.path.join(folder, "bangkok_traffy.csv")
url = "https://drive.google.com/uc?id=19QkF8i1my99gjbyHe7de_qZNwgrca6R5"

if not os.path.exists(output):
    print("File not found. Downloading...")
    gdown.download(url, output, quiet=False)
    print("Download completed!")
else:
    print("File already exists. Skip downloading.")

In [None]:
manual_schema = StructType([
    StructField("ticket_id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("organization", StringType(), True),
    StructField("comment", StringType(), True),
    StructField("photo", StringType(), True),
    StructField("photo_after", StringType(), True),
    StructField("coords", StringType(), True),
    StructField("address", StringType(), True),
    StructField("subdistrict", StringType(), True),
    StructField("district", StringType(), True),
    StructField("province", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("state", StringType(), True),
    StructField("star", FloatType(), True),
    StructField("count_reopen", IntegerType(), True),
    StructField("last_activity", StringType(), True),
])

In [None]:
df = spark.read.csv(output, header=True, sep=',', quote='"', escape='"', multiLine=True, schema=manual_schema)
print(df.count())

In [None]:
df

In [None]:
(df.count(), len(df.columns))

In [None]:
# Drop unused columns
cols_to_drop = [
    'ticket_id', 'photo', 'photo_after', 'address',
    'province', 'star', 'last_activity', 'organization',
    'state', 'type', 'district', 'subdistrict'
]

df = df.drop(*cols_to_drop)

In [None]:
# Drop missing comment
df = df.filter(col('comment').isNotNull())

df = df.withColumn('comment', trim(col('comment').cast("string")))

# Drop empty comment
df = df.filter(length(col('comment')) > 0)

In [None]:
# Standardize timestamp
df = df.withColumn('timestamp', to_timestamp(col('timestamp')))

# Drop invalid timestamp
df = df.filter(col('timestamp').isNotNull())

# Year 2022
df = df.filter(year(col('timestamp')) == 2022)

In [None]:
df.show(5)

In [None]:
# df[['lng', 'lat']] = df['coords'].str.split(',', expand=True).astype(float)
df = df.withColumn('lng_str', split(col('coords'), ',').getItem(0)) \
       .withColumn('lat_str', split(col('coords'), ',').getItem(1))
# geometry = [Point(xy) for xy in zip(df['lng'], df['lat'])]
# df = gpd.GeoDataFrame(df, geometry=geometry, crs="EPSG:4326")

df = df.withColumn('lng', col('lng_str').cast(DoubleType())) \
       .withColumn('lat', col('lat_str').cast(DoubleType()))

# Drop coords
# df = df.drop(columns=["coords"], errors='ignore')
df = df.drop('coords', 'lng_str', 'lat_str')

# display(df.head())
df.show(5)

In [None]:
(df.count(), len(df.columns))

In [None]:
# 1. นิยาม Pandas UDF เพื่อสร้าง WKT String
# @pandas_udf(StringType()) : กำหนดว่าฟังก์ชันนี้จะคืนค่าเป็น PySpark StringType
@pandas_udf(StringType())
def create_point_wkt(lng: pd.Series, lat: pd.Series) -> pd.Series:
    """
    UDF นี้สร้าง Shapely Point และแปลงเป็น WKT String
    (รูปแบบ POINT (X Y))
    """
    # สร้างวัตถุ Point และแปลงเป็นรูปแบบ WKT (Well-Known Text)
    # .wkt จะคืนค่าเป็น String ตามที่คุณต้องการ
    return pd.Series([Point(x, y).wkt for x, y in zip(lng, lat)])

# 2. ใช้งาน UDF กับ DataFrame
# สร้างคอลัมน์ใหม่ชื่อ 'geometry_wkt'
df = df.withColumn(
    "geometry_wkt",
    create_point_wkt(col("lng"), col("lat"))
)

# 3. ตรวจสอบผลลัพธ์
df.select("lng", "lat", "geometry_wkt").show(5, truncate=False)
df.printSchema()

In [None]:
cleaned_df = df

# จำนวนแถว
print("Number of rows:", cleaned_df.count())

# จำนวนคอลัมน์
print("Number of columns:", len(cleaned_df.columns))

# schema ของแต่ละคอลัมน์
cleaned_df.printSchema()

# สรุป non-null ของแต่ละคอลัมน์
from pyspark.sql.functions import col, count

cleaned_df.select([count(col(c)).alias(c) for c in cleaned_df.columns]).show()

In [None]:
# Check is null
# cleaned_df.isna().sum()

columns_to_check = cleaned_df.columns

# สร้าง Expression สำหรับการนับค่า Null ในแต่ละคอลัมน์
# when(col(c).isNull(), 1).otherwise(0) หมายถึง:
# ถ้าคอลัมน์นั้นเป็น Null ให้มีค่าเป็น 1, ไม่อย่างนั้นให้มีค่าเป็น 0
# จากนั้นใช้ sum() เพื่อรวมค่า 1 เหล่านั้น
null_counts_expr = [
    sum(when(isnull(col(c)), 1).otherwise(0)).alias(c)
    for c in columns_to_check
]

# คำนวณและแสดงผลลัพธ์
null_counts_df = cleaned_df.select(null_counts_expr)

# แสดงผลลัพธ์ (โดยปกติจะมีเพียง 1 แถว)
null_counts_df.show(truncate=False)

## count_reopen

In [None]:
# percentiles = [0.90, 0.95, 0.97, 0.98, 0.99, 0.995, 0.999]
# cleaned_df['count_reopen'].quantile(percentiles)

# List ของ Percentiles ที่คุณต้องการคำนวณ
percentiles = [0.90, 0.95, 0.97, 0.98, 0.99, 0.995, 0.999]

# สร้าง Expression สำหรับการคำนวณ Percentiles
# percentile_approx(column, array_of_percentiles, accuracy)
percentile_expr = percentile_approx(
    col('count_reopen'),
    array([lit(p) for p in percentiles]), # สร้าง Array ของค่า Percentile
    100000 # ค่าความแม่นยำ (ยิ่งสูงยิ่งแม่นยำ, 100000 เป็นค่าที่แนะนำ)
).alias('percentile_values')

# ใช้นำมาคำนวณและแสดงผลลัพธ์
percentile_df = cleaned_df.select(percentile_expr)

# แสดงผลลัพธ์
percentile_df.show(truncate=False)

In [None]:
import matplotlib.pyplot as plt

In [None]:
# ดึงคอลัมน์จาก Spark DF มาเป็น pandas Series
count_reopen_pd = cleaned_df.select("count_reopen").toPandas()

plt.figure(figsize=(8,4))
plt.hist(count_reopen_pd['count_reopen'], bins=50)
plt.title("Distribution of count_reopen (Before)")
plt.xlabel("count_reopen")
plt.ylabel("Frequency")
plt.show()

In [None]:
count_reopen_pd = cleaned_df.select("count_reopen").toPandas()

plt.figure(figsize=(6,2))
plt.boxplot(count_reopen_pd['count_reopen'], vert=False)
plt.title("Boxplot of count_reopen (Before)")
plt.show()

In [None]:
cleaned_df = cleaned_df.withColumn(
    "count_reopen_log",
    log1p(col("count_reopen"))
)

In [None]:
count_reopen_log_pd = cleaned_df.select("count_reopen_log").toPandas()

plt.figure(figsize=(8,4))
plt.hist(count_reopen_log_pd['count_reopen_log'], bins=50)
plt.title("Distribution of count_reopen_log (After)")
plt.xlabel("count_reopen_log")
plt.ylabel("Frequency")
plt.show()

In [None]:
count_reopen_log_pd = cleaned_df.select("count_reopen_log").toPandas()

plt.figure(figsize=(6,2))
plt.boxplot(count_reopen_log_pd['count_reopen_log'], vert=False)
plt.title("Boxplot of count_reopen_log (After)")
plt.show()

In [None]:
percentiles = [0.90, 0.95, 0.97, 0.98, 0.99, 0.995, 0.999]

result = cleaned_df.approxQuantile(
    "count_reopen",
    percentiles,
    0.0   # relative error = 0 (exact)
)

print(result)


In [None]:
cleaned_df.select(skewness("count_reopen")).show()
cleaned_df.select(skewness("count_reopen_log")).show()

In [None]:
num_rows = cleaned_df.count()
num_cols = len(cleaned_df.columns)
print("Shape:", (num_rows, num_cols))

In [None]:
cleaned_df.show(20)

In [None]:
for col_name in cleaned_df.columns:
    print(col_name)


# Web scraping
impact_to_public

In [None]:
# Department Store

url_department = "https://data.bangkok.go.th/dataset/d8f814ac-cbaf-43c3-9576-f533b2554776/resource/438101c3-5535-4fe2-bc5e-83aa73703d4a/download/department_store.csv"

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
}

response = requests.get(url_department, headers=headers)
response.raise_for_status()

with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp:
    tmp.write(response.content)
    temp_path = tmp.name

df_department = spark.read.csv(temp_path, header=True, inferSchema=True)

df_department.printSchema()
df_department.show(5)

In [None]:
# Community
url_community = "https://cpudgiapp.bangkok.go.th/arcgis/rest/services/Community/Service_Community_Public/MapServer/0/query"
params = {
    "where": "1=1",
    "outFields": "*",
    "f": "json",
    "returnGeometry": "true"
}
headers = {"User-Agent": "Mozilla/5.0"}

response = requests.get(url_community, headers=headers, params=params)
response.raise_for_status()
data = response.json()

features = [f["attributes"] for f in data["features"]]

with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp:
    for feature in features:
        tmp.write(json.dumps(feature) + "\n")
    temp_path = tmp.name

df_community = spark.read.json(temp_path)
df_community.printSchema()
df_community.show(5)


In [None]:
#school
url_school = "https://bmagis.bangkok.go.th/arcgis/rest/services/riskbkk/RISK_ADMIN_bma_school/FeatureServer/0/query"

params = {
    "where": "1=1",
    "outFields": "*",
    "f": "json",
    "returnGeometry": "true"
}
headers = {"User-Agent": "Mozilla/5.0"}

response = requests.get(url_school, headers=headers, params=params)
response.raise_for_status()
data = response.json()

features = [f["attributes"] for f in data["features"]]

with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp:
    for feature in features:
        tmp.write(json.dumps(feature) + "\n")
    temp_path = tmp.name

df_school= spark.read.json(temp_path)
df_school.printSchema()
df_school.show(5)


In [None]:
# Hospital
url_hospital = "https://bmagis.bangkok.go.th/arcgis/rest/services/riskbkk/RISK_ADMIN_Hospital/FeatureServer/0/query"

# ใช้พารามิเตอร์เหมือน Community/School
params = {
    "where": "1=1",
    "outFields": "*",
    "f": "json",
    "returnGeometry": "true"
}
headers = {"User-Agent": "Mozilla/5.0"}

# ดึงข้อมูล JSON
response = requests.get(url_hospital, headers=headers, params=params)
response.raise_for_status()
data_hospital = response.json()

# Extract attributes
features = [f["attributes"] for f in data_hospital["features"]]

# บันทึกเป็นไฟล์ชั่วคราว JSON Lines (เหมือน CSV ใน pattern เดิม)
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp:
    for feature in features:
        tmp.write(json.dumps(feature) + "\n")
    temp_path = tmp.name

# โหลดเข้า Spark
df_hospital = spark.read.json(temp_path)
df_hospital.printSchema()
df_hospital.show(5)

In [None]:
def clean_and_convert_to_gdf(df, col_map, place_type, drop_zero=True):
    # Step 1: select & rename
    df_clean = df.select(
        F.col(col_map["name"]).alias("name"),
        F.col(col_map["lat"]).alias("lat"),
        F.col(col_map["lng"]).alias("lng")
    )

    # Step 2: clean name
    df_clean = df_clean.withColumn("name", F.trim(F.col("name").cast(StringType())))
    df_clean = df_clean.filter((F.col("name").isNotNull()) & (F.col("name") != ""))

    # Step 3: convert numeric
    df_clean = df_clean.withColumn("lat", F.col("lat").cast(DoubleType()))
    df_clean = df_clean.withColumn("lng", F.col("lng").cast(DoubleType()))

    # Step 4: drop missing / zero
    df_clean = df_clean.filter(F.col("lat").isNotNull() & F.col("lng").isNotNull())
    if drop_zero:
        df_clean = df_clean.filter((F.col("lat") != 0) & (F.col("lng") != 0))

    # Step 5: detect coordinate system
    max_lat = df_clean.agg(F.max("lat")).collect()[0][0]

    if max_lat > 1000:
        # เป็น UTM → ต้อง convert เป็น WGS84
        print(f"[{place_type}] Detected UTM coordinates → converting to WGS84...")

        transformer = Transformer.from_crs("EPSG:32647", "EPSG:4326", always_xy=True)

        def utm_to_wgs84(lng, lat):
            x, y = transformer.transform(lng, lat)
            return float(y), float(x)

        schema = StructType([
            StructField("lat", DoubleType(), True),
            StructField("lng", DoubleType(), True)
        ])
        convert_udf = udf(utm_to_wgs84, schema)
        df_clean = df_clean.withColumn("coords", convert_udf(F.col("lng"), F.col("lat")))
        df_clean = df_clean.withColumn("lat", F.col("coords.lat")).withColumn("lng", F.col("coords.lng"))
        df_clean = df_clean.drop("coords")

    # Step 6: create geometry column (WKT)
    df_clean = df_clean.withColumn("geometry", F.concat(F.lit("POINT("), F.col("lng"), F.lit(" "), F.col("lat"), F.lit(")")))

    # Step 7: add type column
    df_clean = df_clean.withColumn("type", F.lit(place_type))

    return df_clean

In [None]:
column_mapping = {
    "department": {"name": "name", "lat": "lat", "lng": "lng"},
    "community":  {"name": "CMT_NAME", "lat": "LAT",  "lng": "LON"},
    "school":     {"name": "NAME",     "lat": "Y",    "lng": "X"},
    "hospital":   {"name": "NAME",     "lat": "Y",    "lng": "X"},
}


In [None]:
gdf_department = clean_and_convert_to_gdf(
    df_department, column_mapping["department"], "department"
)

gdf_community = clean_and_convert_to_gdf(
    df_community,  column_mapping["community"],  "community"
)

gdf_school = clean_and_convert_to_gdf(
    df_school,     column_mapping["school"],     "school"
)

gdf_hospital = clean_and_convert_to_gdf(
    df_hospital,   column_mapping["hospital"],   "hospital"
)
gdf_hospital.show()

In [None]:
# รวมทุกประเภทเป็น DataFrame เดียว
gdf_public_place = gdf_department\
    .unionByName(gdf_community) \
    .unionByName(gdf_school) \
    .unionByName(gdf_hospital)

# นับจำนวนแถวทั้งหมด
print("Total public places:", gdf_public_place.count())
windowSpec = Window.orderBy(monotonically_increasing_id())
gdf_public_place = gdf_public_place.withColumn("index", row_number().over(windowSpec))
cols = gdf_public_place.columns  # ดึงชื่อคอลัมน์ทั้งหมด
cols.remove("index")             # ลบ index ออกจาก list
cols = ["index"] + cols          # นำ index มาไว้ด้านหน้า

# เลือกคอลัมน์ตามลำดับใหม่
gdf_public_place = gdf_public_place.select(*cols)

gdf_public_place.show()
# แสดงตัวอย่าง 5 แถวแรก




In [None]:
gdf_public_place.sample(fraction=0.01, seed=42).show(5, truncate=False)

In [None]:
cleaned_df = cleaned_df.withColumn("index", row_number().over(windowSpec))
cols = cleaned_df.columns  # ดึงชื่อคอลัมน์ทั้งหมด
cols.remove("index")             # ลบ index ออกจาก list
cols = ["index"] + cols          # นำ index มาไว้ด้านหน้า

# เลือกคอลัมน์ตามลำดับใหม่
cleaned_df = cleaned_df.select(*cols)

cleaned_df.show()

In [None]:
copy_cleaned_df = cleaned_df
copy_gdf_public_place = gdf_public_place

In [None]:
copy_gdf_public_place.show()

In [None]:
copy_cleaned_df
# print(copy_gdf_public_place.columns)



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, lit, array
from pyspark.sql.types import ArrayType, DoubleType
from math import radians, sin, cos, sqrt, atan2, log1p

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, lit
from pyspark.sql.types import ArrayType, DoubleType
from math import radians, sin, cos, sqrt, atan2, log1p

def compute_public_impact(
    gdf_cases,
    gdf_places,
    max_distance=1000,
    weights={"school":0.3,"department":0.2,"community":0.2,"hospital":0.3}
):
    spark = SparkSession.builder.getOrCreate()
    place_types = list(weights.keys())

    print("เริ่มฟังก์ชัน compute_public_impact")

    # -----------------------------
    # Haversine function
    # -----------------------------
    def haversine(lat1, lon1, lat2, lon2):
        R = 6371000  # meters
        phi1, phi2 = radians(lat1), radians(lat2)
        dphi = radians(lat2 - lat1)
        dlambda = radians(lon2 - lon1)
        a = sin(dphi/2)**2 + cos(phi1)*cos(phi2)*sin(dlambda/2)**2
        c = 2 * atan2(sqrt(a), sqrt(1-a))
        return R * c

    # -----------------------------
    # Prepare places dict
    # -----------------------------
    places_dict = {}
    for t in place_types:
        places_list = gdf_places.filter(col("type") == t) \
            .select("lat", "lng") \
            .rdd.map(lambda row: (row.lat, row.lon)) \
            .collect()
        places_dict[t] = places_list
    print("เตรียม places dict:", places_dict)

    # -----------------------------
    # Compute distances UDF
    # -----------------------------
    def compute_distances(case_lat, case_lon):
        result = {}
        for t, locations in places_dict.items():
            dists = []
            for plat, plon in locations:
                d = haversine(case_lat, case_lon, plat, plon)
                if d <= max_distance:
                    dists.append(d)
            result[t] = dists
        return [result[t] for t in place_types]

    compute_distances_udf = udf(compute_distances, ArrayType(ArrayType(DoubleType())))
    gdf_cases = gdf_cases.withColumn("distances_array", compute_distances_udf(col("lat"), col("lon")))
    print("คำนวณ distances_array เรียบร้อย")

    # -----------------------------
    # Split distances_array into <type>_distances
    # -----------------------------
    for i, t in enumerate(place_types):
        gdf_cases = gdf_cases.withColumn(f"{t}_distances", col("distances_array")[i])
    gdf_cases = gdf_cases.drop("distances_array")
    print("แยก distances array เป็น <type>_distances")

    # -----------------------------
    # Inverse distance → _point_scores
    # -----------------------------
    def inverse_distance(dists):
        return [max(0, max_distance - d) for d in dists] if dists else []

    inverse_distance_udf = udf(inverse_distance, ArrayType(DoubleType()))
    for t in place_types:
        gdf_cases = gdf_cases.withColumn(f"{t}_point_scores", inverse_distance_udf(col(f"{t}_distances")))
    print("คำนวณ _point_scores เรียบร้อย")

    # -----------------------------
    # Raw score → log1p(sum)
    # -----------------------------
    def raw_score(dists):
        return log1p(sum(dists)) if dists else 0

    raw_score_udf = udf(raw_score, DoubleType())
    for t in place_types:
        gdf_cases = gdf_cases.withColumn(f"{t}_raw_score", raw_score_udf(col(f"{t}_point_scores")))
    print("คำนวณ _raw_score เรียบร้อย")

    # -----------------------------
    # Normalize 0–100 → _score
    # -----------------------------
    for t in place_types:
        max_val = gdf_cases.agg({f"{t}_raw_score": "max"}).collect()[0][0]
        if max_val > 0:
            gdf_cases = gdf_cases.withColumn(f"{t}_score", col(f"{t}_raw_score") / max_val * 100)
        else:
            gdf_cases = gdf_cases.withColumn(f"{t}_score", lit(0))
    print("Normalize _score เป็น 0–100 เรียบร้อย")

    # -----------------------------
    # Weighted sum → public_impact
    # -----------------------------
    gdf_cases = gdf_cases.withColumn("public_impact", lit(0.0))
    for t, w in weights.items():
        gdf_cases = gdf_cases.withColumn("public_impact", col("public_impact") + col(f"{t}_score") * w)
    print("คำนวณ public_impact เรียบร้อย")

    # -----------------------------
    # Show intermediate results (ตัวอย่าง)
    # -----------------------------
    print("แสดงผลตัวอย่าง 5 แถวสุดท้าย:")
    rows = gdf_cases.select("lat","lng","public_impact", *[f"{t}_score" for t in place_types]).collect()
    for row in rows[:5]:  # แสดง 5 แถวแรก
        print(row)

    # -----------------------------
    # Drop temp columns
    # -----------------------------
    temp_cols = [c for c in gdf_cases.columns if any(x in c for x in ["_distances","_point_scores","_raw_score","_score"]) and c != "public_impact"]
    gdf_cases = gdf_cases.drop(*temp_cols)

    print("เสร็จสิ้น compute_public_impact")
    return gdf_cases



In [None]:
gdf_public_impact = compute_public_impact(copy_cleaned_df, copy_gdf_public_place)


In [None]:
# เรียงตาม public_impact จากมากไปน้อย
gdf_public_impact.orderBy(F.col("public_impact").desc()).show(5, truncate=False)


In [None]:
from pyspark.sql import functions as F

# เรียงจากมากไปน้อย
sorted_df = gdf_public_impact.orderBy(F.col("public_impact").desc())

# สุ่ม 10 แถวจาก DataFrame ที่เรียงแล้ว
sampled_df = sorted_df.orderBy(F.rand()).limit(10)

# แสดงผล
sampled_df.show(truncate=False)


In [None]:
import matplotlib.pyplot as plt

# ดึงคอลัมน์ public_impact ออกมาเป็น Pandas Series
public_impact_values = gdf_public_impact.select("public_impact").toPandas()["public_impact"]

plt.figure(figsize=(10,6))
plt.hist(
    public_impact_values,
    bins=30,
    edgecolor="black"
)
plt.title("Distribution of Public Impact Score", fontsize=16)
plt.xlabel("Public Impact Score", fontsize=14)
plt.ylabel("Frequency", fontsize=14)
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()


In [None]:
from pyspark.sql import functions as F

# คำนวณ skewness ของคอลัมน์ public_impact
skew_value = gdf_public_impact.select(F.skewness("public_impact")).collect()[0][0]

print(skew_value)


In [None]:
import matplotlib.pyplot as plt

# แปลง Spark DataFrame เป็น Pandas DataFrame (ถ้า dataset ใหญ่ให้ sample)
pdf = gdf_public_impact.select("lng", "lat", "public_impact").toPandas()

plt.figure(figsize=(10,6))
plt.scatter(
    pdf["lng"],
    pdf["lat"],
    c=pdf["public_impact"],
    cmap="viridis",
    s=10
)
plt.colorbar(label="Public Impact")
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.title("Spatial Distribution of Public Impact")
plt.tight_layout()
plt.show()


In [None]:
# ลบคอลัมน์ geometry
df_export = gdf_public_impact.drop("geometry")

# เขียนเป็น CSV
df_export.write.csv("gdf_public_impact.csv", header=True, mode="overwrite")



In [None]:
gdf_public_impact.to_file("gdf_public_impact.geojson", driver="GeoJSON")

# Mini LLM
mini_llm_risk

# AI/ML