In [1]:
from datetime import datetime
from delta import DeltaTable
import ConnectionConfig as cc
from pyspark.sql.functions import *
from pyspark.sql.functions import create_map, lit, col
from itertools import chain
cc.setupEnvironment()

In [2]:
spark = cc.startLocalCluster("DIM_USER_TWO",4)
spark.getActiveSession()
cc.set_connectionProfile("VeloBike")

In [3]:
run_timestamp = datetime.now()

In [4]:
print(cc.create_jdbc())
user_df = spark.read.format("jdbc").option("driver" , "org.postgresql.Driver").option("url", cc.create_jdbc()).option("dbtable", "velo_users").option("user", cc.get_Property("username")).option("password", cc.get_Property("password"))    .load()

jdbc:postgresql://localhost:5433/velodb


In [5]:
# Using SQL method, create the dim
user_df.createOrReplaceTempView("users")

users_original = spark.sql("""
SELECT
    userid,
    street,
    number,
    zipcode,
    CASE
        WHEN city IS NULL THEN city
        WHEN city RLIKE '(?i).*Berchem.*Antwerpen.*' THEN 'Antwerpen'
        WHEN city RLIKE '(?i).*Antwerpen.*' THEN 'Antwerpen'
        WHEN city RLIKE '(?i).*Rumst.*' THEN 'Rumst'
        WHEN city RLIKE '(?i).*Ranst.*' THEN 'Ranst'
        WHEN city RLIKE '(?i).*Zandhoven.*' THEN 'Zandhoven'
        WHEN city RLIKE '(?i).*Kontich.*' THEN 'Kontich'
        WHEN city RLIKE '(?i).*Melsele.*' THEN 'Melsele'
        WHEN city RLIKE '(?i).*Antw.*' THEN 'Antwerpen'
        WHEN city RLIKE '(?i).*Schilde.*' THEN 'Schilde'
        WHEN city RLIKE '(?i).*Zwijndrecht.*' THEN 'Zwijndrecht'
        WHEN city RLIKE '(?i).*Bazel.*' THEN 'Bazel'
        WHEN city RLIKE '(?i).*Broechem.*' THEN 'Broechem'
        ELSE city
    END AS city,
    country_code
FROM users
""")
users_original.createOrReplaceTempView("users_standardized")


# Now use the standardized view in your main query


In [6]:
df_dim_user = spark.sql("""
SELECT
    uuid() as user_sk,
    userid,
    street,
    number,
    zipcode,
    city,
    country_code,
    to_timestamp('1999-01-01','yyyy-MM-dd') as start_time,
    to_timestamp('2100-12-12','yyyy-MM-dd') as end_time,
    md5(concat(street, number, zipcode, city, country_code)) as md5_hash,
    True as current
FROM users_standardized
""")


# df_dim_user.printSchema()
df_dim_user.show(20)

+--------------------+------+--------------------+--------+-------+-----------+------------+-------------------+-------------------+--------------------+-------+
|             user_sk|userid|              street|  number|zipcode|       city|country_code|         start_time|           end_time|            md5_hash|current|
+--------------------+------+--------------------+--------+-------+-----------+------------+-------------------+-------------------+--------------------+-------+
|f9917513-e464-4d0...|     1|         Somméstraat|    156 |   2060|  Antwerpen|          BE|1999-01-01 00:00:00|2100-12-12 00:00:00|fddd2c34acf178b1f...|   true|
|768b047f-ba43-4fa...|     2|          Europalaan|     43 |   2610|  Antwerpen|          BE|1999-01-01 00:00:00|2100-12-12 00:00:00|66e547d8ea2e7de14...|   true|
|ef8e09de-e02b-475...|     3|   Maria Clarastraat|     80 |   2160|  Wommelgem|          BE|1999-01-01 00:00:00|2100-12-12 00:00:00|5372c35ac3d3a8b04...|   true|
|cebc5098-734d-49a...|     4

