#Overview

Here we use PySpark to process the dataset. PySpark can be helpful to process huge datasets. Here is a brief description of the steps: <br><br>

1. First import the text file into a PySpark dataframe
2. Split the dataframe into two columns one for the English sentences and the second for the German sentences
3. Define an UDF with steps described above to clean up each columns
4. Save the dataframe into a CSV file

# Libraries

In [0]:
import re
import string 
import pandas as pd 
from unicodedata import normalize
from pyspark.sql.functions import regexp_extract, lower
import pyspark.sql.functions as F
from pyspark.sql.functions import *

In [0]:
spark

In [0]:
sqlContext

Out[113]: <pyspark.sql.context.SQLContext at 0x7f3544068370>

In [0]:
# Shows a dict of PySpark informations. We can check if 'sc', 'sqlContext', and 'spark' are present.
# locals()

In [0]:
sc

# Import Data

In [0]:
%fs ls /FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/Bookings.csv,Bookings.csv,170118,1674344235000
dbfs:/FileStore/tables/English-German.csv/,English-German.csv/,0,0
dbfs:/FileStore/tables/Facilities.csv,Facilities.csv,494,1674344235000
dbfs:/FileStore/tables/Members.csv,Members.csv,3313,1674344236000
dbfs:/FileStore/tables/NASA_access_log_Aug95.txt,NASA_access_log_Aug95.txt,167813770,1675580793000
dbfs:/FileStore/tables/NASA_access_log_Jul95.txt,NASA_access_log_Jul95.txt,205242368,1675580851000
dbfs:/FileStore/tables/deu.txt,deu.txt,40721427,1687189983000


In [0]:
# File location and type
file_location = "dbfs:/FileStore/tables/deu.txt"
file_type = "txt"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
# df = spark.read.format(file_type) \
#   .option("inferSchema", infer_schema) \
#   .option("header", first_row_is_header) \
#   .option("sep", delimiter) \
#   .load(file_location)

# df = spark.read.text('dbfs:/FileStore/tables/deu.txt')
df = spark.read.text(file_location)

# display(df) # display is too long on github use show instead
df.show(10, truncate=False)

+--------------------------------------------------------------------------------------------------------------+
|value                                                                                                         |
+--------------------------------------------------------------------------------------------------------------+
|Go.\tGeh.\tCC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #8597805 (Roujin)                      |
|Hi.\tHallo!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #380701 (cburgmer)                    |
|Hi.\tGrüß Gott!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #659813 (Esperantostern)          |
|Run!\tLauf!\tCC-BY 2.0 (France) Attribution: tatoeba.org #906328 (papabear) & #941078 (Fingerhut)             |
|Run.\tLauf!\tCC-BY 2.0 (France) Attribution: tatoeba.org #4008918 (JSakuragi) & #941078 (Fingerhut)           |
|Wow!\tPotzdonner!\tCC-BY 2.0 (France) Attribution: tatoeba.org #52027 (Zifre) & #2122382 (Pfirs

In [0]:
df.printSchema()

root
 |-- value: string (nullable = true)



In [0]:
df.head(3)

Out[118]: [Row(value='Go.\tGeh.\tCC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #8597805 (Roujin)'),
 Row(value='Hi.\tHallo!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #380701 (cburgmer)'),
 Row(value='Hi.\tGrüß Gott!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #659813 (Esperantostern)')]

In [0]:
type(df)

Out[119]: pyspark.sql.dataframe.DataFrame

In [0]:
df_rdd = df.rdd
type(df_rdd)

Out[120]: pyspark.rdd.RDD

In [0]:
df_rdd.take(5)

