# NSDUH Drug Sequence Analysis Part 4b:  Stability Analysis via Spark
## Matthew J. Beattie
## University of Oklahoma
__December 7, 2021__

### Stability index creation
This script takes the KMC clusterings (which were done on a desktop) and creates a list of tuples and their stability index.  A _tuple_ is a pair of respondents who are included in the same cluster.  We aggregate all the tuples from the multiple clusterings into one file and then count the number of times each distinct tuple occurs.  This count, divided by the total number of clusterings, generates a _stability index_ for the tuple.

### This method uses Spark JOIN instead of an iterative loop to create the tuple list

In [0]:
# Import the Abuse Sequence utilities functions
%run "/dbfs/FileStore/pythonfiles/pathutils.py"

In [0]:
# Import pyspark libraries
from pyspark.sql import functions as f
from pyspark.sql import SparkSession, DataFrameWriter as dfw
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType

# Import standard Python libraries
from os.path import abspath
import copy
import os
import sys
import pathlib, itertools
import time
import random
import pickle
import json
import mlflow
import mlflow.sklearn
from collections import Counter
import profile
import gc
import csv


# Initialize Spark session
spark = SparkSession\
    .builder\
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")\
    .enableHiveSupport()\
    .getOrCreate()

# Set Azure parameters
blob_account_name = "abuseseqstorage"
blob_container_name = "datafiles"
blob_sas_token = 'sv=2020-08-04&st=2022-02-05T21%3A37%3A40Z&se=2022-04-05T20%3A37%3A00Z&sr=c&sp=racwdl&sig=8bnXmCYRpvR93dN7eN1%2B8v%2F7cXD2dXH5z2Fus3vNSVc%3D'

# Set miscellaneous parameters
FIGW = 12
FIGH = 5
FONTSIZE = 8
FIGURESIZE = (FIGW,FIGH)

plt.rcParams['figure.figsize'] = (FIGW, FIGH)
plt.rcParams['font.size'] = FONTSIZE

plt.rcParams['xtick.labelsize'] = FONTSIZE
plt.rcParams['ytick.labelsize'] = FONTSIZE


In [0]:
%sql
/* Create Spark tuple table */

DROP TABLE IF EXISTS abuse_sequence.sparktuples;

CREATE TABLE abuse_sequence.sparktuples
(
  orignode STRING,
  termnode STRING,
  tuplecount INT
)
USING DELTA;


In [0]:
# Point to files in blob storage
clustercsv = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, 'dfclust.txt')
demogcsv = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, 'dfdemog.txt')
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + clustercsv)

clust_schema = StructType([
    StructField("RESPID", StringType(), False),
    StructField("AFUVECT", StringType(), False),
    StructField("YRWEIGHT", FloatType(), False),
    StructField("labels", IntegerType(), False),
    StructField("labels_0", IntegerType(), False),
    StructField("labels_1", IntegerType(), False),
    StructField("labels_2", IntegerType(), False),
    StructField("labels_3", IntegerType(), False),
    StructField("labels_4", IntegerType(), False),
    StructField("labels_5", IntegerType(), False),
    StructField("labels_6", IntegerType(), False),
    StructField("labels_7", IntegerType(), False),
    StructField("labels_8", IntegerType(), False),
    StructField("labels_9", IntegerType(), False),
    StructField("labels_10", IntegerType(), False),
    StructField("labels_11", IntegerType(), False),
    StructField("labels_12", IntegerType(), False),
    StructField("labels_13", IntegerType(), False),
    StructField("labels_14", IntegerType(), False),
    StructField("labels_15", IntegerType(), False),
    StructField("labels_16", IntegerType(), False),
    StructField("labels_17", IntegerType(), False),
    StructField("labels_18", IntegerType(), False),
    StructField("labels_19", IntegerType(), False)
])

dfclust = spark.read.load(clustercsv, format="csv", sep="\t", schema=clust_schema, header="true")
print('dfclust has', dfclust.count(), 'observations')
display(dfclust)

