<a href="https://colab.research.google.com/github/kuldeep27396/pyspark_exercises/blob/master/pyspark_questions1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=f5d0b9a86e9501bed5aa220aef8708a4403ed9f27902c3fe72efe035d2790472
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when

# Initialize Spark session
spark = SparkSession.builder.appName("SCD_Type1").getOrCreate()

# Sample data
data = [
    (1, "GC001", "Active"),
    (2, "GC002", "Active"),
    (3, "GC003", "Expired")
]

columns = ["id", "gift_card_id", "status"]

df = spark.createDataFrame(data, columns)
df.show(5)


+---+------------+-------+
| id|gift_card_id| status|
+---+------------+-------+
|  1|       GC001| Active|
|  2|       GC002| Active|
|  3|       GC003|Expired|
+---+------------+-------+



In [4]:

# Assume the status of GC002 changes to Blocked
updated_data = [(2, "GC002", "Blocked")]

df_update = spark.createDataFrame(updated_data, columns)

# Perform SCD Type 1 (overwrite)
df_scd1 = df.join(df_update, on="gift_card_id", how="left")\
    .select(df["id"], df["gift_card_id"],
            when(df_update["status"].isNotNull(), df_update["status"]).otherwise(df["status"]).alias("status"))

df_scd1.show()

+---+------------+-------+
| id|gift_card_id| status|
+---+------------+-------+
|  1|       GC001| Active|
|  3|       GC003|Expired|
|  2|       GC002|Blocked|
+---+------------+-------+



In [5]:
from pyspark.sql.functions import lit, col, max

# Sample data with versioning
data_scd2 = [
    (1, "GC001", "Active", 1),
    (2, "GC002", "Active", 1),
    (3, "GC003", "Expired", 1)
]

columns_scd2 = ["id", "gift_card_id", "status", "version"]

df_scd2 = spark.createDataFrame(data_scd2, columns_scd2)

# Assume the status of GC002 changes to Blocked
updated_data_scd2 = [(2, "GC002", "Blocked", 2)]

df_update_scd2 = spark.createDataFrame(updated_data_scd2, columns_scd2)

# Perform SCD Type 2 (add new row)
df_scd2_final = df_scd2.union(df_update_scd2)

df_scd2_final.show()


+---+------------+-------+-------+
| id|gift_card_id| status|version|
+---+------------+-------+-------+
|  1|       GC001| Active|      1|
|  2|       GC002| Active|      1|
|  3|       GC003|Expired|      1|
|  2|       GC002|Blocked|      2|
+---+------------+-------+-------+



In [10]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession

# Define the schema explicitly
schema_scd3 = StructType([
    StructField("id", IntegerType(), True),
    StructField("gift_card_id", StringType(), True),
    StructField("status", StringType(), True),
    StructField("previous_status", StringType(), True)  # Explicitly defining the type of previous_status
])

# Sample data with previous status
data_scd3 = [
    (1, "GC001", "Active", None),
    (2, "GC002", "Active", None),
    (3, "GC003", "Expired", None)
]

# Create DataFrame with the explicit schema
df_scd3 = spark.createDataFrame(data_scd3, schema=schema_scd3)

# Assume the status of GC002 changes to Blocked
updated_data_scd3 = [(2, "GC002", "Blocked", "Active")]

# Create DataFrame for updated data with the same schema
df_update_scd3 = spark.createDataFrame(updated_data_scd3, schema=schema_scd3)

# Perform SCD Type 3 (add new column for previous status)
df_scd3_final = df_scd3.join(df_update_scd3, on="gift_card_id", how="left")\
    .select(
        df_scd3["id"],
        df_scd3["gift_card_id"],
        coalesce(df_update_scd3["status"], df_scd3["status"]).alias("status"),
        coalesce(df_update_scd3["previous_status"], df_scd3["previous_status"]).alias("previous_status")
    )

df_scd3_final.show()


+---+------------+-------+---------------+
| id|gift_card_id| status|previous_status|
+---+------------+-------+---------------+
|  1|       GC001| Active|           NULL|
|  3|       GC003|Expired|           NULL|
|  2|       GC002|Blocked|         Active|
+---+------------+-------+---------------+