Out[121]: [Row(value='Go.\tGeh.\tCC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #8597805 (Roujin)'),
 Row(value='Hi.\tHallo!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #380701 (cburgmer)'),
 Row(value='Hi.\tGrüß Gott!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #659813 (Esperantostern)'),
 Row(value='Run!\tLauf!\tCC-BY 2.0 (France) Attribution: tatoeba.org #906328 (papabear) & #941078 (Fingerhut)'),
 Row(value='Run.\tLauf!\tCC-BY 2.0 (France) Attribution: tatoeba.org #4008918 (JSakuragi) & #941078 (Fingerhut)')]

In [0]:
df.show(10, truncate=False)

+--------------------------------------------------------------------------------------------------------------+
|value                                                                                                         |
+--------------------------------------------------------------------------------------------------------------+
|Go.\tGeh.\tCC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #8597805 (Roujin)                      |
|Hi.\tHallo!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #380701 (cburgmer)                    |
|Hi.\tGrüß Gott!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #659813 (Esperantostern)          |
|Run!\tLauf!\tCC-BY 2.0 (France) Attribution: tatoeba.org #906328 (papabear) & #941078 (Fingerhut)             |
|Run.\tLauf!\tCC-BY 2.0 (France) Attribution: tatoeba.org #4008918 (JSakuragi) & #941078 (Fingerhut)           |
|Wow!\tPotzdonner!\tCC-BY 2.0 (France) Attribution: tatoeba.org #52027 (Zifre) & #2122382 (Pfirs

In [0]:
df_rdd.take(10)

