<a href="https://colab.research.google.com/github/sassgabai/pyspark-projects/blob/main/SCD_type_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=99c6957ee82d8932ca7b5495e43a3dd0b36de9f66a2c1d1e441338fbeb03f6d7
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window

In [None]:
spark = SparkSession.builder.appName('scd 3').getOrCreate()

In [None]:
# constructing the schmea
schema = T.StructType([
    T.StructField('ad_id',T.IntegerType(),True),
    T.StructField('street',T.StringType(),True),
    T.StructField('city',T.StringType(),True),
    T.StructField('zip',T.IntegerType(),True),
    T.StructField('state',T.StringType(),True),
    T.StructField('beds',T.IntegerType(),True),
    T.StructField('baths',T.IntegerType(),True),
    T.StructField('sq_ft',T.IntegerType(),True),
    T.StructField('type',T.StringType(),True),
    T.StructField('price',T.IntegerType(),True),
    T.StructField('snapshot_date',T.DateType(),True)
])

In [None]:
pdf = spark.read.csv('/content/*.csv', header=True, schema=schema)

In [None]:
window_spec = Window.partitionBy('ad_id').orderBy('snapshot_date')

In [None]:
# insert all rows into arr, insert leading row into arr_lead

pdf = pdf.withColumn('row_number', F.row_number().over(window_spec))\
         .withColumn('obj', F.array('*'))\
         .withColumn('obj_lead', F.lead('obj').over(window_spec))

In [None]:
# compare arr and lead_arr to see changes

pdf = pdf.withColumn('except', F.array_except(F.col('obj'),F.col('obj_lead')))

In [None]:
# if number of changes in a row >=3 , it's an update (besides snapshot_date and row_number)
# if the lead_arr is null it means it's a new pk and we keep it aswell
# rest insert none for easier drop

pdf = pdf.withColumn('status', F.when((F.size(F.col('except')) >= 3) | (F.col('row_number') == 1) , F.lit('UPDATE'))
.otherwise(F.when(F.col('obj_lead').isNull() == True, F.lit('NEW_PK')).otherwise(F.lit(None))))

In [None]:
# drop

pdf = pdf.dropna().drop('obj','obj_lead','except','status','row_number')

In [None]:
# create start_date which is pasically snapshot_date and end_date

pdf = pdf.withColumn('start_date', F.col('snapshot_date').cast(T.DateType()))\
         .withColumn('end_date', F.lead('snapshot_date').over(window_spec).cast(T.DateType()))

In [None]:
# fix end_date and add current to represent if row is valid or not

pdf = pdf.withColumn('end_date', F.when(F.col('end_date').isNull() == True, F.lit('9999-12-31')).otherwise(F.col('end_date')))\
         .withColumn('current', F.when(F.col('end_date') == '9999-12-31', F.lit(True)).otherwise(F.lit(False)))

In [None]:
pdf.show()

+------+--------------------+--------------+-----+-----+----+-----+-----+-----------+------+-------------+----------+----------+-------+
| ad_id|              street|          city|  zip|state|beds|baths|sq_ft|       type| price|snapshot_date|start_date|  end_date|current|
+------+--------------------+--------------+-----+-----+----+-----+-----+-----------+------+-------------+----------+----------+-------+
|100001|        3526 HIGH ST|    SACRAMENTO|95838|   CA|   2|    1|  836|Residential| 59222|   2021-12-01|2021-12-01|2021-12-05|  false|
|100001|        3526 HIGH ST|    SACRAMENTO|95838|   CA|   2|    2|  836|Residential| 59222|   2021-12-05|2021-12-05|9999-12-31|   true|
|100002|         51 OMAHA CT|    SACRAMENTO|95823|   CA|   3|    1| 1167|Residential| 68212|   2021-12-01|2021-12-01|9999-12-31|   true|
|100003|      2796 BRANCH ST|    SACRAMENTO|95815|   CA|   2|    1|  796|Residential| 68880|   2021-12-01|2021-12-01|9999-12-31|   true|
|100004|    2805 JANETTE WAY|    SACRAMEN