In [6]:
# city names were inconsistent - for multiple cities, I chose one for transparency.
#
# city_mapping = {
#     "Wilrijk (Antwerpen)": "Wilrijk",
#     "Wommelgem": "Wommelgem",
#     "Burcht/Zwijndrecht": "Zwijndrecht",
#     "Kontich/Waarloos": "Kontich",
#     "Brasschaat": "Brasschaat",
#     "Bazel/Kruibeke/Rupelmonde": "Kruibeke",
#     "Antwerpen": "Antwerpen",
#     "Schoten": "Schoten",
#     "Merksem (Antwerpen)": "Merksem",
#     "Vremde": "Vremde",
#     "Mortsel": "Mortsel",
#     "Borgerhout (Antwerpen)": "Borgerhout",
#     "Reet/Rumst/Terhagen": "Rumst",
#     "Hoboken (Antwerpen)": "Hoboken",
#     "Lint": "Lint",
#     "Aartselaar": "Aartselaar",
#     "» Antwerpen «/Ekeren (Antwerpen)": "Ekeren",
#     "Hove": "Hove",
#     "Edegem": "Edegem",
#     "Antwerpen/Berendrecht/Lillo/Zandvliet": "Zandvliet"
# }
#
# mapping_expr = create_map([lit(x) for x in chain(*city_mapping.items())])
#
# df_dim_user = df_dim_user.withColumn("city_cleaned", mapping_expr[col("city")])
#
# df_dim_user.select("city", "city_cleaned").distinct().show(truncate=False)

+-------------------------------------+------------+
|city                                 |city_cleaned|
+-------------------------------------+------------+
|Merksem (Antwerpen)                  |Merksem     |
|Lint                                 |Lint        |
|Aartselaar                           |Aartselaar  |
|Wilrijk (Antwerpen)                  |Wilrijk     |
|Wommelgem                            |Wommelgem   |
|Hove                                 |Hove        |
|Burcht/Zwijndrecht                   |Zwijndrecht |
|Kontich/Waarloos                     |Kontich     |
|Vremde                               |Vremde      |
|Brasschaat                           |Brasschaat  |
|Reet/Rumst/Terhagen                  |Rumst       |
|» Antwerpen «/Ekeren (Antwerpen)     |Ekeren      |
|Schoten                              |Schoten     |
|Edegem                               |Edegem      |
|Borgerhout (Antwerpen)               |Borgerhout  |
|Hoboken (Antwerpen)                  |Hoboken

In [7]:
# # df_dim_user.filter(col("city").rlike("(?i).*Berchem.*")).select("city").distinct().show(truncate=False)
#
# df_dim_user = df_dim_user.withColumn(
#     "city_cleaned",
#     when(col("city").isNull(), col("city"))  # Keep NULL as NULL
#     .when(col("city").rlike("(?i).*Berchem.*Antwerpen.*"), lit("Antwerpen"))
#     .when(col("city").rlike("(?i).*Antwerpen.*"), lit("Antwerpen"))
#     .when(col("city").rlike("(?i).*Rumst.*"), lit("Rumst"))
#     .when(col("city").rlike("(?i).*Ranst.*"), lit("Ranst"))
#     .when(col("city").rlike("(?i).*Zandhoven.*"), lit("Zandhoven"))
#     .when(col("city").rlike("(?i).*Kontich.*"), lit("Kontich"))
#     .when(col("city").rlike("(?i).*Melsele.*"), lit("Melsele"))
#     .when(col("city").rlike("(?i).*Antw.*"), lit("Antwerpen"))
#     .when(col("city").rlike("(?i).*Schilde.*"), lit("Schilde"))
#     .when(col("city").rlike("(?i).*Zwijndrecht.*"), lit("Zwijndrecht"))
#     .when(col("city").rlike("(?i).*Bazel.*"), lit("Bazel"))
#     .when(col("city").rlike("(?i).*Broechem.*"), lit("Broechem"))
#     .otherwise(col("city"))
# )
#
# df_dim_user = df_dim_user.withColumn(
#     "source_md5_hash",
#     expr("md5(concat(userid, street, number, zipcode, city_cleaned, countrycode))")
# )
#
#
# df_dim_user = df_dim_user.drop("city").withColumnRenamed("city_cleaned", "city")
#
#
#


In [8]:
df_dim_user.groupBy("city").count().show(100)

+-----------+-----+
|       city|count|
+-----------+-----+
|  Wommelgem| 1136|
| Brasschaat| 2130|
|    Melsele| 3015|
|  Antwerpen|30648|
|    Schoten| 2886|
|Zwijndrecht| 1319|
|    Kontich| 1832|
|     Vremde|  502|
|    Mortsel| 1347|
|      Bazel| 1470|
|       Lint|  740|
| Aartselaar|  979|
|       Hove|  848|
|     Edegem| 1660|
|  Zandhoven|  883|
|      Ranst| 2062|
|   Boechout| 1387|
|       Niel|  825|
|    Schilde| 2009|
|    Schelle|  813|
|      Rumst| 1509|
+-----------+-----+



