# Part A - Working with the RDD API
## Question A.1
### A.1.1 Read the English transcripts with Spark, and count the number of lines.

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .master("spark://192.168.2.250:7077") \
    .appName("MasudulIslam") \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.port", "9999") \
    .config("spark.blockManager.port", "10005") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
    .config("spark.shuffle.service.enabled", "false") \
    .config("spark.dynamicAllocation.executorIdleTimeout", "30s") \
    .getOrCreate()

# Initialize Spark Context from the Spark Session
sc = spark.sparkContext

# Define the base path for the HDFS directory containing the transcripts
base_path = "hdfs://192.168.2.250:9000/europarl/"

# Read the English transcripts and count the number of lines
eng_transcripts_rdd = sc.textFile(base_path + "europarl-v7.bg-en.en")
eng_line_count = eng_transcripts_rdd.count()
print("Number of lines in the English transcripts:", eng_line_count)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/21 09:23:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:>                                                          (0 + 2) / 2]

Number of lines in the English transcripts: 406934


24/02/21 09:30:14 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
24/02/21 09:30:14 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:291)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:981)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:165)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:263)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:170)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce

### A.1.2 Do the same with the other language (so that you have a separate lineage of RDDs for each).

In [4]:
# Read the Bulgarian transcripts and count the number of lines
bg_transcripts_rdd = sc.textFile(base_path + "europarl-v7.bg-en.bg")
bg_line_count = bg_transcripts_rdd.count()
print("Number of lines in the Bulgarian transcripts:", bg_line_count)

[Stage 1:>                                                          (0 + 2) / 2]

Number of lines in the Bulgarian transcripts: 406934


                                                                                

### A.1.3 Verify that the line counts are the same for the two languages.

In [5]:
if eng_line_count == bg_line_count:
    print("The line counts are the same for both languages.")
else:
    print("The line counts are different.")

The line counts are the same for both languages.


### A.1.4 Count the number of partitions.

In [6]:
# Count the number of partitions for the English transcripts
eng_partitions = eng_transcripts_rdd.getNumPartitions()
print("Number of partitions in the English RDD:", eng_partitions)

# Count the number of partitions for the Bulgarian transcripts
bg_partitions = bg_transcripts_rdd.getNumPartitions()
print("Number of partitions in the Bulgarian RDD:", bg_partitions)

Number of partitions in the English RDD: 2
Number of partitions in the Bulgarian RDD: 2


## Question A.2
### A.2.1 Pre-process the text from both RDDs by doing the following:

In [8]:
# Define a function to lowercase and tokenize the text
def preprocess_line(line):
    return line.lower().split()

# Apply the function to both RDDs
eng_processed_rdd = eng_transcripts_rdd.map(preprocess_line)
bg_processed_rdd = bg_transcripts_rdd.map(preprocess_line)

### A.2.2 Inspect 10 entries from each of your RDDs to verify your pre-processing.

In [9]:
# Inspect 10 entries from the English processed RDD
print("Sample from the English processed RDD:")
print(eng_processed_rdd.take(10))

# Inspect 10 entries from the Bulgarian processed RDD
print("Sample from the Bulgarian processed RDD:")
print(bg_processed_rdd.take(10))

Sample from the English processed RDD:


                                                                                

[['membership', 'of', 'parliament:', 'see', 'minutes'], ['approval', 'of', 'minutes', 'of', 'previous', 'sitting:', 'see', 'minutes'], ['membership', 'of', 'parliament:', 'see', 'minutes'], ['verification', 'of', 'credentials:', 'see', 'minutes'], ['documents', 'received:', 'see', 'minutes'], ['written', 'statements', 'and', 'oral', 'questions', '(tabling):', 'see', 'minutes'], ['petitions:', 'see', 'minutes'], ['texts', 'of', 'agreements', 'forwarded', 'by', 'the', 'council:', 'see', 'minutes'], ['action', 'taken', 'on', "parliament's", 'resolutions:', 'see', 'minutes'], ['agenda', 'for', 'next', 'sitting:', 'see', 'minutes']]
Sample from the Bulgarian processed RDD:


[Stage 3:>                                                          (0 + 1) / 1]

[['състав', 'на', 'парламента:', 'вж.', 'протоколи'], ['одобряване', 'на', 'протокола', 'от', 'предишното', 'заседание:', 'вж', 'протоколите'], ['състав', 'на', 'парламента:', 'вж.', 'протоколи'], ['проверка', 'на', 'пълномощията:', 'вж.', 'протоколи'], ['внасяне', 'на', 'документи:', 'вж.', 'протоколи'], ['въпроси', 'с', 'искане', 'за', 'устен', 'отговор', 'и', 'писмени', 'декларации', '(внасяне):', 'вж.', 'протокола'], ['петиции:', 'вж.', 'протоколи'], ['предаване', 'на', 'текстове', 'на', 'споразумения', 'от', 'съвета:', 'вж.', 'протоколи'], ['действия,', 'предприети', 'вследствие', 'резолюции', 'на', 'парламента:', 'вж.', 'протокола'], ['дневен', 'ред', 'на', 'следващото', 'заседание:', 'вж.', 'протоколи']]


                                                                                