In [11]:
# Sample current table data
data_current = [
    (1, "GC001", "Active"),
    (2, "GC002", "Active"),
    (3, "GC003", "Expired")
]

# Sample historical table data
data_history = [
    (1, "GC002", "Active"),
    (2, "GC003", "Active")
]

columns_current = ["id", "gift_card_id", "status"]
columns_history = ["id", "gift_card_id", "status"]

df_current = spark.createDataFrame(data_current, columns_current)
df_history = spark.createDataFrame(data_history, columns_history)

# Assume the status of GC002 changes to Blocked
updated_data_current = [(2, "GC002", "Blocked")]

df_update_current = spark.createDataFrame(updated_data_current, columns_current)

# Perform SCD Type 4 (add to history, update current)
df_history_final = df_history.union(df_current.filter(col("gift_card_id") == "GC002"))
df_current_final = df_current.join(df_update_current, on="gift_card_id", how="left")\
    .select(df_current["id"], df_current["gift_card_id"],
            coalesce(df_update_current["status"], df_current["status"]).alias("status"))

df_history_final.show()
df_current_final.show()


+---+------------+------+
| id|gift_card_id|status|
+---+------------+------+
|  1|       GC002|Active|
|  2|       GC003|Active|
|  2|       GC002|Active|
+---+------------+------+

+---+------------+-------+
| id|gift_card_id| status|
+---+------------+-------+
|  1|       GC001| Active|
|  3|       GC003|Expired|
|  2|       GC002|Blocked|
+---+------------+-------+



In [12]:
# Sample dimension table data
data_dim = [
    (1, "GC001", "Active", 1),
    (2, "GC002", "Active", 1)
]

columns_dim = ["id", "gift_card_id", "status", "surrogate_key"]

df_dim = spark.createDataFrame(data_dim, columns_dim)

# Perform SCD Type 5 (combining Type 1 and Type 4)
df_dim_scd5 = df_dim.withColumn("surrogate_key", col("surrogate_key") + 1)

df_dim_scd5.show()


+---+------------+------+-------------+
| id|gift_card_id|status|surrogate_key|
+---+------------+------+-------------+
|  1|       GC001|Active|            2|
|  2|       GC002|Active|            2|
+---+------------+------+-------------+



In [14]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Sample data
data_scd6 = [
    (1, "GC001", "Active", "None", 1),
    (2, "GC002", "Active", "None", 1),
    (3, "GC003", "Expired", "None", 1)
]

columns_scd6 = ["id", "gift_card_id", "status", "previous_status", "version"]

df_scd6 = spark.createDataFrame(data_scd6, columns_scd6)

# Assume the status of GC002 changes to Blocked
updated_data_scd6 = [(2, "GC002", "Blocked", "Active", 2)]

df_update_scd6 = spark.createDataFrame(updated_data_scd6, columns_scd6)

# Perform SCD Type 6 (hybrid approach)
df_scd6_final = df_scd6.union(df_update_scd6)\
    .withColumn("row_num", row_number().over(Window.partitionBy("gift_card_id").orderBy(col("version").desc())))\
    .filter(col("row_num") == 1)\
    .drop("row_num")

df_scd6_final.show()


+---+------------+-------+---------------+-------+
| id|gift_card_id| status|previous_status|version|
+---+------------+-------+---------------+-------+
|  1|       GC001| Active|           None|      1|
|  2|       GC002|Blocked|         Active|      2|
|  3|       GC003|Expired|           None|      1|
+---+------------+-------+---------------+-------+



In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col

# Initialize Spark session
spark = SparkSession.builder.appName("InsuranceFraudDetection").getOrCreate()