In [9]:
# check contents

df_dim_user.createOrReplaceTempView("dimUser")
#
spark.sql("""
SELECT *
from dimUser
""").show()

# df_dim_user.show(20)

+--------------------+------+--------------------+--------+-------+-----------+------------+-------------------+-------------------+--------------------+-------+
|             user_sk|userid|              street|  number|zipcode|       city|country_code|         start_time|           end_time|            md5_hash|current|
+--------------------+------+--------------------+--------+-------+-----------+------------+-------------------+-------------------+--------------------+-------+
|f9917513-e464-4d0...|     1|         Somméstraat|    156 |   2060|  Antwerpen|          BE|1999-01-01 00:00:00|2100-12-12 00:00:00|fddd2c34acf178b1f...|   true|
|768b047f-ba43-4fa...|     2|          Europalaan|     43 |   2610|  Antwerpen|          BE|1999-01-01 00:00:00|2100-12-12 00:00:00|66e547d8ea2e7de14...|   true|
|ef8e09de-e02b-475...|     3|   Maria Clarastraat|     80 |   2160|  Wommelgem|          BE|1999-01-01 00:00:00|2100-12-12 00:00:00|5372c35ac3d3a8b04...|   true|
|cebc5098-734d-49a...|     4

In [10]:
df_dim_user.groupby("current").count().show()

+-------+-----+
|current|count|
+-------+-----+
|   true|60000|
+-------+-----+



In [11]:
df_dim_user.write.format("delta").mode("overwrite").saveAsTable("dimUser_current")


In [12]:
dt_userDim = DeltaTable.forPath(spark, "spark-warehouse/dimuser_current")

dt_userDim.toDF().createOrReplaceTempView("dimUser_current")

In [13]:
spark.sql("select * from dimUser_current").count()

60000

In [14]:
# create a new df based on the dim for tracking changes



df_dim_user_new = spark.sql("""
SELECT
    uuid() as source_user_sk,
    userid as source_userid,
    street as source_street,
    number as source_number,
    zipcode as source_zipcode,
    city as source_city,
    country_code as source_countrycode,
    md5(concat(street, number, zipcode, city, country_code)) as source_md5_hash
FROM dimUser_current
""")


df_dim_user_new.createOrReplaceTempView("dimUserNew")


spark.sql("""
SELECT *
from dimUserNew
""").show()

+--------------------+-------------+--------------------+-------------+--------------+-----------+------------------+--------------------+
|      source_user_sk|source_userid|       source_street|source_number|source_zipcode|source_city|source_countrycode|     source_md5_hash|
+--------------------+-------------+--------------------+-------------+--------------+-----------+------------------+--------------------+
|c38a6dca-d1bd-410...|            1|         Somméstraat|         156 |          2060|  Antwerpen|                BE|fddd2c34acf178b1f...|
|96a7873a-0afb-45f...|            2|          Europalaan|          43 |          2610|  Antwerpen|                BE|66e547d8ea2e7de14...|
|2e952b6e-1d26-4f8...|            3|   Maria Clarastraat|          80 |          2160|  Wommelgem|                BE|5372c35ac3d3a8b04...|
|c68ba20d-3695-444...|            4|Graaf Joseph de P...|          15 |          2900|    Schoten|                BE|92680e7e5a3c54a58...|
|090319b0-fd38-468...|     

In [15]:
# detected changes from one table to another

detectedChanges = spark.sql(f"""
    SELECT * 
    FROM dimUserNew source
    LEFT OUTER JOIN dimUser_current dwh 
        ON dwh.userid = source.source_userid and
        dwh.current = true
    WHERE dwh.userid IS NULL 
        OR dwh.md5_hash <> source.source_md5_hash
""")


detectedChanges.createOrReplaceTempView("detectedChanges")

detectedChanges.show()

