<a href="https://colab.research.google.com/github/zwelshman/collections/blob/main/learning-lab/notebooks/introductory_notebooks/Introduction_to_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to PySpark

## Set up back end for a spark session

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [None]:
from pyspark.sql import functions as F

df_a = (
    spark.createDataFrame(
        [
            ("1", "2020-02-01", 1, 'A1'),
            ("1", "2021-02-11", 2, 'A2'),
            ("1", "2020-02-01", 3, 'B1'),

            ("2", "2020-02-01", 1, 'A3'),
            ("2", "2021-02-23", 2, 'B1'),
            ("2", "2020-05-01", 3, None),

            ("3", "2020-01-01", 1, 'A1'),
            ("3", "2021-02-23", 2, 'B1'),
            ("3", "2020-05-01", 3, 'B2'),

            ("4", "2020-01-01", 1, 'A1'),
            ("4", "2021-02-23", 2, 'A2'),
            ("4", "2020-05-01", 3, 'A3'),
        ],
        ['person_id', 'date', 'diagnostic_position','diagnostic_code']
    )
    .withColumn('date', F.to_date(F.col('date')))
)

display(df_a)

person_id,date,diagnostic_position,diagnostic_code
1,2020-02-01,1,A1
1,2021-02-11,2,A2
1,2020-02-01,3,B1
2,2020-02-01,1,A3
2,2021-02-23,2,B1
2,2020-05-01,3,
3,2020-01-01,1,A1
3,2021-02-23,2,B1
3,2020-05-01,3,B2
4,2020-01-01,1,A1


In [None]:
from pyspark.sql import functions as F

df_b = (
    spark.createDataFrame(
        [
            ('A1', 'include'),
            ('A2', 'include'),
            ('A3', 'include'),
            ('B1', 'exclude'),
            ('B2', 'exclude'),
            ('B3', 'include'),
        ],
        ['diagnostic_code', 'category']
    )
    # .withColumn('date', F.to_date(F.col('date')))
)

display(df_b)

diagnostic_code,category
A1,include
A2,include
A3,include
B1,exclude
B2,exclude
B3,include


In [None]:
from pyspark.sql import functions as F

# Assuming df_a and df_b are the DataFrames corresponding to 'a' and 'b' respectively
# Joining on the diagnostic code
df_joined = df_a.join(df_b, df_a.diagnostic_code == df_b.diagnostic_code, "inner")
display(df_joined)


person_id,date,diagnostic_position,diagnostic_code,diagnostic_code.1,category
1,2021-02-11,2,A2,A2,include
4,2021-02-23,2,A2,A2,include
3,2020-05-01,3,B2,B2,exclude
1,2020-02-01,3,B1,B1,exclude
2,2021-02-23,2,B1,B1,exclude
3,2021-02-23,2,B1,B1,exclude
2,2020-02-01,1,A3,A3,include
4,2020-05-01,3,A3,A3,include
1,2020-02-01,1,A1,A1,include
3,2020-01-01,1,A1,A1,include


In [None]:
df_joined = df_joined.withColumn("include_flag",
                                 F.when((F.col("diagnostic_position") == 1) &
                                        (F.col("category") == "include"), 1)
                                 .otherwise(0))
display(df_joined)

person_id,date,diagnostic_position,diagnostic_code,diagnostic_code.1,category,include_flag
1,2021-02-11,2,A2,A2,include,0
4,2021-02-23,2,A2,A2,include,0
3,2020-05-01,3,B2,B2,exclude,0
1,2020-02-01,3,B1,B1,exclude,0
2,2021-02-23,2,B1,B1,exclude,0
3,2021-02-23,2,B1,B1,exclude,0
2,2020-02-01,1,A3,A3,include,1
4,2020-05-01,3,A3,A3,include,0
1,2020-02-01,1,A1,A1,include,1
3,2020-01-01,1,A1,A1,include,1


In [None]:
df_b_values = df_joined.filter((F.col("category") == "exclude") & (F.col("diagnostic_position") > 1))
display(df_b_values )

person_id,date,diagnostic_position,diagnostic_code,diagnostic_code.1,category,include_flag
3,2020-05-01,3,B2,B2,exclude,0
1,2020-02-01,3,B1,B1,exclude,0
2,2021-02-23,2,B1,B1,exclude,0
3,2021-02-23,2,B1,B1,exclude,0


In [None]:
# Self-join to find patients with both A and B values
df_self_joined = df_joined.alias("df1").join(df_b_values.alias("df2"), F.col("df1.person_id") == F.col("df2.person_id"), "inner")
display(df_self_joined )


