In [None]:
mport databricks.koalas as ks
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import string
import re
spark = SparkSession.builder.master("local[*]").appName("URBANICITY").getOrCreate()
from pyspark.sql import SparkSession

In [None]:
cmd_parquet_uri = "/mnt/AA/ba008/cmd/transformed/marrakech_city_200706_100000_cmd.parquet"
cmd_channels = ["HOTEL", "RECA", "SNACK"]
cmd_columns = ["CUSTOMER_ID", "LONGITUDE", "LATITUDE", "CUSTOMER", "STREET"]

In [None]:
# ta_parquet_uri = "/mnt/AA/ba008/tripadvisor/transformed/marrakech_city_restaurants_and_hotels_to_match.parquet"
ta_parquet_uri ="/mnt/AA/ba008/tripadvisor/outputs/ta_combined_l3_morocco_prepped_20201015.parquet"
ta_columns = ["id", "name", "address", "location_lat", "location_lon"]
 
# ta_no_geo_parquet_uri = "/mnt/AA/ba008/tripadvisor/transformed/marrakech_no_geolocation_restaurants_and_hotels_to_match.parquet"
ta_no_geo_parquet_uri = ta_parquet_uri


In [None]:
# parameters for the string cleaning functions 
name_column_blacklist = ["cafe", "cf", "restaurant", "estaurant", "rest", "ag", "ste", "café", "snack", "sn", "hotel", "sarl", "rotisserie", "marrakech", "management"]
name_column_regex_replace = {r"\'": "", r"\d{5}": "", r"\s+": " "}
address_column_blacklist = []
address_column_regex_replace = {r"\'": "", r"\s+": " ", "avenu ": "av ", "boulevard ": "bd "}

In [1]:
%run ./01_text_prep_functions.ipynb

[nltk_data] Downloading package punkt to C:\Users\Salif
[nltk_data]     SAWADOGO\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to C:\Users\Salif
[nltk_data]     SAWADOGO\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
%run ./geospatial_functions

