## Install .NET for Apache Spark & ML.NET NuGet packages

In [1]:
#r "nuget:Microsoft.Spark,0.12.1"
#r "nuget:Microsoft.ML,1.5.2"
#r "nuget:Microsoft.ML.AutoML,0.17.2"

Installed package Microsoft.ML version 1.5.2

Installed package Microsoft.ML.AutoML version 0.17.2

Installed package Microsoft.Spark version 0.12.1

## Import packages

In [18]:
open Microsoft.Spark
open Microsoft.Spark.Sql
open Microsoft.ML
open Microsoft.ML.Data
open Microsoft.ML.AutoML

## Define data directory

In [3]:
let DATA_DIR = "/datadrive/Data/Kaggle/arxiv-metadata-oai-snapshot-2020-08-14.json"

## Initialize SparkSession

In [4]:
let sparkSession = 
    SparkSession
        .Builder()
        .AppName("arxiv-analytics")
        .GetOrCreate()

[2020-10-28T14:14:18.6588261Z] [fsharp-analytics-vm] [Info] [ConfigurationService] 'DOTNETBACKEND_PORT' environment variable is not set.
[2020-10-28T14:14:18.6635483Z] [fsharp-analytics-vm] [Info] [ConfigurationService] Using port 5567 for connection.
[2020-10-28T14:14:18.6642894Z] [fsharp-analytics-vm] [Info] [JvmBridge] JvMBridge port is 5567


## Load data into DataFrame

In [5]:
let arxivData = 
    sparkSession
        .Read()
        .Option("inferSchema",true)
        .Json([|DATA_DIR|])

## Inspect original DataFrame schema

In [6]:
arxivData.PrintSchema()

root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- authors_parsed: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- categories: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- id: string (nullable = true)
 |-- journal-ref: string (nullable = true)
 |-- license: string (nullable = true)
 |-- report-no: string (nullable = true)
 |-- submitter: string (nullable = true)
 |-- title: string (nullable = true)
 |-- update_date: string (nullable = true)
 |-- versions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |    |-- version: string (nullable = true)



## Select a subset of columns

In [7]:
let columnNames = 
    [|
        Functions.Col("id")
        Functions.Col("title")
        Functions.Col("abstract")
        Functions.Col("categories")
    |]

In [8]:
let dfSubset = 
    arxivData
        .Select(columnNames)

In [9]:
dfSubset.Show(3)

+---------+--------------------+--------------------+--------------+
|       id|               title|            abstract|    categories|
+---------+--------------------+--------------------+--------------+
|0704.0001|Calculation of pr...|  A fully differe...|        hep-ph|
|0704.0002|Sparsity-certifyi...|  We describe a n...| math.CO cs.CG|
|0704.0003|The evolution of ...|  The evolution o...|physics.gen-ph|
+---------+--------------------+--------------------+--------------+
only showing top 3 rows



## Get only top level category

In [10]:
let multipleCategorySelection = 
    Functions.Split(Functions.Col("categories"), " ").GetItem(0)

let subcategorySelection = 
    Functions.Split(Functions.Col("categories"),"\.").GetItem(0)

In [11]:
let topLevelCategoryDf = 
    dfSubset
        .WithColumn("categories",multipleCategorySelection)
        .WithColumn("categories",subcategorySelection)

In [12]:
topLevelCategoryDf.Show(3)

+---------+--------------------+--------------------+----------+
|       id|               title|            abstract|categories|
+---------+--------------------+--------------------+----------+
|0704.0001|Calculation of pr...|  A fully differe...|    hep-ph|
|0704.0002|Sparsity-certifyi...|  We describe a n...|      math|
|0704.0003|The evolution of ...|  The evolution o...|   physics|
+---------+--------------------+--------------------+----------+
only showing top 3 rows



## Drop duplicates

In [13]:
// original article count
topLevelCategoryDf
    .Agg(Functions.Count(Functions.Col("id")))
    .Show()

+---------+
|count(id)|
+---------+
|  1747307|
+---------+



