In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, broadcast, rank, row_number)
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder.appName("fact_mapping").getOrCreate()

In [4]:
df_fact = spark.read.option("header",True).option("inferSchema",True).csv("source/Fact_Transactions.csv")

In [4]:
df_fact.show()

+-------------+---------------+-----------+------------+------+
|TransactionID|ProductCategory|StoreRegion|CustomerType|Amount|
+-------------+---------------+-----------+------------+------+
|            1|         Laptop|      North|      Retail|  1200|
|            2|         Mobile|      South|   Wholesale|   800|
|            3|         Tablet|       East|      Retail|   300|
|            4|         Laptop|       West|      Retail|  1500|
|            5|         Tablet|      North|   Wholesale|   400|
|            6|         Mobile|       East|      Retail|   900|
|            7|         Mobile|      South|      Retail|   850|
|            8|         Laptop|       West|   Wholesale|  1700|
|            9|         Tablet|       East|      Retail|   350|
|           10|         Mobile|      North|      Retail|   950|
+-------------+---------------+-----------+------------+------+



#### Replace Dimension Columns With respective dimension keys

In [5]:
dim_product = spark.read.parquet("warehouse/dim_product_category")

In [6]:
df_fact2 = df_fact.join(dim_product, \
    df_fact.ProductCategory == dim_product.ProductCategory, \
    "left"). \
    select([df_fact["*"], dim_product.ProductCategoryKey])

In [7]:
dim_store_region = spark.read.parquet("warehouse/dim_store_region")

In [8]:
df_fact3 = df_fact2.join(dim_store_region, \
    df_fact2.StoreRegion == dim_store_region.StoreRegion, \
    "left"). \
    select([df_fact2["*"], dim_store_region.StoreRegionKey])

In [9]:
dim_customer_type = spark.read.parquet("warehouse/dim_customer_type")

In [10]:
df_fact4 = df_fact3.join(broadcast(dim_customer_type), \
    df_fact3.CustomerType == dim_customer_type.CustomerType, \
    "left"). \
    select([df_fact3["*"], dim_customer_type.CustomerTypeKey])

#### Introduce Custom Mapping Key for the matching mapping

In [11]:
dim_custom_mapping = spark.read.parquet("warehouse/dim_custom_mapping")
dim_custom_mapping.show()

+---------------+-----------+------------+-----------------+--------+----------------+
|ProductCategory|StoreRegion|CustomerType|     MappingLabel|Priority|CustomMappingKey|
+---------------+-----------+------------+-----------------+--------+----------------+
|         Tablet|       East|      Retail|        Side Head|       1|               0|
|         Laptop|       West|      Retail|      Premium Box|       1|               1|
|         Mobile|      North|   Wholesale|North Dist Mobile|       1|               2|
|         Tablet|      South|      Retail|      Tablet Push|       1|               3|
|         Laptop|      North|   Wholesale|    Laptop Supply|       1|               4|
|         Laptop|        ANY|      Retail|   Regular Laptop|       2|               5|
|            ANY|        ANY|   Wholesale| Normal Wholesale|       3|               6|
|            ANY|        ANY|      Retail|    Normal Retail|       3|               7|
|            N/A|        N/A|         N/A| 

In [35]:
window_spec = Window.partitionBy(df_fact4["ProductCategory"], df_fact4["StoreRegion"], df_fact4["StoreRegion"], df_fact4["Amount"]).orderBy(col("Priority"))
df_fact5 = df_fact4.join(dim_custom_mapping, \
        ((df_fact4.ProductCategory == dim_custom_mapping.ProductCategory) | (dim_custom_mapping.ProductCategory == "ANY")) & \
        ((df_fact4.StoreRegion == dim_custom_mapping.StoreRegion) | (dim_custom_mapping.StoreRegion == "ANY")) & \
        ((df_fact4.CustomerType == dim_custom_mapping.CustomerType) | (dim_custom_mapping.CustomerType == "ANY")), \
    "left") \
    .withColumn("rn", row_number().over(window_spec)) \
    .select([df_fact4['*'], dim_custom_mapping.CustomMappingKey, "rn"])

In [36]:
df_fact5.show()

