In [4]:
#r "tools/bin/Debug/netcoreapp3.1/tools.dll"

using System;
using System.IO;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Concurrent;


# Configuration

In [5]:
const int p_InstanceOf = 31;
const int p_SubclassOf = 279;
var propertyOfInterest =  p_InstanceOf;

In [28]:
var epe_path  = "/scratch/trh6u/db_csv/WD_entity_prop_entity.csv";
var freq_path = $"/scratch/jag2j/final_data/{propertyOfInterest}-train_freq_matrix.csv";
var test_path = $@"/scratch/jag2j/final_data/test_entities_P{propertyOfInterest}_1000.csv";
var se_sum_path = $@"/scratch/jag2j/final_data/aux/{propertyOfInterest}_se_sum.csv";

# Read in data

In [30]:
var se_sum = File.ReadLines(se_sum_path)
                 .Skip(1) // heading
                 .Select(line => line.Split(",").Select(int.Parse).ToArray())
                 .Select(cols => new {
                     Type = cols[0],
                     Count = cols[1]
                 })
                 .ToDictionary(x => x.Type, x => x.Count);

In [32]:
se_sum.Take(5) // count of distinct subjects by type

index,Key,Value
0,3624078,46
1,9415,58
2,5,8248222
3,15773317,8023
4,15633582,138


In [20]:
var testIds = File.ReadLines(test_path).Select(int.Parse).ToHashSet();

In [9]:
List<ClassFrequencies> freq;
using(var reader = Formats.GetReader(freq_path))
    freq = Formats.ReadFrequencyFile(reader).ToList();

## Calculate capacity

The arrays are sized based on the maximum property ID (which must fit into a 32-bit integer).

In [10]:
int maxFreq = freq.Select(c => c.Properties.Length).Max();
maxFreq

We actually only have the following number of properties with a non-zero frequency, so we could save on memory if we compress the IDs, but we don't do this here.  We want speed!

In [11]:
freq.SelectMany(f => Enumerable.Range(1, maxFreq-1)
                               .Where(i => f.Properties[i] != 0))
    .Distinct()
    .Count()

We show the count of types associated with the frequency matrix.  There are about 66,000 types (for P31).  

In [12]:
var numDistinctTypes = freq.Select(f => f.Id).Distinct().Count();
numDistinctTypes

We'll need about 2 GB to store this in memory.

In [13]:
(
    // size of frequency byte mask 
    //  - we could save on memory using bitfields, but we don't
    freq.Select(f => f.Id).Distinct().Count() * 4  
    // size of entity ID
    + 4
) / 1024.0 * 8670 / 1024 / 1024

In [14]:
freq.Take(5)

index,Id,Properties
0,3624078,"[ 0, 0, 0, 0, 0, 0, 0.84782606, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0 ... (8651 more) ]"
1,9415,"[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ... (8651 more) ]"
2,5,"[ 0, 0, 0, 0, 0, 0, 4.6070536E-06, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.0001325134, 0, 0.3162867 ... (8651 more) ]"
3,15773317,"[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.0034899663, 0, 0.036021437 ... (8651 more) ]"
4,15633582,"[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.11594203, 0, 0 ... (8651 more) ]"


## Parser for entity-property-entity matrix

We define some types that will help us with parsing, but we will discard after we load everything.

A generator of all the edge data.

In [21]:
IEnumerable<Edge> epe => 
        Formats.ReadEdgeFile(Formats.GetReader(epe_path))
               .Where(e => testIds.Contains(e.S));

In [22]:
epe.Take(3)

index,S,P,O
0,34786,31,34770
1,34786,31,1288568
2,34786,910,33071841


## Discuss size

There are roughly 100 M distinct entities.

So 8KB * 100M is roughly 800GB.  So we can't fit the whole half-sparse matrix.

## Load entity property binary matrix

In [23]:
#!time
var entities = Formats.BuildEntityProperties<HashSet<int>>(epe);

Wall time: 169651.1797ms

In [25]:
entities.Take(3)

index,Id,Properties
0,34786,"[ 31, 910, 279, 3823, 17, 2341 ]"
1,4348159,"[ 910, 991, 17, 31, 156, 155, 1001, 541 ]"
2,4690173,"[ 910, 31, 5008 ]"


# Scoring

## Demonstrate a few scores

Let's score first entity with first super-entity

In [33]:
var scorer = new CosineScoringFunction().CreateScorer();

In [34]:
#!time
//     properties of first entity    property frequencies of 1st super-entity
scorer.Score(entities.First(), freq.First())

Wall time: 58.3528ms

Let's try a few more.

In [36]:
#!time
entities.SelectMany(e =>
  freq.Select(f =>
  {
    var score = scorer.Score(e, f);
    return new { Entity = e.Id, Type = f.Id, score };   
  }))
  .Take(5)

index,Entity,Type,score
0,34786,3624078,0.20200212
1,34786,9415,0.46221933
2,34786,5,0.2414603
3,34786,15773317,0.20761646
4,34786,15633582,0.34560657


Wall time: 74.0592ms

## Calculate timing

