In [1]:
from pyspark.sql import SparkSession

def buildSpark():
    spark = SparkSession.builder \
        .master("local") \
        .appName("SparkConnectionTest") \
        .config("spark.jars", "postgresql-42.7.4.jar") \
        .getOrCreate()
    
    #Проверка соединения со Spark
    print("=== Spark Session Info ===")
    print(spark.sparkContext.getConf().getAll())
    return spark

jdbc_url = "jdbc:postgresql://postgres_db:5432/mydatabase"
db_properties = {
    "user": "myuser",
    "password": "mypassword",
    "driver": "org.postgresql.Driver"
}

def runSQL(spark, command):
    try:
        df = spark.read.jdbc(
            url=jdbc_url,
            table=command,  # Тестовый запрос
            properties=db_properties
        )
        print("PostgreSQL Connection Successful!")
        return df
    except Exception as e:
        print("PostgreSQL Connection Failed!")
        print(e)

# Настройка соединения со Spark Master
spark = buildSpark()
# Проверка соединения с PostgreSQL
runSQL(spark, "(SELECT 1) as test")

spark.stop()

=== Spark Session Info ===
[('spark.master', 'local'), ('spark.executor.id', 'driver'), ('spark.app.submitTime', '1734975901024'), ('spark.driver.port', '44879'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=fal

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp

# Создание SparkSession
spark = SparkSession.builder \
    .appName("DWH Loader") \
    .config("spark.jars", "postgresql-42.7.4.jar") \
    .getOrCreate()

# Настройки подключения к PostgreSQL
jdbc_url = "jdbc:postgresql://postgres_db:5432/mydatabase"
db_properties = {
    "user": "myuser",
    "password": "mypassword",
    "driver": "org.postgresql.Driver"
}

In [None]:
# Чтение исходных данных из PostgreSQL
def read_table(table_name):
    return spark.read.jdbc(
        url=jdbc_url,
        table=table_name,
        properties=db_properties
    )

# Загрузка таблиц источников
source1 = read_table("source1.craft_market_wide")
source2_products = read_table("source2.craft_market_masters_products")
source2_orders = read_table("source2.craft_market_orders_customers")
source3_craftsmans = read_table("source3.craft_market_craftsmans")
source3_customers = read_table("source3.craft_market_customers")
source3_orders = read_table("source3.craft_market_orders")

# 1. Заполнение таблицы d_customers
d_customers = source3_customers.withColumn("load_dttm", current_timestamp())
d_customers.select(
    col("customer_id"),
    col("customer_name"),
    col("customer_address"),
    col("customer_birthday"),
    col("customer_email"),
    col("load_dttm")
).write.jdbc(
    url=jdbc_url,
    table="dwh.d_customers",
    mode="append",
    properties=db_properties
)

# 2. Заполнение таблицы d_products
d_products = source2_products.withColumn("load_dttm", current_timestamp())
d_products.select(
    col("product_id"),
    col("product_name"),
    col("product_description"),
    col("product_type"),
    col("product_price"),
    col("load_dttm")
).write.jdbc(
    url=jdbc_url,
    table="dwh.d_products",
    mode="append",
    properties=db_properties
)

# 3. Заполнение таблицы d_craftsmans
d_craftsmans = source3_craftsmans.withColumn("load_dttm", current_timestamp())
d_craftsmans.select(
    col("craftsman_id"),
    col("craftsman_name"),
    col("craftsman_address"),
    col("craftsman_birthday"),
    col("craftsman_email"),
    col("load_dttm")
).write.jdbc(
    url=jdbc_url,
    table="dwh.d_craftsmans",
    mode="append",
    properties=db_properties
)

# 4. Заполнение таблицы f_orders
f_orders = source3_orders.withColumn("load_dttm", current_timestamp())
f_orders.select(
    col("order_id"),
    col("product_id"),
    col("craftsman_id"),
    col("customer_id"),
    col("order_created_date"),
    col("order_completion_date"),
    col("order_status"),
    col("load_dttm")
).write.jdbc(
    url=jdbc_url,
    table="dwh.f_orders",
    mode="append",
    properties=db_properties
)

print("DWH tables have been successfully populated!")

# Завершение сессии
spark.stop()