# Here are some of my PySpark codes

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    explode,
    split,
    count,
    sum,
    avg,
    when,
    desc,
    broadcast,
)
import os
import random
import string
import json

In [1]:
spark = (
    SparkSession.builder.appName("dataframPractices")
    .master("spark://spark-master:7077")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/20 04:57:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/20 04:57:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Partitioning

In [9]:
sales = spark.read.option("header", "true").csv("/data/practice/sales.csv")

Check default partitioning

In [10]:
print(f"Default partitions: {sales.rdd.getNumPartitions()}")

Default partitions: 1


Repartition to specific number

In [12]:
sales_10p = sales.repartition(10)
print(f"After repartition(10): {sales_10p.rdd.getNumPartitions()}")

After repartition(10): 10


[Stage 4:>                                                          (0 + 1) / 1]

Repartition by key

In [13]:
sales_by_date = sales.repartition("timestamp")
print(f"After repartition by timestamp: {sales_by_date.rdd.getNumPartitions()}")

After repartition by timestamp: 3


narrow transformation example - doesn't shuffle all data 

In [14]:
sales_coalesceed = sales_10p.coalesce(4)
print(f"After coalesce(4): {sales_coalesceed.rdd.getNumPartitions()}")

After coalesce(4): 4


Examine partition sizes

In [15]:

print("Partition distribution:")
part_sizes = sales_10p.rdd.glom().map(len).collect()
for i, size in enumerate(part_sizes):
    print(f"Partition {i}: {size} records")


Partition distribution:
Partition 0: 10000 records
Partition 1: 10000 records
Partition 2: 10000 records
Partition 3: 10000 records
Partition 4: 10000 records
Partition 5: 10000 records
Partition 6: 10000 records
Partition 7: 10000 records
Partition 8: 10000 records
Partition 9: 10000 records


                                                                                

# Narrow Transformations

Check the jobs http://localhost:4041/

Filter - doesn't require shuffling

In [20]:
high_value_sales = sales.filter(col("price").cast("double") > 500)
print(f"High value sales count: {high_value_sales.count()}")

High value sales count: 50516


Map - doesn't require shuffling

In [26]:
sales_with_total = sales.withColumn(
    "total_value", col("quantity").cast("int") * col("price").cast("double")
)
sales_with_total.show(2) 
# sales_with_total.limit(5).toPandas()

+--------+----------+-------+--------+-----+----------+-----------+
|order_id|product_id|user_id|quantity|price| timestamp|total_value|
+--------+----------+-------+--------+-----+----------+-----------+
|       0|       553|   4397|       8|490.6|2023-08-18|     3924.8|
|       1|       441|   6066|       2|23.87|2023-10-09|      47.74|
+--------+----------+-------+--------+-----+----------+-----------+
only showing top 2 rows



Sample - doesn't require shuffling

In [None]:

sample_sales = sales.sample(fraction=0.1, seed=42)
print(f"Sample size: {sample_sales.count()}")

# Wide Transformations

requires shuffle 


In [27]:
print("\033[31mSchema:\033[0m")
print(f"\033[31m{sales.printSchema()}\033[0m")

[31mSchema:[0m
root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- price: string (nullable = true)
 |-- timestamp: string (nullable = true)

[31mNone[0m


In [48]:
sales_by_product = sales_10p.groupBy("product_id").agg(
    count("*").alias("order_count"),
    sum(col("quantity").cast("int")).alias("total_quantity"),
    avg(col("price").cast("double")).alias("total_price"),
)
sales_by_product.show(2)

+----------+-----------+--------------+------------------+
|product_id|order_count|total_quantity|       total_price|
+----------+-----------+--------------+------------------+
|       691|        117|           676| 527.1941025641026|
|       467|         84|           502|476.77404761904756|
+----------+-----------+--------------+------------------+
only showing top 2 rows



In [44]:
users = spark.read.option("header", "true").csv("/data/practice/users.csv")

In [47]:
sales_with_user = sales_10p.join(users, "user_id")
print("Sales joined with users:")
sales_with_user.select("order_id", "user_id", "name", "country", "price").show(5)

Sales joined with users:
+--------+-------+--------+-------+------+
|order_id|user_id|    name|country| price|
+--------+-------+--------+-------+------+
|   25941|   8950|jkbhyoft| France|949.69|
|   13463|   4086|qfgieltd|  India|653.83|
|    1143|   9538|gzcpaecn|  China|361.53|
|   15590|   1180|zpvjtsye|  India| 27.94|
|   11401|   6656|bkojubrf|  China|943.61|
+--------+-------+--------+-------+------+
only showing top 5 rows



In [49]:
# orderBy/sort are wide transformation
sorted_sales = sales_10p.orderBy(desc("price"))
print("Sales sorted by price:")
sorted_sales.show(5)

Sales sorted by price:
+--------+----------+-------+--------+------+----------+
|order_id|product_id|user_id|quantity| price| timestamp|
+--------+----------+-------+--------+------+----------+
|   10866|       249|   1438|       9|999.98|2023-06-15|
|   24923|       950|    812|       5|999.96|2023-02-06|
|   63684|       483|   8612|       7|999.93|2023-08-07|
|   52482|       680|   7484|      10|999.91|2023-04-12|
|   57854|       975|   5551|       3|999.91|2023-06-01|
+--------+----------+-------+--------+------+----------+
only showing top 5 rows



# Optimizations

In [51]:
products = spark.read.option("header", "true").csv("/data/practice/products.csv")

Broadcast join (optimization for joining with small tables). Products table is small, so we can broadcast it.

In [52]:
sales_with_prod = sales.join(broadcast(products), "product_id")

Caching/persisting: Cache a DataFrame we'll use multiple times

In [53]:
cached_sales = sales.cache()

Use the cached DataFrame

In [54]:
print(f"Total sales count (from cache): {cached_sales.count()}")
print(
    f"Average price (from cache): {cached_sales.agg({'price': 'avg'}).collect()[0][0]}"
)

Total sales count (from cache): 100000
Average price (from cache): 504.6348144999969


Partition pruning

In [55]:

sales_by_month = sales.withColumn("month", split(col("timestamp"), "-")[1])
sales_by_month.write.partitionBy("month").mode("overwrite").parquet(
    "/data/practice/sales_partitioned"
)

                                                                                

Read back with partition filtering

In [56]:

january_sales = spark.read.parquet("/data/practice/sales_partitioned").filter(
    col("month") == "01"
)
print(f"January sales count (using partition pruning): {january_sales.count()}")


January sales count (using partition pruning): 8462


In [61]:
sales.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- price: string (nullable = true)
 |-- timestamp: string (nullable = true)



25/03/28 15:01:26 ERROR TaskSchedulerImpl: Lost executor 1 on 172.19.0.4: worker lost: Not receiving heartbeat for 60 seconds
25/03/28 15:01:26 ERROR TaskSchedulerImpl: Lost executor 0 on 172.19.0.5: worker lost: Not receiving heartbeat for 60 seconds
25/03/28 15:01:26 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_240_0 !
25/03/31 05:06:03 ERROR TaskSchedulerImpl: Lost executor 2 on 172.19.0.4: worker lost: Not receiving heartbeat for 60 seconds
25/03/31 05:06:03 ERROR TaskSchedulerImpl: Lost executor 3 on 172.19.0.5: worker lost: Not receiving heartbeat for 60 seconds


# practice

In [1]:
from pyspark.sql import SparkSession

In [57]:
spark = SparkSession.builder.appName("hamooon2").master("spark://spark-master:7077").getOrCreate()
sc1 = spark.sparkContext

25/03/28 12:05:20 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [58]:
rdd = sc1.textFile("/data/Common_sense.txt")
rdd1 = rdd.map(lambda x: x.split(" "))

In [59]:
rdd1.take(2)


[['The', 'Project', 'Gutenberg', 'eBook', 'of', 'Common', 'Sense'],
 ['', '', '', '', '']]