In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import types 
from pyspark.sql.functions import *


ss = SparkSession.builder.config('spark.driver.extraClassPath',
                                 'postgresql-42.2.18.jar') \
                        .config('spark.driver.memory',
                                 '8g') \
                        .config('spark.executor.memory',
                                 '8g') \
                         .getOrCreate()
sc = ss.sparkContext

## Data Preparation for Clustering

### Goals:

1. **Join mood, covid, and about on date and id** (Christabelle)
    - Rename duplicate columns for each table
    - Join mood and covid by id and date
    - Grab only the most recent entry for every id in the about df
    - Join mood/covid df with the unique about df by id
    - Drop renamed duplicate columns and rename to original columns
    
2. **Drop columns with null values** 

3. **Remove 'prefer not to answer'**: (Daniel)
    - Drop mood rows that contain 'prefer not to answer'
            - Mood PNA: 3
    - Drop about rows that contain 'prefer not to answer'
            - Education PNA: 9
            - Income PNA: 7
            - Employment PNA: 5
            - Veteran PNA: 3
            - Research PNA: 3
    - Drop columns that correspond to PNA: 'HeightPNA', 'WeightPNA', 'RacePNA','EthnPNA'
    
    
4. **Group mood columns together into separate categories** (Victor)
    - Return the sum of the positive mood columns into new column 'positive' and drop the old columns 
    - Return the sum of the negative mood columns into new column 'negative' and drop the old columns 
    - Return the sum of the lonely mood columns into new column 'lonely' and drop the old columns 
    - Return the sum of the energy mood columns into new column 'energy' and drop the old columns 
    
    
5. **Principal Component Analysis** (Christabelle, Daniel, Victor)

In [None]:
# # load data locally
# df_covid = ss.read.csv("FoxInsight/COVID_19_Experience.csv", header=True).cache()
# df_about = ss.read.csv("FoxInsight/About.csv", header=True).cache()
# df_mood = ss.read.csv("FoxInsight/Mood.csv", header=True).cache()

## Join mood, covid, and about on date and id: (Christabelle)
    - Rename duplicate columns for each table
    - Join mood and covid by id and date
    - Grab only the most recent entry for every id in the about df
    - Join mood/covid df with the unique about df by id
    - Drop renamed duplicate columns and rename to original columns

In [None]:
def rename_duplicate(df, name):
    return df.withColumnRenamed('fox_insight_id', f'fox_insight_id_{name}')\
              .withColumnRenamed('days_elapsed', f'days_elapsed_{name}')\
              .withColumnRenamed('days_acquired', f'days_acquired_{name}')\
              .withColumnRenamed('schedule_of_activities', f'schedule_of_activities_{name}')\
              .withColumnRenamed('age', f'age_{name}')

In [None]:
# renaming duplicate columns
df_mood = rename_duplicate(df_mood, 'm')
df_about = rename_duplicate(df_about, 'a')
df_covid = rename_duplicate(df_covid, 'c')

In [None]:
# merging mood and covid by id and date
df_merge_mc = df_mood.join(df_covid, 
                         (df_covid.fox_insight_id_c == df_mood.fox_insight_id_m) \
                          & (df_covid.days_elapsed_c == df_mood.days_elapsed_m), how = 'inner')

In [None]:
df_about.select('fox_insight_id_a').show(15)

In [None]:
# return unique rows in about df
df_about = df_about.withColumn("days_elapsed", df_about["days_elapsed_a"].cast("int"))
df_about_max = df_about.select('*',max('days_elapsed').over(Window.partitionBy('fox_insight_id_a')).alias('max_days'))
df_about_unique = df_about_max.filter(df_about_max.days_elapsed == df_about_max.max_days)

In [None]:
# count should be 50565
df_about_unique.count()

In [None]:
df_about_unique

In [None]:
df_about_unique.select('fox_insight_id_a').show(15)

In [None]:
df_about_unique.show(2)

