#**Cancer Line of Therapy - Analysis**


**Step 1 : Installing and Importing Pyspark into our notebook**

In [None]:
!pip3 install pyspark
import pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=51983a3aa8d6bb5e79efd14971f77cce86313aa81e8a9ae7fe55f3f8cdfe2256
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


**Step 2 : Starting a Spark Session and Setting logs**

In [None]:
# Starting a Spark Session with app name "Line-of-Therapy"
from pyspark.sql import SparkSession
spark = pyspark.sql.SparkSession.builder\
        .master('local[*]')\
        .appName('Line-of-Therapy')\
        .getOrCreate()

In [None]:
#Setting log
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sc

**Step 3 : Importing the Data from CSV File and checking the Schema**

In [None]:
# Loading the data from CSV file
file_path = "/content/sample_data/cancer_data2.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

#Printing the schema to check datatype of each column specifically to check date columns
df.printSchema();

#Printing the Dataframe
df.show();

root
 |-- patient_ID: string (nullable = true)
 |-- drug_date: string (nullable = true)
 |-- drug: string (nullable = true)
 |-- days_of_supply: integer (nullable = true)
 |-- class: string (nullable = true)
 |-- drug_start_date: date (nullable = true)
 |-- drug_end_date: date (nullable = true)

+----------+----------+-----------+--------------+-----+---------------+-------------+
|patient_ID| drug_date|       drug|days_of_supply|class|drug_start_date|drug_end_date|
+----------+----------+-----------+--------------+-----+---------------+-------------+
| PATID_001|  5-Dec-13| PACLITAXEL|             0|CHEMO|     2013-12-05|   2013-12-05|
| PATID_001|  5-Dec-13|CARBOPLATIN|             0|CHEMO|     2013-12-05|   2013-12-05|
| PATID_001| 31-Dec-13|CARBOPLATIN|             0|CHEMO|     2013-12-31|   2013-12-31|
| PATID_001| 31-Dec-13| PACLITAXEL|             0|CHEMO|     2013-12-31|   2013-12-31|
| PATID_001| 30-Jan-14|CARBOPLATIN|             0|CHEMO|     2014-01-30|   2014-01-30|
| PATID

**Step 4 : Importing necessary libraries and defining the Window Specification**

In [None]:
#Importing Necessary Libraries

from pyspark.sql.functions import col, lit, lag, when, sum, concat_ws, collect_set, datediff, max as sql_max, row_number
from pyspark.sql.window import Window
#import pyspark.sql.functions as F

#Defining Window Specification - ordering by drug start date within each patient group
date_window = Window.partitionBy("patient_ID").orderBy("drug_start_date")


**Step 5 : Calculating the gap between treatments to determine when a new line of therapy should start**

In [None]:
# Computing the end date of the previous treatment within the same patient group
# 'previous_end_date' stores the drug end date of the previous row within the same patient group
df = df.withColumn("previous_end_date", lag(df.drug_end_date).over(date_window))

# Calculating the number of days between the current treatment start and the previous treatment end
# 'days_between' calculates the days between the current drug's start date and the previous drug's end date
df = df.withColumn("days_between", when(col("previous_end_date").isNotNull(),
                                        datediff(col("drug_start_date"), col("previous_end_date"))).otherwise(0))

# Printing DataFrame with the new 'days_between' column
df.show(truncate=False)


+----------+----------+-----------+--------------+-----+---------------+-------------+-----------------+------------+
|patient_ID|drug_date |drug       |days_of_supply|class|drug_start_date|drug_end_date|previous_end_date|days_between|
+----------+----------+-----------+--------------+-----+---------------+-------------+-----------------+------------+
|PATID_001 |5-Dec-13  |PACLITAXEL |0             |CHEMO|2013-12-05     |2013-12-05   |NULL             |0           |
|PATID_001 |5-Dec-13  |CARBOPLATIN|0             |CHEMO|2013-12-05     |2013-12-05   |2013-12-05       |0           |
|PATID_001 |31-Dec-13 |CARBOPLATIN|0             |CHEMO|2013-12-31     |2013-12-31   |2013-12-05       |26          |
|PATID_001 |31-Dec-13 |PACLITAXEL |0             |CHEMO|2013-12-31     |2013-12-31   |2013-12-31       |0           |
|PATID_001 |30-Jan-14 |CARBOPLATIN|0             |CHEMO|2014-01-30     |2014-01-30   |2013-12-31       |30          |
|PATID_001 |30-Jan-14 |PACLITAXEL |0             |CHEMO|

**Step 6 : Flag new lines based on the gap days**

In [None]:
# Gap days
gap_days = 45

# Flaging a new therapy line if the gap between treatments exceeds 45 days
df = df.withColumn("new_line", when(col("days_between") > gap_days, 1).otherwise(0))

# Accumulating the flags to assign a unique identifier to each line of therapy
df = df.withColumn("cumulative_line", sum("new_line").over(date_window.rowsBetween(Window.unboundedPreceding, 0)))

# Printing DataFrame with the new 'cumulative_line' column
df.show(truncate=False)


