<a href="https://colab.research.google.com/github/sumitsaxena-git/databricks/blob/main/pyspark_transformations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ab005d7d2de3e1f0864a0629c5bde12245025603b528be39d56f4f69c1285f0d
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Colab").getOrCreate()
        # .master("local")\
        # .config('spark.ui.port', '4050')\


In [22]:
from pyspark import SparkContext

# Initialize SparkContext; in Spark Shell, this is already created for you and named 'sc'
sc = SparkContext.getOrCreate()

# Create an RDD with 6 integers
data = [4, 6, 3, 1, 5, 4]
rdd = sc.parallelize(data)

# Print the RDD
print(rdd.collect())


[4, 6, 3, 1, 5, 4]


In [None]:
#Partition in Spark
# There are different ways to partition an RDD, effective way is using key-value pairs.
# Other 2 ways don't always need to partition an RDD with a key-value pair.

# 1. repartition(numPartitions): This method can be used to increase or decrease the number of partitions in an RDD. It does a full shuffle of the data, so it can be expensive for large RDDs.

rdd = rdd.repartition(2)  # Repartition RDD into 2 partitions

# 2. coalesce(numPartitions): This method is used to reduce the number of partitions in an RDD without a full shuffle, by merging partitions.

rdd = rdd.coalesce(2)  # Coalesce RDD into 2 partitions

# 3. Using Transformations: You can also use other transformations like mapPartitions, flatMap, etc., to create new RDDs with custom partitioning logic.

rdd = rdd.mapPartitions(lambda partition: ...)  # Define custom partitioning log

In [23]:
# map Transformation
rdd_map = rdd.map(lambda x:x*x)
print(rdd_map.collect())

[16, 36, 9, 1, 25, 16]


In [24]:
# mapPartition Transformation

# Create RDD with 2 partitions
rdd_partitioned = sc.parallelize(data, 2)

# Apply mapPartitions transformation
rdd_map_partitions = rdd_partitioned.mapPartitions(lambda partition: map(lambda x: x*x, partition))

# Print the partitioned RDD, glom() method is used to collect all elements of each partition into a list for easier viewing
print(rdd_map_partitions.glom().collect())


[[16, 36, 9], [1, 25, 16]]


In [25]:
# Sort the RDD
rdd_sorted_asc  = rdd.sortBy(lambda x:x)

# Sort the RDD in descending order
rdd_sorted_desc = rdd.sortBy(lambda x: x, ascending=False)

print(rdd_sorted_asc.collect())
print(rdd_sorted_desc.collect())

[1, 3, 4, 4, 5, 6]
[6, 5, 4, 4, 3, 1]


In [26]:
# Distinct RDD
rdd_dist = rdd.distinct()
print(rdd_dist.collect())

[4, 6, 3, 1, 5]


In [27]:
# Filtered RDD
rdd_filtered = rdd.filter(lambda x:x%2==0)
print(rdd_filtered.collect())

[4, 6, 4]


In [35]:
# reduceByKey

from pyspark import SparkContext

# Initialize SparkContext; in Spark Shell, this is already created for you and named 'sc'
sc = SparkContext.getOrCreate()

# Create an RDD with key-value pairs
data = [("a", 4), ("b", 6), ("a", 3), ("c", 1), ("b", 5), ("c", 4)]
rdd = sc.parallelize(data)

# Example using reduceByKey
rdd_reduce_by_key = rdd.reduceByKey(lambda x, y: x + y)
print("reduceByKey result:")
print(rdd_reduce_by_key.collect())



reduceByKey result:
[('b', 11), ('c', 5), ('a', 7)]