In [None]:
def match_join(l_sdf: pyspark.sql.DataFrame,
               l_id: str,
               l_lon: str,
               l_lat: str,
               l_name: str,
               l_addr: str,
               r_sdf: pyspark.sql.DataFrame,
               r_id: str,
               r_lon: str,
               r_lat: str,
               r_name: str,
               r_addr: str,
               distance_threshold_m: float = 0,
               similarity_threshold: float = 0.59,
               name_similarity_weight: float = 0.75,
               address_similarity_weight: float = 0.15,
               dist_similarity_weight: float = 0.1,
               no_geolocation: bool = False
              ) -> pyspark.sql.DataFrame:
  """This function performs a name and address similarity based matching with an
  optional geolocation filtering (if coordinates are available) between two sources.
  The left (first) source is treated as the primary one even though the results produced
  are 1-to-1 in the sense that each item from the right (second) source is matched to just
  one item from the left source. In case an item from the right source is the best match
  for multiple items from the left source, the tie is broken by choosing the left item
  which has the highest similarity score. The matching loop continues until no further
  matches like that could be done.
 
  Args:
      l_sdf (pyspark.sql.DataFrame): The first source to be matched.
      l_id (str): The name of the primary key column of the first source.
      l_lon (str): The name of the longitude column of the first source.
      l_lat (str): The name of the latitude column of the first source.
      l_name (str): The name of the item name column of the first source.
      l_addr (str): The name of the item address column of the first source.
      r_sdf (pyspark.sql.DataFrame): The second source to be matched.
      r_id (str): The name of the primary key column of the second source.
      r_lon (str): The name of the longitude column of the second source.
      r_lat (str): The name of the latitude column of the second source.
      r_name (str): The name of the item name column of the second source.
      r_addr (str): The name of the item address column of the second source.
      distance_threshold_m (float, optional): When geolocation is available, the radius in meters within which items
        from the second source will be matched to items from the first source. Defaults to 0.
      similarity_threshold (float, optional): The similarity threshold above which a matched pair is accepted and
        below which the match is rejected. Defaults to 0.59.
      name_similarity_weight (float, optional): The weight given to the name similarity. Defaults to 0.75.
      address_similarity_weight (float, optional): The weight given to the address similarity. Defaults to 0.15.
      dist_similarity_weight (float, optional): The weight given to the distance similarity (geographical closeness). Defaults to 0.1.
      no_geolocation (bool, optional): Flags whether the geolocation filtering should be performed. Defaults to False.
 
  Returns:
      pyspark.sql.DataFrame: A Spark DataFrame with the 1-1 matches including the input columns and some of the numerical
        results such as the different similarity scores.
  """
  l_slice = l_sdf[[l_id, l_lon, l_lat, l_name, l_addr]]
  r_slice = r_sdf[[r_id, r_lon, r_lat, r_name, r_addr]]
  
  l_og_cols = l_slice.columns
  r_og_cols = r_slice.columns
  for c in l_og_cols:
    l_slice = l_slice.withColumnRenamed(c, "L_"+c)
  for c in r_og_cols:
    r_slice = r_slice.withColumnRenamed(c, "R_"+c)
  
  if not no_geolocation:
    fully_joined = l_slice.join(r_slice, haversine_distance_sdf(F.col("L_"+l_lon), F.col("L_"+l_lat), F.col("R_"+r_lon), F.col("R_"+r_lat)) <= distance_threshold_m, how="inner")
    similarity_expr = F.col("name_similarity")*name_similarity_weight + F.col("address_similarity")*address_similarity_weight + F.col("dist_similarity")*dist_similarity_weight
  else:
    fully_joined = l_slice.crossJoin(r_slice)
    similarity_expr = F.col("name_similarity")*name_similarity_weight + F.col("address_similarity")*address_similarity_weight
  fully_joined = fully_joined \
                          .withColumn("dist_m", haversine_distance_sdf(F.col("L_"+l_lon), F.col("L_"+l_lat), F.col("R_"+r_lon), F.col("R_"+r_lat))) \
                          .withColumn("name_similarity", compound_similarity_sdf(F.col("L_"+l_name),F.col("R_"+r_name))) \
                          .withColumn("address_similarity", compound_similarity_sdf(F.col("L_"+l_addr),F.col("R_"+r_addr))) \
                          .withColumn("dist_similarity", (distance_threshold_m - F.col("dist_m"))/distance_threshold_m) \
                          .withColumn("similarity", similarity_expr) \
                          .withColumn("rank", F.dense_rank().over(Window.partitionBy('L_' + l_id).orderBy(F.col("similarity").desc()))) #.persist()
  
  first_run = True
  while True:
    top_ranks = fully_joined.filter("rank == 1").filter(F.col("similarity") >= similarity_threshold)
    r_max_similarity = top_ranks["R_"+r_id, "similarity"].groupby("R_"+r_id).max().withColumnRenamed("max(similarity)", "similarity")
    if first_run:
      result = top_ranks.join(r_max_similarity, on=["R_"+r_id, "similarity"])
      if result.rdd.isEmpty():
        return result
      first_run = False
    else:
      running_result = top_ranks.join(r_max_similarity, on=["R_"+r_id, "similarity"])
      if running_result.rdd.isEmpty():
        return result
      result = result.union(running_result)
    exclusion_ids = result[["L_"+l_id, "R_"+r_id]].toPandas()
    l_exclude = list(exclusion_ids["L_"+l_id])
    r_exclude = list(exclusion_ids["R_"+r_id])
    fully_joined = fully_joined.filter(~F.col("L_"+l_id).isin(l_exclude)) \
                               .filter(~F.col("R_"+r_id).isin(r_exclude)) \
                               .withColumn("rank", F.dense_rank().over(Window.partitionBy('L_' + l_id).orderBy(F.col("similarity").desc()))) #.persist()


In [None]:
cmd = spark.read.parquet(cmd_parquet_uri).where(F.col("CHANNEL_CUSTOM").isin(cmd_channels))[cmd_columns]
cmd.count()

