# Repliquer calcul cosine en Spark

# Objective(s)

Repliquer le code du calcul de la distance entre 2 listes de mots en Spark. Plus précisément, faire:
* créer 6 variables qui vont permettre a la réalisation des tests test_similarite_exception_words et test_distance_levhenstein_exception_words. Les six variables sont les suivantes:
  * unzip_inpi: Mot comparé coté inpi
  * unzip_insee: Mot comparé coté insee
  * max_cosine_distance: Score de similarité entre le mot compaté coté inpi et coté insee
  * levenshtein_distance: Nombre d'édition qu'il faut réaliser pour arriver à reproduire les deux mots
  * key_except_to_test: Champs clé-valeur pour toutes les possibiltés des mots qui ne sont pas en communs entre l'insee et l'inp

Metadata
* Epic: Epic 1
* US: US 1
* Date Begin: 10/13/2020
* Duration Task: 4
* Description: Traduire les codes SQL du calcul du Cosine en Spark
* Step type:  
* Status: Active
* Source URL: US 01 Transfert Spark
* Task type: Jupyter Notebook
* Users: Thomas Pernet
* Watchers: Thomas Pernet
* User Account: https://937882855452.signin.aws.amazon.com/console
* Estimated Log points: 10
* Task tag: #computation,#spark
* Toggl Tag: #data-preparation

# Knowledge

## List of candidates
* Calcul from scratch de la distance de cosine entre deux listes

## Connexion serveur

In [1]:
from awsPy.aws_authorization import aws_connector
from awsPy.aws_s3 import service_s3
from pathlib import Path
import pandas as pd
import numpy as np
import seaborn as sns
import os, shutil, json

path = os.getcwd()
parent_path = str(Path(path).parent)

path_cred = r"{}/credential_AWS.json".format(parent_path)
con = aws_connector.aws_instantiate(credential = path_cred,
                                       region = 'eu-west-3')

region = 'eu-west-3'
bucket = 'calfdata'

In [2]:
con = aws_connector.aws_instantiate(credential = path_cred,
                                       region = region)
client= con.client_boto()
s3 = service_s3.connect_S3(client = client,
                      bucket = bucket, verbose = False) 

In [3]:
pandas_setting = True
if pandas_setting:
    cm = sns.light_palette("green", as_cmap=True)
    pd.set_option('display.max_columns', None)
    pd.set_option('display.max_colwidth', None)

# Creation tables

## Steps

In [4]:
from pyspark.sql import SparkSession

spark = (SparkSession 
    .builder 
    .appName("Python Spark SQL basic example") 
    .config('spark.executor.memory', '4G') 
    .getOrCreate()
        )

In [5]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

In [6]:
s3_output = 'SQL_OUTPUT_ATHENA'
database = 'ets_siretisation'

query = """
SELECT row_id, inpi_except, insee_except
FROM "ets_siretisation"."ets_insee_inpi_statut_cas"
WHERE inpi_except IS NOT NULL AND insee_except IS NOT NULL
LIMIT 10

"""

output = s3.run_query(
            query=query,
            database=database,
            s3_output=s3_output,
  filename = 'test_list_spark', ## Add filename to print dataframe
  destination_key = None ### Add destination key if need to copy output
        )
output.head()

Unnamed: 0,row_id,inpi_except,insee_except
0,5,"[RUE, CHARLES, GILLE]","[BOULEVARD, PREUILLY]"
1,7,[JB],"[JEAN, BAPTISTE]"
2,8,[JB],"[JEAN, BAPTISTE]"
3,10,"[MARCELIN, BERTHELOT, CENTRE, D, ENTREPRISES]","[PROSPER, LEGOUTE]"
4,12,"[CHEMIN, BEL, AIR]","[RUE, VICTOR, HUGO]"


In [7]:
result = output.to_json(orient="records")
parsed = json.loads(result)
parsed

[{'row_id': 5,
  'inpi_except': '[RUE, CHARLES, GILLE]',
  'insee_except': '[BOULEVARD, PREUILLY]'},
 {'row_id': 7, 'inpi_except': '[JB]', 'insee_except': '[JEAN, BAPTISTE]'},
 {'row_id': 8, 'inpi_except': '[JB]', 'insee_except': '[JEAN, BAPTISTE]'},
 {'row_id': 10,
  'inpi_except': '[MARCELIN, BERTHELOT, CENTRE, D, ENTREPRISES]',
  'insee_except': '[PROSPER, LEGOUTE]'},
 {'row_id': 12,
  'inpi_except': '[CHEMIN, BEL, AIR]',
  'insee_except': '[RUE, VICTOR, HUGO]'},
 {'row_id': 19,
  'inpi_except': '[A, E]',
  'insee_except': '[AIME, EUGENIE, ZI, NORD]'},
 {'row_id': 21, 'inpi_except': '[ST]', 'insee_except': '[SAINT]'},
 {'row_id': 23,
  'inpi_except': '[LOTISSEMENT, N]',
  'insee_except': '[BOULEVARD, RAYMOND, POINCARE, PALAIS, ORIENTAL]'},
 {'row_id': 24,
  'inpi_except': '[LOTISSEMENT, N]',
  'insee_except': '[PLACE, AMIRAL, ORTOLI]'},
 {'row_id': 25,
  'inpi_except': '[RUE, FURSANNES]',
  'insee_except': '[COLLINE]'}]

