What is Action in RDD?

An action is one of the ways of sending data from Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task.

In [0]:
from pyspark import SparkContext ##we will use SparkContext for RDD, SparkSession used whiel creating the DataFrame 

sc = SparkContext.getOrCreate()

print(sc)



<SparkContext master=local[8] appName=Databricks Shell>


In [0]:
my_rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10,11,12,13,14])

In [0]:
type(my_rdd)

Out[9]: pyspark.rdd.RDD

In [0]:
type(my_rdd.collect())


Out[10]: list

In [0]:
my_rdd.take(3)   #First 3 values it display

Out[7]: [1, 2, 3]

In [0]:
my_rdd.top(3)   #Last 3 values

Out[9]: [14, 13, 12]

In [0]:
my_rdd.min()   #1
my_rdd.max()   #14
my_rdd.sum()   #105
my_rdd.count()  #14
my_rdd.mean()   #7.5
my_rdd.stdev()  #4.03

Out[16]: 4.031128874149275

In [0]:
my_rdd.takeSample(True,5)  #it will select sample values with repeated values 


Out[17]: [10, 8, 7, 11, 8]

In [0]:
my_rdd.takeSample(False,5) # it will select sample values with out repeated values

Out[18]: [12, 1, 6, 5, 14]

No. RDD Action  Expecting Result

1 collect()        Convert RDD to in-memory list

2 take(3)       First 3 elements of RDD

3 top(3)           Top 3 elements of RDD

4 count()              Find total no of values in RDD.

5 min()      Find minimum value from the RDD list

6 max()     Find maximum value from the RDD List

7 sum()   Find element sum (assumes numeric elements)

8 mean() Find element mean (assumes numeric elements)

9 stdev() Find element deviation (assumes numeric elements)

10 takeSample(withReplacement=True,3) Create sample of 3 elements with replacement

No. RDD Action  Expecting Result      

11 reduce() Reduce is a spark action that aggregates a data set (RDD) element using a function.   

12 countByKey() Count the number of elements for each key, and return the result to the master as a dictionary.      

13 CountByValue() Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.      

14 fold() Aggregate the elements of each partition      

15 range() Create a new RDD of int containing elements from start to end (exclusive)      

16 variance() Compute the variance of this RDD’s elements.      

17 sampleVariance() Compute the sample variance of this RDD’s elements (which corrects for bias in estimating the variance by dividing by N-1 instead of N).      

18 saveAsTextFile() Save this RDD as a text file, using string representations of elements.      

19 saveAsPickleFile() Save this RDD as a SequenceFile of serialized objects      

20 Stats() Stats will give complete information count, min, max,  stdev and mean

In [0]:
#reduce
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd.reduce(lambda a,b : a+b)

Out[21]: 55

In [0]:
#countByValue

rdd = sc.parallelize([1,3,1,2,3,1,3,1,3,2,4,2,5,4,3])
rdd.countByValue()

Out[23]: defaultdict(int, {1: 4, 3: 5, 2: 3, 4: 2, 5: 1})

In [0]:
#countByKey
x = sc.parallelize([('A', 1), ('B', 2),('A', 3),('B', 4),('A', 5)])

x.countByKey()

Out[24]: defaultdict(int, {'A': 3, 'B': 2})

In [0]:
#Fold Aggregate the elements of each partitions, and then the results for all the partitions 
from operator import add 
x_fold = sc.parallelize([1,2,3,4,5])
x_fold.fold(0, add)

Out[26]: 15

In [0]:
#Range 

x_range = sc.parallelize(range(1,10))

x_range.collect()

Out[29]: [1, 2, 3, 4, 5, 6, 7, 8, 9]

In [0]:
 #variance 
    
x_var = sc.parallelize([1,2,3,4,5,6,7,8,9])

x_var.variance()

Out[30]: 6.666666666666667

In [0]:
#sampleVariance
x_var.sampleVariance()
 

Out[31]: 7.5

In [0]:
#Save as textFile , #Save as pickel file

pass

In [0]:
#stats

sc.parallelize([1,2,3,4,5]).stats()

Out[35]: (count: 5, mean: 3.0, stdev: 1.4142135623730951, max: 5.0, min: 1.0)

