In [1]:
file_name = 'DiaryNotes_sample_Mar28.csv'
col1 = 'clt Id'
col2 = 'Text Entry'
col3 = 'Translated Entry'
col4 = 'Language Detected'
col5 = 'Confidence'
output_directory_name = 'outputdir'

In [2]:
import json
from ibm_watson import LanguageTranslatorV3
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

import configparser
import traceback
import logging

config = configparser.ConfigParser()
config.read('config.ini')
apikey = config['DEFAULT']['ApiKey']
url = config['DEFAULT']['Url']


# local external file, gitignore
# to encrypt apikey

# read into array
# pass array to translate service
# iterate following the original order
# O(2*N)


# load document
spark = SparkSession \
    .builder \
    .appName("Test") \
    .getOrCreate()

# create spark dataframe, immutable
df = spark.read \
    .options(header=True, encoding='UTF-8') \
    .csv(file_name)


# col1
client_id_list = [data[0] for data in df.select((col(col1))).collect()]
# col2
to_translate_list = [data[0] for data in df.select((col(col2))).collect()]
# col3
translated_list = []
# col4    
detected_lang_list = []
# col5
detected_lang_confidence_list = []

def translateService(to_translate):
    try:
        if (to_translate == ''):
            return ''
        authenticator = IAMAuthenticator(apikey)
        language_translator = LanguageTranslatorV3(
            version='2018-05-01',
            authenticator=authenticator
        )

        language_translator.set_service_url(url)
        if (to_translate != '' and to_translate is not None):
            language = language_translator.identify(
                to_translate).get_result()
            # TODO need to persist for each line
            # TODO print out on new column
            detected_language = json.loads(json.dumps(language, indent=2))['languages'][0]['language']
            detected_lang_confidence = json.loads(json.dumps(language, indent=2))['languages'][0]['confidence']
            global detected_lang_list
            detected_lang_list.append(detected_language)
            global detected_lang_confidence_list
            detected_lang_confidence_list.append(detected_lang_confidence)
            model_used = ''
            if(detected_language == 'en'):
                model_used = 'en-fr'
            else:
                model_used = 'fr-en'
            translation = language_translator.translate(
                text=to_translate,
                model_id=model_used).get_result()
            translated_result = json.loads(json.dumps(translation, indent=2, ensure_ascii=False))['translations'][0]['translation']
            return translated_result
    except Exception as e:
        logging.error(traceback.format_exc())
        # print('translated length: ' + str(len(translated_list)))
        # print('detected lang length : ' + str(len(detected_lang_list)))
        # print('detected conf length : ' + str(len(detected_lang_confidence_list)))

for text in to_translate_list:
    translated = translateService(text)
    translated_list.append(translated)

# print('to translate length: ' + str(len(to_translate_list)))
# print('translated length: ' + str(len(translated_list)))
# print('detected lang length : ' + str(len(detected_lang_list)))
# print('detected conf length : ' + str(len(detected_lang_confidence_list)))

# headers
headers = [col1, col2, col3, col4, col5]

merged_columns_tuple_list = [(client_id_list[i], 
                              to_translate_list[i], 
                              translated_list[i],
                              detected_lang_list[i],
                              detected_lang_confidence_list[i]) for i in range(0, len(detected_lang_list))]

# parse df_new for output
df_new = spark.createDataFrame(merged_columns_tuple_list, schema = headers)
# df_new.show()
# generate as output file
df_new.coalesce(1).write.csv(output_directory_name, header=True)


22/04/26 20:51:04 WARN Utils: Your hostname, LAPTOP-USGLV2R6 resolves to a loopback address: 127.0.1.1; using 172.28.45.209 instead (on interface eth0)
22/04/26 20:51:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/26 20:51:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

to translate length: 218
translated length: 218
detected lang length : 189
detected conf length : 189
+------+--------------------+--------------------+-----------------+------------------+
|clt Id|          Text Entry|    Translated Entry|Language Detected|        Confidence|
+------+--------------------+--------------------+-----------------+------------------+
|   101|Called Joe @ 900-...|Appelé Joe @ 900-...|               en|0.9999999999790786|
|   101|The RTP sent Nov ...|Le RTP envoyé en ...|               en|0.9999999999930513|
|   101|Sending RTP to  G...|Envoi de RTP au g...|               en|0.9999999999993567|
|   101|TRANSFÉRÉ DE L'IN...|TRANSFERRED FROM ...|               fr|0.9992467534254423|
|   101|Called TP at (999...|Appelée TP au (99...|               en|0.9999999999319831|
|   101|RTP on RT acct ha...|RTP on RT acct ha...|               en|0.9999986303707714|
|   102|Recevied the foll...|Récupéré les paie...|               en|0.9998525906794822|
|   102|*** Comple