In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, datediff, last

BUCKET_NAME = "marcial-tmf-oct22-msf-data"

Contacts

In [None]:
df_con = (
  spark.read
    .option("header", "true")
    .option("quote", "\"")
    .option("escape" , "\"")   
    .format("parquet")
    .load(f"s3://{BUCKET_NAME}/contacts/MSF_Contact.parquet")
)

In [None]:
df_con = df_con.withColumnRenamed("id","con_id")

Tasks

In [None]:
df_tasks = (
  spark.read
    .option("header", "true")
    .option("quote", "\"")
    .option("escape" , "\"")   
    .format("parquet")
    .load(f"s3://{BUCKET_NAME}/tasks/MSF_Task.parquet")
)

In [None]:
df_tasks = df_tasks.withColumnRenamed("id","task_id")

MERGE CON CONTACTS

In [None]:
df_tasks_cons = (
        df_con
        .join(df_tasks, df_con.con_id == df_tasks.whoid, 'inner')
    .select(
        df_con.msf_seniority__c, df_con.msf_birthyear__c, df_con.msf_entrycampaign__c, df_con.npo02__averageamount__c,
        df_con.msf_begindatemsf__c, df_con.msf_datefirstrecurringdonorquota__c, df_con.gender__c, df_con.msf_ltvcont__c,
        df_con.msf_ltvdesc__c, df_con.msf_recencyrecurringdonorcont__c, df_con.msf_rfvrecurringdonor__c, df_con.npo02__totaloppamount__c,
        df_con.msf_valuetotalcont__c, df_con.msf_valuetotaldesc__c, df_con.msf_lifetime__c,
        df_tasks.whoid, df_tasks.activitydate, df_tasks.msf_channel__c, df_tasks.msf_inboundoutbound__c, df_tasks.msf_closetype__c
    ).where((df_con.con_id == df_tasks.whoid) &
            (df_con.msf_isactiverecurringdonor__c == 'Socio') &
            (df_tasks.subject == "MSF Aumento de Cuota"))
)
#.where((df_con.id == '0033Y00002unQGTQA2') & 

In [None]:
windowSpec = Window.partitionBy("whoid").orderBy("activitydate")
  
df_tasks_cons = df_tasks_cons.withColumn("previous_modification_date", lag("activitydate").over(windowSpec)) \
    .withColumn("days_elapsed_since_previous_modification", datediff(col("activitydate"), col("previous_modification_date")))

-----------------------------------------------------------------------------------

In [None]:
df_rec = (
  spark.read
    .option("header", "true")
    .option("quote", "\"")
    .option("escape" , "\"")   
    .format("parquet")
    .load(f"s3://{BUCKET_NAME}/recurringdonations/MSF_RecurringDonation.parquet")
)

In [None]:
df_rec = df_rec.withColumnRenamed("id","rec_id")

In [None]:
df_quo = (
  spark.read
    .option("header", "true")
    .option("quote", "\"")
    .option("escape" , "\"")   
    .format("parquet")
    .load(f"s3://{BUCKET_NAME}/quotamodifications/MSF_QuotaModification.parquet")
)

In [None]:
df_quo = df_quo.withColumnRenamed("id","quo_id")

In [None]:
df_rec_quo = (
        df_con
        .join(df_rec, df_con.con_id == df_rec.npe03__contact__c, 'inner')
        .join(df_quo, df_con.con_id == df_quo.msf_contactid__c, 'inner')
    .select(
        df_con.con_id, 
        df_quo.msf_changedate__c, df_quo.msf_leadsource3__c, 
        df_quo.msf_newamount__c, df_quo.msf_newannualizedquota__c, df_quo.msf_newrecurringperiod__c
    ).where((df_con.con_id == df_rec.npe03__contact__c) & 
            (df_con.con_id == df_quo.msf_contactid__c) &
            (df_rec.rec_id == df_quo.msf_recurringdonation__c) &
            (df_rec.npe03__open_ended_status__c == 'Open'))
)

In [None]:
windowSpec = Window.partitionBy("con_id").orderBy(col("msf_changedate__c"))

df_rec_quo = df_rec_quo.withColumn("previous_change_date", lag("msf_changedate__c").over(windowSpec)) \
    .withColumn("days_elapsed_since_previous_change", datediff(col("msf_changedate__c"), col("previous_change_date"))) \
    .withColumn("msf_oldamount__c", lag(col("msf_newamount__c")).over(windowSpec))  \
    .withColumn("msf_oldannualizedquota__c", lag(col("msf_newannualizedquota__c")).over(windowSpec))  \
    .withColumn("msf_oldrecurringperiod__c", lag(col("msf_newrecurringperiod__c")).over(windowSpec))

In [None]:
df_merge = (
        df_tasks_cons
        .join(df_rec_quo, (df_rec_quo.msf_changedate__c > df_tasks_cons.activitydate) &
                          (df_rec_quo.msf_changedate__c <= df_tasks_cons.activitydate + 60) &
                          (df_rec_quo.con_id == df_tasks_cons.whoid)
             , 'left')
)