In [None]:
# merging mood/covid and about by id
df_merge_mca = df_merge_mc.join(df_about_unique, df_merge_mc.fox_insight_id_c == \
                               df_about_unique.fox_insight_id_a, how = 'inner')

In [None]:
# count should be 245
df_merge_mca.count()

In [None]:
# drop duplicate columns and rename
df_merge_mca = df_merge_mca.drop('fox_insight_id_c', 'fox_insight_id_m', 'days_elapsed_m', 'days_elapsed_c')
df_merge_mca = df_merge_mca.drop('age_c', 'age_m', 'days_acquired_c', 'days_acquired_m', 'days_elapsed_a')
df_merge_mca = df_merge_mca.withColumnRenamed('fox_insight_id_a', 'fox_insight_id')
df_merge_mca = df_merge_mca.withColumnRenamed('days_elapsed_a', 'days_elapsed')
df_merge_mca = df_merge_mca.withColumnRenamed('days_acquired_a', 'days_acquired')
df_merge_mca = df_merge_mca.withColumnRenamed('age_a', 'age')

In [None]:
df_merge_mca.select('fox_insight_id').show(15)

## Drop columns with null values

In [None]:
def drop_null_columns(df):
    """
    This function drops all columns which contain null values.
    :param df: A PySpark DataFrame
    """
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    to_drop = [k for k, v in null_counts.items() if v > 0]
    df = df.drop(*to_drop)
    return df

In [None]:
df = drop_null_columns(df_merge_mca)

In [None]:
len(df.columns)

In [None]:
# remove: COVSrcInfo
df = df.drop('COVSrcInfo').cache()

## Remove 'Prefer Not to Answer': Daniel Carrera
1. Drop rows that contain 'prefer not to answer' in the mood columns
            - Mood PNA: 3
2. Drop about rows that contain 'prefer not to answer' in the following columns:
            - Education PNA: 9
            - Income PNA: 7
            - Employment PNA: 5
            - Veteran PNA: 3
            - Research PNA: 3
3. Drop columns that correspond to PNA: 'HeightPNA', 'WeightPNA', 'RacePNA','EthnPNA'

In [None]:
# Drop rows that contain 'prefer not to answer' on the mood columns 
def drop_mood_rows_w3(df):
    cols = list(df_mood.columns)
    mood_cols = [col for col in cols if col.startswith('Mood')]
    
    for col in mood_cols:
        df = df.where(f'{col} != {3}')
        
    return df    

In [None]:
# show values before removing rows with 3
df.select('MoodInterest').distinct().show()

In [None]:
df = drop_mood_rows_w3(df)

In [None]:
# show values after removing rows with 3
df.select('MoodInterest').distinct().show()

In [None]:
# Drop about rows that contain 'prefer not to answer' 

columns = {'Education':9, 'Income':7, 'Employment':5, 'Veteran':3, 'Research':3}
def drop_pna_rows(df, columns):
    for k,v in columns.items():
        df = df.where(f'{k} != {v}')
    return df

In [None]:
for i in columns.keys():
    print(i)

In [None]:
df = drop_pna_rows(df, columns)

In [None]:
for col in columns.keys():
    df.select(col).distinct().orderBy(col).show()

In [None]:
# Drop columns that correspond to PNA: 'HeightPNA', 'WeightPNA', 'RacePNA','EthnPNA'
df = df.drop('HeightPNA', 'WeightPNA', 'RacePNA', 'EthnPNA') 

## Group mood columns together into separate categories (Victor)
1. Return the sum of the positive mood columns into new column 'positive' and drop the old columns 
2. Return the sum of the negative mood columns into new column 'negative' and drop the old columns 
3. Return the sum of the lonely mood columns into new column 'lonely' and drop the old columns 
4. Return the sum of the energy mood columns into new column 'energy' and drop the old columns 

In [None]:
positive= ['MoodSatis',  'MoodSpirits','MoodHappy','MoodAlive'] 

lonely = ['MoodHome', 'MoodInterest', 'MoodEmpty', 'MoodBored', 'MoodBetter']