### A.2.3 Verify that the line counts still match after the pre-processing.

In [10]:
# Verify that the line counts still match after pre-processing
assert eng_processed_rdd.count() == bg_processed_rdd.count(), "Line counts differ after pre-processing"
print("Line counts still match after pre-processing.")

[Stage 5:>                                                          (0 + 2) / 2]

Line counts still match after pre-processing.


                                                                                

## Question A.3
### A.3.1 Use Spark to compute the 10 most frequently according words in the English language corpus. Repeat for the other language.

In [11]:
# Compute the 10 most frequent words in the English corpus
eng_word_counts = eng_processed_rdd.flatMap(lambda line: line).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
top_10_eng_words = eng_word_counts.takeOrdered(10, key=lambda x: -x[1])
print("Top 10 words in the English corpus:", top_10_eng_words)

# Compute the 10 most frequent words in the Bulgarian corpus
bg_word_counts = bg_processed_rdd.flatMap(lambda line: line).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
top_10_bg_words = bg_word_counts.takeOrdered(10, key=lambda x: -x[1])
print("Top 10 words in the Bulgarian corpus:", top_10_bg_words)

                                                                                

Top 10 words in the English corpus: [('the', 752615), ('of', 363742), ('to', 328387), ('and', 294459), ('in', 240247), ('a', 166056), ('that', 162448), ('is', 158054), ('for', 123519), ('we', 110215)]


[Stage 8:>                                                          (0 + 2) / 2]

Top 10 words in the Bulgarian corpus: [('на', 604938), ('да', 330186), ('и', 328079), ('за', 261271), ('в', 228108), ('от', 168749), ('се', 150472), ('е', 129681), ('че', 114145), ('с', 95262)]


                                                                                

### A.3.2 Verify that your results are reasonable.

In [12]:
# Print the top 10 words from English and Bulgarian corpuses
print("Top 10 English words and their counts:")
for word, count in top_10_eng_words:
    print(f"{word}: {count}")

print("\nTop 10 Bulgarian words and their counts:")
for word, count in top_10_bg_words:
    print(f"{word}: {count}")

Top 10 English words and their counts:
the: 752615
of: 363742
to: 328387
and: 294459
in: 240247
a: 166056
that: 162448
is: 158054
for: 123519
we: 110215

Top 10 Bulgarian words and their counts:
на: 604938
да: 330186
и: 328079
за: 261271
в: 228108
от: 168749
се: 150472
е: 129681
че: 114145
с: 95262


## Question A.4
### A.4.1 Use this parallel corpus to mine some translations in the form of word pairs, for the two languages. Do this by pairing words found on short lines with the same number of words respectively. We (incorrectly) assume the words stay in the same order when translated.

In [13]:
# 1. Key the lines by their line number
eng_indexed = eng_processed_rdd.zipWithIndex().map(lambda x: (x[1], x[0]))
bg_indexed = bg_processed_rdd.zipWithIndex().map(lambda x: (x[1], x[0]))

# 2. Swap the key and value - so that the line number is the key
# (This step is already accomplished by how we structured the zipWithIndex() result.

# 3. Join the two RDDs together according to the line number key, so we have pairs of matching lines
paired_sentences = eng_indexed.join(bg_indexed)

# 4. Filter to exclude line pairs that have an empty/missing "corresponding" sentence
non_empty_pairs = paired_sentences.filter(lambda x: x[1][0] and x[1][1])

# 5. Filter to leave only pairs of sentences with a small number of words per sentence
# (Here, let's choose sentences with 5 or fewer words as "small")
small_sentence_pairs = non_empty_pairs.filter(lambda x: len(x[1][0]) <= 5 and len(x[1][1]) <= 5)

# 6. Filter to leave only pairs of sentences with the same number of words in each sentence
equal_length_pairs = small_sentence_pairs.filter(lambda x: len(x[1][0]) == len(x[1][1]))

# 7. For each sentence pair, map so that we pair each (in order) word in the two sentences
word_pairs = equal_length_pairs.flatMap(lambda x: zip(x[1][0], x[1][1]))

# 8. Use reduce to count the number of occurrences of the word-translation-pairs
word_pair_counts = word_pairs.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)

# 9. Print some of the most frequently occurring pairs of words
most_frequent_word_pairs = word_pair_counts.takeOrdered(10, key=lambda x: -x[1])
print("Most frequent translation pairs:")
for pair, count in most_frequent_word_pairs:
    print(f"{pair}: {count}")



Most frequent translation pairs:
('(applause)', '(ръкопляскания)'): 911
('written', 'писмени'): 687
('is', 'е'): 626
('(rule', '(член'): 566
('of', 'на'): 492
('statements', 'изявления'): 457
('149)', '149)'): 423
('see', 'вж.'): 349
('thank', 'благодаря'): 298
('that', 'това'): 297


                                                                                