In [7]:
from pyspark.sql import SparkSession
import re

spark = SparkSession.builder.appName("MergeNewsgroupFiles").getOrCreate()
sc = spark.sparkContext

def parse_file(content):

    pattern = r'Newsgroup:.*\nDocument_id:\s*(\d+)\nFrom:.*\nSubject:.*\n'
    matches = list(re.finditer(pattern, content))
    documents = []
    
    for i in range(len(matches)):
        document_id = int(matches[i].group(1))

        if i < len(matches) - 1:
            article = content[matches[i].end():matches[i+1].start()]
        else:
            article = content[matches[i].end():]
        

        article = article.strip()

        article_lines = article.split('\n')
        while article_lines and any(article_lines[0].startswith(header) for header in 
                                    ('Archive-name:', 'Alt-atheism-archive-name:', 'Last-modified:', 'Version:')):
            article_lines.pop(0)

        article = '\n'.join(article_lines).strip()
        
        documents.append((document_id, article))
    
    return documents

rdd = sc.wholeTextFiles("*.txt")

documents_rdd = rdd.flatMap(lambda x: parse_file(x[1]))

df = documents_rdd.toDF(["Document_id", "Article"])
df = df.orderBy("Document_id")

df.show(5)


+-----------+--------------------+
|Document_id|             Article|
+-----------+--------------------+
|       8514|CALL FOR PRESENTA...|
|       9136|In article <1993M...|
|       9138|Geoffrey S. Elbo ...|
|       9139|>     There's one...|
|       9140|jorge@erex.East.S...|
+-----------+--------------------+
only showing top 5 rows



In [8]:
if df.count() > 0:
    first_row = df.first()
    print(f"Document_id: {first_row['Document_id']}")
    print(f"Article:\n{first_row['Article']}")
else:
    print("The DataFrame is empty.")

Document_id: 8514
Article:
CALL FOR PRESENTATIONS
	
      NAVY SCIENTIFIC VISUALIZATION AND VIRTUAL REALITY SEMINAR

			Tuesday, June 22, 1993

	    Carderock Division, Naval Surface Warfare Center
	      (formerly the David Taylor Research Center)
			  Bethesda, Maryland

SPONSOR: NESS (Navy Engineering Software System) is sponsoring a 
one-day Navy Scientific Visualization and Virtual Reality Seminar.  
The purpose of the seminar is to present and exchange information for
Navy-related scientific visualization and virtual reality programs, 
research, developments, and applications.

PRESENTATIONS: Presentations are solicited on all aspects of 
Navy-related scientific visualization and virtual reality.  All 
current work, works-in-progress, and proposed work by Navy 
organizations will be considered.  Four types of presentations are 
available.

     1. Regular presentation: 20-30 minutes in length
     2. Short presentation: 10 minutes in length
     3. Video presentation: a stand-alo

In [9]:
df.count()

18299

In [10]:
df.coalesce(1).write.csv('df_Mid_Size.csv', mode='overwrite', header=True)

print("df_Mid_Size has been exported to 'df_Mid_Size.csv'")

df_Mid_Size has been exported to 'df_Mid_Size.csv'