Out[123]: [Row(value='Go.\tGeh.\tCC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #8597805 (Roujin)'),
 Row(value='Hi.\tHallo!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #380701 (cburgmer)'),
 Row(value='Hi.\tGrüß Gott!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #659813 (Esperantostern)'),
 Row(value='Run!\tLauf!\tCC-BY 2.0 (France) Attribution: tatoeba.org #906328 (papabear) & #941078 (Fingerhut)'),
 Row(value='Run.\tLauf!\tCC-BY 2.0 (France) Attribution: tatoeba.org #4008918 (JSakuragi) & #941078 (Fingerhut)'),
 Row(value='Wow!\tPotzdonner!\tCC-BY 2.0 (France) Attribution: tatoeba.org #52027 (Zifre) & #2122382 (Pfirsichbaeumchen)'),
 Row(value='Wow!\tDonnerwetter!\tCC-BY 2.0 (France) Attribution: tatoeba.org #52027 (Zifre) & #2122391 (Pfirsichbaeumchen)'),
 Row(value='Duck!\tKopf runter!\tCC-BY 2.0 (France) Attribution: tatoeba.org #280158 (CM) & #9968521 (wolfgangth)'),
 Row(value='Fire!\tFeuer!\tCC-BY 2.0 (France) Attribution: tatoeba.or

In [0]:
print(df.count(), len(df.columns))

261499 1


# Process Data

In [0]:
sample_sentences = [item['value'] for item in df.take(100)]
sample_sentences

Out[125]: ['Go.\tGeh.\tCC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #8597805 (Roujin)',
 'Hi.\tHallo!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #380701 (cburgmer)',
 'Hi.\tGrüß Gott!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #659813 (Esperantostern)',
 'Run!\tLauf!\tCC-BY 2.0 (France) Attribution: tatoeba.org #906328 (papabear) & #941078 (Fingerhut)',
 'Run.\tLauf!\tCC-BY 2.0 (France) Attribution: tatoeba.org #4008918 (JSakuragi) & #941078 (Fingerhut)',
 'Wow!\tPotzdonner!\tCC-BY 2.0 (France) Attribution: tatoeba.org #52027 (Zifre) & #2122382 (Pfirsichbaeumchen)',
 'Wow!\tDonnerwetter!\tCC-BY 2.0 (France) Attribution: tatoeba.org #52027 (Zifre) & #2122391 (Pfirsichbaeumchen)',
 'Duck!\tKopf runter!\tCC-BY 2.0 (France) Attribution: tatoeba.org #280158 (CM) & #9968521 (wolfgangth)',
 'Fire!\tFeuer!\tCC-BY 2.0 (France) Attribution: tatoeba.org #1829639 (Spamster) & #1958697 (Tamy)',
 'Help!\tHilfe!\tCC-BY 2.0 (France) Attribution: tatoeba

## English-German Split

In [0]:
first_column_pattern = r'^(.+?)\t'
for item in sample_sentences:
    print(re.search(first_column_pattern, item))
    print(re.search(first_column_pattern, item).group(1))

second_column_pattern = r'\t(.+?)\t'
for item in sample_sentences:
    print(re.search(second_column_pattern, item))
    print(re.search(second_column_pattern, item).group(1))

<re.Match object; span=(0, 4), match='Go.\t'>
Go.
<re.Match object; span=(0, 4), match='Hi.\t'>
Hi.
<re.Match object; span=(0, 4), match='Hi.\t'>
Hi.
<re.Match object; span=(0, 5), match='Run!\t'>
Run!
<re.Match object; span=(0, 5), match='Run.\t'>
Run.
<re.Match object; span=(0, 5), match='Wow!\t'>
Wow!
<re.Match object; span=(0, 5), match='Wow!\t'>
Wow!
<re.Match object; span=(0, 6), match='Duck!\t'>
Duck!
<re.Match object; span=(0, 6), match='Fire!\t'>
Fire!
<re.Match object; span=(0, 6), match='Help!\t'>
Help!
<re.Match object; span=(0, 6), match='Help!\t'>
Help!
<re.Match object; span=(0, 6), match='Stay.\t'>
Stay.
<re.Match object; span=(0, 6), match='Stop!\t'>
Stop!
<re.Match object; span=(0, 6), match='Stop!\t'>
Stop!
<re.Match object; span=(0, 6), match='Wait!\t'>
Wait!
<re.Match object; span=(0, 6), match='Wait.\t'>
Wait.
<re.Match object; span=(0, 7), match='Begin.\t'>
Begin.
<re.Match object; span=(0, 7), match='Do it.\t'>
Do it.
<re.Match object; span=(0, 7), match='Do it.

In [0]:
first_column_pattern = r'^(.+?)\t'
first_column = [re.search(first_column_pattern, item).group(1) if re.search(first_column_pattern, item) else 'no match' for item in sample_sentences]
first_column

Out[127]: ['Go.',
 'Hi.',
 'Hi.',
 'Run!',
 'Run.',
 'Wow!',
 'Wow!',
 'Duck!',
 'Fire!',
 'Help!',
 'Help!',
 'Stay.',
 'Stop!',
 'Stop!',
 'Wait!',
 'Wait.',
 'Begin.',
 'Do it.',
 'Do it.',
 'Go on.',
 'Hello!',
 'Hello!',
 'Hello.',
 'Hurry!',
 'Hurry!',
 'I hid.',
 'I hid.',
 'I ran.',
 'I see.',
 'I see.',
 'I try.',
 'I try.',
 'I won!',
 'I won!',
 'I won.',
 'Oh no!',
 'Relax.',
 'Shoot!',
 'Shoot!',
 'Smile.',
 'Sorry?',
 'Ask me.',
 'Ask me.',
 'Ask me.',
 'Attack!',
 'Attack!',
 'Buy it.',
 'Cheers!',
 'Eat it.',
 'Eat up.',
 'Eat up.',
 'Eat up.',
 'Freeze!',
 'Freeze!',
 'Go now.',
 'Got it!',
 'Got it!',
 'Got it!',
 'Got it?',
 'Got it?',
 'Got it?',
 'He ran.',
 'He ran.',
 'Hop in.',
 'Hop in.',
 'Hug me.',
 'Hug me.',
 'Hug me.',
 'I care.',
 'I fell.',
 'I fell.',
 'I fell.',
 'I fell.',
 'I fell.',
 'I fled.',
 'I fled.',
 'I know.',
 'I lied.',
 'I lost.',
 'I paid.',
 'I paid.',
 'I pass.',
 'I sang.',
 'I spit.',
 'I spit.',
 'I swim.',
 'I wept.',
 'I wept.',
 

In [0]:
second_column_pattern = r'\t(.+?)\t'
second_column = [re.search(second_column_pattern, item).group(1) if re.search(second_column_pattern, item) else 'no match' for item in sample_sentences]
second_column

Out[128]: ['Geh.',
 'Hallo!',
 'Grüß Gott!',
 'Lauf!',
 'Lauf!',
 'Potzdonner!',
 'Donnerwetter!',
 'Kopf runter!',
 'Feuer!',
 'Hilfe!',
 'Zu Hülf!',
 'Bleib!',
 'Stopp!',
 'Anhalten!',
 'Warte!',
 'Warte.',
 'Fang an.',
 'Mache es!',
 'Tue es.',
 'Mach weiter.',
 'Hallo!',
 'Sers!',
 'Hallo!',
 'Beeil dich!',
 'Schnell!',
 'Ich versteckte mich.',
 'Ich habe mich versteckt.',
 'Ich rannte.',
 'Ich verstehe.',
 'Aha.',
 'Ich versuche es.',
 'Ich probiere es.',
 'Ich hab gewonnen!',
 'Ich habe gewonnen!',
 'Ich habe gewonnen.',
 'Oh, Nein!',
 'Entspann dich.',
 'Feuer!',
 'Schieß!',
 'Lächeln!',
 'Entschuldigung?',
 'Frag mich!',
 'Fragt mich!',
 'Fragen Sie mich!',
 'Angriff!',
 'Attacke!',
 'Kauf’s!',
 'Zum Wohl!',
 'Iss es.',
 'Iss fertig!',
 'Iss auf.',
 'Iss auf!',
 'Keine Bewegung!',
 'Stehenbleiben!',
 'Geh jetzt!',
 'Verstanden!',
 'Ich hab’s!',
 'Aha!',
 'Kapiert?',
 'Verstanden?',
 'Einverstanden?',
 'Er rannte.',
 'Er lief.',
 'Mach mit!',
 'Spring rein!',
 'Drück mich!',
 'N

In [0]:
df_pair = df.select(regexp_extract('value', first_column_pattern, 1).alias('English'), 
                    regexp_extract('value', second_column_pattern, 1).alias('German'), )
df_pair.show(10, truncate=False)

+-------+-------------+
|English|German       |
+-------+-------------+
|Go.    |Geh.         |
|Hi.    |Hallo!       |
|Hi.    |Grüß Gott!   |
|Run!   |Lauf!        |
|Run.   |Lauf!        |
|Wow!   |Potzdonner!  |
|Wow!   |Donnerwetter!|
|Duck!  |Kopf runter! |
|Fire!  |Feuer!       |
|Help!  |Hilfe!       |
+-------+-------------+
only showing top 10 rows



## Lowercase

In [0]:
column_name = 'English'
df_2 = df_pair.withColumn(column_name, lower(F.col(column_name)))
column_name = 'German'
df_2 = df_2.withColumn(column_name, lower(F.col(column_name)))
df_2.show(10, truncate=False)

+-------+-------------+
|English|German       |
+-------+-------------+
|go.    |geh.         |
|hi.    |hallo!       |
|hi.    |grüß gott!   |
|run!   |lauf!        |
|run.   |lauf!        |
|wow!   |potzdonner!  |
|wow!   |donnerwetter!|
|duck!  |kopf runter! |
|fire!  |feuer!       |
|help!  |hilfe!       |
+-------+-------------+
only showing top 10 rows



In [0]:
df_2 = df_pair.select(
    [F.regexp_replace(F.col(column_name), r',|\.|&|\\|\||-|_|!', '').alias(column_name)
    for column_name in df_pair.columns]
    )
df_2.show(10)


+-------+------------+
|English|      German|
+-------+------------+
|     Go|         Geh|
|     Hi|       Hallo|
|     Hi|   Grüß Gott|
|    Run|        Lauf|
|    Run|        Lauf|
|    Wow|  Potzdonner|
|    Wow|Donnerwetter|
|   Duck| Kopf runter|
|   Fire|       Feuer|
|   Help|       Hilfe|
+-------+------------+
only showing top 10 rows



## Removed non-printable chars

In [0]:
# re_print = re.compile('[^%s]' % re.escape(string.printable))
non_printable = r'[^%s]' % re.escape(string.printable)
# non_printable = r'[\.\!]'
# non_printable = r',|\.|&|\\|\||-|_|!'
print(non_printable)
df_2 = df_pair.select(
    [F.regexp_replace(F.col(column_name), non_printable, '').alias(column_name)
    for column_name in df_pair.columns]
    )
df_2.show(10)

[^0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"\#\$%\&'\(\)\*\+,\-\./:;<=>\?@\[\\\]\^_`\{\|\}\~\ \	\
\\\]
+-------+-------------+
|English|       German|
+-------+-------------+
|    Go.|         Geh.|
|    Hi.|       Hallo!|
|    Hi.|     Gr Gott!|
|   Run!|        Lauf!|
|   Run.|        Lauf!|
|   Wow!|  Potzdonner!|
|   Wow!|Donnerwetter!|
|  Duck!| Kopf runter!|
|  Fire!|       Feuer!|
|  Help!|       Hilfe!|
+-------+-------------+
only showing top 10 rows



## Remove puncuations

In [0]:
# re_print = re.compile('[^%s]' % re.escape(string.printable))
punctuations = r'[%s]' % re.escape(string.punctuation)
# non_printable = r'[\.\!]'
# non_printable = r',|\.|&|\\|\||-|_|!'
print(non_printable)
df_2 = df_pair.select(
    [F.regexp_replace(F.col(column_name), punctuations, '').alias(column_name)
    for column_name in df_pair.columns]
    )
df_2.show(10)

[^0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"\#\$%\&'\(\)\*\+,\-\./:;<=>\?@\[\\\]\^_`\{\|\}\~\ \	\
\\\]
+-------+------------+
|English|      German|
+-------+------------+
|     Go|         Geh|
|     Hi|       Hallo|
|     Hi|   Grüß Gott|
|    Run|        Lauf|
|    Run|        Lauf|
|    Wow|  Potzdonner|
|    Wow|Donnerwetter|
|   Duck| Kopf runter|
|   Fire|       Feuer|
|   Help|       Hilfe|
+-------+------------+
only showing top 10 rows



## Encode and then Decode

In [0]:
df_2 = df_pair.withColumn('German', decode(col('German'), 'UTF-8').cast('string'))
df_2.show(10, truncate=False)

+-------+-------------+
|English|German       |
+-------+-------------+
|Go.    |Geh.         |
|Hi.    |Hallo!       |
|Hi.    |Grüß Gott!   |
|Run!   |Lauf!        |
|Run.   |Lauf!        |
|Wow!   |Potzdonner!  |
|Wow!   |Donnerwetter!|
|Duck!  |Kopf runter! |
|Fire!  |Feuer!       |
|Help!  |Hilfe!       |
+-------+-------------+
only showing top 10 rows



##User Defined Function (UDF)

UDFs are black boxed to PySpark. Therefore, cannot apply some optimizations that PySpark does on dataframes or datasets. It is better to use Spark SQL built-in functions.

In [0]:
def clean_sentence(sentence):
    # To remove non-printable chars
    re_print = re.compile('[^%s]' % re.escape(string.printable))
    # To remove punctuation chars 
    table = str.maketrans('', '', string.punctuation)
    # 
    #
    cleaned_sentence = normalize('NFD', sentence).encode('ascii', 'ignore')
    cleaned_sentence = cleaned_sentence.decode('UTF-8')
    cleaned_sentence = cleaned_sentence.split()
    cleaned_sentence = [word.lower() for word in cleaned_sentence]
    #Remove punctuation chars 
    cleaned_sentence = [word.translate(table) for word in cleaned_sentence]
    # Remove non-printable chars
    cleaned_sentence = [re_print.sub('', word) for word in cleaned_sentence]
    # Remove words with numbers? how to deal with numbers? 
    # How to deal with upper case? And , . ? % these signs? 
    cleaned_sentence = [word for word in cleaned_sentence if word.isalpha()] 
    cleaned_sentence = ' '.join(cleaned_sentence)
    #
    #
    return cleaned_sentence


my_udf = udf(lambda x:clean_sentence(x), StringType())
df_pair.select(df_pair.German, my_udf(df_pair.German).alias('German')).show(10)
df_pair.select(df_pair.English, my_udf(df_pair.English).alias('English')).show(10)


+-------------+------------+
|       German|      German|
+-------------+------------+
|         Geh.|         geh|
|       Hallo!|       hallo|
|   Grüß Gott!|    gru gott|
|        Lauf!|        lauf|
|        Lauf!|        lauf|
|  Potzdonner!|  potzdonner|
|Donnerwetter!|donnerwetter|
| Kopf runter!| kopf runter|
|       Feuer!|       feuer|
|       Hilfe!|       hilfe|
+-------------+------------+
only showing top 10 rows

+-------+-------+
|English|English|
+-------+-------+
|    Go.|     go|
|    Hi.|     hi|
|    Hi.|     hi|
|   Run!|    run|
|   Run.|    run|
|   Wow!|    wow|
|   Wow!|    wow|
|  Duck!|   duck|
|  Fire!|   fire|
|  Help!|   help|
+-------+-------+
only showing top 10 rows



In [0]:
df_cleaned = df_pair.withColumn('German', my_udf(df_pair.German).alias('German'))
df_cleaned = df_cleaned.withColumn('English', my_udf(df_pair.English).alias('English'))  
df_cleaned.show(20, truncate=False)

+-------+------------+
|English|German      |
+-------+------------+
|go     |geh         |
|hi     |hallo       |
|hi     |gru gott    |
|run    |lauf        |
|run    |lauf        |
|wow    |potzdonner  |
|wow    |donnerwetter|
|duck   |kopf runter |
|fire   |feuer       |
|help   |hilfe       |
|help   |zu hulf     |
|stay   |bleib       |
|stop   |stopp       |
|stop   |anhalten    |
|wait   |warte       |
|wait   |warte       |
|begin  |fang an     |
|do it  |mache es    |
|do it  |tue es      |
|go on  |mach weiter |
+-------+------------+
only showing top 20 rows



In [0]:
df_cleaned.count()

Out[137]: 261499

#Save the data

In [0]:
%fs ls /FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/Bookings.csv,Bookings.csv,170118,1674344235000
dbfs:/FileStore/tables/English-German.csv/,English-German.csv/,0,0
dbfs:/FileStore/tables/Facilities.csv,Facilities.csv,494,1674344235000
dbfs:/FileStore/tables/Members.csv,Members.csv,3313,1674344236000
dbfs:/FileStore/tables/NASA_access_log_Aug95.txt,NASA_access_log_Aug95.txt,167813770,1675580793000
dbfs:/FileStore/tables/NASA_access_log_Jul95.txt,NASA_access_log_Jul95.txt,205242368,1675580851000
dbfs:/FileStore/tables/deu.txt,deu.txt,40721427,1687189983000


In [0]:
#%fs rm -r /FileStore/tables/English-German.csv

## CSV file

In [0]:
df_cleaned.coalesce(1).write.format('com.databricks.spark.csv').option('header', 'true').save('dbfs:/FileStore/tables/English-German.csv')

## Pickle file - Does not work

In [0]:
# filename = '/dbfs/FileStore/tables/english-german.pkl'
# with open(filename, 'wb') as f:
#     dump(df_cleaned, f)