In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, year, month, monotonically_increasing_id

spark = SparkSession.builder \
    .appName("ETL to Star Schema") \
    .config("spark.jars", "postgresql-42.6.0.jar") \
    .getOrCreate()

pg_url = "jdbc:postgresql://bigdata_postgres_db:5432/lab2"
pg_props = {
    "user": "debug",
    "password": "pswd",
    "driver": "org.postgresql.Driver"
}


In [18]:
df = spark.read.jdbc(pg_url, "mock_data", properties=pg_props)
df.printSchema()
df.show(1)

root
 |-- id: integer (nullable = true)
 |-- customer_first_name: string (nullable = true)
 |-- customer_last_name: string (nullable = true)
 |-- customer_age: integer (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_country: string (nullable = true)
 |-- customer_postal_code: string (nullable = true)
 |-- customer_pet_type: string (nullable = true)
 |-- customer_pet_name: string (nullable = true)
 |-- customer_pet_breed: string (nullable = true)
 |-- seller_first_name: string (nullable = true)
 |-- seller_last_name: string (nullable = true)
 |-- seller_email: string (nullable = true)
 |-- seller_country: string (nullable = true)
 |-- seller_postal_code: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_price: decimal(10,2) (nullable = true)
 |-- product_quantity: integer (nullable = true)
 |-- sale_date: timestamp (nullable = true)
 |-- sale_customer_id: integer (nullable = tru

In [9]:
from pyspark.sql.functions import col

customers_df = df.select(
    col("customer_first_name").alias("first_name"),
    col("customer_last_name").alias("last_name"),
    col("customer_age").alias("age"),
    col("customer_email").alias("email"),
    col("customer_country").alias("country"),
    col("customer_postal_code").alias("postal_code"),
    col("customer_pet_type").alias("pet_type"), 
    col("customer_pet_name").alias("pet_name"), 
    col("customer_pet_breed").alias("pet_breed"), 
    "pet_category"
).dropDuplicates()

# загружаем в postgres в таблицу d_customers
customers_df.write.jdbc(url=pg_url, table="d_customers", mode="append", properties=pg_props)


In [10]:
from pyspark.sql.functions import col

products_df = df.select(
    col("product_name").alias("name"),
    col("product_category").alias("category"),
    col("product_price").alias("price"),
    col("product_quantity").alias("quantity"),
    col("product_weight").alias("weight"),
    col("product_color").alias("color"),
    col("product_size").alias("size"),
    col("product_brand").alias("brand"),
    col("product_material").alias("material"),
    col("product_description").alias("description"),
    col("product_rating").alias("rating"),
    col("product_reviews").alias("reviews"),
    col("product_release_date").alias("release_date"),
    col("product_expiry_date").alias("expiry_date")
).dropDuplicates()

# загружает в postgres в таблицу d_products
products_df.write.jdbc(url=pg_url, table="d_products", mode="append", properties=pg_props)

In [11]:
from pyspark.sql.functions import col

sellers_df = df.select(
    col("seller_first_name").alias("first_name"),
    col("seller_last_name").alias("last_name"),
    col("seller_email").alias("email"),
    col("seller_country").alias("country"),
    col("seller_postal_code").alias("postal_code")
).dropDuplicates()

# загружает в postgres в таблицу d_sellers
sellers_df.write.jdbc(url=pg_url, table="d_sellers", mode="append", properties=pg_props)

In [12]:
from pyspark.sql.functions import col

suppliers_df = df.select(
    col("supplier_name").alias("name"),
    col("supplier_contact").alias("contact"),
    col("supplier_email").alias("email"),
    col("supplier_phone").alias("phone"),
    col("supplier_address").alias("address"),
    col("supplier_city").alias("city"),
    col("supplier_country").alias("country")
).dropDuplicates()

# загружает в postgres в таблицу d_suppliers
suppliers_df.write.jdbc(url=pg_url, table="d_suppliers", mode="append", properties=pg_props)

In [13]:
from pyspark.sql.functions import col

stores_df = df.select(
    col("store_name").alias("name"),
    col("store_location").alias("location"),
    col("store_city").alias("city"),
    col("store_state").alias("state"),
    col("store_country").alias("country"),
    col("store_phone").alias("phone"),
    col("store_email").alias("email")
).dropDuplicates()

# загружает в postgres в таблицу d_stores
stores_df.write.jdbc(url=pg_url, table="d_stores", mode="append", properties=pg_props)

In [14]:
from pyspark.sql.functions import col, year, month, dayofmonth, dayofweek

times_df = df.select(
    col("sale_date").alias("date")
).dropDuplicates()

times_df = times_df.withColumn("year", year("date")) \
                   .withColumn("month", month("date")) \
                   .withColumn("day", dayofmonth("date")) \
                   .withColumn("weekday", dayofweek("date"))

# загружает в postgres в таблицу d_times
times_df.write.jdbc(url=pg_url, table="d_times", mode="append", properties=pg_props)

Теперь свяжем все данные с таблицей фактов.

In [19]:
# берем данные мз таблиц измерений
dim_customers = spark.read.jdbc(url=pg_url, table="d_customers", properties=pg_props)
dim_sellers = spark.read.jdbc(url=pg_url, table="d_sellers", properties=pg_props)
dim_products = spark.read.jdbc(url=pg_url, table="d_products", properties=pg_props)
dim_stores = spark.read.jdbc(url=pg_url, table="d_stores", properties=pg_props)
dim_suppliers = spark.read.jdbc(url=pg_url, table="d_suppliers", properties=pg_props)
dim_times = spark.read.jdbc(url=pg_url, table="d_times", properties=pg_props)

Теперь соединим данные в соответсвии с исходной таблицей.

In [27]:
from pyspark.sql.functions import col, broadcast, monotonically_increasing_id

# alias для читаемости
raw = df.alias("raw")
cust = dim_customers.alias("cust")
sell = dim_sellers.alias("sell")
prod = dim_products.alias("prod")
store = dim_stores.alias("store")
sup = dim_suppliers.alias("sup")
dt = dim_times.alias("dt")

# JOIN всех измерений
fact_df = (
    raw
    .join(cust,
          (raw["customer_email"] == cust["email"]),
          "left")
    .join(sell,
          (raw["seller_email"] == sell["email"]),
          "left")
    .join(prod,
          (raw["product_name"] == prod["name"]) &
          (raw["product_category"] == prod["category"]) &
          (raw["product_price"] == prod["price"]) &
          (raw["product_brand"] == prod["brand"]) &
          (raw["product_size"] == prod["size"]) &
          (raw["product_material"] == prod["material"]),
          "left")
    .join(store,
          (raw["store_email"] == store["email"]),
          "left")
    .join(sup,
          (raw["supplier_email"] == sup["email"]),
          "left")
    .join(broadcast(dt), raw["sale_date"] == dt["date"], "left")
    .select(
        col("dt.id").alias("date_id"),
        col("cust.id").alias("customer_id"),
        col("sell.id").alias("seller_id"),
        col("prod.id").alias("product_id"),
        col("store.id").alias("store_id"),
        col("sup.id").alias("supplier_id"),
        col("raw.sale_quantity").cast("int").alias("quantity"),
        col("raw.sale_total_price").cast("decimal(10,2)").alias("total_price")
    )
)
print(fact_df.count())

10000


На этапе join возникли проблемы, так как делал слияния с ошибкой --- не уникальные поля брал, в следствие чего получил ~11000000 записей в `f_sales` и долгую запись.

Отбираем только нужные столбцы.

In [28]:
fact_df = fact_df.repartition(4)

# Запись с настройками
fact_df.write \
    .format("jdbc") \
    .option("url", pg_url) \
    .option("dbtable", "f_sales") \
    .option("user", pg_props["user"]) \
    .option("password", pg_props["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .option("batchsize", "500") \
    .option("truncate", "true") \
    .mode("overwrite") \
    .save()


В данном случае нет необходимости использовать батчевую загрузку, 10000 не так много.