# Slowly Changing dimension 

In [15]:
# slowly changing dimension defines the update strategy on the data. 
# The easiest way to ensure this is by making in-place changes, a little bit like with SQL updates, 
# UPDATE table SET a = "1", b = "2" WHERE id = 30. The problem with these queries is that,
# despite their simplicity, they decrease the security of your data. 
# Let's imagine how to "rollback" the change and apply different business logic, 
# maybe because we misunderstood the former one. With in-place changes and
# no raw data to regenerate the dataset from scratch, it can be hard.
# That's where slowly changing dimension types came in.

# type 0 - no specific action performed.
# type 1 - data is overwritten in-place, like with my UPDATE...SET example
# type 2 - uses a concept of active/inactive row. When new information is present for a row, 
# it's used to build a completely new row flagged as "active". Previous "active" row passes to "inactive" state. In addition to the state transition, the columns with dates storing the validity period are updated accordingly.
# type 3 - the table has one extra column per updatable field. 
# The currently used values are stored in "current_"-prefixed columns whereas previously used in "previous_"-prefixed columns. The history is then limited by the number of "previous" columns.
# type 4 - the table looks like an append-only log file system. 
# Every change is added at the end of the table. In addition to that, every new row for given key changes the validity period of the previously active row.
# type 6 - the combination of types 1, 2 and 3 (1+2+3 = 6), so you will retrieve a table with: "previous"-prefixed column, validity period and active/inactive flag.

# Implementing Type 1

In [16]:
# Datasets we have 

# employee.csv

"""
emp_id,emp_name,emp_city,emp_salary
1, John, Sydney, 35000.00
2, Peter, Melbourne, 45000.00
3, Sam, Sydney,55000.00

"""

# employee_delta.csv

"""
emp_id,emp_name,emp_city,emp_salary
2, Peter, Melbourne, 55000.00
5, Jessie, Brisbane, 42000.00

"""

'\nemp_id,emp_name,emp_city,emp_salary\n2, Peter, Melbourne, 55000.00\n5, Jessie, Brisbane, 42000.00\n\n'

In [17]:
# Objective

# 1) UPDATE record where emp_id=2 with the new salary info in the employee_delta.csv”.

# 2) INSERT records that are new in the employee_delta.csv”.

# NOTE: We don’t have to do DELETE as it is normally done as a logical delete with a new field “active=y” or “active=n”.

In [18]:
# Step 1 

# Initialize pyspark

import os
import sys
import glob
os.environ['SPARK_HOME'] = 'C:\spark3'
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_231'
os.environ['HADOOP_HOME'] = 'C:\spark3'
spark_python = os.path.join(os.environ.get('SPARK_HOME',None),'python')
py4j = glob.glob(os.path.join(spark_python,'lib','py4j-*.zip'))[0]
sys.path[:0]=[spark_python,py4j]
os.environ['PYTHONPATH']=py4j
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SCD").enableHiveSupport().getOrCreate()

In [19]:
# Step 2 : Inner join two dataframes to find the “emp_id” that is in both employee.csv & employee_delta.csv.
employees_df = spark.read.csv("C:/Users/ShoryaSharma/Desktop/employee.csv", header="true", inferSchema="true")
employees_delta_df = spark.read.csv("C:/Users/ShoryaSharma/Desktop/employee_delta.csv", header="true", inferSchema="true")
 
#IDENTIFY RECORDS THAT ARE IN BOTH WITH AN "INNER JOIN"
 
emp_updated = employees_df.join(employees_delta_df, employees_df.emp_id == employees_delta_df.emp_id, 'inner' )
emp_updated.show()

+------+--------+---------+----------+------+--------+---------+----------+
|emp_id|emp_name| emp_city|emp_salary|emp_id|emp_name| emp_city|emp_salary|
+------+--------+---------+----------+------+--------+---------+----------+
|     2|   Peter|Melbourne|     45000|     2|   Peter|Melbourne|     55000|
+------+--------+---------+----------+------+--------+---------+----------+



In [20]:
# step 3: Let’s SELECT the column values from employee_delta.csv as it will update the values in employee.csv.

#IDENTIFY RECORDS THAT ARE IN BOTH WITH AN "INNER JOIN"
 
emp_updated = emp_updated.select(employees_delta_df.emp_id, 
                                 employees_delta_df.emp_name, 
                                 employees_delta_df.emp_city, 
                                 employees_delta_df.emp_salary)
 
emp_updated.show()

+------+--------+---------+----------+
|emp_id|emp_name| emp_city|emp_salary|
+------+--------+---------+----------+
|     2|   Peter|Melbourne|     55000|
+------+--------+---------+----------+



In [21]:
# step 4: do a “left outer join”. We need to filter out records that are not in “employee_delta.csv”.

emp_no_change_df = employees_df.join(employees_delta_df, employees_df.emp_id == employees_delta_df.emp_id, 'leftouter')\
  .filter(employees_delta_df.emp_id.isNull()) \
  .select(employees_df.emp_id, employees_df.emp_name, employees_df.emp_city, employees_df.emp_salary)
 
emp_no_change_df.show()

+------+--------+--------+----------+
|emp_id|emp_name|emp_city|emp_salary|
+------+--------+--------+----------+
|     1|    John|  Sydney|     35000|
|     3|     Sam|  Sydney|     55000|
+------+--------+--------+----------+



In [22]:
# step 5: do a “right outer join”. We need to filter out records that are in “employee.csv”.

emp_new_df = employees_df.join(employees_delta_df, employees_df.emp_id == employees_delta_df.emp_id, 'rightouter')\
  .filter(employees_df.emp_id.isNull()) \
  .select(employees_delta_df.emp_id, employees_delta_df.emp_name, employees_delta_df.emp_city, employees_delta_df.emp_salary)
 
emp_new_df.show()

+------+--------+--------+----------+
|emp_id|emp_name|emp_city|emp_salary|
+------+--------+--------+----------+
|     5|  Jessie|Brisbane|     42000|
+------+--------+--------+----------+



In [23]:
# step 6: Union all three dataframes – emp_updated,emp_no_change_df, and emp_new_df to give us the final values.

emp_final = emp_updated.unionAll(emp_no_change_df).unionAll(emp_new_df).orderBy('emp_id')
 
emp_final.show()

+------+--------+---------+----------+
|emp_id|emp_name| emp_city|emp_salary|
+------+--------+---------+----------+
|     1|    John|   Sydney|     35000|
|     2|   Peter|Melbourne|     55000|
|     3|     Sam|   Sydney|     55000|
|     5|  Jessie| Brisbane|     42000|
+------+--------+---------+----------+



In [24]:
emp_final.coalesce(1).write.save(path='C:/Users/ShoryaSharma/Documents/employee_temp.csv', format='csv', header="true", mode='overwrite', sep=',')

In [25]:
emp_final = spark.read.csv('C:/Users/ShoryaSharma/Documents/employee_temp.csv', header="true", inferSchema="true")
emp_final.coalesce(1).write.save(path='C:/Users/ShoryaSharma/Desktop/employee.csv', header="true", format='csv', mode='overwrite', sep=',')

In [27]:
spark.read.csv("C:/Users/ShoryaSharma/Desktop/employee.csv", header="true").show()

+------+--------+---------+----------+
|emp_id|emp_name| emp_city|emp_salary|
+------+--------+---------+----------+
|     1|    John|   Sydney|     35000|
|     2|   Peter|Melbourne|     55000|
|     3|     Sam|   Sydney|     55000|
|     5|  Jessie| Brisbane|     42000|
+------+--------+---------+----------+

