<a href="https://colab.research.google.com/github/subhendu115/Spark/blob/main/Scenario-based-Spark-Interview-Question.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [160]:
!pip install pyspark



In [161]:
from google.colab import drive
drive.mount('/content/gdrive/', force_remount=True)

Mounted at /content/gdrive/


In [162]:
from pyspark.sql import SparkSession

In [163]:
spark = SparkSession.builder.appName("MultiDelim").master("local[*]").getOrCreate()
import os;print(os.getcwd())

/content


In [164]:
basePath = "/content/gdrive/MyDrive/SparkData/dataset-master/"

**Multiple Delimeter**

In [165]:
# Multiple delimeter file
df = spark.read.format("csv").option("sep","~|").option("header",True).load(basePath+"/multi-delim.dat")

In [166]:
df.show()

+---+-----------+---+
| id|       name|Age|
+---+-----------+---+
|  1|       gsfg| 23|
|  1|  ssdgdsgjs| 20|
|  1|svfgbfbggjs| 21|
|  1|     svfscv| 29|
|  1|   svcvvbjs| 23|
|  1| ssgsrhjbjs| 28|
|  1|  svfhfhbjs| 23|
|  1|  svfdgdbjs| 25|
|  1|   svfdgbjs| 22|
|  1| svdgngjbjs| 25|
+---+-----------+---+



In [167]:
df1 = spark.read.format("text").option("header",True).load(basePath+"/multi-delim.dat")

In [168]:
df1.show()

+------------------+
|             value|
+------------------+
|     id~|name~|Age|
|       1~|gsfg~|23|
|  1~|ssdgdsgjs~|20|
|1~|svfgbfbggjs~|21|
|     1~|svfscv~|29|
|   1~|svcvvbjs~|23|
| 1~|ssgsrhjbjs~|28|
|  1~|svfhfhbjs~|23|
|  1~|svfdgdbjs~|25|
|   1~|svfdgbjs~|22|
| 1~|svdgngjbjs~|25|
+------------------+



In [169]:
print(df1.first())
print(df1.first()[0]) # Row will have n number of indexes when dataframe have n columns. Here only one column

Row(value='id~|name~|Age')
id~|name~|Age


In [170]:
header = df1.first()[0]
header
schm = header.split('~|')
schm

['id', 'name', 'Age']

In [171]:
df_in = df1.filter(df1['value'] != header).rdd.map(lambda row:row[0].split("~|")).toDF(schm)
df_in.show()
# As the DataFarme have only One column -> hence row[0]. It can change based on number of column in dataframe/RDD

+---+-----------+---+
| id|       name|Age|
+---+-----------+---+
|  1|       gsfg| 23|
|  1|  ssdgdsgjs| 20|
|  1|svfgbfbggjs| 21|
|  1|     svfscv| 29|
|  1|   svcvvbjs| 23|
|  1| ssgsrhjbjs| 28|
|  1|  svfhfhbjs| 23|
|  1|  svfdgdbjs| 25|
|  1|   svfdgbjs| 22|
|  1| svdgngjbjs| 25|
+---+-----------+---+



**Merge two dataframe with different fields**

In [172]:
#Different Schema data merge
df1 = spark.read.format('csv').option('header',True).load(basePath+"input.csv")
df2 = spark.read.format('csv').option('header',True).load(basePath+"input1.csv")

In [173]:
df1.show()
df2.show()

+---------+---+
|     Name|Age|
+---------+---+
|bsfbssdds| 28|
|  bsfbssd| 25|
|  bsfbssd| 26|
|   bsfbss| 20|
|  bsfbsdf| 25|
|   bsfbss| 26|
|   bsfbsd| 23|
+---------+---+

+---------+---+------+
|     Name|Age|Gender|
+---------+---+------+
|bsfbssdds| 28|     M|
|  bsfbssd| 25|     F|
|  bsfbssd| 26|     M|
|   bsfbss| 20|     M|
|  bsfbsdf| 25|     M|
|   bsfbss| 26|     F|
|   bsfbsd| 23|     M|
+---------+---+------+



In [174]:
list1 = list(set(df1.columns) - set(df2.columns))
list2 = list(set(df2.columns) - set(df1.columns))
print(list1)
print(list2)

[]
['Gender']


In [175]:
from pyspark.sql.functions import lit
for col in list1:
  df2 = df2.withColumn(col,lit(None))
for col in list2:
  df1 = df1.withColumn(col,lit(None))

In [176]:
df = df1.unionAll(df2)
df.show()