RESPID,AFUVECT,YRWEIGHT,labels,labels_0,labels_1,labels_2,labels_3,labels_4,labels_5,labels_6,labels_7,labels_8,labels_9,labels_10,labels_11,labels_12,labels_13,labels_14,labels_15,labels_16,labels_17,labels_18,labels_19
201611635143.0,"[0, 16, 15, 20, 991, 991, 991, 991, 991, 991]",204.85857,3,0,3,0,3,3,1,1,0,2,3,0,1,1,6,4,5,1,1,4,0
201635755143.0,"[0, 26, 16, 991, 991, 991, 991, 991, 991, 991]",2533.4585,0,3,1,1,2,2,5,7,4,4,2,4,3,5,1,1,2,3,0,2,3
201692675143.0,"[0, 5, 18, 32, 34, 991, 991, 991, 991, 991]",6203.973,10,10,10,5,9,8,8,10,10,8,9,8,6,9,9,6,10,9,9,9,7
201659596143.0,"[0, 991, 14, 991, 991, 991, 991, 991, 991, 991]",1386.6727,2,4,4,2,0,4,0,0,3,1,1,3,5,0,4,5,0,0,6,5,1
201641106143.0,"[0, 991, 991, 991, 991, 991, 991, 991, 991, 991]",2384.8416,5,1,6,4,4,0,3,9,1,5,8,1,0,3,2,8,4,4,2,1,4
201696416143.0,"[0, 15, 14, 991, 991, 991, 991, 991, 991, 991]",1036.2367,0,3,1,1,2,2,5,7,4,4,2,4,3,5,1,1,2,3,0,2,3
201673716143.0,"[0, 15, 991, 15, 991, 991, 991, 991, 991, 991]",735.0233,9,0,3,0,3,3,1,5,8,2,10,0,8,1,10,0,5,1,1,4,10
201676226143.0,"[0, 16, 991, 991, 991, 991, 991, 991, 991, 991]",445.44296,9,3,1,4,4,0,5,5,8,4,10,1,8,5,10,0,4,4,0,2,10
201661056143.0,"[0, 21, 15, 15, 991, 991, 991, 991, 991, 991]",26.197422,3,0,3,0,3,3,1,1,0,2,3,0,1,1,6,4,5,1,1,4,0
201683666143.0,"[0, 18, 18, 20, 991, 991, 991, 991, 991, 991]",198.3173,3,0,3,0,3,3,1,1,0,2,3,0,1,1,6,4,5,1,1,4,0


In [0]:
# Convert AFUVECT from string to array of integers
df2 = dfclust.withColumn("AFUVECT",f.regexp_replace("AFUVECT", "\\[", ""))\
             .withColumn("AFUVECT",f.regexp_replace("AFUVECT", "\\]", ""))\
             .withColumn("AFUVECT",f.split(f.col("AFUVECT"),",").cast('array<int>'))
display(df2)


RESPID,AFUVECT,YRWEIGHT,labels,labels_0,labels_1,labels_2,labels_3,labels_4,labels_5,labels_6,labels_7,labels_8,labels_9,labels_10,labels_11,labels_12,labels_13,labels_14,labels_15,labels_16,labels_17,labels_18,labels_19
201611635143.0,"List(0, 16, 15, 20, 991, 991, 991, 991, 991, 991)",204.85857,3,0,3,0,3,3,1,1,0,2,3,0,1,1,6,4,5,1,1,4,0
201635755143.0,"List(0, 26, 16, 991, 991, 991, 991, 991, 991, 991)",2533.4585,0,3,1,1,2,2,5,7,4,4,2,4,3,5,1,1,2,3,0,2,3
201692675143.0,"List(0, 5, 18, 32, 34, 991, 991, 991, 991, 991)",6203.973,10,10,10,5,9,8,8,10,10,8,9,8,6,9,9,6,10,9,9,9,7
201659596143.0,"List(0, 991, 14, 991, 991, 991, 991, 991, 991, 991)",1386.6727,2,4,4,2,0,4,0,0,3,1,1,3,5,0,4,5,0,0,6,5,1
201641106143.0,"List(0, 991, 991, 991, 991, 991, 991, 991, 991, 991)",2384.8416,5,1,6,4,4,0,3,9,1,5,8,1,0,3,2,8,4,4,2,1,4
201696416143.0,"List(0, 15, 14, 991, 991, 991, 991, 991, 991, 991)",1036.2367,0,3,1,1,2,2,5,7,4,4,2,4,3,5,1,1,2,3,0,2,3
201673716143.0,"List(0, 15, 991, 15, 991, 991, 991, 991, 991, 991)",735.0233,9,0,3,0,3,3,1,5,8,2,10,0,8,1,10,0,5,1,1,4,10
201676226143.0,"List(0, 16, 991, 991, 991, 991, 991, 991, 991, 991)",445.44296,9,3,1,4,4,0,5,5,8,4,10,1,8,5,10,0,4,4,0,2,10
201661056143.0,"List(0, 21, 15, 15, 991, 991, 991, 991, 991, 991)",26.197422,3,0,3,0,3,3,1,1,0,2,3,0,1,1,6,4,5,1,1,4,0
201683666143.0,"List(0, 18, 18, 20, 991, 991, 991, 991, 991, 991)",198.3173,3,0,3,0,3,3,1,1,0,2,3,0,1,1,6,4,5,1,1,4,0


