# Import lib

In [1]:
import sys
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sys.path.append(os.path.abspath(os.path.join('../../')))

In [None]:
from src.etl.bronze.extract.data_structure_extract_strategy import DataExtractor
import src.analysis.EDA.data_inspection_strategy as BaseInspector
import src.etl.transform.data_cleansing_strategy as BaseCleansing
import src.etl.transform.data_transform_strategy as BaseTransform
import src.etl.transform.data_combining_strategy as BaseCombining
import src.analysis.visualization.data_univariate_visualization_strategy as BaseViz
import src.etl.transform.data_encoding_strategy as BaseEncoder

In [None]:
from urllib.parse import urlparse
parsed = urlparse("postgresql://postgres:!NTpatn2549@localhost:5433/postgres")

In [None]:
print(parsed.hostname)
print(parsed.port)
print(parsed.path.lstrip("/"))
print(parsed.username)
print(parsed.password)

In [None]:
from src.function.index.sf_generator import SnowflakeGenerator, log_running
import pandas as pd


# ---------------------------------------------------------
# 1️⃣ สร้าง SnowflakeGenerator instance
# ---------------------------------------------------------
dsn = "postgresql://postgres:!NTpatn2549@localhost:5433/postgres"

sf = SnowflakeGenerator(
    dsn=dsn,
    source_name="bronze.newsapi",
    datacenter_id=0,
    version_no=1,
    usercreate="system"
)

# ---------------------------------------------------------
# 2️⃣ ตัวอย่าง DataFrame (ข้อมูลข่าวหรือข้อมูลธุรกรรม)
# ---------------------------------------------------------
data = {
    "title": [
        "Apple announces new MacBook Pro",
        "Tesla stock surges 10%",
        "Google launches new AI model",
        "Amazon opens new data center in Tokyo",
        "Meta unveils AR glasses prototype",
    ],
    "source": ["BBC", "Reuters", "CNN", "Bloomberg", "TechCrunch"],
}
df = pd.DataFrame(data)

print("ก่อนเพิ่ม sf_id:")
print(df, "\n")



In [None]:

# ---------------------------------------------------------
# 3️⃣ Generate Snowflake ID ให้แต่ละแถว
# ---------------------------------------------------------
# ✅ วิธีที่ 1 — ใช้ list comprehension
df["sf_id"] = [sf.generate() for _ in range(len(df))]


In [None]:
df

In [None]:
# ✅ วิธีที่ 2 — ถ้าอยากสร้างครั้งเดียวแบบ bulk (เร็วกว่ามาก)
df["sf_id"] = sf.bulk_generate(len(df))

In [None]:



print("หลังเพิ่ม sf_id:")
print(df, "\n")

# ---------------------------------------------------------
# 4️⃣ Decode เพื่อดูองค์ประกอบของ ID (timestamp, datacenter, etc.)
# ---------------------------------------------------------
decoded_df = sf.bulk_generate(count=3, return_df=True)
print("ตัวอย่าง decode:")
print(decoded_df)

In [2]:
from src.function.index.sf_generator import SnowflakeGenerator, log_running
import pandas as pd


# ---------------------------------------------------------
# 1️⃣ สร้าง SnowflakeGenerator instance
# ---------------------------------------------------------
dsn = "postgresql://postgres:!NTpatn2549@localhost:5433/postgres"

# ✅ ใช้แค่ชื่อ source ก็พอ (datacenter_id & worker_id อ่านจาก DB)
sf = SnowflakeGenerator(dsn, "bronze.newsapi")

df = pd.DataFrame({
    "title": ["Apple launches iPhone", "Tesla opens factory", "Meta rebrands AI"],
    "url": ["https://news1", "https://news2", "https://news3"]
})

# ✅ Gen IDs (เร็วมาก)
df["sf_id"] = sf.bulk_generate_fast(len(df))

# ✅ Log run
log_running(
    dsn=dsn,
    source_name="bronze.newsapi",
    version_no=sf.config.version_no,
    job_name="bronze_ingest_newsapi",
    id_start=min(df["sf_id"]),
    id_end=max(df["sf_id"]),
    usercreate="airflow"
)

df


Unnamed: 0,title,url,sf_id
0,Apple launches iPhone,https://news1,240845924421603328
1,Tesla opens factory,https://news2,240845924421603329
2,Meta rebrands AI,https://news3,240845924421603330


In [3]:
from pyspark.sql import SparkSession
from src.function.index.sf_generator import SnowflakeGenerator

# === สร้าง Spark session ===
spark = (
    SparkSession.builder
    .appName("SFID_Backfill_NewsAPI")
    .config("spark.driver.extraClassPath", "/path/to/postgresql-42.6.0.jar")  # JDBC driver
    .getOrCreate()
)

# === ตั้งค่าการเชื่อมต่อ ===
dsn = "postgresql://postgres:!NTpatn2549@localhost:5433/postgres"

# === สร้าง instance ของ SnowflakeGenerator ===
sf = SnowflakeGenerator(
    dsn=dsn,
    source_name="bronze.newsapi",   # ต้องตรงกับ system.source_registry
)

# === เรียก process_source เพื่อสร้าง sf_id ===
sf.process_source(
    spark,
    table_name="bronze.source_news_articles_newsapi",
    timestamp_col="createdate"      # ใช้ timestamp จากคอลัมน์ createdate
)

spark.stop()


KeyboardInterrupt: 

In [3]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
    .master("local[*]")
    .appName("test")
    .config("spark.jars", "/opt/spark_drivers/postgresql-42.7.3.jar")
    .config("spark.driver.extraClassPath", "/opt/spark_drivers/postgresql-42.7.3.jar")
    .getOrCreate())

print(spark.version)


KeyboardInterrupt: 

In [4]:
import pyspark

In [5]:
from pyspark.sql import SparkSession

In [None]:
spark= SparkSession.builder.appName("test").getOrCreate()