In [None]:
window_spec = Window.partitionBy("whoid").orderBy(col("activitydate"),col("previous_modification_date"))

columns_to_fill = ["msf_newamount__c", "msf_newannualizedquota__c", "msf_newrecurringperiod__c",
                   "previous_change_date", "msf_oldamount__c", "msf_oldannualizedquota__c", "msf_oldrecurringperiod__c"]

# Use the last function to fill null values with values from the previous row within the same ID
for col in columns_to_fill:
    df_merge = df_merge.withColumn(col, last(col, True).over(window_spec))

In [None]:
from pyspark.sql.functions import col
window_spec = Window.partitionBy("whoid").orderBy(col("activitydate").desc())

columns_to_fill = ["previous_change_date", "msf_oldamount__c", "msf_oldannualizedquota__c", "msf_oldrecurringperiod__c"]

# Use the last function to fill null values with values from the previous row within the same ID
for col in columns_to_fill:
    df_merge = df_merge.withColumn(col, last(col, True).over(window_spec))

In [None]:
from pyspark.sql.functions import datediff, col

df_merge = df_merge.withColumn("days_elapsed_since_previous_quota_change", datediff(col("activitydate"), col("previous_change_date")))

In [None]:
from pyspark.sql.functions import coalesce

df_merge = df_merge.withColumn("days_elapsed_since_previous_modification", coalesce(col("days_elapsed_since_previous_modification"),col("days_elapsed_since_previous_quota_change"))) \
                   .withColumn('msf_newamount__c', coalesce(col('msf_newamount__c'),col('msf_oldamount__c'))) \
                   .withColumn('msf_newannualizedquota__c', coalesce(col('msf_newannualizedquota__c'),col('msf_oldannualizedquota__c'))) \
                   .withColumn('msf_newrecurringperiod__c', coalesce(col('msf_newrecurringperiod__c'),col('msf_oldrecurringperiod__c')))                                

In [None]:
(
  df_merge.write
        .format("parquet")
        .mode("overwrite")
        .save(f"s3://{BUCKET_NAME}/output/task_merge")
)

In [None]:
df_task_merge = (
  spark.read
    .option("header", "true")
    .option("quote", "\"")
    .option("escape" , "\"")   
    .format("parquet")
    .load(f"s3://{BUCKET_NAME}/output/task_merge")
)

In [None]:
# Transformamos fecha de nacimiento en edad
currentYear = 2023
df_task_merge = df_task_merge.withColumn("age", lit(currentYear) - col("msf_birthyear__c"))

In [None]:
# Eliminamos campos que no deben estar entre las features
columnsToDrop = ["msf_birthyear__c", "msf_entrycampaign__c", "npo02__averageamount__c",
                       "msf_begindatemsf__c", "msf_datefirstrecurringdonorquota__c", "msf_lifetime__c",
                       "msf_inboundoutbound__c", "previous_purchase_date",
                       "con_id", "msf_changedate__c", "msf_leadsource3__c"]
df_task_merge = df_task_merge.drop(*columnsToDrop)

In [1]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline

# Define the OneHotEncoder
encoder = OneHotEncoder(inputCol="gender__c", outputCol="gender_encoded")

# Fit the encoder on the data
encoder_model = encoder.fit(df_task_merge)

# Transform the data using the encoder
df_task_merge = encoder_model.transform(df_task_merge)

ModuleNotFoundError: No module named 'pyspark'

In [None]:
# Define the condition and replacement values
condition = df_task_merge["msf_closetype__c"] != "Positivo"
replacement_value = "Negativo"

# Use the 'when' function to replace values based on the condition
df_task_merge = df_task_merge.withColumn("msf_closetype__c", when(condition, replacement_value).otherwise(df_task_merge["msf_closetype__c"]))

In [None]:
desired_column_order = ["whoid", "activitydate", "msf_seniority__c","gender__c","msf_ltvcont__c","msf_ltvdesc__c",
                        "msf_recencyrecurringdonorcont__c","msf_rfvrecurringdonor__c","npo02__totaloppamount__c",
                        "msf_valuetotalcont__c","msf_valuetotaldesc__c","msf_channel__c", "days_elapsed_since_previous_modification"
                       ,"msf_newamount__c","msf_newannualizedquota__c","msf_newrecurringperiod__c","msf_closetype__c"]  # Replace with your desired column order
df_task_merge = df_task_merge.select(*desired_column_order)

In [None]:
(
  df_task_merge.write
        .format("parquet")
        .mode("overwrite")
        .save(f"s3://{BUCKET_NAME}/output/task_merge_id_date")
)

In [None]:
# Eliminamos id y date
columnsToDrop = ["whoid", "activitydate"]
df_task_merge = df_task_merge.drop(*columnsToDrop)

In [None]:
(
  df_task_merge.write
        .format("parquet")
        .mode("overwrite")
        .save(f"s3://{BUCKET_NAME}/output/task_merge_clean")
)