person_id,date,diagnostic_position,diagnostic_code,diagnostic_code.1,category,include_flag,person_id.1,date.1,diagnostic_position.1,diagnostic_code.2,diagnostic_code.3,category.1,include_flag.1
3,2020-05-01,3,B2,B2,exclude,0,3,2020-05-01,3,B2,B2,exclude,0
3,2020-05-01,3,B2,B2,exclude,0,3,2021-02-23,2,B1,B1,exclude,0
3,2021-02-23,2,B1,B1,exclude,0,3,2020-05-01,3,B2,B2,exclude,0
3,2021-02-23,2,B1,B1,exclude,0,3,2021-02-23,2,B1,B1,exclude,0
3,2020-01-01,1,A1,A1,include,1,3,2020-05-01,3,B2,B2,exclude,0
3,2020-01-01,1,A1,A1,include,1,3,2021-02-23,2,B1,B1,exclude,0
1,2021-02-11,2,A2,A2,include,0,1,2020-02-01,3,B1,B1,exclude,0
1,2020-02-01,3,B1,B1,exclude,0,1,2020-02-01,3,B1,B1,exclude,0
1,2020-02-01,1,A1,A1,include,1,1,2020-02-01,3,B1,B1,exclude,0
2,2021-02-23,2,B1,B1,exclude,0,2,2021-02-23,2,B1,B1,exclude,0


In [None]:
# Calculate the date difference where df1 has A values and df2 has B values
df_self_joined = df_self_joined.withColumn("date_diff",
                                           F.datediff(F.col("df2.date"), F.col("df1.date")))
display(df_self_joined )

person_id,date,diagnostic_position,diagnostic_code,diagnostic_code.1,category,include_flag,person_id.1,date.1,diagnostic_position.1,diagnostic_code.2,diagnostic_code.3,category.1,include_flag.1,date_diff
3,2020-05-01,3,B2,B2,exclude,0,3,2020-05-01,3,B2,B2,exclude,0,0
3,2020-05-01,3,B2,B2,exclude,0,3,2021-02-23,2,B1,B1,exclude,0,298
3,2021-02-23,2,B1,B1,exclude,0,3,2020-05-01,3,B2,B2,exclude,0,-298
3,2021-02-23,2,B1,B1,exclude,0,3,2021-02-23,2,B1,B1,exclude,0,0
3,2020-01-01,1,A1,A1,include,1,3,2020-05-01,3,B2,B2,exclude,0,121
3,2020-01-01,1,A1,A1,include,1,3,2021-02-23,2,B1,B1,exclude,0,419
1,2021-02-11,2,A2,A2,include,0,1,2020-02-01,3,B1,B1,exclude,0,-376
1,2020-02-01,3,B1,B1,exclude,0,1,2020-02-01,3,B1,B1,exclude,0,0
1,2020-02-01,1,A1,A1,include,1,1,2020-02-01,3,B1,B1,exclude,0,0
2,2021-02-23,2,B1,B1,exclude,0,2,2021-02-23,2,B1,B1,exclude,0,0


In [None]:

# Flag based on the condition
df_self_joined = df_self_joined.withColumn("exclude_flag",
                                           F.when((F.col("df1.diagnostic_position") == 1) &
                                                  (F.col("df1.category") == "include") &
                                                  (F.col("df2.category") == "exclude") &
                                                  (F.col("df2.diagnostic_position") > 1), 1)
                                                  # & (F.col("date_diff") > 0), 1)
                                           .otherwise(0))

display(df_self_joined )

person_id,date,diagnostic_position,diagnostic_code,diagnostic_code.1,category,include_flag,person_id.1,date.1,diagnostic_position.1,diagnostic_code.2,diagnostic_code.3,category.1,include_flag.1,date_diff,exclude_flag
3,2020-05-01,3,B2,B2,exclude,0,3,2020-05-01,3,B2,B2,exclude,0,0,0
3,2020-05-01,3,B2,B2,exclude,0,3,2021-02-23,2,B1,B1,exclude,0,298,0
3,2021-02-23,2,B1,B1,exclude,0,3,2020-05-01,3,B2,B2,exclude,0,-298,0
3,2021-02-23,2,B1,B1,exclude,0,3,2021-02-23,2,B1,B1,exclude,0,0,0
3,2020-01-01,1,A1,A1,include,1,3,2020-05-01,3,B2,B2,exclude,0,121,1
3,2020-01-01,1,A1,A1,include,1,3,2021-02-23,2,B1,B1,exclude,0,419,1
1,2021-02-11,2,A2,A2,include,0,1,2020-02-01,3,B1,B1,exclude,0,-376,0
1,2020-02-01,3,B1,B1,exclude,0,1,2020-02-01,3,B1,B1,exclude,0,0,0
1,2020-02-01,1,A1,A1,include,1,1,2020-02-01,3,B1,B1,exclude,0,0,1
2,2021-02-23,2,B1,B1,exclude,0,2,2021-02-23,2,B1,B1,exclude,0,0,0