In [0]:
# Parameters from clustering
# Set clustering process parameters
B = 20   # Number of models to generate
fracinput = 0.8  # Fraction of input dataset to use for model construction
frac = 0.25
n_init = 10
max_iter = 1000
tol = 0.0001
n_clusters = 11

df2 = df2.sample(withReplacement=False,fraction=frac,seed=19660806).cache()
observations = df2.count()
print('The number of observations in the sample is', observations)

df2.write.mode("overwrite").saveAsTable("abuse_sequence.clustsamp")

In [0]:
%sql
select count(*) from abuse_sequence.clustsamp

count(1)
42887


In [0]:
%sql
select * from abuse_sequence.clustsamp

RESPID,AFUVECT,YRWEIGHT,labels,labels_0,labels_1,labels_2,labels_3,labels_4,labels_5,labels_6,labels_7,labels_8,labels_9,labels_10,labels_11,labels_12,labels_13,labels_14,labels_15,labels_16,labels_17,labels_18,labels_19
201629587143.0,"List(0, 991, 14, 991, 991, 991, 991, 991, 991, 991)",1479.1442,2,4,4,2,0,4,0,0,3,1,1,3,5,0,4,5,0,0,6,5,1
201675987143.0,"List(0, 18, 14, 18, 23, 991, 23, 991, 991, 991)",613.19666,6,5,2,6,7,5,9,8,5,10,5,5,4,8,3,2,8,7,10,6,5
201643328143.0,"List(0, 18, 19, 991, 991, 991, 991, 991, 991, 991)",512.1833,0,3,1,1,2,2,5,7,4,4,2,4,3,5,1,1,2,3,0,2,3
201630438143.0,"List(0, 18, 991, 991, 991, 991, 991, 991, 991, 991)",740.19476,9,3,1,4,4,0,5,5,8,4,10,1,8,5,10,0,4,4,0,2,10
201668869143.0,"List(0, 18, 21, 991, 991, 991, 991, 991, 991, 991)",1397.463,0,3,1,1,2,2,5,7,4,4,2,4,3,5,1,1,2,3,0,2,3
201699371143.0,"List(0, 12, 13, 16, 991, 991, 991, 991, 991, 991)",790.9822,3,0,3,0,3,3,1,1,0,2,3,0,1,1,6,4,5,1,1,4,0
201649893143.0,"List(0, 21, 17, 22, 991, 991, 991, 991, 991, 991)",135.97232,3,0,3,0,3,3,1,1,0,2,3,0,1,1,6,4,5,1,1,4,0
201629223143.0,"List(0, 35, 18, 991, 991, 991, 991, 991, 991, 991)",3499.109,0,3,1,1,2,2,5,7,4,4,2,4,3,5,1,1,2,3,0,2,3
201679504143.0,"List(0, 16, 16, 18, 991, 991, 991, 991, 991, 991)",6434.9214,3,0,3,0,3,3,1,1,0,2,3,0,1,1,6,4,5,1,1,4,0
201625474143.0,"List(0, 18, 991, 21, 22, 991, 991, 991, 991, 991)",1722.7317,10,10,10,5,9,8,8,10,10,8,9,8,6,9,9,6,10,9,9,9,7


In [0]:
# Create a list of tuples from the cluster sets
starttime = time.time()
for b in range(0,B):
    # Initiate tuplelist
    print('Finding tuples for clustering', b)
    clustset = 'labels_' + str(b)

    # Generate tuples via a Spark join
    for c in range(0,n_clusters):
        dfslice = df2[df2[clustset]==c].select('RESPID')
        dfslice.createOrReplaceTempView('tblslice1')
        dfslice.createOrReplaceTempView('tblslice2')
        dftupleset = spark.sql("""
                        SELECT tblslice1.RESPID AS orignode, tblslice2.RESPID AS termnode
                        FROM tblslice1 JOIN tblslice2
                        WHERE tblslice1.RESPID < tblslice2.RESPID
                        """)

        # Insert set into persistent table
        dftupleset.createOrReplaceTempView('tblinsertslice')
        spark.sql("""
            INSERT INTO abuse_sequence.sparktuples
            SELECT DISTINCT orignode, termnode, count(*) as tuplecount
            FROM tblinsertslice
            GROUP BY orignode, termnode
        """)
    tuplecountcnt = spark.sql("""select count(*) from abuse_sequence.sparktuples""").collect()[0][0]
    print('New abuse_sequence.tuplecounts count is', tuplecountcnt)

sequencetime = time.time() - starttime
        
print('Unique tuple count is:', tuplecountcnt)
print('Elapsed time is:', sequencetime)

In [0]:
%sql
OPTIMIZE abuse_sequence.sparktuples;
VACUUM abuse_sequence.sparktuples;