+-------------+---------------+-----------+------------+------+------------------+--------------+---------------+----------------+---+
|TransactionID|ProductCategory|StoreRegion|CustomerType|Amount|ProductCategoryKey|StoreRegionKey|CustomerTypeKey|CustomMappingKey| rn|
+-------------+---------------+-----------+------------+------+------------------+--------------+---------------+----------------+---+
|            1|         Laptop|      North|      Retail|  1200|                 0|             0|              0|               5|  1|
|            1|         Laptop|      North|      Retail|  1200|                 0|             0|              0|               7|  2|
|            4|         Laptop|       West|      Retail|  1500|                 0|             3|              0|               1|  1|
|            4|         Laptop|       West|      Retail|  1500|                 0|             3|              0|               5|  2|
|            4|         Laptop|       West|      Retail

In [39]:
df_fact6 = df_fact5.filter(col("rn") == 1).orderBy(col("TransactionID"))

In [40]:
df_fact6.show()

+-------------+---------------+-----------+------------+------+------------------+--------------+---------------+----------------+---+
|TransactionID|ProductCategory|StoreRegion|CustomerType|Amount|ProductCategoryKey|StoreRegionKey|CustomerTypeKey|CustomMappingKey| rn|
+-------------+---------------+-----------+------------+------+------------------+--------------+---------------+----------------+---+
|            1|         Laptop|      North|      Retail|  1200|                 0|             0|              0|               5|  1|
|            2|         Mobile|      South|   Wholesale|   800|                 1|             1|              1|               6|  1|
|            3|         Tablet|       East|      Retail|   300|                 2|             2|              0|               0|  1|
|            4|         Laptop|       West|      Retail|  1500|                 0|             3|              0|               1|  1|
|            5|         Tablet|      North|   Wholesale

#### Final Cleanup 

In [43]:
df_fact7 = df_fact6.drop(col('ProductCategory'), col('StoreRegion'), col('CustomerType'), col('rn'))

In [44]:
df_fact7.show()

+-------------+------+------------------+--------------+---------------+----------------+
|TransactionID|Amount|ProductCategoryKey|StoreRegionKey|CustomerTypeKey|CustomMappingKey|
+-------------+------+------------------+--------------+---------------+----------------+
|            1|  1200|                 0|             0|              0|               5|
|            2|   800|                 1|             1|              1|               6|
|            3|   300|                 2|             2|              0|               0|
|            4|  1500|                 0|             3|              0|               1|
|            5|   400|                 2|             0|              1|               6|
|            6|   900|                 1|             2|              0|               7|
|            7|   850|                 1|             1|              0|               7|
|            8|  1700|                 0|             3|              1|               6|
|         

In [45]:
df_fact_final = df_fact7.fillna({"CustomMappingKey":-1})

In [46]:
df_fact_final.show()

+-------------+------+------------------+--------------+---------------+----------------+
|TransactionID|Amount|ProductCategoryKey|StoreRegionKey|CustomerTypeKey|CustomMappingKey|
+-------------+------+------------------+--------------+---------------+----------------+
|            1|  1200|                 0|             0|              0|               5|
|            2|   800|                 1|             1|              1|               6|
|            3|   300|                 2|             2|              0|               0|
|            4|  1500|                 0|             3|              0|               1|
|            5|   400|                 2|             0|              1|               6|
|            6|   900|                 1|             2|              0|               7|
|            7|   850|                 1|             1|              0|               7|
|            8|  1700|                 0|             3|              1|               6|
|         

In [47]:
df_fact_final.write.mode("overwrite").parquet("warehouse/fact_transactions")

In [48]:
df_transactions = spark.read.parquet("warehouse/fact_transactions")
df_transactions.show()

+-------------+------+------------------+--------------+---------------+----------------+
|TransactionID|Amount|ProductCategoryKey|StoreRegionKey|CustomerTypeKey|CustomMappingKey|
+-------------+------+------------------+--------------+---------------+----------------+
|            1|  1200|                 0|             0|              0|               5|
|            2|   800|                 1|             1|              1|               6|
|            3|   300|                 2|             2|              0|               0|
|            4|  1500|                 0|             3|              0|               1|
|            5|   400|                 2|             0|              1|               6|
|            6|   900|                 1|             2|              0|               7|
|            7|   850|                 1|             1|              0|               7|
|            8|  1700|                 0|             3|              1|               6|
|         

In [None]:
df_transactions.toPandas().to_csv("output_csv/fact_transactions.csv", index=False)