<h1>Merge RDD rows by Start/End Pattern</h1>

<p>Concatenate block of multiple lines based on Start/End patterns</p>
<p>Example: https://stackoverflow.com/questions/50865236/convert-multiple-rdd-rows-into-one-row-in-pyspark</p>

<p>In this case merging lines between <b>STARTING</b> and <b>STOP</b> marks. Since the same block (from STARTING to STOP) can be processed in multiple partitions, it's not easy to handle the Start and End marks at the same time. </p>

<p>We can handle this task in two steps:</p>

<ol>
 <li>Split the block with the '<b>STOP</b>' block by functions like <b>fold()</b>, <b>aggregate()</b>, <b>reduce()</b>
    Mark STARTING with a preceeding NUL character: '<b>\0</b>' </li>
 <li>Reload data into RDD and clean the unrelated text before the <b>NUL</b> + '<b>STARTING</b>' using <b>map()</b> function </li>
</ol>

<p>Below code was tested under ipython:</p>

<p>Sample text:</p>
<pre>
skip0 STARTING
skip0
STARTING |1|TH|TGG|132|8|T|Fall|
EVENT 1|56|HT|JUP||||||||
EVENT 2|BHT|987|231|||||||||||||||||
STOP|HFR|0.5|90|
skip1
skip1
skip1
STARTING |8|TH|TGG|12|8|T|Fall|
EVENT 1|6|HT|UP||||||||
EVENT 2|BT|987|31|||||||||||||||||
STOP|FR|0.5|90|
skip2
skip2
</pre>

<h4>Note:</h4>
<ol>
    <li>For large amount of data, merging data into one might raise memory or disk space issue. The following method by using foldByKey() transformation replace the fold() action may or may not resulve the issue. since the order of lines matters for this problem, the first function using a constant key is probably required. My conclusion is that using map/reduce and reduce's cousions like fold(), aggregate(), combiner etc is probably not appropriate for string concatenation since it does not reduce the size of data chucnks.
<pre>
    rdd.map(lambda x: (1,x))   \
   .foldByKey([''], merge) \
   .map(lambda x: x[1])    \
   .saveAsTextFile("/path/to/file")
</pre>
    </li>
    <li>The result can be easier by running the following awk command line
<pre>
    awk '/^STARTING/{f=1; ORS=" "}/^STOP/{ORS="\n"; f=0}f==1||/^STOP/' merge-orig.txt > merge-new.txt
</pre>
    </li>
</ol>




In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder                   \
                    .master("local[2]")        \
                    .appName("pyspark-test1")  \
                    .getOrCreate()
sc = spark.sparkContext

In [None]:
# read the text file and check the partitions
rdd = sc.textFile("/test/pyspark/merge-1.txt", minPartitions=20)
rdd.glom().collect()

<pre>
[[u'skip0', u'skip0STARTING'],
 [u'STARTING |1|TH|TGG|132|8|T|Fall|'],
 [],
 [],
 [u'EVENT 1|56|HT|JUP||||||||'],
 [],
 [u'EVENT 2|BHT|987|231|||||||||||||||||'],
 [],
 [u'STOP|HFR|0.5|90|'],
 [],
 [u'skip1', u'skip1'],
 [u'skip1', u'STARTING |8|TH|TGG|12|8|T|Fall|'],
 [],
 [],
 [u'EVENT 1|6|HT|UP||||||||'],
 [u'EVENT 2|BT|987|31|||||||||||||||||'],
 [],
 [],
 [u'STOP|FR|0.5|90|'],
 [u'skip2'],
 [u'skip2']]
</pre>

In [None]:
# fold function to merge data in and between partitions
# Note: we preceeded a NUL char before the 'STARTING' to split the unwanted text later
def merge(x, y):
    if type(y) is list:
        x[-1] += y[0]
        x = x + y[1:]
    else:
        if y.startswith('STARTING'):
            x[-1] += '\0' + y
        else:
            x[-1] += y
        if y.startswith('STOP'):
            x.append('')
    return x

In [None]:
# the result from the first step Terminate the block at 'STOP' line while keeping messy before the 'Start' line
rdd.fold([''], merge)

<pre>
[u'skip0skip0STARTING\x00STARTING |1|TH|TGG|132|8|T|Fall|EVENT 1|56|HT|JUP||||||||EVENT 2|BHT|987|231|||||||||||||||||STOP|HFR|0.5|90|',
 u'skip1skip1skip1\x00STARTING |8|TH|TGG|12|8|T|Fall|EVENT 1|6|HT|UP||||||||EVENT 2|BT|987|31|||||||||||||||||STOP|FR|0.5|90|',
 u'skip2skip2']
</pre>

In [None]:
# read the data into RDD again and remove the precedding text with map() function
rdd1 = sc.parallelize(rdd.fold([''], merge)[:-1])
rdd1.map(lambda x: x[x.find('\0STARTING')+1:]).collect()

<pre>
[u'STARTING |1|TH|TGG|132|8|T|Fall|EVENT 1|56|HT|JUP||||||||EVENT 2|BHT|987|231|||||||||||||||||STOP|HFR|0.5|90|',
 u'STARTING |8|TH|TGG|12|8|T|Fall|EVENT 1|6|HT|UP||||||||EVENT 2|BT|987|31|||||||||||||||||STOP|FR|0.5|90|']
</pre>