In [2]:
# importing basic libraries
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.window import *


In [3]:
# creating spark session object
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
# Sample data for the first DataFrame
data1 = [
    (1, "Alice"),
    (2, "Bob"),
    (3, "Cathy"),
    (4, "David"),
    (5, "Eva"),
    (6, "Frank"),
    (7, "Grace"),
    (8, "Henry"),
    (9, "Ivy"),
    (10, "Jack"),
]

# Columns for the first DataFrame
columns1 = ["id", "name"]

# Creating the first DataFrame
df1 = spark.createDataFrame(data1, columns1)

# Show the first DataFrame
df1.show(2,False)

print(f"df1 partition size {df1.rdd.getNumPartitions()}")



+---+-----+
|id |name |
+---+-----+
|1  |Alice|
|2  |Bob  |
+---+-----+
only showing top 2 rows

df1 partition size 4


In [8]:
# Sample data for the second DataFrame
data2 = [
    (1, "Math", 85),
    (2, "English", 78),
    (3, "History", 90),
    (4, "Science", 88),
    (5, "Geography", 76),
    (6, "Art", 95),
    (7, "Music", 80),
    (8, "Physical Education", 92),
    (9, "Computer Science", 89),
    (10, "Biology", 91),
]

# Columns for the second DataFrame
columns2 = ["id", "subject", "score"]

# Creating the second DataFrame
df2 = spark.createDataFrame(data2, columns2)

# Show the second DataFrame
df2.show(2,False)

print(f"df2 partition size {df2.rdd.getNumPartitions()}")


+---+-------+-----+
|id |subject|score|
+---+-------+-----+
|1  |Math   |85   |
|2  |English|78   |
+---+-------+-----+
only showing top 2 rows

df2 partition size 4


# Testing spark sql join repartition output

In [17]:


# Check the number of partitions of the original DataFrames
print(f"df1 partitions: {df1.rdd.getNumPartitions()} with record count {df1.count()}")
print(f"df2 partitions: {df2.rdd.getNumPartitions()} with record count {df2.count()}")

# Join the two DataFrames
# spark.conf.set("spark.sql.shuffle.partitions", 200)  # Example: Set to 4 partitions  ----- default=200

joined_df = df1.join(df2, on="id", how="inner")
print(f"joined_df.count(): {joined_df.count()}")
# Check the number of partitions after the join
print(f"Number of partitions after join: {joined_df.rdd.getNumPartitions()}")

df1 partitions: 4 with record count 1000
df2 partitions: 4 with record count 1000
joined_df.count(): 1000
Number of partitions after join: 4


In [28]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import random


# Function to generate synthetic data
def generate_data(num_records):
    data = [(i, f"name_{i}") for i in range(num_records)]
    return data

# Generate two datasets with 1000 records each
data1 = generate_data(1000000)
data2 = generate_data(1000000)

# Create two DataFrames
df1 = spark.createDataFrame(data1, ["id", "name"])  # repartition to 4 for testing
df2 = spark.createDataFrame(data2, ["id", "name"])  # repartition to 4 for testing

# Check the number of partitions of the original DataFrames
print(f"df1 partitions: {df1.rdd.getNumPartitions()}")
print(f"df2 partitions: {df2.rdd.getNumPartitions()}")

# Perform a join operation on the two DataFrames
joined_df = df1.join(df2, on="id")
print(f"""joined_df.groupBy("id").count(): {joined_df.groupBy("id").count().show()}""")

# Check the number of partitions after the join
print(f"Number of partitions after join: {joined_df.rdd.getNumPartitions()}")

# Optionally, repartition the result if you want to enforce a specific number of partitions
# joined_df_repartitioned = joined_df.repartition(4)  # Adjust this number based on your cluster resources
# print(f"Number of partitions after repartitioning: {joined_df_repartitioned.rdd.getNumPartitions()}")

# Show some results from the joined DataFrame
# joined_df.show(10)


df1 partitions: 4
df2 partitions: 4
+---+-----+
| id|count|
+---+-----+
|  0|    1|
|  6|    1|
|  7|    1|
|  9|    1|
| 19|    1|
| 22|    1|
| 25|    1|
| 26|    1|
| 29|    1|
| 31|    1|
| 32|    1|
| 34|    1|
| 39|    1|
| 43|    1|
| 50|    1|
| 54|    1|
| 57|    1|
| 58|    1|
| 65|    1|
| 68|    1|
+---+-----+
only showing top 20 rows

joined_df.groupBy("id").count(): None
Number of partitions after join: 4


# Dealing with spark rdd

In [34]:
# Find the min temperature value for each station

data = [
('ITE00100554',18000101,'TMAX',-75),
('ITE00100554',18000101,'TMIN',-148),
('GM000010962',18000101,'PRCP',0),
('EZE00100082',18000101,'TMAX',-86),
('EZE00100082',18000101,'TMIN',-135)
]

df = spark.createDataFrame(data,["st_id","zip","temp_type","temp_value"])
df.select("st_id","temp_value").groupBy("st_id").agg(min("temp_value")).show(5,False)


+-----------+---------------+
|st_id      |min(temp_value)|
+-----------+---------------+
|ITE00100554|-148           |
|GM000010962|0              |
|EZE00100082|-135           |
+-----------+---------------+

