In [2]:
%%configure -f

{

    "name": "synapseml",

    "conf": {

        "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.9.4",

        "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",

        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12",

        "spark.yarn.user.classpath.first": "true"

    }

}

In [75]:
import os
#key = os.environ['VISION_API_KEY']
key = "FORM REC KEY"
#search_key = os.environ['AZURE_SEARCH_KEY']
search_key = ""
#translator_key = os.environ['TRANSLATOR_KEY']
translator_key = ""
search_service = "vikurpad-search"
search_index = "form-demo-index2"

In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def blob_to_url(blob):
  [prefix, postfix] = blob.split("@")
  container = prefix.split("/")[-1]
  split_postfix = postfix.split("/")
  account = split_postfix[0]
  filepath = "/".join(split_postfix[1:])
  return "https://{}/{}/{}".format(account, container, filepath)


df2 = (spark.read.format("binaryFile")
       .load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
       .select("path")
       .limit(10)
       .select(udf(blob_to_url, StringType())("path").alias("url"))
       .cache()
      )

In [5]:
display(df2)

In [6]:
displayHTML("""
<embed src="https://mmlsparkdemo.blob.core.windows.net/ignite2021/form_svgs/Invoice11205.svg" width="40%"/>
""")

In [7]:
from synapse.ml.cognitive import AnalyzeInvoices

analyzed_df = (AnalyzeInvoices()
  .setSubscriptionKey(key)
  .setLocation("westus2")
  .setImageUrlCol("url")
  .setOutputCol("invoices")
  .setErrorCol("errors")
  .setConcurrency(5)
  .transform(df2)
  .cache())

In [8]:
display(analyzed_df)

In [9]:
from synapse.ml.cognitive import FormOntologyLearner

organized_df = (FormOntologyLearner()
  .setInputCol("invoices")
  .setOutputCol("extracted")
  .fit(analyzed_df)
  .transform(analyzed_df)
  .select("url", "extracted.*")
  .cache())

In [10]:
display(organized_df)

In [11]:
from pyspark.sql.functions import explode, col
itemized_df = (organized_df
        .select("*", explode(col("Items")).alias("Item"))
        .drop("Items")
        .select("Item.*", "*")
        .drop("Item"))

In [12]:
display(itemized_df)

In [23]:
fraud_threshold = itemized_df.stat.approxQuantile("Amount", [.9], .01)[0]
fraud_df = itemized_df.withColumn("FraudWarning", col("Amount") > fraud_threshold)

In [68]:
display(fraud_df.where(col("FraudWarning")))

In [67]:
fraud_df.where(col("FraudWarning")).createOrReplaceTempView("fraudtable")

In [69]:
%%spark
fraud_df.write.
option(Constants.SERVER, "SQLSERVER.sql.azuresynapse.net").
option(Constants.USER, "sqladminuser").
option(Constants.PASSWORD, "PASSWORD").
synapsesql("output.dbo.PySparkTable", Constants.INTERNAL)

In [71]:
%%spark
spark.sqlContext.sql("select * from fraudtable").write.
option(Constants.SERVER, "SQLSERVER.sql.azuresynapse.net").
option(Constants.USER, "sqladminuser").
option(Constants.PASSWORD, "PASSWORD").
synapsesql("output.dbo.PySparkTable", Constants.INTERNAL)

In [72]:
from synapse.ml.cognitive import Translate

translated_df = (Translate()
    .setSubscriptionKey(translator_key)
    .setLocation("eastus")
    .setTextCol("Description")
    .setErrorCol("TranslationError")
    .setOutputCol("output")
    .setToLanguage(["zh-Hans", "fr", "ru", "cy"])
    .setConcurrency(5)
    .transform(fraud_df)
    .withColumn("Translations", col("output.translations")[0])
    .drop("output", "TranslationError")
    .cache())

In [73]:
display(translated_df)

In [76]:
from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit

(translated_df
  .withColumn("DocID", monotonically_increasing_id().cast("string"))
  .withColumn("SearchAction", lit("upload"))
  .writeToAzureSearch(
    subscriptionKey=search_key,
    actionCol="SearchAction",
    serviceName=search_service,
    indexName=search_index,
    keyCol="DocID")
)

In [78]:
import requests
url = 'https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06'.format(search_service, search_index)
requests.post(url, json={"search": "*",
    "filter": "FraudWarning eq true",}, headers = {"api-key": search_key}).json()