+--------------+-------------+-------------+-------------+--------------+-----------+------------------+---------------+-------+------+------+------+-------+----+------------+----------+--------+--------+-------+
|source_user_sk|source_userid|source_street|source_number|source_zipcode|source_city|source_countrycode|source_md5_hash|user_sk|userid|street|number|zipcode|city|country_code|start_time|end_time|md5_hash|current|
+--------------+-------------+-------------+-------------+--------------+-----------+------------------+---------------+-------+------+------+------+-------+----+------------+----------+--------+--------+-------+
+--------------+-------------+-------------+-------------+--------------+-----------+------------------+---------------+-------+------+------+------+-------+----+------------+----------+--------+--------+-------+



In [16]:
detectedChanges.count()

0

In [17]:
df_upserts = spark.sql(f"""
SELECT 
    source_user_sk as user_sk,
    source_userid as user_id,
    source_street as street,
    source_number as number,
    source_zipcode as zipcode,
    source_city as city,
    source_countrycode as country_code,
    to_timestamp('{run_timestamp}') as start_time,
    to_timestamp('2100-12-12', 'yyyy-MM-dd') as end_time,
    source_md5_hash as md5,
    true as current
FROM detectedChanges

UNION

SELECT 
    user_sk,
    userid,
    street,
    number,
    zipcode,
    city,
    country_code,
    start_time,
    to_timestamp('{run_timestamp}') as end_time,
    md5_hash,
    false
FROM detectedChanges
WHERE current is NOT NULL
""")


df_upserts.createOrReplaceTempView("upserts")
df_upserts.show(20)


+-------+-------+------+------+-------+----+------------+----------+--------+---+-------+
|user_sk|user_id|street|number|zipcode|city|country_code|start_time|end_time|md5|current|
+-------+-------+------+------+-------+----+------------+----------+--------+---+-------+
+-------+-------+------+------+-------+----+------------+----------+--------+---+-------+



In [18]:
# debug code to show content
spark.sql("select * from upserts").show(100)
# df_upserts.show()

+-------+-------+------+------+-------+----+------------+----------+--------+---+-------+
|user_sk|user_id|street|number|zipcode|city|country_code|start_time|end_time|md5|current|
+-------+-------+------+------+-------+----+------------+----------+--------+---+-------+
+-------+-------+------+------+-------+----+------------+----------+--------+---+-------+



In [19]:
spark.sql(f"""
MERGE INTO dimUser_current AS target
USING upserts AS source
    ON target.userid = source.user_id
    AND source.current = false
    AND target.current = true
WHEN MATCHED THEN
    UPDATE
        SET target.end_time = source.end_time,
            target.current = source.current
WHEN NOT MATCHED THEN
    INSERT (user_sk, userid, street, number, zipcode, city, country_code, start_time, end_time, md5_hash, current)
    VALUES (source.user_sk, source.user_id, source.street, source.number, source.zipcode, source.city, source.country_code, source.start_time, source.end_time, source.md5, source.current)
""")

# debug to show content
dt_userDim.toDF().show(100)

+--------------------+------+--------------------+--------+-------+-----------+------------+-------------------+-------------------+--------------------+-------+
|             user_sk|userid|              street|  number|zipcode|       city|country_code|         start_time|           end_time|            md5_hash|current|
+--------------------+------+--------------------+--------+-------+-----------+------------+-------------------+-------------------+--------------------+-------+
|f9917513-e464-4d0...|     1|         Somméstraat|    156 |   2060|  Antwerpen|          BE|1999-01-01 00:00:00|2100-12-12 00:00:00|fddd2c34acf178b1f...|   true|
|768b047f-ba43-4fa...|     2|          Europalaan|     43 |   2610|  Antwerpen|          BE|1999-01-01 00:00:00|2100-12-12 00:00:00|66e547d8ea2e7de14...|   true|
|ef8e09de-e02b-475...|     3|   Maria Clarastraat|     80 |   2160|  Wommelgem|          BE|1999-01-01 00:00:00|2100-12-12 00:00:00|5372c35ac3d3a8b04...|   true|
|cebc5098-734d-49a...|     4

In [20]:
dt_userDim.toDF().write.format("delta").mode("overwrite").saveAsTable("userDim")
# dt_userDim.toDF().repartition(1).write.format("parquet").mode("overwrite").saveAsTable("ridesFact_pq")


In [21]:
dt_userDim.toDF().repartition(1).write.format("parquet").mode("overwrite").saveAsTable("userDim_pq")



In [22]:
spark.stop()