# trainings

In [1]:
import yaml
from pyspark.sql import types
import pyspark.sql.functions as F
from pyspark.ml.feature import Normalizer

import dextra.dna.core as C
import dextra.dna.commons as P

Using local environment.


In [2]:
spark = P.config.spark

In [3]:
PQT = P.config.lakes.models + '/logs/encoder_trainings.parquet'

t = C.io.stream.read(PQT)
t.limit(2).toPandas()

Unnamed: 0,model_name,model_weights_path,training_proc,training,trained_at,date_received_stats,committed_at_stats,records
0,word2vec,file:///datalake/models/products/word2vec,dextra.dna.commons.processors.products.Learnin...,"{'model': {'input_col': 'text_cleaned', 'stop_...",2020-12-08 23:03:18.089839,"(2015-03-19 00:00:00, 2017-11-02 00:00:00, 146...","(2020-12-08 21:24:36.162162, 2020-12-08 21:24:...","[(861cbf751715, 2020-12-08 21:24:36.162162), (..."
1,word2vec,file:///datalake/models/products/word2vec,dextra.dna.commons.processors.products.Learnin...,"{'model': {'input_col': 'text_cleaned', 'stop_...",2020-12-08 23:36:43.753759,"(2015-03-19 00:00:00, 2017-11-02 00:00:00, 146...","(2020-12-08 21:24:36.162162, 2020-12-08 21:24:...","[(861cbf751715, 2020-12-08 21:24:36.162162), (..."


### Stats from Last Training

In [4]:
last_training = (
    t.drop('records')
     .orderBy(F.desc('trained_at'))
     .limit(1)
     .collect()[0])

In [5]:
print(yaml.dump(last_training.asDict(recursive=True)))

committed_at_stats:
  avg: 1607462676.1621423
  count: 10003
  countDistinct: 1
  max: 2020-12-08 21:24:36.162162
  min: 2020-12-08 21:24:36.162162
date_received_stats:
  avg: 1468604431.790463
  count: 10003
  countDistinct: 912
  max: 2017-11-02 00:00:00
  min: 2015-03-19 00:00:00
model_name: word2vec
model_weights_path: file:///datalake/models/products/word2vec
trained_at: 2020-12-08 23:36:43.753759
training: '{''model'': {''input_col'': ''text_cleaned'', ''stop_words'': ''english'',
  ''features'': 128}}'
training_proc: dextra.dna.commons.processors.products.LearningEncoder



### Encoding Example

In [6]:
x = C.io.stream.read(P.config.lakes.refined + '/issues.parquet')

In [7]:
word2vec_training = (
    t.where(t.model_name == 'word2vec')
     .orderBy(F.desc('trained_at'))
     .limit(1)
     .collect()[0])

In [8]:
from pyspark.ml import PipelineModel

encoder = PipelineModel.load(word2vec_training.model_weights_path)

In [12]:
s = x.limit(5)
s = encoder.transform(s)

s.toPandas()

Unnamed: 0,complaint_id,consumer_message,customer_name,date_received,disputed,issue,product,resolution,state,sub_issue,...,via,zip_code,ingested_at,tags_trusted_labels,tags_split,text_cleaned,committed_at,text_cleaned_words,text_cleaned_filtered,features
0,0052ac552c28,Hello My name is {hash} and I have a {hash} th...,26d89fd7bd44,2017-01-30,False,Problems when you are unable to pay,Consumer Loan,Closed with explanation,GA,,...,Web,300XX,2020-12-08 21:22:48.137401,False,train,hello my name is {hash} and i have a {hash} th...,2020-12-08 21:24:36.162162,"[hello, my, name, is, {hash}, and, i, have, a,...","[hello, name, {hash}, {hash}, santander, consu...","[0.042588631013551585, 0.040190241351061, 0.04..."
1,00a844d52aec,I have written several letters to the creditor...,89dbbe6ad0b5,2016-11-11,True,Credit reporting company's investigation,Credit reporting,Closed with explanation,OH,Investigation took too long,...,Web,440XX,2020-12-08 21:22:48.137401,False,train,i have written several letters to the creditor...,2020-12-08 21:24:36.162162,"[i, have, written, several, letters, to, the, ...","[written, several, letters, creditor, {hash}, ...","[0.05852093231527412, -0.08493632040521566, -0..."
2,00ae385f4af9,This agency is reporting an account on my redi...,321a61586171,2016-10-27,False,Other,Credit card,Closed with explanation,IL,,...,Web,604XX,2020-12-08 21:22:48.137401,False,train,this agency is reporting an account on my redi...,2020-12-08 21:24:36.162162,"[this, agency, is, reporting, an, account, on,...","[agency, reporting, account, redit, profile, b...","[0.12804793007671833, -0.03408676184092958, -0..."
3,00da7e528dac,In {hash}/{hash}/{hash} or {hash}/{hash}/{hash...,f11c4528f211,2017-08-30,,Fraud or scam,"Money transfer, virtual currency, or money ser...",Untimely response,PA,,...,Web,160XX,2020-12-08 21:22:48.137401,False,test,in {hash} {hash} {hash} or {hash} {hash} {hash...,2020-12-08 21:24:36.162162,"[in, {hash}, {hash}, {hash}, or, {hash}, {hash...","[{hash}, {hash}, {hash}, {hash}, {hash}, {hash...","[0.05250623404648841, 0.027649868460445613, 0...."
4,010554da4e3a,"In my previous complaint Equifax states "" Equi...",3fb5aa9ff928,2016-01-06,False,Incorrect information on credit report,Credit reporting,Closed with explanation,CA,Reinserted previously deleted info,...,Web,913XX,2020-12-08 21:22:48.137401,False,test,in my previous complaint equifax states equifa...,2020-12-08 21:24:36.162162,"[in, my, previous, complaint, equifax, states,...","[previous, complaint, equifax, states, equifax...","[0.16846882887241407, -0.08434288474026977, -0..."


In [13]:
def normalize(x):
    normalizer = Normalizer(inputCol='features', outputCol='features_norm')
    return normalizer.transform(x)

In [14]:
s = normalize(s)

In [18]:
dot_udf = F.udf(lambda x,y: float(x.dot(y)), types.DoubleType())

In [22]:
(s.alias('i')
  .join(s.alias("j"), F.col("i.complaint_id") < F.col("j.complaint_id"))\
  .select(
      F.col("i.complaint_id").alias("i"), 
      F.col("j.complaint_id").alias("j"),
      dot_udf("i.features_norm", "j.features_norm").alias("dot"))
  .sort("i", "j")
  .toPandas()
  .round(1))

Unnamed: 0,i,j,dot
0,0052ac552c28,00a844d52aec,-0.1
1,0052ac552c28,00ae385f4af9,0.0
2,0052ac552c28,00da7e528dac,0.7
3,0052ac552c28,010554da4e3a,-0.1
4,00a844d52aec,00ae385f4af9,0.5
5,00a844d52aec,00da7e528dac,-0.1
6,00a844d52aec,010554da4e3a,0.7
7,00ae385f4af9,00da7e528dac,-0.1
8,00ae385f4af9,010554da4e3a,0.6
9,00da7e528dac,010554da4e3a,-0.1