path
dbfs:/user/hive/warehouse/abuse_sequence.db/sparktuples


In [0]:
%sql
/* Create tuple stability table */

DROP TABLE IF EXISTS abuse_sequence.sparktuplestability;

CREATE TABLE abuse_sequence.sparktuplestability
(
  orignode STRING,
  termnode STRING,
  tottuples INT,
  stability FLOAT
)
USING DELTA;

INSERT INTO abuse_sequence.sparktuplestability
SELECT orignode, termnode, sum(tuplecount) AS tottuples, sum(tuplecount)/20 AS stability
FROM abuse_sequence.sparktuples
GROUP BY orignode, termnode;

OPTIMIZE abuse_sequence.sparktuplestability;
VACUUM abuse_sequence.sparktuplestability;


path
dbfs:/user/hive/warehouse/abuse_sequence.db/sparktuplestability


In [0]:
# Save tuple stability data to csv on Azure blob
dftuplestability = spark.sql("""
    select tottuples, count(*) as tottuplescnt, tottuples/20 as stability
    from abuse_sequence.sparktuplestability
    group by tottuples
    order by tottuplescnt desc
""")


In [0]:
display(dftuplestability)

tottuples,tottuplescnt,stability
20,105576279,1.0
1,13094492,0.05
19,8490053,0.95
7,7420108,0.35
6,6101530,0.3
13,2412893,0.65
2,2277757,0.1
18,2035977,0.9
9,1850530,0.45
4,1336043,0.2


In [0]:
# Save tuple stability count file to Azure blob storage
output_container_path = "wasbs://%s@%s.blob.core.windows.net" % (blob_container_name, blob_account_name)
output_blob_folder = "%s/" % output_container_path
output_file_name = 'sparktuplestability.txt'
final_file_name = 'sparktuplestabilitycounts.txt'
output_filename = output_blob_folder + output_file_name
final_filename = output_blob_folder + final_file_name

dftuplestability \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .options(delimiter='\t') \
    .format("csv") \
    .save(output_filename)

# Get the name of the CSV file that was just saved to Azure blob storage (it starts with 'part-')
files = dbutils.fs.ls(output_filename)
output_file = [x for x in files if x.name.startswith("part-")]

# Move the wrangled-data CSV file from a sub-folder (wrangled_data_folder) to the root of the blob container
# While simultaneously changing the file name
dbutils.fs.mv(output_file[0].path, final_filename)

# Remove the parquet blob
dbutils.fs.rm(output_filename, recurse=True)


In [0]:
# Log parameters and results into MLflow
totaltuples = spark.sql("""select sum(tottuples) from abuse_sequence.sparktuplestability""").collect()[0][0]
uniquetuples = spark.sql("""select count(*) from abuse_sequence.sparktuplestability""").collect()[0][0]
maxcount = spark.sql("""select distinct max(tottuples) from abuse_sequence.sparktuplestability""").collect()[0][0]
maxcountcnt = spark.sql("""select count(*) from abuse_sequence.sparktuplestability where tottuples={}""".format(maxcount)).collect()[0][0]

with mlflow.start_run():
        mlflow.log_metric("Observations", observations)
        mlflow.log_metric("Total tuples with duplicates", totaltuples)
        mlflow.log_metric("Total unique tuples", uniquetuples)
        mlflow.log_param("Clusterings", B)
        mlflow.log_param("Fraction of total dataset", frac)
        mlflow.log_param("Fold fraction size", fracinput)
        mlflow.log_metric("Most common stability", maxcount)
        mlflow.log_metric("Fraction of tuples with commonest stability", 
                          maxcountcnt/uniquetuples)
        mlflow.log_metric("Sequencing and RDD create runtime", sequencetime)
        
mlflow.end_run()

In [0]:
# Save tuple stability database to CSV file for use with NetworkX or other things
# Save tuple stability count file to Azure blob storage
output_file_name = 'sparktuplebigblob.csv'
final_file_name = 'sparktuplebigfile.csv'
output_filename = output_blob_folder + output_file_name
final_filename = output_blob_folder + final_file_name

dftuplebig = spark.sql("""select * from abuse_sequence.sparktuplestability""")

dftuplebig \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .format("csv") \
    .save(output_filename)

# Get the name of the CSV file that was just saved to Azure blob storage (it starts with 'part-')
files = dbutils.fs.ls(output_filename)
output_file = [x for x in files if x.name.startswith("part-")]

# Move the wrangled-data CSV file from a sub-folder (wrangled_data_folder) to the root of the blob container
# While simultaneously changing the file name
dbutils.fs.mv(output_file[0].path, final_filename)

# Remove the parquet blob
dbutils.fs.rm(output_filename, recurse=True)