In [14]:
let distinctTopLevelCategoryDf = 
    topLevelCategoryDf
        .DropDuplicates("id")
        .Drop(Functions.Col("id"))

In [15]:
distinctTopLevelCategoryDf.Show(3)

+--------------------+--------------------+----------+
|               title|            abstract|categories|
+--------------------+--------------------+----------+
|Binary Systems as...|  We review the g...|     gr-qc|
|Cosmology from St...|  We explore the ...|    hep-ph|
|Domain Switching ...|  We investigated...|  cond-mat|
+--------------------+--------------------+----------+
only showing top 3 rows



## Output cleaned results to files

In [16]:
let outputDir = "/datadrive/Data/ArXivSparkOutput"

In [17]:
distinctTopLevelCategoryDf
    .Write()
    .Mode("overwrite")
    .Option("header", true)
    .Csv(outputDir)

## Define paths and variables

In [25]:
let trainDataPath = "/datadrive/Data/ArXivTrainData/*"
let labelColumnName = "categories"

## Initialize MLContext

In [20]:
let mlContext = MLContext()

## Define data schema

In [22]:
[<CLIMutable>]
type ArxivData = {
    [<LoadColumn(0)>] Title : string
    [<LoadColumn(1)>] Abstract : string
    [<LoadColumn(2)>] Categories : string
}

## Load data into IDataView

In [26]:
let arxivDataView = mlContext.Data.LoadFromTextFile<ArxivData>(trainDataPath,separatorChar=',',hasHeader=true)

## Define experiment settings

In [30]:
let arxivExperimentSettings = new MulticlassExperimentSettings()
arxivExperimentSettings.MaxExperimentTimeInSeconds <- 300u
arxivExperimentSettings.OptimizingMetric <- MulticlassClassificationMetric.LogLoss

## Define progress handler

In [36]:
let width = 114

let createRow width (message : string) = sprintf "|%s|" (message.PadRight(width - 2))

let printRow l = 
    l
    |> List.map (fun (p,msg : string) -> if p > 0 then msg.PadLeft(p, ' ') else msg.PadRight(-p, ' '))
    |> String.concat " "
    |> createRow width 
    |> printfn "%s"

let multiclassMetricsHeader = 
    [
        -4, ""
        -35, "Trainer"
        14, "MicroAccuracy"
        14, "MacroAccuracy"
        9, "Duration"
    ]

let printMulticlassMetricsHeader() = printRow multiclassMetricsHeader

let printMulticlassIterationMetrics iteration trainerName (metrics : MulticlassClassificationMetrics) runtimeInSeconds = 
    [
        string iteration
        trainerName
        sprintf "%0.4f" metrics.MicroAccuracy
        sprintf "%0.2f" metrics.MacroAccuracy
        sprintf "%0.1f" runtimeInSeconds
    ]
    |> List.zip (multiclassMetricsHeader |> List.map fst)
    |> printRow

let printIterationException (ex : exn) = 
    printf "Exception during AutoML iteration: %O" ex

let progressHandler printHeader printIterMetrics =
    let mutable iterIndex = 0
    {new IProgress<RunDetail<'a>> with
         member this.Report(value: RunDetail<'a>): unit = 
            if iterIndex = 0 then 
                printHeader()
            iterIndex <- iterIndex + 1
            match value.Exception with 
            | null -> 
                printIterMetrics iterIndex value.TrainerName value.ValidationMetrics value.RuntimeInSeconds
            | ex -> 
                printIterationException ex
    }


let multiclassExperimentProgressHandler() = progressHandler printMulticlassMetricsHeader printMulticlassIterationMetrics

In [38]:
let arxivExperiment = mlContext.Auto().CreateMulticlassClassificationExperiment(arxivExperimentSettings)

In [None]:
let experimentResults = arxivExperiment.Execute(arxivDataView,labelColumnName="Categories",progressHandler=multiclassExperimentProgressHandler())

Application is shutting down...