+---------+---+------+
|     Name|Age|Gender|
+---------+---+------+
|bsfbssdds| 28|  NULL|
|  bsfbssd| 25|  NULL|
|  bsfbssd| 26|  NULL|
|   bsfbss| 20|  NULL|
|  bsfbsdf| 25|  NULL|
|   bsfbss| 26|  NULL|
|   bsfbsd| 23|  NULL|
|bsfbssdds| 28|     M|
|  bsfbssd| 25|     F|
|  bsfbssd| 26|     M|
|   bsfbss| 20|     M|
|  bsfbsdf| 25|     M|
|   bsfbss| 26|     F|
|   bsfbsd| 23|     M|
+---------+---+------+



**Create Dataframe from a file that contains 5 pipe delimited fields but in a single line**

In [177]:
df = spark.read.format('csv').option('header',False).load(basePath+"same_line_input.txt")
df.show()

+--------------------+
|                 _c0|
+--------------------+
|Azar|BE|Bigdata|9...|
+--------------------+



In [178]:
from pyspark.sql.functions import regexp_replace,split,explode,col

In [179]:
df1 = df.withColumn('chk',regexp_replace('_c0',"(.*?\\|){4}","$0-"))
df1.show(20,False)
df1 = df1.withColumn('col_explode',explode(split('chk','\\|-'))).select('col_explode')
df1.show(20,False)

+--------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------+
|_c0                                                                                               |chk                                                                                                   |
+--------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------+
|Azar|BE|Bigdata|985632|bhshds|msc|cs|6666|shdj|BE|ML|64456|hksdh|BE|FSD|6466|jsda|Bsc|Bigdata|6444|Azar|BE|Bigdata|985632|-bhshds|msc|cs|6666|-shdj|BE|ML|64456|-hksdh|BE|FSD|6466|-jsda|Bsc|Bigdata|6444|
+--------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------

In [180]:
rdd = df1.select('col_explode').rdd.map(lambda x:x[0].split('|'))
#rdd = df1.withColumn('Name',split(df1['col_explode'],'\\|')) // Dynamic column name is difficult with dataframe
rdd.collect()
df2 = rdd.toDF(['Name','Degree','Subject','Contact'])
df2.show(20,False)

+------+------+-------+-------+
|Name  |Degree|Subject|Contact|
+------+------+-------+-------+
|Azar  |BE    |Bigdata|985632 |
|bhshds|msc   |cs     |6666   |
|shdj  |BE    |ML     |64456  |
|hksdh |BE    |FSD    |6466   |
|jsda  |Bsc   |Bigdata|6444   |
+------+------+-------+-------+



**Sample Explode, Explode_outer,PosExplode, PosExplode_Outer**

In [181]:
df = spark.read.format('csv').option('Sep','|').option('header',True).load(basePath+"input4.csv")
df.show(200,False)

+-----+---+-----------------+
|Name |Age|Subject          |
+-----+---+-----------------+
|hbsbc|23 |ssf,fffs,dfdf,sdf|
|wwd  |43 |ssf,fffs,sdf     |
|rer  |33 |dfdf,sdf         |
|ewer |23 |ssf,fffs,dfdf,   |
|dwsf |23 |NULL             |
+-----+---+-----------------+



In [182]:
from pyspark.sql.functions import split,explode_outer,posexplode,posexplode_outer

df1 = df.withColumn('Qualification',explode(split('Subject','\\,')))
# NULL not considered
df1.show(200,False)
df2 = df.withColumn('Qualification',explode_outer(split('Subject','\\,')))
# NULL Considered
df2.show(200,False)
# NULL not considered and extra position field
df3 = df.select('*',posexplode(split('Subject','\\,')))
# NULL Considered
df3.show(200,False)
# NULL  considered and extra position field
df4 = df.select('*',posexplode_outer(split('Subject','\\,')))
# NULL Considered
df4.show(200,False)

+-----+---+-----------------+-------------+
|Name |Age|Subject          |Qualification|
+-----+---+-----------------+-------------+
|hbsbc|23 |ssf,fffs,dfdf,sdf|ssf          |
|hbsbc|23 |ssf,fffs,dfdf,sdf|fffs         |
|hbsbc|23 |ssf,fffs,dfdf,sdf|dfdf         |
|hbsbc|23 |ssf,fffs,dfdf,sdf|sdf          |
|wwd  |43 |ssf,fffs,sdf     |ssf          |
|wwd  |43 |ssf,fffs,sdf     |fffs         |
|wwd  |43 |ssf,fffs,sdf     |sdf          |
|rer  |33 |dfdf,sdf         |dfdf         |
|rer  |33 |dfdf,sdf         |sdf          |
|ewer |23 |ssf,fffs,dfdf,   |ssf          |
|ewer |23 |ssf,fffs,dfdf,   |fffs         |
|ewer |23 |ssf,fffs,dfdf,   |dfdf         |
|ewer |23 |ssf,fffs,dfdf,   |             |
+-----+---+-----------------+-------------+

