In [28]:
from pyspark.sql import SparkSession, types
from pyspark.sql.functions import regexp_replace, lit, split, explode, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [22]:
spark = SparkSession.builder.appName("Same Delimiter Line Break").getOrCreate()

In [23]:
sc = spark.sparkContext
schema = ('org')

In [24]:
# Define the structure for the data frame 

schema = StructType([
    StructField('org', StringType(), True)
])

In [26]:
df = spark.read.option('header', 'False').option('inferSchema', 'False').schema(schema).csv('same_delim_lineBreak.txt')

In [27]:
df.show(truncate=0)

+-------------------------------------------------------+
|org                                                    |
+-------------------------------------------------------+
|Keerthan Adivikolanu|29|M|Harika Vemula|28|F|Baby A|1|M|
+-------------------------------------------------------+



In [31]:
df2 = df.withColumn('chk', regexp_replace(col('org'), '(.*?\\|){3}', '$0-')) 
df2.show(truncate=0)

+-------------------------------------------------------+---------------------------------------------------------+
|org                                                    |chk                                                      |
+-------------------------------------------------------+---------------------------------------------------------+
|Keerthan Adivikolanu|29|M|Harika Vemula|28|F|Baby A|1|M|Keerthan Adivikolanu|29|M|-Harika Vemula|28|F|-Baby A|1|M|
+-------------------------------------------------------+---------------------------------------------------------+



In [34]:
df3 = df2.withColumn('chk', explode(split(col('chk'), '\|-')))
df3.show(truncate=0)

+-------------------------------------------------------+-------------------------+
|org                                                    |chk                      |
+-------------------------------------------------------+-------------------------+
|Keerthan Adivikolanu|29|M|Harika Vemula|28|F|Baby A|1|M|Keerthan Adivikolanu|29|M|
|Keerthan Adivikolanu|29|M|Harika Vemula|28|F|Baby A|1|M|Harika Vemula|28|F       |
|Keerthan Adivikolanu|29|M|Harika Vemula|28|F|Baby A|1|M|Baby A|1|M               |
+-------------------------------------------------------+-------------------------+



In [35]:
df4 = df2.select(explode(split(col('chk'), '\|-')))
df4.show(truncate=0)

+-------------------------+
|col                      |
+-------------------------+
|Keerthan Adivikolanu|29|M|
|Harika Vemula|28|F       |
|Baby A|1|M               |
+-------------------------+



In [46]:
# Can be done like this as well but in the output we get rows, in the below the right method is there wherein we use index of a list

df_final = df4.rdd.map(lambda x: str(x).split('|'))
df_final.collect()

[["Row(col='Keerthan Adivikolanu", '29', "M')"],
 ["Row(col='Harika Vemula", '28', "F')"],
 ["Row(col='Baby A", '1', "M')"]]

In [48]:
df_final2 = df4.rdd.map(lambda x: x[0].split('|'))
df_final2.collect()

[['Keerthan Adivikolanu', '29', 'M'],
 ['Harika Vemula', '28', 'F'],
 ['Baby A', '1', 'M']]

In [None]:
df_final2.toDF(['Name)