From 3b17dae047e134bed9d22fe6ef10d187f21b3fa1 Mon Sep 17 00:00:00 2001 From: Sudhindra Kovalam Date: Thu, 9 Dec 2021 20:54:57 -0800 Subject: [PATCH] feat: Add support for Azure Maps Search APIs (#1294) * feat) Add support for Azure Maps Geocode and Batch Geocode - Created a geospatial package to support all geospatial functions - Added support for GetSearchAddress - TODO: Fix the flattening of batch results * fix) fixup the basic use testcase for batch geocoding - fixed how the columns are fetched and flattened - added asserts to ensure the results `df` contains all rows * fix) fix style errors reported in pipeline * fix) fix pending style errors * chore: Implement Review Comments - [Review Comment] Removed `AzureMapsBase` class to simplify dependency - [Review Comment] Renamed `BatchAsyncReply` to `MapsAsyncReply` - [Review Comment] Removed `=None` assignments in schema classes - [Enhancements] Update Output Column to only return batch results making is easier to flatten responses. * chore: cleanup - Updated test case to include realistic address data - fixed a potential issue with `MapAsyncReply` appending subscription key twice * chore) Explicitly Set `User-Agent` header to `synapseml ...` - Explicitly set the user agent header to track SynapseML usage * chore: cleanup unused code - cleaned up `HasAddressInput` Co-authored-by: Mark Hamilton --- .../ml/geospatial/AzMapsSearchSchemas.scala | 298 ++++++++++++++++++ .../ml/geospatial/AzureMapsHelpers.scala | 113 +++++++ .../ml/geospatial/AzureMapsSearch.scala | 44 +++ .../ml/geospatial/AzureMapsSearch.scala | 64 ++++ .../microsoft/azure/synapse/ml/Secrets.scala | 1 + 5 files changed, 520 insertions(+) create mode 100644 cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzMapsSearchSchemas.scala create mode 100644 cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsHelpers.scala create mode 100644 cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsSearch.scala create mode 100644 cognitive/src/test/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsSearch.scala diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzMapsSearchSchemas.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzMapsSearchSchemas.scala new file mode 100644 index 0000000000..9795f9a7da --- /dev/null +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzMapsSearchSchemas.scala @@ -0,0 +1,298 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.azure.synapse.ml.geospatial + +import com.microsoft.azure.synapse.ml.core.schema.SparkBindings +import java.util.Date + +object Address extends SparkBindings[Address] + +case class Address ( + // The building number on the street. DEPRECATED, use streetNumber instead. + buildingNumber: Option[String], + // The street name. DEPRECATED, use streetName instead. + street: Option[String], + // The name of the street being crossed. + crossStreet: Option[String], + // The building number on the street. + streetNumber: Option[String], + // The codes used to unambiguously identify the street + routeNumbers: Option[Seq[Integer]], + // The street name. + streetName: Option[String], + // The street name and number. + streetNameAndNumber: Option[String], + // City / Town + municipality: Option[String], + // Sub / Super City + municipalitySubdivision: Option[String], + // Named Area + countryTertiarySubdivision: Option[String], + // County + countrySecondarySubdivision: Option[String], + // State or Province + countrySubdivision: Option[String], + // Postal Code / Zip Code + postalCode: Option[String], + // Extended postal code (availability is dependent on the region). + extendedPostalCode: Option[String], + // Country (Note: This is a two-letter code, not a country name.) + countryCode: Option[String], + // Country name + country: Option[String], + // ISO alpha-3 country code + countryCodeISO3: Option[String], + // An address line formatted according to the formatting rules of a Result's country of origin, + // or in the case of a country, its full country name. + freeformAddress: Option[String], + // The full name of a first level of country administrative hierarchy. + // This field appears only in case countrySubdivision is presented in an abbreviated form. + // Only supported for USA, Canada, and Great Britain. + countrySubdivisionName: Option[String], + // An address component which represents the name of a geographic area or locality that groups a number of + // addressable objects for addressing purposes, without being an administrative unit. + // This field is used to build the `freeformAddress` property. + localName: Option[String], + // Bounding box coordinates. + boundingBox: Option[BoundingBox]) + +case class AddressRanges ( + // Address range on the left side of the street. + rangeLeft: Option[String], + // Address range on the right side of the street. + rangeRight: Option[String], + from: Option[LatLongPairAbbreviated], + to: Option[LatLongPairAbbreviated]) + +case class BrandName ( + // Name of the brand + name: Option[String]) + +case class BoundingBox ( + // top-left + topLeftPoint: Option[LatLongPairAbbreviated], + // bottom-right + btmRightPoint: Option[LatLongPairAbbreviated]) + +case class Classification ( + // Code property + code: Option[String], + // Names array + names: Option[Seq[ClassificationName]]) + +case class ClassificationName ( + // Name Locale property + nameLocale: Option[String], + // Name property + name: Option[String]) + + +case class DataSources ( + geometry: Option[Geometry]) + +case class EntryPoint ( + // The type of entry point. Value can be either _main_ or _minor_. + `type`: Option[String], + position: Option[LatLongPairAbbreviated]) + +case class ErrorDetail ( + // The error code. + code: Option[String], + // The error message. + message: Option[String], + // The error target. + target: Option[String], + // The error details. + details: Option[Seq[String]], + // The error additional info. + additionalInfo: Option[Seq[ErrorAdditionalInfo]]) + +case class ErrorAdditionalInfo ( + // The additional info type. + `type`: Option[String], + // The additional info. + info: Option[String]) + +case class Geometry ( + // Pass this as geometryId to the + // [Get Search Polygon](https://docs.microsoft.com/rest/api/maps/search/getsearchpolygon) API to fetch + // geometry information for this result. + id: Option[String]) + +case class LatLongPair ( + // Latitude property + latitude: Option[Double], + // Longitude property + longitude: Option[Double]) + +case class LatLongPairAbbreviated ( + // Latitude property + lat: Option[Double], + // Longitude property + lon: Option[Double]) + +object LongRunningOperationResult extends SparkBindings[LongRunningOperationResult] +case class LongRunningOperationResult ( + // The Id for this long-running operation. + operationId: Option[String], + // The status state of the request. + status: Option[String], + // The created timestamp. + created: Option[Date], + error: Option[ErrorDetail], + warning: Option[ErrorDetail]) + + +case class OperatingHours ( + // Value used in the request: none or \"nextSevenDays\" + mode: Option[String], + // List of time ranges for the next 7 days + timeRanges: Option[Seq[OperatingHoursTimeRange]]) + +case class OperatingHoursTime ( + // Represents current calendar date in POI time zone, e.g. \"2019-02-07\". + date: Option[String], + // Hours are in the 24 hour format in the local time of a POI; possible values are 0 - 23. + hour: Option[Integer], + // Minutes are in the local time of a POI; possible values are 0 - 59. + minute: Option[Integer]) + +case class OperatingHoursTimeRange ( + // The point in the next 7 days range when a given POI is being opened, or the beginning of the range + // if it was opened before the range. + startTime: Option[OperatingHoursTime], + // The point in the next 7 days range when a given POI is being closed, or the beginning of the range + // if it was closed before the range. + endTime: Option[OperatingHoursTime]) + + +case class PointOfInterest ( + // Name of the POI property + name: Option[String], + // Telephone number property + phone: Option[String], + // Website URL property + url: Option[String], + // The list of the most specific POI categories + categorySet: Option[Seq[PointOfInterestCategorySet]], + // Classification array + classifications: Option[Seq[Classification]], + // Brands array. The name of the brand for the POI being returned. + brands: Option[Seq[BrandName]], + openingHours: Option[OperatingHours]) + +case class PointOfInterestCategorySet ( + // Category ID + id: Option[Integer]) + + +object ReverseSearchAddressResult extends SparkBindings[ReverseSearchAddressResult] +case class ReverseSearchAddressResult ( + // The error object. + error: Option[ErrorDetail], + // Summary object for a Search Address Reverse response + summary: Option[SearchSummary], + // Addresses array + addresses: Option[Seq[ReverseSearchAddressResultItem]]) + +case class ReverseSearchAddressResultItem ( + address: Option[Address], + // Position property in the form of \"{latitude},{longitude}\" + position: Option[String], + roadUse: Option[Seq[String]], + // Information on the type of match. One of: * AddressPoint * HouseNumberRange * Street + matchType: Option[String]) + +object ReverseSearchAddressBatchResult extends SparkBindings[ReverseSearchAddressBatchResult] +case class ReverseSearchAddressBatchResult ( + summary: Option[SearchBatchSummary], + batchItems: Option[Seq[ReverseSearchAddressBatchItem]]) + +object ReverseSearchAddressBatchItem extends SparkBindings[ReverseSearchAddressBatchItem] +case class ReverseSearchAddressBatchItem ( + // HTTP request status code. + statusCode: Option[Integer], + // The result of the query. + response: Option[ReverseSearchAddressResult]) + +case class SearchBatchSummary( + successfulRequests: Option[Integer], + totalRequests: Option[Integer] +) + +object SearchAddressBatchProcessResult extends SparkBindings[SearchAddressBatchProcessResult] +case class SearchAddressBatchProcessResult ( + // Summary of the results for the batch request + summary: Option[SearchBatchSummary], + // Array containing the batch results. + batchItems: Option[Seq[SearchAddressBatchItem]]) + +object SearchAddressBatchItem extends SparkBindings[SearchAddressBatchItem] +case class SearchAddressBatchItem ( + // HTTP request status code. + statusCode: Option[Integer], + // The result of the query. + response: Option[SearchAddressResult]) + +object SearchAddressResult extends SparkBindings[SearchAddressResult] +case class SearchAddressResult ( + // The error object. + error: Option[ErrorDetail], + // Summary object for a Search API response + summary: SearchSummary, + // A list of Search API results. + results: Option[Seq[SearchAddressResultItem]]) + +case class SearchSummary ( + // The query parameter that was used to produce these search results. + query: Option[String], + // The type of query being returned: NEARBY or NON_NEAR. + queryType: Option[String], + // Time spent resolving the query, in milliseconds. + queryTime: Option[Integer], + // Number of results in the response. + numResults: Option[Integer], + // Maximum number of responses that will be returned + limit: Option[Integer], + // The starting offset of the returned Results within the full Result set. + offset: Option[Integer], + // The total number of Results found. + totalResults: Option[Integer], + // The maximum fuzzy level required to provide Results. + fuzzyLevel: Option[Integer], + // Indication when the internal search engine has applied a geospatial bias to improve the ranking of results. + // In some methods, this can be affected by setting the lat and lon parameters where available. + // In other cases it is purely internal. + geoBias: Option[LatLongPairAbbreviated]) + +case class SearchAddressResultItem ( + // One of: * POI * Street * Geography * Point Address * Address Range * Cross Street + `type`: Option[String], + // Id property + id: Option[String], + score: Option[Double], + dist: Option[Double], + // Information about the original data source of the Result. Used for support requests. + info: Option[String], + entityType: Option[String], + poi: Option[PointOfInterest], + address: Option[Address], + position: Option[LatLongPairAbbreviated], + viewport: Option[Viewport], + // Array of EntryPoints. Those describe the types of entrances available at the location. + // The type can be \"main\" for main entrances such as a front door, or a lobby, and \"minor\", + // for side and back doors. + entryPoints: Option[Seq[EntryPoint]], + addressRanges: Option[AddressRanges], + // Optional section. Reference geometry id for use with the + // [Get Search Polygon](https://docs.microsoft.com/rest/api/maps/search/getsearchpolygon) API. + dataSources: Option[DataSources], + // Information on the type of match. One of: * AddressPoint * HouseNumberRange * Street + matchType: Option[String], + // Detour time in seconds. Only returned for calls to the Search Along Route API. + detourTime: Option[Integer]) + +case class Viewport ( + topLeftPoint: Option[LatLongPairAbbreviated], + btmRightPoint: Option[LatLongPairAbbreviated]) diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsHelpers.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsHelpers.scala new file mode 100644 index 0000000000..ddcd4b4f4c --- /dev/null +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsHelpers.scala @@ -0,0 +1,113 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.azure.synapse.ml.geospatial + +import com.microsoft.azure.synapse.ml.build.BuildInfo +import com.microsoft.azure.synapse.ml.cognitive.{HasAsyncReply, HasServiceParams, HasSubscriptionKey, URLEncodingUtils} +import com.microsoft.azure.synapse.ml.io.http.{CustomInputParser, HTTPInputParser, HasURL} +import com.microsoft.azure.synapse.ml.io.http._ +import com.microsoft.azure.synapse.ml.io.http.HandlingUtils._ +import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase} +import org.apache.http.entity.{AbstractHttpEntity, StringEntity} +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.spark.ml.param._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.StructType + +import java.net.{URI, URLEncoder} +import java.util.concurrent.TimeoutException +import scala.concurrent.blocking +import scala.language.{existentials, postfixOps} +import spray.json.DefaultJsonProtocol.{StringJsonFormat, seqFormat} + +trait HasAddressInput extends HasServiceParams with HasSubscriptionKey with HasURL { + val address = new ServiceParam[Seq[String]]( + this, "address", "the address to geocode") + + def getAddress: Seq[String] = getScalarParam(address) + + def setAddress(v: Seq[String]): this.type = setScalarParam(address, v) + + def setAddress(v: String): this.type = setScalarParam(address, Seq(v)) + + def getAddressCol: String = getVectorParam(address) + + def setAddressCol(v: String): this.type = setVectorParam(address, v) + + protected def prepareEntity: Row => Option[AbstractHttpEntity] + + protected def contentType: Row => String = { _ => "application/json" } + + protected def inputFunc(schema: StructType): Row => Option[HttpRequestBase] = { + { row: Row => + if (shouldSkip(row)) { + None + } else { + + val queryParams = "?" + URLEncodingUtils.format(Map("api-version" -> "1.0", + "subscription-key" -> getSubscriptionKey)) + val post = new HttpPost(new URI(getUrl + queryParams)) + post.setHeader("Content-Type", "application/json") + post.setHeader("User-Agent", s"synapseml/${BuildInfo.version}${HeaderValues.PlatformInfo}") + val addressesCol = getValueOpt(row, address) + val encodedAddresses = addressesCol.get.map(x => URLEncoder.encode(x, "UTF-8")).toList + val payloadItems = encodedAddresses.map(x => s"""{ "query": "?query=$x&limit=1" }""").mkString(",") + val payload = s"""{ "batchItems": [ $payloadItems ] }""" + post.setEntity(new StringEntity(payload)) + Some(post) + } + } + } + + protected def getInternalInputParser(schema: StructType): HTTPInputParser = { + new CustomInputParser().setNullableUDF(inputFunc(schema)) + } +} + +trait MapsAsyncReply extends HasAsyncReply { + + protected def queryForResult(key: Option[String], client: CloseableHttpClient, + location: URI): Option[HTTPResponseData] = { + val statusRequest = new HttpGet() + statusRequest.setURI(location) + statusRequest.setHeader("User-Agent", s"synapseml/${BuildInfo.version}${HeaderValues.PlatformInfo}") + val resp = convertAndClose(sendWithRetries(client, statusRequest, getBackoffs)) + statusRequest.releaseConnection() + val status = resp.statusLine.statusCode + if (status == 202) { + None + } else if (status == 200) { + Some(resp) + } else { + throw new RuntimeException(s"Received unknown status code: $status") + } + } + + protected def handlingFunc(client: CloseableHttpClient, + request: HTTPRequestData): HTTPResponseData = { + val response = HandlingUtils.advanced(getBackoffs: _*)(client, request) + if (response.statusLine.statusCode == 202) { + + val maxTries = getMaxPollingRetries + val location = new URI(response.headers.filter(_.name.toLowerCase() == "location").head.value) + val it = (0 to maxTries).toIterator.flatMap { _ => + queryForResult(None, client, location).orElse({ + blocking { + Thread.sleep(getPollingDelay.toLong) + } + None + }) + } + if (it.hasNext) { + it.next() + } else { + throw new TimeoutException( + s"Querying for results did not complete within $maxTries tries") + } + } else { + response + } + } +} + diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsSearch.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsSearch.scala new file mode 100644 index 0000000000..74043870e9 --- /dev/null +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsSearch.scala @@ -0,0 +1,44 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.azure.synapse.ml.geospatial + +import com.microsoft.azure.synapse.ml.cognitive.{CognitiveServicesBaseNoHandler, HasInternalJsonOutputParser} +import com.microsoft.azure.synapse.ml.logging.BasicLogging +import com.microsoft.azure.synapse.ml.stages.Lambda +import org.apache.http.entity.AbstractHttpEntity +import org.apache.spark.ml.{ComplexParamsReadable, NamespaceInjections, PipelineModel} +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{DataType, StructType} + +object AzureMapsAPIConstants { + val DefaultAPIVersion = "1.0" +} + +object AddressGeocoder extends ComplexParamsReadable[AddressGeocoder] + +class AddressGeocoder(override val uid: String) + extends CognitiveServicesBaseNoHandler(uid) with HasAddressInput + with HasInternalJsonOutputParser + with MapsAsyncReply + with BasicLogging { + + def this() = this(Identifiable.randomUID("AddressGeocoder")) + + setDefault( + url -> "https://atlas.microsoft.com/search/address/batch/json") + + override protected def prepareEntity: Row => Option[AbstractHttpEntity] = { _ => None } + override protected def responseDataType: DataType = SearchAddressBatchProcessResult.schema + + override protected def getInternalTransformer(schema: StructType): PipelineModel = { + val basePipeline = super.getInternalTransformer(schema) + val stages = Array( + basePipeline, + Lambda(_.withColumn(getOutputCol, col("output.batchItems"))) + ) + NamespaceInjections.pipelineModel(stages) + } +} diff --git a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsSearch.scala b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsSearch.scala new file mode 100644 index 0000000000..abb2ee46d0 --- /dev/null +++ b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/geospatial/AzureMapsSearch.scala @@ -0,0 +1,64 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.azure.synapse.ml.geospatial + +import com.microsoft.azure.synapse.ml.Secrets +import com.microsoft.azure.synapse.ml.core.test.fuzzing.{TestObject, TransformerFuzzing} +import com.microsoft.azure.synapse.ml.stages.{FixedMiniBatchTransformer, FlattenBatch} +import org.apache.spark.ml.util.MLReadable +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.col + +trait AzureMapsKey { + lazy val azureMapsKey: String = sys.env.getOrElse("AZURE_MAPS_KEY", Secrets.AzureMapsKey) +} + +class AzureMapSearchSuite extends TransformerFuzzing[AddressGeocoder] with AzureMapsKey { + + import spark.implicits._ + + lazy val df: DataFrame = Seq( + "One, Microsoft Way, Redmond", + "Katajatie 7, 04260 Kerava, Suomi", + "Via Roma 261, Via Roma, Palermo, PA", + "Артемовская улица, 17, Самара", + "Alconbury Hill, Huntingdon PE28 4HY", + "玉里鎮城東六街26號", + "jalan haji abah no 1 kelurahan pinang kecamatan pinang tangerang", + "Ulica Aleksinackih rudara 10a, 11070 Beograd, Srbija", + "Domingo Campos Lagos 1887", + "Schelmenwasenstraße Stuttgart 70567 Baden-Württemberg DE", + "1014 Indian Pass Rd, Port St Joe, FL 32456", + "400 Broad St, Seattle", + "350 5th Ave, New York", + "Pike Pl, Seattle", + "Champ de Mars, 5 Avenue Anatole France, 75007 Paris" + ).toDF("address") + + lazy val batchGeocodeAddresses: AddressGeocoder = new AddressGeocoder() + .setSubscriptionKey(azureMapsKey) + .setAddressCol("address") + .setOutputCol("output") + + test("Basic Batch Geocode Usage") { + val batchedDF = batchGeocodeAddresses.transform(new FixedMiniBatchTransformer().setBatchSize(5).transform(df)) + val flattenedResults = new FlattenBatch().transform(batchedDF) + .select( + col("address"), + col("output.response.results").getItem(0).getField("position") + .getField("lat").as("latitude"), + col("output.response.results").getItem(0).getField("position") + .getField("lon").as("longitude")) + .collect() + + assert(flattenedResults != null) + assert(flattenedResults.length == 15) + assert(flattenedResults.toSeq(0).get(1) == 47.64016) + } + + override def testObjects(): Seq[TestObject[AddressGeocoder]] = + Seq(new TestObject[AddressGeocoder](batchGeocodeAddresses, df)) + + override def reader: MLReadable[_] = AddressGeocoder +} diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/Secrets.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/Secrets.scala index 52be84202f..40d8c97052 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/Secrets.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/Secrets.scala @@ -54,4 +54,5 @@ object Secrets { lazy val SynapseStorageKey: String = getSecret("mmlsparkeuap-key") lazy val SynapseSpnKey: String = getSecret("synapse-spn-key") + lazy val AzureMapsKey: String = getSecret("azuremaps-api-key") }