+-----+---+-----------------+-------------+
|Name |Age|Subject          |Qualification|
+-----+---+-----------------+-------------+
|hbsbc|23 |ssf,fffs,dfdf,sdf|ssf          |
|hbsbc|23 |ssf,fffs,dfdf,sdf|fffs         |
|hbsbc|23 |ssf,fffs,dfdf,sdf|df

**Create Dataframe from Json**

In [183]:
df = spark.read.format('csv').option('header',True).option('escape','\"').option('multiline',True).load(basePath+"dummy2.csv")
df.show(200,False)

+-------------+--------------+-------------------------------------------------------------------------------+
|PartitionDate|Status        |request                                                                        |
+-------------+--------------+-------------------------------------------------------------------------------+
|2020-06-30   |Internal Error|{"Response":{"MessageId" : 15432 }}                                            |
|2020-06-30   |Success       |{"Response":{"MessageId" : 15432,"Latitude":"-176.2989","longitude":"7.3614" }}|
+-------------+--------------+-------------------------------------------------------------------------------+



In [184]:
from pyspark.sql.functions import col,json_tuple,to_json,from_json

In [185]:
df1 = df.select('*',json_tuple('request','Response'))
df2 = df1.select('*',json_tuple('c0','MessageId','Latitude','longitude').alias('MessageId','Latitude','longitude')).drop('request','c0')
df2.show(200,False)

+-------------+--------------+---------+---------+---------+
|PartitionDate|Status        |MessageId|Latitude |longitude|
+-------------+--------------+---------+---------+---------+
|2020-06-30   |Internal Error|15432    |NULL     |NULL     |
|2020-06-30   |Success       |15432    |-176.2989|7.3614   |
+-------------+--------------+---------+---------+---------+



**Remove first 7 lines and create dataframe from remaining records**

In [205]:
inp = spark.sparkContext.textFile(basePath+'pageview.csv',2).map(lambda x:x.split(','))
print(inp.getNumPartitions())
inp.collect()

2


[['Site', 'www.learntospark.com'],
 ['Desccription', '"Complete guide to learn Spark', 'AI', 'ML"'],
 ['Page Views of each blog'],
 ['20200817-20200817'],
 [''],
 ['Total data in page', '12'],
 [''],
 ['Page', 'Date', 'Pageviews', 'Unique_Pageviews', 'Sessions'],
 ['Guide to Install Spark', '2020-08-17', '1140', '986', '800'],
 ['Spark MAP vs FlatMap', '2020-08-17', '836', '800', '128'],
 ['Spark Architechture', '2020-08-17', '1569', '1345', '1400'],
 ['Azure Function for Mp3 to text', '2020-08-17', '350', '245', '234'],
 ['Scala Vs Python', '2020-08-17', '200', '150', '130'],
 ['Spark Window Function', '2020-08-17', '789', '546', '560'],
 ['Natural Language Processing', '2020-08-17', '467', '456', '100'],
 ['Spark Linear Interpolation - Time Series',
  '2020-08-17',
  '698',
  '345',
  '349'],
 ['Spark case statement', '2020-08-17', '234', '196', '120'],
 ['Spark Scenario Based Questions', '2020-08-17', '712', '329', '137'],
 ['Spark v3.0 Delta Lake', '2020-08-17', '333', '198', '39']

In [210]:
rdd_drop = inp.mapPartitionsWithIndex(lambda id_x,iter:list(iter)[8:] if (id_x == 0) else iter)
rdd_drop.collect()
schm = ['Page','Date','PageViews','Uniq_views','Session']
# rdd_drop.toDF().show()
out_df = spark.createDataFrame(rdd_drop,schm)
out_df.show(200,False)

+----------------------------------------+----------+---------+----------+-------+
|Page                                    |Date      |PageViews|Uniq_views|Session|
+----------------------------------------+----------+---------+----------+-------+
|Guide to Install Spark                  |2020-08-17|1140     |986       |800    |
|Spark MAP vs FlatMap                    |2020-08-17|836      |800       |128    |
|Spark Architechture                     |2020-08-17|1569     |1345      |1400   |
|Azure Function for Mp3 to text          |2020-08-17|350      |245       |234    |
|Scala Vs Python                         |2020-08-17|200      |150       |130    |
|Spark Window Function                   |2020-08-17|789      |546       |560    |
|Natural Language Processing             |2020-08-17|467      |456       |100    |
|Spark Linear Interpolation - Time Series|2020-08-17|698      |345       |349    |
|Spark case statement                    |2020-08-17|234      |196       |120    |
|Spa