In [37]:
#!time
entities.Take(5).SelectMany(e =>
  freq.Select(f =>
  {
    var score = scorer.Score(e, f);
    return new { Entity = e.Id, Type = f.Id, Score = score };   
  }))
 .GroupBy(res => res.Entity)
  .Select(g => new {
      Entity = g.Key,
      Types = g.OrderByDescending(e => e.Score)
               .Select(e => new {
                    e.Type,
                    e.Score
                })
  })
  .ToArray();


Wall time: 22261.0099ms

On Rivanna, we could compute all entities in about 5 weeks if we use 40 cores:

In [42]:
entities.Count() 
    / 5.0   
    * 22_261
    /1000  // sec
    /60    // min
    /60    // hr
    /20    // parallelism

## Parallel pipeline

Let's set up a pipeline so we can use multiple cores.

In [48]:
Action<EntityMatches> DoNothing = _ => {};

In [46]:
var su = new ScoreUtils();

In [44]:
var functions = new ScoringFunction[] {
    new CosineScoringFunction(),
    new ManhattanScoringFunction(),
    new JaccardScoringFunction(),
    new InnerProductScoringFunction(),
    new WeightedInnerProductScoringFunction(1, -1, -1, 0)    
};

In [49]:
#!time 
su.GetScoresToFunction(entities.Take(5), freq, functions, DoNothing);

Wall time: 21473.9969ms

In [54]:
var scoreResults = su.GetScores(entities, freq, functions, dop: 20);

In [57]:
using(var f = new FileStream($"/scratch/jag2j/final_data/aux/raw_scores.csv", FileMode.Create)) 
using(var tw = new StreamWriter(f))
{    
    tw.AutoFlush = true;
    tw.WriteLine("entity,type,score_type,score_value");
    foreach(var res in scoreResults) {
        var entity = res.BaseEntity; 
        foreach(var match in res.Matches) {
            var type = match.MatchedEntity;
            foreach(var score in match.Scores) {
                var func = score.Scorer;
                var val = score.Value;
                tw.WriteLine($"{entity},{type},{func},{val}");
            }
        }
    }
}

In [61]:
scoreResults.FirstOrDefault().Matches.FirstOrDefault().Scores

index,Scorer,Value
0,CosineScorer,0.12192716
1,ManhattanScorer,44.39133
2,JaccardScorer,0.010869565
3,InnerProductScorer,1.0
4,WeightedInnerProductScorer,-43.39133


## Create weights

In [66]:
using System;

In [74]:
var weights = se_sum.ToDictionary(s => s.Key, s => Math.Log(s.Value));

## Split

In [77]:
using System.Threading.Tasks;

In [82]:
var po = new ParallelOptions { MaxDegreeOfParallelism = 20 };

In [98]:
class SpecificMatch {
    public int Type;
    public float Score;
}






In [105]:
Parallel.ForEach(new[] { false, true }, po, weighted => 
{
    Parallel.ForEach(new[] { "Cosine", "Manhattan", "Jaccard", "InnerProduct", "WeightedInnerProduct" }, po, scorer => 
    {
        var filename = 
            "result_" +
            scorer.Replace("WeightedInnerProduct", "pen_inner_prod").ToLower() +
            (weighted ? "_wgt" : "") +
            ".csv";
        
        var path = $"/scratch/jag2j/final_data/{filename}";

        var scorerName = $"{scorer}Scorer";
        
        Console.WriteLine($"{scorerName} -> {path}");
        using(var fs = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.ReadWrite))
        using(var sw = new StreamWriter(fs))
        {
            var tw = TextWriter.Synchronized(sw);
            tw.WriteLine("entity,type,score");
            
            // each test item for which we find test results
            Parallel.ForEach(scoreResults, po, res => {
                var entity = res.BaseEntity;

                var scoredMatches = res.Matches
                                       .Select(m => new {
                                           Type = m.MatchedEntity,
                                           Score = m.Scores.Where(s => s.Scorer == scorerName)
                                                           .Select(s => s.Value)
                                                           .FirstOrDefault()
                                                   * (weighted ? weights[m.MatchedEntity] : 1)
                                       })
                                       .OrderByDescending(m => m.Score)
                                       .Take(10);            
                
                foreach(var s in scoredMatches) {
                    tw.WriteLine($"{entity},{s.Type},{s.Score}");
                }
            });
        }
        
    });
});

CosineScorer -> /scratch/jag2j/final_data/result_cosine.csv
CosineScorer -> /scratch/jag2j/final_data/result_cosine_wgt.csv
ManhattanScorer -> /scratch/jag2j/final_data/result_manhattan.csv
JaccardScorer -> /scratch/jag2j/final_data/result_jaccard.csv
ManhattanScorer -> /scratch/jag2j/final_data/result_manhattan_wgt.csv
JaccardScorer -> /scratch/jag2j/final_data/result_jaccard_wgt.csv
InnerProductScorer -> /scratch/jag2j/final_data/result_innerproduct.csv
InnerProductScorer -> /scratch/jag2j/final_data/result_innerproduct_wgt.csv
WeightedInnerProductScorer -> /scratch/jag2j/final_data/result_pen_inner_prod.csv
WeightedInnerProductScorer -> /scratch/jag2j/final_data/result_pen_inner_prod_wgt.csv
