<a href="https://colab.research.google.com/github/mehulpatel21/mehulpatel21/blob/main/pyspark_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

sample_data  spark-2.3.1-bin-hadoop2.7	spark-2.3.1-bin-hadoop2.7.tgz


In [None]:
"""
for each loan_ar_id, there will be multiple records with different snapshot_dates
for each record, create the following:
  1) lag_tot_bill_am - start from earliest record with 0, and then assign prev snapshot's tot_bill_am - done
  2) increase counter from N=1 through N=25 = done
  3) bill3m - if tot_bill_am > lag_tot_bill_am then 1 else 0 (if N=1, bill3m = 0)
  4) bill_chng_dt - Start with bill3m = 1 and assign snapshot_date as bill_chng_dt, keep going until you encounter another 1.
  5) mnth_sinc_bill_chng - difference between snapshot_date and bill_chng_dt
"""


'2.3.1'

In [97]:
full_df = spark.read.csv("/content/test_mnth_sinc_bill_chg.csv", sep=",", header='True')

In [None]:
full_df.show(60, False)

In [98]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window


In [None]:
df1 = full_df.withColumn('snapshot_dt',to_date(full_df.snapshot_dt, 'yyyy-MM-dd'))
df1.show(60, False)

In [100]:
windowSpec = Window.partitionBy("loan_ar_id").orderBy(col("snapshot_dt").asc())

In [101]:
# increase counter from N=1 through N=25 = done
df_wind = df1.withColumn("N", row_number().over(windowSpec))

In [None]:
df_wind.show(60, False)
df_wind.printSchema()

In [103]:
# lag_tot_bill_am - start from earliest record with 0, and then assign prev snapshot's tot_bill_am
lag_condition = when(col("N") == 0, 0).when(col("N") > 0, lag(df_wind.tot_bill_am).over(windowSpec))
df_wind = df_wind.withColumn("lag_tot_bill_am", lag_condition)

In [None]:
df_wind.show(60, False)

In [109]:
# bill3m - if tot_bill_am > lag_tot_bill_am then 1 else 0 (if N=1, bill3m = 0)
bill3m_condition = when(col("N") == 1, 0).when(col("tot_bill_am") > col("lag_tot_bill_am"), 1).otherwise(0)
df_wind = df_wind.withColumn("bill3m", bill3m_condition)


In [None]:
df_wind.show(60, False)

In [136]:
# bill_chng_dt - Start with bill3m = 1 and assign snapshot_date as bill_chng_dt, keep going until you encounter another 1.
bill_chng_dt_condition = when(col("N") == 1, None)\
                            .when(col("bill3m") == 1, col("snapshot_dt"))


df_wind = df_wind.withColumn("bill_chng_dt", bill_chng_dt_condition)

wind = (
    Window.partitionBy('loan_ar_id').orderBy('snapshot_dt').rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

final = df_wind.withColumn("bill_chng_dt", last("bill_chng_dt", ignorenulls=True).over(wind)).drop("temp_bill_chng_dt")





In [None]:
final.show(60, False)

In [142]:
# mnth_sinc_bill_chng - difference between snapshot_date and bill_chng_dt
from pyspark.sql.types import IntegerType
final_df = final.withColumn("mnth_sinc_bill_chng", months_between(col("snapshot_dt"), col("bill_chng_dt")))
final_df = final_df.withColumn("mnth_sinc_bill_chng", final_df.mnth_sinc_bill_chng.cast(IntegerType()))



In [143]:
final_df.show(60, False)

+----------+-----------+-----------+---+---------------+------+------------+-------------------+
|loan_ar_id|snapshot_dt|tot_bill_am|N  |lag_tot_bill_am|bill3m|bill_chng_dt|mnth_sinc_bill_chng|
+----------+-----------+-----------+---+---------------+------+------------+-------------------+
|112       |2019-12-31 |200.00     |1  |null           |0     |null        |null               |
|112       |2020-01-31 |200.00     |2  |200.00         |0     |null        |null               |
|112       |2020-02-28 |200.00     |3  |200.00         |0     |null        |null               |
|112       |2020-03-31 |200.00     |4  |200.00         |0     |null        |null               |
|112       |2020-04-30 |200.00     |5  |200.00         |0     |null        |null               |
|112       |2020-05-31 |200.00     |6  |200.00         |0     |null        |null               |
|112       |2020-06-30 |200.00     |7  |200.00         |0     |null        |null               |
|112       |2020-07-31 |200.00