In [1]:
import gzip
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf

spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/07 19:53:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Spark user defined function (UDF): fasttext LID
sc = spark.sparkContext
sc.addFile('lid218e.bin')
sc.addPyFile('fasttext_lang_classifier.py')
import fasttext_lang_classifier
udf_predict_language = udf(fasttext_lang_classifier.predict_language)


In [3]:
# Load bitext
datasets = [('eng_Latn', '/Users/jkim/src/MultitaskNMT_data/Indo-Pacific/bilingual/en-bug/wikimedia.bug-en.en.gz',  'bug_Latn', '/Users/jkim/src/MultitaskNMT_data/Indo-Pacific/bilingual/en-bug/wikimedia.bug-en.bug.gz'),
            ('eng_Latn', '/Users/jkim/src/MultitaskNMT_data/Indo-Pacific/bilingual/en-ilo/XLEnt.en-ilo.en.gz', 'ilo_Latn', '/Users/jkim/src/MultitaskNMT_data/Indo-Pacific/bilingual/en-ilo/XLEnt.en-ilo.ilo.gz')]

df = spark.createDataFrame(data=[(srclang, src.strip(), tgtlang, tgt.strip()) \
                                  for (srclang, srcf, tgtlang, tgtf) in datasets \
                                  for (src,tgt) in zip(gzip.open(srcf, 'rt'), gzip.open(tgtf, 'rt'))], \
                           schema=["SRCLANG", "SRC", "TGTLANG", "TGT"]) 
print(f'{df.count()=}')

24/04/07 19:53:45 WARN TaskSetManager: Stage 0 contains a task of very large size (2355 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 8) / 8]

df.count()=79086


24/04/07 19:53:49 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [4]:
# Add LID predictions
df = df.withColumn('SRCLANG_PREDICTED', udf_predict_language(col('SRC')))\
       .withColumn('TGTLANG_PREDICTED', udf_predict_language(col('TGT')))
df.show(5)
df.tail(5)

24/04/07 19:53:51 WARN TaskSetManager: Stage 3 contains a task of very large size (2355 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------+--------------------+--------+--------------------+-----------------+-----------------+
| SRCLANG|                 SRC| TGTLANG|                 TGT|SRCLANG_PREDICTED|TGTLANG_PREDICTED|
+--------+--------------------+--------+--------------------+-----------------+-----------------+
|eng_Latn|On May 22, 2013, ...|bug_Latn|On May 22, 2018, ...|         eng_Latn|         eng_Latn|
|eng_Latn|He passed away tw...|bug_Latn|He passed away tw...|         eng_Latn|         eng_Latn|
|eng_Latn|He died at the ag...|bug_Latn|He died at the ag...|         eng_Latn|         eng_Latn|
|eng_Latn|Pearl Fernandez M...|bug_Latn| Pearl was born 1983|         eng_Latn|         eng_Latn|
|eng_Latn|Pearl Sinthia Fer...|bug_Latn|Pearl Sinthia Fer...|         eng_Latn|         eng_Latn|
+--------+--------------------+--------+--------------------+-----------------+-----------------+
only showing top 5 rows



24/04/07 19:53:52 WARN TaskSetManager: Stage 4 contains a task of very large size (2596 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[Row(SRCLANG='eng_Latn', SRC='Arthur Merrill', TGTLANG='ilo_Latn', TGT='家用摄像头网上专卖店满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满意意满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满意意了我是我是我是我是我是我是我是非常满意！。', SRCLANG_PREDICTED='eng_Latn', TGTLANG_PREDICTED='bos_Latn'),
 Row(SRCLANG='eng_Latn', SRC='High Roller Records', TGTLANG='ilo_Latn', TGT='家用摄像头网上专卖店满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满意意满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满意意了我是我是我是我是我是我是我是非常满意！。', SRCLANG_PREDICTED='eng_Latn', TGTLANG_PREDICTED='bos_Latn'),
 Row(SRCLANG='eng_Latn', SRC='Ripon', TGTLANG='ilo_Latn', TGT='家用摄像头网上专卖店满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满意意满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满满意意了我是我是我是我是我是我是我是非常满意！。', SRCLANG_PREDICTED='vie_Latn', TGTLANG_PREDICTED='bos_Latn'),
 Row(SRCLANG='eng_Latn', SRC='Geotechnical and Environmental Rese

In [5]:
print(f'{df.count()=}')

24/04/07 19:54:00 WARN TaskSetManager: Stage 5 contains a task of very large size (2355 KiB). The maximum recommended task size is 1000 KiB.


df.count()=79086


In [8]:
print(f'{df.select(["SRCLANG", "SRCLANG_PREDICTED", "TGTLANG", "TGTLANG_PREDICTED"]).where((df.SRCLANG == df.SRCLANG_PREDICTED) & (df.TGTLANG == df.TGTLANG_PREDICTED)).count()=}')

24/04/07 19:58:17 WARN TaskSetManager: Stage 11 contains a task of very large size (2355 KiB). The maximum recommended task size is 1000 KiB.

df.select(["SRCLANG", "SRCLANG_PREDICTED", "TGTLANG", "TGTLANG_PREDICTED"]).where((df.SRCLANG == df.SRCLANG_PREDICTED) & (df.TGTLANG == df.TGTLANG_PREDICTED)).count()=11570


                                                                                

In [9]:
langmatch = df.filter((F.col("SRCLANG") == F.col("SRCLANG_PREDICTED")) & (F.col("TGTLANG") == F.col("TGTLANG_PREDICTED")))
print(f'{langmatch.count()=}')

langmatch.write.csv("wikimedia.bug-en__XLEnt.en-ilo.csv", mode="overwrite")
langmatch.write.parquet("wikimedia.bug-en__XLEnt.en-ilo.parquet", mode="overwrite")
langmatch.write.mode("overwrite").format("json").save("wikimedia.bug-en__XLEnt.en-ilo.json")

24/04/07 20:05:28 WARN TaskSetManager: Stage 14 contains a task of very large size (2355 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

langmatch.count()=11570


24/04/07 20:08:18 WARN TaskSetManager: Stage 17 contains a task of very large size (2355 KiB). The maximum recommended task size is 1000 KiB.
24/04/07 20:11:59 WARN TaskSetManager: Stage 18 contains a task of very large size (2355 KiB). The maximum recommended task size is 1000 KiB.
24/04/07 20:16:11 WARN TaskSetManager: Stage 19 contains a task of very large size (2355 KiB). The maximum recommended task size is 1000 KiB.
                                                                                