In [8]:
list_ = []
for key, value in enumerate(parsed):
    dic = {
        'row_id':value['row_id'],
        'inpi_except':value['inpi_except'].strip('][').split(', ') ,
        'insee_except': value['insee_except'].strip('][').split(', ')
    }
    list_.append(dic)
with open('test_list.json', 'w') as outfile:
    json.dump(list_, outfile)

Recupération premier ID

In [9]:
test_id = parsed[0]['row_id']

In [10]:
df = spark.createDataFrame(list_)
df.printSchema()



root
 |-- inpi_except: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- insee_except: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- row_id: long (nullable = true)



In [11]:
df.first()

Row(inpi_except=['RUE', 'CHARLES', 'GILLE'], insee_except=['BOULEVARD', 'PREUILLY'], row_id=5)

Load weights

In [98]:
from pyspark.sql.types import StructType, ArrayType, StringType, FloatType, MapType
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from pyspark.mllib.linalg import DenseVector, Vectors, VectorUDT

In [13]:
list_to_vector_udf = F.udf(lambda x: Vectors.dense(x), VectorUDT())

In [14]:
path_list = 'word2vec_weights_100_v2.csv'
schema  = (
    StructType()
    .add('words', StringType(),True)
    .add("list_weigths", ArrayType(FloatType(), True))
)

cols = [str(i) for i in range(1, 101)]
weights = (spark.read.csv(path_list, header = True)
           .select('0',(F.array(cols)).cast(ArrayType(FloatType(), True)).alias('list_weights'))
           .withColumnRenamed("0","words")
           #.select('words', list_to_vector_udf("list_weights").alias('list_weights'))
          )
weights.dtypes

[('words', 'string'), ('list_weights', 'array<float>')]

In [15]:
weights.show()