In [None]:
ry:
  cmd_clean = cmd.withColumn("CUSTOMER_CLEAN", make_text_prep_func(word_blacklist=name_column_blacklist, regex_replace=name_column_regex_replace)(F.col("CUSTOMER"))) \
                 .withColumn("STREET_CLEAN", make_text_prep_func(word_blacklist=address_column_blacklist, regex_replace=address_column_regex_replace)(F.col("STREET")))
except:
  cmd_clean = cmd.withColumn("CUSTOMER_CLEAN", make_text_prep_func(word_blacklist=name_column_blacklist, regex_replace=name_column_regex_replace)(F.col("CUSTOMER"))) \
                 .withColumn("STREET_CLEAN", make_text_prep_func(word_blacklist=address_column_blacklist, regex_replace=address_column_regex_replace)(F.col("STREET")))
cmd_clean.show()

In [None]:
ta = spark.read.parquet(ta_parquet_uri).filter(F.col("location_lat").isNotNull())[ta_columns]
ta.show()

In [None]:
ta_clean = ta.withColumn("name_CLEAN", make_text_prep_func(word_blacklist=name_column_blacklist, regex_replace=name_column_regex_replace)(F.col("name"))) \
             .withColumn("address_CLEAN", make_text_prep_func(word_blacklist=address_column_blacklist, regex_replace=address_column_regex_replace)(ta_remove_address_tail(F.col("address"))))
ta_clean.show()

In [None]:
matched = match_join(cmd_clean, "CUSTOMER_ID", "LONGITUDE", "LATITUDE", "CUSTOMER_CLEAN", "STREET_CLEAN", ta_clean, "id", "location_lon", "location_lat", "name_CLEAN", "address_CLEAN", 2000, 0.59).persist()
matched.show()

In [None]:
matched.write.mode("overwrite").parquet("/mnt/AA/ba008/data_samples/matching/marrakech_city_matching_with_geoloc_20201015.parquet")

In [None]:
ta_no_geo = spark.read.parquet(ta_no_geo_parquet_uri).filter(F.col("location_lat").isNull())[ta_columns]

In [None]:
try:
  ta_no_geo_clean = ta_no_geo.withColumn("name_CLEAN", make_text_prep_func(word_blacklist=name_column_blacklist, regex_replace=name_column_regex_replace)(F.col("name"))) \
                             .withColumn("address_CLEAN", make_text_prep_func(word_blacklist=address_column_blacklist, regex_replace=address_column_regex_replace)(ta_remove_address_tail(F.col("address"))))
except:
  ta_no_geo_clean = ta_no_geo.withColumn("name_CLEAN", make_text_prep_func(word_blacklist=name_column_blacklist, regex_replace=name_column_regex_replace)(F.col("name"))) \
                           .withColumn("address_CLEAN", make_text_prep_func(word_blacklist=address_column_blacklist, regex_replace=address_column_regex_replace)(ta_remove_address_tail(F.col("address"))))
ta_no_geo_clean.show()

In [None]:
ta_no_geo_not_matched = ta_no_geo_clean.filter(~F.col("id").isin(list(matched[["R_id"]].toPandas())))
ta_no_geo_not_matched.count()

In [None]:
cmd_not_matched = cmd_clean.filter(~F.col("CUSTOMER_ID").isin(list(matched[["L_CUSTOMER_ID"]].toPandas())))
cmd_not_matched.show()

In [None]:
matched_no_geo = match_join(cmd_not_matched, "CUSTOMER_ID", "LONGITUDE", "LATITUDE", "CUSTOMER_CLEAN", "STREET_CLEAN", ta_no_geo_not_matched, "id", "location_lon", "location_lat", "name_CLEAN", "address_CLEAN", 0, 0.65, 0.8, 0.2, 0, no_geolocation=True).persist()
matched_no_geo.show()

In [None]:
matched_no_geo.write.mode("overwrite").parquet("/mnt/AA/ba008/data_samples/matching/marrakech_city_matching_no_geoloc_20201015.parquet")