In [1]:
from getpass import getuser
import datetime
from datetime import datetime as dt
import pandas as pd
import numpy as np
from scipy.sparse import load_npz
import sys
import os

%matplotlib inline
pd.set_option('display.float_format', '{:.0f}'.format)
pd.set_option('display.max_rows', 1000)
pd.set_option('display.max_columns', 1000)

In [2]:
SparkAppName = 'Author-Project-Part-{0}'
spark_home = '/opt/spark22/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2'
os.environ['SPARK_HOME'] = spark_home
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'
os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/PYENV.ZNO52292115/bin/python'

sys.path.insert(0, os.path.join (spark_home,'python'))
 
sys.path.insert(0, os.path.join (spark_home,'python/lib/py4j-0.10.4-src.zip'))
from pyspark import SparkContext, SparkConf, HiveContext

conf = SparkConf() \
    .setAppName(SparkAppName.format(str(dt.now()).split()[1][:5])) \
    .setMaster("yarn-client") \
    .set('spark.yarn.queue', 'root.g_dl_u_corp.korniltsev1-da_ca-sbrf-ru') \
    .set('spark.dynamicAllocation.enabled', 'true') \
    .set('spark.local.dir', 'sparktmp') \
    .set('spark.executor.memory','20g') \
    .set('spark.driver.memory', '40g') \
    .set('spark.executor.cores', '2') \
    .set('spark.executor.instances', '140') \
    .set('spark.driver.maxResultSize','10g') \
    .set('spark.yarn.driver.memoryOverhead', '2g') \
    .set('spark.port.maxRetries', '150') \
    .set('spark.kryoserializer.buffer.max.mb','512') \
    .set('"spark.default.parallelism','1000') \
    .set('spark.ui.killEnable','true')

In [3]:
print('Start',dt.now())
sc = SparkContext.getOrCreate(conf=conf)
hive = HiveContext(sc)
print('Allocated', dt.now())

Start 2020-06-10 09:53:01.546302
Allocated 2020-06-10 09:53:30.446138


In [4]:
from pyspark.sql.types import TimestampType, DoubleType, IntegerType, StringType, DateType, ArrayType
import pyspark.sql.functions as F
from pyspark.sql.window import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from functools import reduce

# Текстовые эмбединги на tf idf

In [57]:
text_kt = hive.table("t_fin_adviser.words_sums_ct_agg_18_19")
text_kt = text_kt.filter((F.col("word").isNotNull()) & \
                         (F.col("short_dt").between("2019-01-01", "2019-12-31"))
                        )

In [59]:
df1 = text_kt \
    .groupBy('inn_kt') \
    .agg(F.collect_list('word').alias('words_flaten')) \
    .select(F.col("inn_kt").alias("inn"), "words_flaten")

In [60]:
target = df1.where(F.col('inn').isin(cases)).select('inn', 'words_flaten')

In [61]:
hashingTF = HashingTF(inputCol="words_flaten", outputCol="rawFeatures")
featurizedData = hashingTF.transform(df1)

In [62]:
%%time
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)

CPU times: user 3.07 ms, sys: 6.15 ms, total: 9.22 ms
Wall time: 40.7 s


In [63]:
%%time
rescaledData = idfModel.transform(featurizedData)

CPU times: user 55 µs, sys: 1.12 ms, total: 1.17 ms
Wall time: 7.78 ms


In [64]:
target_features = idfModel.transform(hashingTF.transform(target)) \
    .select(F.col('inn').alias('inn_target'), F.col('features').alias('features_target'))

In [66]:
joined = rescaledData.join(target_features, 
                           (rescaledData.inn!=target_features.inn_target)
                           )

In [50]:
@F.udf(DoubleType())
def cosine_similarity(v, u):
    if (v is None) or (u is None) or (v.norm(2) == 0) or (u.norm(2) == 0):
        return None
    else:
        return float(v.dot(u) / (v.norm(2) * u.norm(2)))

In [67]:
df_dist = joined.withColumn('dist', cosine_similarity(F.col('features'), F.col('features_target')))

In [97]:
w = Window.partitionBy('inn_target').orderBy(F.desc('dist'))
dfTop = df_dist.withColumn('rn', F.row_number().over(w)).where(F.col('rn')<100000)

In [101]:
dfTop.select('inn_target', 'inn', 'dist').write.format('parquet').mode('overwrite') \
            .saveAsTable("{}.wlt_similar_tf_idf_200k".format('t_team_ds_szb'))

In [103]:
df = hive.table("{}.wlt_similar_tf_idf_200k".format('t_team_ds_szb'))