+----------+----------+-----------+--------------+-----+---------------+-------------+-----------------+------------+--------+---------------+
|patient_ID|drug_date |drug       |days_of_supply|class|drug_start_date|drug_end_date|previous_end_date|days_between|new_line|cumulative_line|
+----------+----------+-----------+--------------+-----+---------------+-------------+-----------------+------------+--------+---------------+
|PATID_001 |5-Dec-13  |PACLITAXEL |0             |CHEMO|2013-12-05     |2013-12-05   |NULL             |0           |0       |0              |
|PATID_001 |5-Dec-13  |CARBOPLATIN|0             |CHEMO|2013-12-05     |2013-12-05   |2013-12-05       |0           |0       |0              |
|PATID_001 |31-Dec-13 |CARBOPLATIN|0             |CHEMO|2013-12-31     |2013-12-31   |2013-12-05       |26          |0       |0              |
|PATID_001 |31-Dec-13 |PACLITAXEL |0             |CHEMO|2013-12-31     |2013-12-31   |2013-12-31       |0           |0       |0              |

**Step 7: Collecting unique drugs for each therapy line**

In [None]:
# Collecting unique drugs within each identified therapy line
# 'drugs_in_line' collects unique drugs within each therapy line identified by 'cumulative_line'
df = df.withColumn("drugs_in_line", collect_set("drug").over(Window.partitionBy("patient_ID", "cumulative_line")))

# Concatenating the unique drugs into a string to represent the regimen for each line
# 'Line_regimen' concatenates the collected unique drugs into a string for each line
df = df.withColumn("Line_regimen", concat_ws(" + ", col("drugs_in_line")))

# Printing DataFrame with the new 'Line_regimen' column
df.show(truncate=False)


+----------+----------+-----------+--------------+-----+---------------+-------------+-----------------+------------+--------+---------------+-------------------------------------+-------------------------------------+
|patient_ID|drug_date |drug       |days_of_supply|class|drug_start_date|drug_end_date|previous_end_date|days_between|new_line|cumulative_line|drugs_in_line                        |Line_regimen                         |
+----------+----------+-----------+--------------+-----+---------------+-------------+-----------------+------------+--------+---------------+-------------------------------------+-------------------------------------+
|PATID_001 |5-Dec-13  |PACLITAXEL |0             |CHEMO|2013-12-05     |2013-12-05   |NULL             |0           |0       |0              |[PACLITAXEL, CARBOPLATIN]            |PACLITAXEL + CARBOPLATIN             |
|PATID_001 |5-Dec-13  |CARBOPLATIN|0             |CHEMO|2013-12-05     |2013-12-05   |2013-12-05       |0           |0      

**Step 8 : Generating line identifiers**

In [None]:
# Creating a line identifier by concatenating 'L' with the line number
df = df.withColumn("Line", concat_ws("", lit("L"), col("cumulative_line") + 1))

# Printing DataFrame with the new line identifiers
df.show(truncate=False)

+----------+----------+-----------+--------------+-----+---------------+-------------+-----------------+------------+--------+---------------+-------------------------------------+-------------------------------------+----+
|patient_ID|drug_date |drug       |days_of_supply|class|drug_start_date|drug_end_date|previous_end_date|days_between|new_line|cumulative_line|drugs_in_line                        |Line_regimen                         |Line|
+----------+----------+-----------+--------------+-----+---------------+-------------+-----------------+------------+--------+---------------+-------------------------------------+-------------------------------------+----+
|PATID_001 |5-Dec-13  |PACLITAXEL |0             |CHEMO|2013-12-05     |2013-12-05   |NULL             |0           |0       |0              |[PACLITAXEL, CARBOPLATIN]            |PACLITAXEL + CARBOPLATIN             |L1  |
|PATID_001 |5-Dec-13  |CARBOPLATIN|0             |CHEMO|2013-12-05     |2013-12-05   |2013-12-05       |

**Step 9 : Selecting Distinct Lines and generating the desired output**

In [None]:
# Selecting distinct lines to ensure each line is represented once
result_df = df.select("patient_ID", "drug_start_date", "Line_regimen", "class", "Line").distinct()

# 'row_num' helps in filtering to get only the first occurrence of each line
result_df = result_df.withColumn("row_num", row_number().over(Window.partitionBy("patient_ID", "Line").orderBy("drug_start_date")))

# Printing dataframe
result_df.show(truncate=False)

# Filtering to get the first entry per line to represent the start of each therapy line
result_df = result_df.filter(col("row_num") == 1).drop("row_num")

# Final output DataFrame
result_df.show(truncate=False)

+----------+---------------+-------------------------------------+-----+----+-------+
|patient_ID|drug_start_date|Line_regimen                         |class|Line|row_num|
+----------+---------------+-------------------------------------+-----+----+-------+
|PATID_001 |2013-12-05     |PACLITAXEL + CARBOPLATIN             |CHEMO|L1  |1      |
|PATID_001 |2013-12-31     |PACLITAXEL + CARBOPLATIN             |CHEMO|L1  |2      |
|PATID_001 |2014-01-30     |PACLITAXEL + CARBOPLATIN             |CHEMO|L1  |3      |
|PATID_001 |2015-09-14     |PACLITAXEL + CARBOPLATIN             |CHEMO|L2  |1      |
|PATID_001 |2015-10-08     |PACLITAXEL + CARBOPLATIN             |CHEMO|L2  |2      |
|PATID_001 |2015-10-29     |PACLITAXEL + CARBOPLATIN             |CHEMO|L2  |3      |
|PATID_001 |2015-11-19     |PACLITAXEL + CARBOPLATIN             |CHEMO|L2  |4      |
|PATID_001 |2018-04-18     |LETROZOLE                            |HORMO|L3  |1      |
|PATID_001 |2018-06-22     |LIPOSOMAL + CARBOPLATIN + 