## Translate text on Azure Data Lake Store using Azure Databricks and Azure Translate API

This example reads text from a CSV file on a Azure Data Lake Store (ADLS), translates it into English and writes the result back to ADLS.
The text can be in various languages, the Translate API will automatically infer the source language.

Requires:
 - Python 3
 - An Azure AD service principle with access to the ADLS
 - A Translate service provisioned in Azure and its subscription key

In [2]:
### Variables ###

## TODO: Fill in those values ##########################

translationApiKey = "*******************"

aadServicePrinciple_clientId = "***-******************-*******-**-***"
aadServicePrinciple_key = "***************"
aadTenantId = "**-****-****-****-****"

adlsAccountName = "MYADLSNAME"
inputFilePath = "/cognitiveservicesdemo/translation_input.csv"
outputPath = "/cognitiveservicesdemo/translation_result"

In [3]:
# Set ADLS credentials
# See here for details how to configure: https://docs.databricks.com/spark/latest/data-sources/azure/azure-datalake.html#id3

spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")
spark.conf.set("dfs.adls.oauth2.client.id", aadServicePrinciple_clientId)
spark.conf.set("dfs.adls.oauth2.credential", aadServicePrinciple_key)
spark.conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/" + aadTenantId + "/oauth2/token")

In [4]:
# Example source: https://docs.microsoft.com/en-us/azure/cognitive-services/translator/quickstart-python-translate#translate-request

###### Add your API key below ####

# -*- coding: utf-8 -*-

import http.client, urllib.parse, uuid, json

subscriptionKey = translationApiKey

host = 'api.cognitive.microsofttranslator.com'
path = '/translate?api-version=3.0'

# Translate to English
params = "&to=en";

def translate (inputText):

    requestBody = [{
      'Text' : inputText,
    }]
    
    content = json.dumps(requestBody, ensure_ascii=False).encode('utf-8')
  
    headers = {
        'Ocp-Apim-Subscription-Key': subscriptionKey,
        'Content-type': 'application/json',
        'X-ClientTraceId': str(uuid.uuid4())
    }

    conn = http.client.HTTPSConnection(host)
    conn.request ("POST", path + params, content, headers)
    response = conn.getresponse ()
    result = response.read ()
    jsonOutput = json.loads(result.decode("utf-8"))
    return jsonOutput

In [5]:
def translateText(inputText):
    translationResult = translate(inputText)
    # Since we only translate to English, we always just get the first translation result. TODO: Add some error/null handling here
    translatedText = translationResult[0]['translations'][0]['text']
    return translatedText

In [6]:
# Define translateText function as UDF so we can use it below directly on the DF
from pyspark.sql.functions import udf
translateTextUdf=udf(translateText)

In [7]:
# Read CSV from ADLS
inputDF = sqlContext \
            .read \
            .format("csv") \
            .load("adl://" + adlsAccountName +".azuredatalakestore.net" + inputFilePath, header='true', inferSchema='true')

In [8]:
# Translate 'text' column in each row of the dataframe and write it back into a new column 'translated'
translatedDF = inputDF.withColumn("translatedEN", translateTextUdf("text"))

In [9]:
display(translatedDF)

In [10]:
# Write translated dataframe back as CSV to the ADLS
# Note: this writed back a partitioned CSV file (or multiple). If you want to merge them, you can use .coalesce(1) but beware of possible performance implications for big data sets
translatedDF \
  .write.option("header", "true") \
  .option("quoteAll", "true") \
  .option("encoding", "UTF-8") \
  .mode("Overwrite") \
  .csv("adl://" + adlsAccountName +".azuredatalakestore.net" + outputPath + ".tmp/")

In [11]:
## Optional ##
## If the write() above only produced one .csv file (only one partition, or coalesce(1) was used), you can use these to copy and rename the resulting csv file
csv = list(filter(lambda file: file.path.endswith(".csv"), dbutils.fs.ls("adl://" + adlsAccountName +".azuredatalakestore.net" + outputPath + ".tmp/")))
dbutils.fs.cp(csv[0].path, "adl://" + adlsAccountName +".azuredatalakestore.net" + outputPath + ".csv")
dbutils.fs.rm("adl://" + adlsAccountName +".azuredatalakestore.net" + outputPath + ".tmp", recurse= True)