negative = ['MoodHome', 'MoodInterest', 'MoodBored', 'MoodAfraid', 'MoodHelp',
            'MoodMemory',  'MoodBetter', 'MoodWorth','MoodHopeless', 'MoodEmpty']

energy = ['MoodEnergy']

In [None]:
df.select('MoodHappy').printSchema()

In [None]:
# cast each column from string to int
for col in [col for col in df.columns if col != 'fox_insight_id' ]: 
    df = df.withColumn(col, df[col].cast("int"))

In [None]:
#df.printSchema()

In [None]:
df[positive].show(5)

In [None]:
from operator import add
from functools import reduce

In [None]:
# sum across rows for moods cast to new column
df = df.withColumn('positive', reduce(add, [df[x] for x in positive]))
df = df.withColumn('negative', reduce(add, [df[x] for x in negative]))
df = df.withColumn('energy', reduce(add, [df[x] for x in energy]))
df = df.withColumn('lonely', reduce(add, [df[x] for x in lonely]))

In [None]:
for col in ['positive', 'negative', 'energy', 'lonely']:
    df.select(col).show(10)

In [None]:
for category in [positive, negative, energy]:
    for col in category:
        df = df.drop(col)

In [None]:
len(df.columns)

## Getting Rid of Columns with 1 Distinct Value

In [None]:
# printing distinct count of each column
for col in df.columns:
    print(f'{col} Count: {df.select(col).distinct().count()}')

In [None]:
# Drop all columns with 1 distinct value
df = df.drop('COVClinicalTrial', 'days_acquired', 'RaceNH', 'EthnCuban')

In [None]:
len(df.columns)

## Principal Component Analysis (Christabelle, Daniel, Victor)

In [None]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors  # Pre 2.0 pyspark.mllib.linalg
from pyspark.ml.feature import VectorAssembler
#Kmeans 
from pyspark.ml.clustering import KMeans
# Clustering Evaluator
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
# Merging the data with Vector Assembler.
va = VectorAssembler(outputCol="features", inputCols=[col for col in df.columns if col != 'fox_insight_id'])
df_va = va.transform(df)

In [None]:
pca = PCA(k=15, inputCol="features", outputCol="pca")
model = pca.fit(df_va.select('features'))
df_pca = model.transform(df_va)

In [None]:
kmeans =  KMeans(k = 6, maxIter = 200, tol = 0.1, featuresCol='pca') 
model = kmeans.fit(df_pca)
df_kmeans = model.transform(df_pca)

In [None]:
df_kmeans.select('prediction').distinct().show()

In [None]:
# Center of each cluster
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)
    print(center.shape)

In [None]:
evaluator = ClusteringEvaluator(featuresCol='pca')
silhouette = evaluator.evaluate(df_kmeans)
print("Silhouette with squared euclidean distance = " + str(silhouette))

The silhouette value is a measure of how similar an object is to its own cluster (cohesion) compared to other clusters (separation). The silhouette ranges from −1 to +1, where a high value indicates that the object is well matched to its own cluster and poorly matched to neighboring clusters. If most objects have a high value, then the clustering configuration is appropriate. If many points have a low or negative value, then the clustering configuration may have too many or too few clusters.

In [None]:
silhouette = []
for i in range(2,15):
    kmeans =  KMeans(k = i, maxIter = 200, tol = 0.1) 
    model = kmeans.fit(df_pca)
    predictions = model.transform(df_pca)
    evaluator = ClusteringEvaluator()
    silhouette.append(evaluator.evaluate(predictions))

In [None]:
plt.figure(figsize = (10,8))
plt.plot(range(2,15), silhouette, marker = 'o', linestyle = '--')
plt.xlabel('Number of Clusters')
plt.ylabel('Silhouette')
plt.title('K-means with PCA Clustering')
plt.show()

In [None]:
df_kmeans.write.parquet("s3://msds694-parkinsons")

In [3]:
# load data locally
df_kmeans = ss.read.parquet("df_pca")

In [None]:
sc.stop()
ss.stop()