# Sample JSON data
json_data = '''
[
    {
        "claim_number": "CLM001",
        "policy_holder": "John Doe",
        "claim_date": "2024-08-01",
        "contacts": {
            "primary_contact": {
                "name": "John Doe",
                "phone": "123-456-7890",
                "email": "john.doe@example.com"
            },
            "secondary_contact": {
                "name": "Jane Doe",
                "phone": "987-654-3210",
                "email": "jane.doe@example.com"
            }
        },
        "items_bought": [
            {
                "item_id": "ITM001",
                "item_name": "Laptop",
                "quantity": 1,
                "price": 1200.00
            },
            {
                "item_id": "ITM002",
                "item_name": "Smartphone",
                "quantity": 2,
                "price": 800.00
            }
        ]
    },
    {
        "claim_number": "CLM002",
        "policy_holder": "Alice Smith",
        "claim_date": "2024-08-02",
        "contacts": {
            "primary_contact": {
                "name": "Alice Smith",
                "phone": "555-123-4567",
                "email": "alice.smith@example.com"
            },
            "secondary_contact": {
                "name": "Bob Smith",
                "phone": "555-987-6543",
                "email": "bob.smith@example.com"
            }
        },
        "items_bought": [
            {
                "item_id": "ITM003",
                "item_name": "Tablet",
                "quantity": 1,
                "price": 600.00
            }
        ]
    }
]
'''

# Read JSON from the string
df = spark.read.json(spark.sparkContext.parallelize([json_data]))

# Display the DataFrame
df.show(truncate=False)




+----------+------------+--------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+-------------+
|claim_date|claim_number|contacts                                                                                                |items_bought                                                 |policy_holder|
+----------+------------+--------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+-------------+
|2024-08-01|CLM001      |{{john.doe@example.com, John Doe, 123-456-7890}, {jane.doe@example.com, Jane Doe, 987-654-3210}}        |[{ITM001, Laptop, 1200.0, 1}, {ITM002, Smartphone, 800.0, 2}]|John Doe     |
|2024-08-02|CLM002      |{{alice.smith@example.com, Alice Smith, 555-123-4567}, {bob.smith@example.com, Bob Smith, 555-987-6543}}|[{ITM003, Tablet, 600.0, 1}]              

In [18]:
# Flatten the contacts field
df_contacts = df.select(
    col("claim_number"),
    col("policy_holder"),
    col("claim_date"),
    col("contacts.primary_contact.name").alias("primary_contact_name"),
    col("contacts.primary_contact.phone").alias("primary_contact_phone"),
    col("contacts.primary_contact.email").alias("primary_contact_email"),
    col("contacts.secondary_contact.name").alias("secondary_contact_name"),
    col("contacts.secondary_contact.phone").alias("secondary_contact_phone"),
    col("contacts.secondary_contact.email").alias("secondary_contact_email")
)

df_contacts.show(truncate=False)


+------------+-------------+----------+--------------------+---------------------+-----------------------+----------------------+-----------------------+-----------------------+
|claim_number|policy_holder|claim_date|primary_contact_name|primary_contact_phone|primary_contact_email  |secondary_contact_name|secondary_contact_phone|secondary_contact_email|
+------------+-------------+----------+--------------------+---------------------+-----------------------+----------------------+-----------------------+-----------------------+
|CLM001      |John Doe     |2024-08-01|John Doe            |123-456-7890         |john.doe@example.com   |Jane Doe              |987-654-3210           |jane.doe@example.com   |
|CLM002      |Alice Smith  |2024-08-02|Alice Smith         |555-123-4567         |alice.smith@example.com|Bob Smith             |555-987-6543           |bob.smith@example.com  |
+------------+-------------+----------+--------------------+---------------------+-----------------------+----

In [19]:
# Explode the items_bought field
df_items = df.select(
    col("claim_number"),
    col("policy_holder"),
    col("claim_date"),
    explode("items_bought").alias("item")
).select(
    col("claim_number"),
    col("policy_holder"),
    col("claim_date"),
    col("item.item_id").alias("item_id"),
    col("item.item_name").alias("item_name"),
    col("item.quantity").alias("quantity"),
    col("item.price").alias("price")
)

df_items.show(truncate=False)


+------------+-------------+----------+-------+----------+--------+------+
|claim_number|policy_holder|claim_date|item_id|item_name |quantity|price |
+------------+-------------+----------+-------+----------+--------+------+
|CLM001      |John Doe     |2024-08-01|ITM001 |Laptop    |1       |1200.0|
|CLM001      |John Doe     |2024-08-01|ITM002 |Smartphone|2       |800.0 |
|CLM002      |Alice Smith  |2024-08-02|ITM003 |Tablet    |1       |600.0 |
+------------+-------------+----------+-------+----------+--------+------+

