### Iteration in DF / RDD
Spark is distributed data processing, so it is suggested to use spark buitin transformation and actions. Instead of iterating through the rows.

#### Methods for looping
In below are used for complex 'transformations' and they return the same number of rows but number of columns could be different. They wont bring data to driver. used on RDD
1. map()
2. mapPartitions() 

Below  used to iterate the rows same like above but they perform 'action' and return nothing. Used on DF.

1. foreach() - eg: used for storing the data into db by rows
2. foreachPartitions() - eg: used to store data in db in batches

Below methods works on the DF. Bring data to driver. They are action
1. collect() bring all partitions data to the driver
2. toLocalIterator() bring each partition data to driver and process them

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("learning").getOrCreate()

24/11/11 17:30:51 WARN Utils: Your hostname, padmanabhan-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/11/11 17:30:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 17:30:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 45384)
Traceback (most recent call last):
  File "/usr/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.12/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.12/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.12/socketserver.py", line 761, in __init__
    self.handle()
  File "/home/padmanabhan/Desktop/development/data-engineering/BigdataEnv/lib/python3.12/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/home/padmanabhan/Desktop/development/data-engineering/BigdataEnv/lib/python3.12/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                  

In [2]:
spark.getActiveSession()

In [3]:
# creating df
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
data = [(("padmanabhan","s"),"chennai",1000),
        (("karthik","k"),"trichy",5000)
        ]
schema = StructType([StructField("name",StructType([
                                 StructField("firstName",StringType(),False),
                                 StructField("lastName",StringType(),False),])),
                     StructField("location",StringType(),True),
                     StructField("salary",IntegerType(),True)])

df = spark.createDataFrame(data,schema=schema)

In [6]:
df.show(truncate=False)

                                                                                

+----------------+--------+------+
|name            |location|salary|
+----------------+--------+------+
|{padmanabhan, s}|chennai |1000  |
|{karthik, k}    |trichy  |5000  |
+----------------+--------+------+



In [11]:
# for simple transformations use select and withColumn()

# create new column with updated sal
# concate first and last name with ,
# show location at the last

from pyspark.sql.functions import col,lit,concat_ws

df.withColumn("new_salary",df.salary*10).withColumn("full name",concat_ws(",",df.name.firstName,df.name.lastName)).show()

                                                                                

+----------------+--------+------+----------+-------------+
|            name|location|salary|new_salary|    full name|
+----------------+--------+------+----------+-------------+
|{padmanabhan, s}| chennai|  1000|     10000|padmanabhan,s|
|    {karthik, k}|  trichy|  5000|     50000|    karthik,k|
+----------------+--------+------+----------+-------------+



In [14]:
# same using select command

df.select(concat_ws(',',col("name.firstName"),col("name.lastName")).alias("full name"),lit(col("salary")*10).alias("modified salary"),df.location).show()

                                                                                

+-------------+---------------+--------+
|    full name|modified salary|location|
+-------------+---------------+--------+
|padmanabhan,s|          10000| chennai|
|    karthik,k|          50000|  trichy|
+-------------+---------------+--------+



In [15]:
#map

# df - dont have map , so need to convert df into rdd and use map and then change back to df
# mapPartitions() execute only once for each partition instead of each record

rdd = df.rdd.map(lambda x: (x[0][0]+','+x[0][1],x[2]*2))
df2 = rdd.toDF(["full-name","modified salary"])
df2.show()

                                                                                

+-------------+---------------+
|    full-name|modified salary|
+-------------+---------------+
|padmanabhan,s|           2000|
|    karthik,k|          10000|
+-------------+---------------+



In [19]:
df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstName: string (nullable = false)
 |    |-- lastName: string (nullable = false)
 |-- location: string (nullable = true)
 |-- salary: integer (nullable = true)



In [20]:
# we can also use the columne name instead of the index value in the above example
rdd2 = df.rdd.map(lambda x : (x["name"]["firstName"]+"-"+x["name"]["lastName"],x["salary"]*2))
rdd2.toDF(["full-name","mod sal"]).show()

                                                                                

+-------------+-------+
|    full-name|mod sal|
+-------------+-------+
|padmanabhan-s|   2000|
|    karthik-k|  10000|
+-------------+-------+



In [21]:
# we can also use x.name.firstName, x.name.lastName, x.salary

In [25]:
# using custom functions 

def func(x):
    fullName = x.name.firstName + "--" + x.name.lastName
    return (fullName,x.salary*2)

rdd3 = df.rdd.map(lambda x : func(x))
rdd3.take(10)

                                                                                

[('padmanabhan--s', 2000), ('karthik--k', 10000)]

In [4]:
# foreach() to loop - action & map() is a transformation
# foreach() is used on DF
df.foreach(lambda x:
           print("data :",x))

data : Row(name=Row(firstName='padmanabhan', lastName='s'), location='chennai', salary=1000)
data : Row(name=Row(firstName='karthik', lastName='k'), location='trichy', salary=5000)
                                                                                

In [8]:
# Using collect() and loop

# show() works on the dataframe while collect() return the array like object
# collect () bring all the data into driver memory, dont use if the volume of data is high

dataList = df.collect()
for rec in dataList:
    for val in rec:
        print(val)

Row(firstName='padmanabhan', lastName='s')
chennai
1000
Row(firstName='karthik', lastName='k')
trichy
5000


                                                                                

In [12]:
# Using toLocalIterator()

# Instead of bring all data into driver memory, toLocalIterator() works on individual partitions locally on the driver

dataLocalIterator = df.toLocalIterator()

while(True):
    try:
        print(next(dataLocalIterator))
    except:
        break

Row(name=Row(firstName='padmanabhan', lastName='s'), location='chennai', salary=1000)
Row(name=Row(firstName='karthik', lastName='k'), location='trichy', salary=5000)


24/11/11 18:50:19 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2490756 ms exceeds timeout 120000 ms
24/11/11 18:50:20 WARN SparkContext: Killing executors is not supported by current scheduler.
24/11/11 18:50:23 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$