+---------+--------------------+
|    words|        list_weights|
+---------+--------------------+
|      RUE|[-0.86694837, -0....|
|   AVENUE|[-2.206969, -0.84...|
|    ROUTE|[0.75751305, -0.2...|
|   CHEMIN|[-0.33908606, 0.7...|
|        D|[1.1837989, -1.35...|
|        L|[-1.9471866, -0.5...|
|BOULEVARD|[-1.4547851, 0.14...|
|    PLACE|[-0.21757227, 0.1...|
|      BIS|[0.76434, -0.1137...|
|    SAINT|[0.7002688, -2.22...|
|     LIEU|[-0.5478822, 1.27...|
|      DIT|[-1.040085, -0.21...|
|        A|[-1.2238628, -0.0...|
|     ZONE|[-1.985939, -1.25...|
|        B|[-0.30842575, 0.0...|
|    ALLEE|[-0.8832805, -2.1...|
|     JEAN|[-2.4126835, 1.26...|
|   CENTRE|[-3.6606867, -4.1...|
|RESIDENCE|[-0.3581616, -0.9...|
|      BAT|[-1.2430978, 0.25...|
+---------+--------------------+
only showing top 20 rows



Function UDF

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=udf#pyspark.sql.functions.udf

In [16]:
slen = F.udf(lambda s: len(s), IntegerType())

In [17]:
(
    weights
    .select(
        'words',
        slen('words')
    )
    .show()
)

+---------+---------------+
|    words|<lambda>(words)|
+---------+---------------+
|      RUE|              3|
|   AVENUE|              6|
|    ROUTE|              5|
|   CHEMIN|              6|
|        D|              1|
|        L|              1|
|BOULEVARD|              9|
|    PLACE|              5|
|      BIS|              3|
|    SAINT|              5|
|     LIEU|              4|
|      DIT|              3|
|        A|              1|
|     ZONE|              4|
|        B|              1|
|    ALLEE|              5|
|     JEAN|              4|
|   CENTRE|              6|
|RESIDENCE|              9|
|      BAT|              3|
+---------+---------------+
only showing top 20 rows



In [None]:
#@F.udf(returnType=FloatType())
#def dot(x, y):
#    return Vectors.dense(x).dot(Vectors.dense(y))

# Calcul Cosine depuis deux listes en Spark 3.0

Comme la fonction du cosine est assez simple, il n'y a pas besoin de créer une fonction (et le décorateur). Une fonction lambda est amplement suffisante

In [None]:
cosine = F.udf(lambda x, y: 
               (np.dot(x, y)/ (np.linalg.norm(x) * np.linalg.norm(y))).item(),
               FloatType())

In [None]:
test = (
    df
    .filter("row_id = {}".format(test_id))
    .select(
        'row_id',
        F.expr(
        """
explode(
map_from_entries(    
 arrays_zip(
  inpi_except, 
  transform(
    sequence(
      1, 
      size(inpi_except)
    ), 
    x -> insee_except
    )
    )
  )
)
      """
                        )
         .alias("inpi", "value")
    )
       
    .select(
        'row_id',
        "inpi",
        F.explode_outer("value")
        .alias("insee")
   )
    .join((weights.withColumnRenamed("words","inpi")),
        on = ['inpi'], how = 'left')
    .withColumnRenamed("list_weights","list_weights_inpi")
    .join((weights.withColumnRenamed("words","insee")),
       on = ['insee'], how = 'left')
    .withColumnRenamed("list_weights","list_weights_insee")
    .select('row_id',
            'inpi',
            'insee',
            "list_weights_inpi",
            "list_weights_insee",
            cosine("list_weights_inpi", "list_weights_insee").alias("cosine"),
           )
)

In [None]:
test.dtypes

In [None]:
test.show(truncate = True)

Verification 

In [None]:
df_pandas = test.toPandas()

In [None]:
df_pandas.head()

# Calcul Cosine depuis deux listes en Spark, version < 2.2

In [123]:
list_a = ["RUE", "CHARLES", "GILLE"]
list_b = ["BOULEVARD", "PREUILLY"]

test_list =[dict(zip([i], [list_b])) for i in list_a]
test_list

[{'RUE': ['BOULEVARD', 'PREUILLY']},
 {'CHARLES': ['BOULEVARD', 'PREUILLY']},
 {'GILLE': ['BOULEVARD', 'PREUILLY']}]

In [158]:
zip_except = F.udf(lambda x, y: [dict(zip([i], [y])) for i in x],
                   ArrayType(MapType(StringType(), ArrayType(StringType()))))

In [175]:
cosine = F.udf(lambda x, y: 
               (np.dot(x, y)/ (np.linalg.norm(x) * np.linalg.norm(y))).item(),
               FloatType())

In [182]:
test = (
    df
    .filter("row_id = {}".format(test_id))
    .select(
        'row_id',
        'inpi_except',
        'insee_except',
        F.explode(zip_except("inpi_except","insee_except")).alias("zip_except")
    )
    .select(
    'row_id',
        'inpi_except',
        'insee_except',
        F.explode("zip_except").alias("inpi", "value")
    )
    .select(
    'row_id',
        'inpi_except',
        'insee_except',
        'inpi',
         F.explode("value")
        .alias("insee")
    )
    .join((weights.withColumnRenamed("words","inpi")),
        on = ['inpi'], how = 'left')
    .withColumnRenamed("list_weights","list_weights_inpi")
    .join((weights.withColumnRenamed("words","insee")),
       on = ['insee'], how = 'left')
    .withColumnRenamed("list_weights","list_weights_insee")
    .select('row_id',
            'inpi',
            'insee',
            "list_weights_inpi",
            "list_weights_insee",
            cosine("list_weights_inpi", "list_weights_insee").alias("cosine"),
           )
)


In [183]:
test

DataFrame[row_id: bigint, inpi: string, insee: string, list_weights_inpi: array<float>, list_weights_insee: array<float>, cosine: float]

In [184]:
test.show(truncate =True)

+------+-------+---------+--------------------+--------------------+-----------+
|row_id|   inpi|    insee|   list_weights_inpi|  list_weights_insee|     cosine|
+------+-------+---------+--------------------+--------------------+-----------+
|     5|    RUE|BOULEVARD|[-0.86694837, -0....|[-1.4547851, 0.14...| 0.40306154|
|     5|    RUE| PREUILLY|[-0.86694837, -0....|[0.026656773, -0....|0.096528575|
|     5|CHARLES|BOULEVARD|[-1.1762805, -0.5...|[-1.4547851, 0.14...| 0.09133629|
|     5|CHARLES| PREUILLY|[-1.1762805, -0.5...|[0.026656773, -0....| 0.10189664|
|     5|  GILLE|BOULEVARD|[0.34494784, -0.2...|[-1.4547851, 0.14...| 0.03590281|
|     5|  GILLE| PREUILLY|[0.34494784, -0.2...|[0.026656773, -0....| 0.22824264|
+------+-------+---------+--------------------+--------------------+-----------+