In [0]:
x_map = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
y_map = x_map.map(lambda x: (x, x**2))
print(x_map.collect())
print(y_map.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[(1, 1), (2, 4), (3, 9), (4, 16), (5, 25), (6, 36), (7, 49), (8, 64), (9, 81), (10, 100)]


In [0]:
x_fil = sc.parallelize(range(1,12))
y_fil = x_fil.filter(lambda x: x%2 == 0)
y_fil.collect()

Out[45]: [2, 4, 6, 8, 10]

In [0]:
#flatmap : function should return a seq instead of single item 

x_fm = sc.parallelize(range(1,13))
y_fm=x_fm.flatMap(lambda x: (x**2 , 100*x))
y_fm.collect()

Out[48]: [1,
 100,
 4,
 200,
 9,
 300,
 16,
 400,
 25,
 500,
 36,
 600,
 49,
 700,
 64,
 800,
 81,
 900,
 100,
 1000,
 121,
 1100,
 144,
 1200]

In [0]:
#mapPartitions: similar to map but run separetly on each partition(block) of the rdd

x_par=sc.parallelize(range(1,16),3)
def func(x):
    yield sum(x)
    
y_par=x_par.mapPartitions(func)

#glom flattens elements on the same partition
print(x_par.glom().collect())

print(y_par.glom().collect())

[[1, 2, 3, 4, 5], [6, 7, 8, 9, 10], [11, 12, 13, 14, 15]]
[[15], [40], [65]]


In [0]:
#mapPartitionsWithIndex


x_parIndex=sc.parallelize(range(1,16),3)

def funcIndex(partitionIndex, iterator): 
    yield (partitionIndex,sum(iterator))

y_pari = x_parIndex.mapPartitionsWithIndex(funcIndex)

print(y_pari.glom().collect())

[[(0, 15)], [(1, 40)], [(2, 65)]]


In [0]:
#sample ==> sample(withReplacement, fraction, seed= None)

x = sc.parallelize(range(1,10))
y= x.sample(False,0.2)   #If it's True, values are may or may not repeat. for False, value won't be repeated

print(x.collect())
print(y.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9]
[4, 8]


In [0]:
#Union 

rdd_1 = sc.parallelize(range(0,5))
rdd_2 = sc.parallelize(range(3,8))

#Intersection
rdd_1.union(rdd_2).collect()
rdd_1.intersection(rdd_2).collect()

Out[64]: [3, 4]

In [0]:
#distinct 

x = sc.parallelize(['A','A','B','B','C','D','E'])

y=x.distinct()

print(y.collect())

['B', 'C', 'A', 'D', 'E']


In [0]:
#groupByKey

x = sc.parallelize([('B',2),('A',3),('C',3),('A',12),('D',3)])

y=x.groupByKey()

x.collect()



[0;36m  File [0;32m"<command-3876523344670179>"[0;36m, line [0;32m13[0m
[0;31m    print('',end="\n)[0m
[0m                         ^[0m
[0;31mSyntaxError[0m[0;31m:[0m EOL while scanning string literal


In [0]:
#Cache and persist  https://www.youtube.com/watch?v=Q2qpCGtIQgU&list=PL50mYnndduIHGS49Q_tve1f7aW4NHjvgQ&index=7&ab_channel=TechLake

#Pyspark Tutorial: 7 TechLake 

# rdd_emp = sc.read.csv("file")

# rdd_emp.cache()

# rdd_emp.unpersist()

# rdd.persist(pyspark,StorageLevel.MEMORY_AND_DISK_SER)

In [0]:
#Broadcast variable
broadcast_v = sc.broadcast([1,2,3,4,5,6,7])
print(type(broadcast_v))

print(broadcast_v.value)
#broadcast_v.persit()
#broadcast_v.unpersist()
broadcast_v.destroy

<class 'pyspark.broadcast.Broadcast'>
[1, 2, 3, 4, 5, 6, 7]
Out[79]: <bound method Broadcast.destroy of <pyspark.broadcast.Broadcast object at 0x7fe17c701a90>>

In [0]:
help(broadcast_v)

Help on Broadcast in module pyspark.broadcast object:

class Broadcast(typing.Generic)
 |  Broadcast(sc: Optional[ForwardRef('SparkContext')] = None, value: Optional[~T] = None, pickle_registry: Optional[ForwardRef('BroadcastPickleRegistry')] = None, path: Optional[str] = None, sock_file: Optional[BinaryIO] = None)
 |  
 |  A broadcast variable created with :meth:`SparkContext.broadcast`.
 |  Access its value through :attr:`value`.
 |  
 |  Examples
 |  --------
 |  >>> from pyspark.context import SparkContext
 |  >>> sc = SparkContext('local', 'test')
 |  >>> b = sc.broadcast([1, 2, 3, 4, 5])
 |  >>> b.value
 |  [1, 2, 3, 4, 5]
 |  >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
 |  [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
 |  >>> b.unpersist()
 |  
 |  >>> large_broadcast = sc.broadcast(range(10000))
 |  
 |  Method resolution order:
 |      Broadcast
 |      typing.Generic
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, sc: Optional[ForwardRef('S

###RDD Transformation\

Changes data from one format to another
eg: map, filter, flatmap 

Lazy execution - Delay execution untill finds an execution so that it can prepare optimized lineage(Spark internel coe pipeline)

In [0]:
#map #filter

x_map = sc.parallelize(range(1,11))

x_lambda = x_map.map(lambda x: (x,x**2))

# print(x_map.collect())
# print(x_lambda.collect())

#filter
# x_filter = x_map.filter(lambda x: x> 5)
# print(x_filter.collect())

#flapmap 

# x_flatmap = x_map.flatMap(lambda x: (x, x**3))
# print(x_flatmap.collect())

#mappartitions

# x_mapp = x_map.repartition(2)

# def f(iterator):
#     yield sum(iterator)
    
# x_mapp.mapPartitions(f).glom().collect()

#groupByKey 

x = sc.parallelize([('A',1),('B',2),('C',3),('A',23),('B',1),('E',12)])
x.groupByKey().collect()

Out[11]: [('B', <pyspark.resultiterable.ResultIterable at 0x7f7efe64ae80>),
 ('C', <pyspark.resultiterable.ResultIterable at 0x7f7efe50ae20>),
 ('A', <pyspark.resultiterable.ResultIterable at 0x7f7efe50ad60>),
 ('E', <pyspark.resultiterable.ResultIterable at 0x7f7efe50ad90>)]

####cache and persist  and RDD unpersist

Both caching and persisting are used to save the Spark RDD, Dataframe, and Dataset’s. But, the difference is, RDD cache() method default saves it to memory (MEMORY_ONLY) whereas persist() method is used to store it to the user-defined storage level.

Storage-level :

MEMORY_ONLY

MEMORY_AND_DISK

MEMORY_ONLY_SER

MEMORY_AND_DISK_SER

DISK_ONLY

OFF_HEAP

Saprk automatically monitors cache usage on each node and drops out old data partitions in a least recently  used LRU fashion. If you would like to remove and rdd instead of waiting for it to fall out of the cache, use RDD.unpersist() method

```
rdd.persist(pyspark.StorageLevel.DISK_ONLY)
rdd.unpersist()
rdd.cache()
```

######Advantages : 
Time and cost efficient
lessen the execution time

####Broadcast Vraibles 

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. 

They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
7
```
broadcast_v = sc.parellize([1,2,3,4,5,6,])
braodcast_v.value
broadcast_v.unpersist()
broadcast_v.destroy()
```

####Accumulators

Accumulators are varibles that are only "added" to through an associative and ccommutative operation and can therfore be efficiently supported in parellel

```
accum_v = sc.accumulator(2)
sc.parellize([1,2,3,4,5,6,7,8,9]).foreach(lambda x:accum_v.add(x))
accum_v.value
```

In [0]:
accum_v = sc.accumulator(2)
sc.parallelize([1,2,3,4,5,6,7,8,9]).foreach(lambda x:accum_v.add(x))
accum_v.value

Out[19]: 47

###RDD transformation  Joins 
 
 Join
 
 leftOuterJoin
 
 rightOuterJoin
 
 fullOuterJoin
 
 cartesian()

####Diff between RDD, Dataframe and Dataset(Java and Scala)

#######RDD Features:

Distributed collection

1. immutable
2. Fault tolerant
3. Lazy evaluations
4. Function Transformation ==> Transformation and Action
5. Data processing formats ==> Structured and Unstructured data
6. Programming Languages supported  => Java, Scala, R and Python 

######DataFrame Features:

1. Distributed collection of row object
2. Data processing (csv, json, parquet and etc)
3. OPtimization using catalyst optimizer
4. Hive compatibility
5. Tungsten
6. Supported in Java,Scala,R and python 

######Dataset features
1. Provides best of both RDD and Dataframe
2. Encoders
3. Type safety
4. supported in Java and Scala

####Disadvantages RDD, Dataframe and Dataset(Java and Scala)

######RDD
1. We have to optimize each and every RDD. 
2. RDD's don't infer the schema of the data ingested therefore we have to specify it 

######Dataframe
The main drawback of Dataframe API is that it doesn't support compile time safely, as a result, the user is limited in case the structure of the data is unknowm

######Dataset
They require typecasting into strings

####Read and write json file 

```
emp_json = spark.read.json("test.json")
emp_text = spark.read.text("test.json")  #dispaly the value emp_text , so that we will get the data how it is , like json or csv and etc

display(emp_json)

###Creating sample json file 

dbutils.fs.put("/tmp/test.json", """
{"string":"string1", "int":1, "array":[1,2,3], "dict": {"key":"value1"}}
{"string":"string2", "int":2, "array":[3,4,6], "dict": {"key":"value2"}}
{"string":"string3", "int":3, "array":[7,8,9], "dict": {"key":"value3"}}
""", True)
```


Using SQL Temporary view on Json files for reading  json data 

```
%sql
CREATE TEMPORARY VIEW emp_json_table
using json 
options (path="/temp/emp_details.json")
 
select * from emp_json_table

```


if the data is present in multiline 

```
{"string":"string1", 
  "int":1, 
    "array":
          [1,2,3], 
     "dict": 
         {"key":"value1"}}
{"string":"string2", "int":2, "array":[3,4,6], "dict": {"key":"value2"}}
{"string":"string3", "int":3, "array":[7,8,9], "dict": {"key":"value3"}}
""", True)
```

then we need to use *multiline=True* option while reading the file
```
emp_json = spark.read.option('multiline',"true").json(/tmp/emp_data.json)
```


RDD Storing json object per string
```
my_data = ['{"name":"Ravi", "add":{"state":"Karnatake","city":"Bengaluru"}}']
my_rdd = sc.parellelize(my_data)
my_df = spark.read.json(my_rdd)
my_df.printSchema()
```

Writing into file
```

emp_json.write.json("/tmp/emp_data.json".mode='append')

```

##reading and writing Excel file 

Install library files for excel file 

Databricks account-> clusters-> libraries->install new-> click maven in coordinates -> search for `spark-excel` then select latest version ->  com.crealytics:spark.excel_2.12:0.13.5

```
excel_read_data = spark.read.format('com.crealytics.spark.excel').option("header","true").option("inferschema","true").load('test.xlsx')

excel_read_data.select('*').write.format('com.crealytics.spark.excel').option("header","true").option("inferschema","true").save("emp_data_DF.json")
```

##Interview Questions

####How to handle bad records in spark and those types 

There are three types of modes available while reading and creating Dataframe

Dealing with bad records verify correctness of the data when reading csv file with specified schema, it is possible that the data in the files does not match the schema 

##########mode:behaviour
1. PERMISSIVE : Includes corrupt records in a "__corrupt_record" column by default (defauulty , it's set true while reading json file )
2. DROPMALFORMED : Ignore all corrupted records 
3. FAILFAST : Throws an exception when it meets corrupted records 


badRecordsPath Option to store rejected records in externel loctaion by proving the error message 
1. use the badRecordsPath option to save corrupt
2. records to the directory specified by the corruptPathvariable now
3. unable to find input file then it will log error information into `badRecordsPath`
4. if 'badRecordsPath' is specified, mode is not allowed to set.

###How to get all available dataframes in pyspark

In [0]:
#Answer
from pyspark.sql import DataFrame
print([k for (k,v) in globals().items() if isinstance(v,DataFrame)])   #globals() will provide all the used spark varibles

# print(globals())

[]


###How to track add source file name in one of column in Dataframe? 

answer : using input_file_name() function and withColumn() function we can add new column for filenames

df=spark,read,csv(/tmp/*.csv)

df.withColumn("file_name", input_file_name()).limit(3)

###Get no of rows on each file in a dataframe ? 

ans : using input_file_name() function and groupBy transformation we can achive no of rows on each file

from pyspark.sql.functions import input_file_name

df=spark,read,csv(/tmp/*.csv)

df.withColumn("file_name", input_file_name()).groupBy("file_name").count()

###How to add PartitionId in dataframe
Ans: using spark_partition_id function and withColumn we can get the partitionId and add into DataFrame 

from pyspark.sql.functions import spark_partition_id

df.withColumn('partitionId', spark_partition_id()).groupBy('')


######Row count by partitionId

df.withColumn('partitionId', spark_partition_id()).groupBy('partitionId').count()

###How to add Sequence generated surrogate key as a column in dataframe

ans : using monotonically_increasing_id() or hash() functions we can generate sequence or surrogate key

```
from pyspark.sql.functions import monotonically_increasing_id

df.withColumn("key",monotonically_increasing_id())
```

But above have some disadvantage when we remove the particular row and when we reporcess the data , it will chnage the id i.e., we will get different id again and again 


#####Using `MD5`

```
from pyspark.sql.functions import md5

df.withColumn("Key",md5("EMPNO"))  #based on specific column value, it will providie unique value thiugh we process again and again
```

But above has disavnatage like, if the data is very huge , then there might be chance of duplicate value  

#####Using `Sha2`

```
from pyspark.sql.functions import sha2

df.withColumn("Key",sha2("EMPNO",256))  #based on specific column value, it will providie unique value thiugh we process again and again
```


Row_number is also we can use but it is not suggestable

###What is Global Temporary view and Temporary view

ans:  
   - Temporary views in spark sql are session scoped and will disappear if the sessions that creates it terminates

   - If you want to have temporary view that is `shared among all sessions and keep alive untill the spark application terminates`. 
        You can create a `global temprary view` (server level)

   - Global temporary view is tied to system preserved database global_temp, and we must use the qualified name to refer it. i.e., 
          ```select * from global_temp.view1```

###How to get list of databases, tables and columns using spark.catalog

ans:
```
display(spark.catalog.listDatabases())
display(spark.catalog.listTables("database_name"))
display(spark.catalog.listcolumns("database_name","table_name"))
```

###How partitions are created?

- Spark's tasks process data as partitions read from disk into memory. Data on disk is lead out in chunks or contiguous file blocks, depends on store.By default, file blocks on data storage range in size from 64MB to 128MB . for example, on HDFS and s3 the default size is 128MB. A contiguous collection of these blocks constitutes a partitions

```
spark.conf.get("spark.sql.files.maxPartitionBytes") 
spark.conf.set("spark.sql.files.maxPartitionBytes", "300MB")
```

RDD will create 8 partitions by default when we use parallelize operation 
RDD will create 2 partitions by default when we use textFile opearion
```
rdd=sc.parallelize([1,2,3,4])
rdd.getNumPartitions()  #8
```

```
rdd=sc.textFile("/tmp/a.txt")
rdd.getNumPartitions()   #2
```

In [0]:
rdd=sc.parallelize([1,2,3,4])
rdd.getNumPartitions()

Out[3]: 8

###How to get shuffle partitions count from spark configuration?
```
spark.conf.get("spark.sql.shuffle.partitions")   #default is 200 
spark.conf.set("spark.sql.shuffle.partitions", "20")  ##setting it to 20
```

###Path Global Filter

- `pathGlobalFilter` is used to only include files with file names matching the pattern   

###Recursive File lookup 

- `pathGlobalFilter` is used to recursively load files and it disables partition inferring
- it's default value is false. if data source explicitly specifies the partitionSpec when recursiveFileLookup is true, exception will be thrown

In [0]:
# from pyspark.sql.functions import input_file_name
# df=spark.read.option("pathGlobalFilter", "sample*.json").csv("dbfs:/Filestore/tables") \
#     .withColumn("file_name", input_file_name())

# display(df.select("file_name").distinct())

# #recursive
# df=spark.read.option("recursiveFileLookup","true").csv("/FileStore/tables/emp/", header = True ).withColumn("file_name", input_file_name())


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-2715617520755858>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      1[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mfunctions[0m [0;32mimport[0m [0minput_file_name[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf[0m[0;34m=[0m[0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0moption[0m[0;34m([0m[0;34m"pathGlobalFilter"[0m[0;34m,[0m [0;34m"sample*.json"[0m[0;34m)[0m[0;34m.[0m[0mcsv[0m[0;34m([0m[0;34m"dbfs:/Filestore/tables"[0m[0;34m)[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      3[0m     [0;34m.[0m[0mwithColumn[0m[0;34m([0m[0;34m"file_name"[0m[0;34m,[0m [0minput_file_name[0m[0;34m([0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m [0;34m[0m[0m
[1;32m      5[0m [0m

#Pyspark