In [None]:
pip install pyspark

# <mark>_<u>**Explode very powerful**</u>_</mark>

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode

# Create a SparkSession
spark = SparkSession.builder.appName("ExplodeExample").getOrCreate()

# Create a DataFrame with nested data
data = [
    (1, ["apple", "banana", "cherry"]),
    (2, ["grape", "orange"])
]

df = spark.createDataFrame(data, ["id", "fruits"])

# Explode the "fruits" column
exploded_df = df.withColumn("fruit", explode("fruits"))

exploded_df.show()

24/12/05 05:41:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---+--------------------+------+
| id|              fruits| fruit|
+---+--------------------+------+
|  1|[apple, banana, c...| apple|
|  1|[apple, banana, c...|banana|
|  1|[apple, banana, c...|cherry|
|  2|     [grape, orange]| grape|
|  2|     [grape, orange]|orange|
+---+--------------------+------+



In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.getOrCreate()

In [3]:
data1 = [("Alice", 1, 1000, "Address1"),
         ("Bob", 2, 2000, "Address2"),
         ("Charlie", 3, 3000, "Address3"),
         ("David", 4, 4000, "Address4"),
         ("Eve", 5, 5000, "Address5")]

data2 = [(1, 25, "Married"),
         (2, 30, "Single"),
         (3, 28, "Married"),
         (4, 32, "Single"),
         (5, 27, "Married")]

In [4]:
df1 = spark.createDataFrame(data1, ["student_name", "id", "salary", "address"])
df2 = spark.createDataFrame(data2, ["id", "age", "married_status"])


In [24]:
joined_df=df1.join(df2,"id")
joined_df.show()

+---+------------+------+--------+---+--------------+
| id|student_name|salary| address|age|married_status|
+---+------------+------+--------+---+--------------+
|  1|       Alice|  1000|Address1| 25|       Married|
|  2|         Bob|  2000|Address2| 30|        Single|
|  3|     Charlie|  3000|Address3| 28|       Married|
|  4|       David|  4000|Address4| 32|        Single|
|  5|         Eve|  5000|Address5| 27|       Married|
+---+------------+------+--------+---+--------------+



# <mark>_<u>**Broad Case Join**</u>_</mark>

In [25]:
data3 = [(1, "DeptA"),
         (2, "DeptB"),
         (3, "DeptC"),
         (4, "DeptD"),
         (5, "DeptE")]

df3=spark.createDataFrame(data3, ["id","department"])

joined_df = df1.join(df2, "id").join(df3.hint("broadcast"), "id")

joined_df.show()

+---+------------+------+--------+---+--------------+----------+
| id|student_name|salary| address|age|married_status|department|
+---+------------+------+--------+---+--------------+----------+
|  1|       Alice|  1000|Address1| 25|       Married|     DeptA|
|  2|         Bob|  2000|Address2| 30|        Single|     DeptB|
|  3|     Charlie|  3000|Address3| 28|       Married|     DeptC|
|  4|       David|  4000|Address4| 32|        Single|     DeptD|
|  5|         Eve|  5000|Address5| 27|       Married|     DeptE|
+---+------------+------+--------+---+--------------+----------+



# <mark>_<u>**Distinct Function **</u>_</mark>

In [27]:
distinctDF = joined_df.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)

Distinct count: 5
+---+------------+------+--------+---+--------------+----------+
|id |student_name|salary|address |age|married_status|department|
+---+------------+------+--------+---+--------------+----------+
|1  |Alice       |1000  |Address1|25 |Married       |DeptA     |
|2  |Bob         |2000  |Address2|30 |Single        |DeptB     |
|3  |Charlie     |3000  |Address3|28 |Married       |DeptC     |
|4  |David       |4000  |Address4|32 |Single        |DeptD     |
|5  |Eve         |5000  |Address5|27 |Married       |DeptE     |
+---+------------+------+--------+---+--------------+----------+



# <mark>_<u>**dropDuplicates**</u>_</mark>

In [28]:
df2 = joined_df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
df2.show(truncate=False)

Distinct count: 5


[Stage 57:>                                                         (0 + 2) / 2]                                                                                

# <mark>_<u>**dropDuplicates_2_Columns**</u>_</mark>

In [29]:
dropDisDF = joined_df.dropDuplicates(["department","salary"])
print("Distinct count of department & salary : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)

Distinct count of department & salary : 5
+---+------------+------+--------+---+--------------+----------+
|id |student_name|salary|address |age|married_status|department|
+---+------------+------+--------+---+--------------+----------+
|1  |Alice       |1000  |Address1|25 |Married       |DeptA     |
|2  |Bob         |2000  |Address2|30 |Single        |DeptB     |
|3  |Charlie     |3000  |Address3|28 |Married       |DeptC     |
|4  |David       |4000  |Address4|32 |Single        |DeptD     |
|5  |Eve         |5000  |Address5|27 |Married       |DeptE     |
+---+------------+------+--------+---+--------------+----------+



# <mark>_<u>**repartition**</u>_</mark>

In [30]:
df2 = joined_df.repartition(6)
print(df2.rdd.getNumPartitions())

6


# <mark>_<u>**UDF Example**</u>_</mark>

In [32]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



[Stage 94:>                                                         (0 + 1) / 1]

# <mark>_<u>**Complex data type**</u>_</mark>

In [12]:
data = [
    {"id": 1, "items": ["item1", "item2", "item3"]},
    {"id": 2, "items": ["item4", "item5"]}
]

df = spark.createDataFrame(data)

df.show()

+---+--------------------+
| id|               items|
+---+--------------------+
|  1|[item1, item2, it...|
|  2|      [item4, item5]|
+---+--------------------+



# <mark>_<u>**Using python and explode**</u>_</mark>

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, udf, size, collect_set, col
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType

# Create a SparkSession
spark = SparkSession.builder.appName("NestedDataFrame").getOrCreate()

# Create a DataFrame with nested data
data = [
    {"id": 1, "items": ["item1", "item2", "item3","item56","item546"]},
    {"id": 2, "items": ["item4", "item5"]}
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("items", ArrayType(StringType()), True)
])

df = spark.createDataFrame(data, schema=schema)

# Calculate total items before exploding
df = df.withColumn("total_items", size("items"))

# Define a custom function to extract a feature (adjust the return type as needed)
def extract_feature(item):
    feature_value = "hello"
    return feature_value  # Assuming feature_value is a string


# Explode the `items` array and apply the custom function
exploded_df = df.withColumn("item", explode("items")) \
               .withColumn("feature", udf(extract_feature, StringType())("item"))

# Group by `id` and calculate aggregate metrics
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, udf, size, collect_set, col, first


result_df = exploded_df.groupBy("id") \
               .agg(
                   first("total_items").alias("total_items"),
                   collect_set("feature").alias("unique_features")
               )

df.show()
result_df.show()

24/12/05 05:45:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---+--------------------+-----------+
| id|               items|total_items|
+---+--------------------+-----------+
|  1|[item1, item2, it...|          5|
|  2|      [item4, item5]|          2|
+---+--------------------+-----------+



# <mark>_<u>**Using SQL and explode**</u>_</mark>

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, udf, size, collect_set, col
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType

data = [
    {"id": 1, "items": ["item1", "item2", "item3","item56","item546"]},
    {"id": 2, "items": ["item4", "item5"]}
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("items", ArrayType(StringType()), True)
])

df = spark.createDataFrame(data, schema=schema)

# Register the original DataFrame with a temporary view
df.createOrReplaceTempView("original_data")

df.show()

# Register the exploded DataFrame with a temporary view
exploded_df = df.withColumn("total_items", size("items")) \
               .withColumn("item", explode("items")) \
               #.withColumn("feature", udf(extract_feature, StringType())("item"))
exploded_df.createOrReplaceTempView("exploded_data")
exploded_df.show()



+---+--------------------+-----------+
| id|               items|total_items|
+---+--------------------+-----------+
|  1|[item1, item2, it...|          5|
|  2|      [item4, item5]|          2|
+---+--------------------+-----------+

+---+--------------------+-----------+-------+
| id|               items|total_items|   item|
+---+--------------------+-----------+-------+
|  1|[item1, item2, it...|          5|  item1|
|  1|[item1, item2, it...|          5|  item2|
|  1|[item1, item2, it...|          5|  item3|
|  1|[item1, item2, it...|          5| item56|
|  1|[item1, item2, it...|          5|item546|
|  2|      [item4, item5]|          2|  item4|
|  2|      [item4, item5]|          2|  item5|
+---+--------------------+-----------+-------+



In [None]:
# SQL query to calculate total items and unique features grouped by id
sql_query = """
SELECT id,
       first(total_items) AS total_items,
       collect_set(feature) AS unique_features
FROM exploded_data
GROUP BY id
"""

# Execute the SQL query on the temporary view "exploded_data"
result_df_1 = spark.sql(sql_query)

# Display the results (optional)
result_df_1.show()

spark.sql("select distinct id, items, total_items from exploded_data").show()