### Difference Features

In [None]:
# !sudo apt update
# !sudo apt install openjdk-17-jre-headless -y
import pyspark
# from pyspark import pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql.functions import col, substring
from pyspark.sql.types import StructType, StructField, \
StringType, IntegerType, TimestampType, DateType, FloatType

import os

In [None]:
conf = pyspark.SparkConf().setAll([\
            ('spark.app.name', 'Glucose_Analysis_Spark')])\
            .set('spark.sql.shuffle.partitions', '1500')
spark = SparkSession.builder.config(conf=conf)\
    .getOrCreate()  

In [None]:
df = spark.read.options(header='True', inferSchema='True', delimiter=',')\
        .csv('/cephfs/data/cohort.csv')\
        .withColumnRenamed('_c0', 'NumId')

In [None]:
df.show(2)

In [None]:
df = df.withColumn('AgeGroup', substring(df.Age.cast(StringType()), 0,1) * 10)
df = df.withColumn('AgeGroup', df.AgeGroup.cast(IntegerType()))

In [None]:
df = df.withColumnRenamed('Gender', 'Sex')

In [None]:
df.show(3)

In [None]:
encodedCols = ['Sex', 'Treatment', 'AgeGroup'] # not doing'DiabetesType' because all type-two
encodedLabels = []

for name in encodedCols:
    indexer = StringIndexer(inputCol=name, outputCol= name + '_Num')
    indexer_fitted = indexer.fit(df)
    encodedLabels.append([name, indexer_fitted.labels])                    
    df = indexer_fitted.transform(df)

In [None]:
encodedLabels

One Hot Encoding Way

In [None]:
single_col_ohe = OneHotEncoder(inputCol="Sex_Num", outputCol="Sex_Encoded", dropLast=True)
df = single_col_ohe.fit(df).transform(df)

single_col_ohe = OneHotEncoder(inputCol="Treatment_Num", outputCol="Treatment_Encoded", dropLast=True)
df = single_col_ohe.fit(df).transform(df)

single_col_ohe = OneHotEncoder(inputCol="AgeGroup_Num", outputCol="AgeGroup_Encoded", dropLast=True)
df = single_col_ohe.fit(df).transform(df)

In [None]:
df = df.drop('UserId', 'Sex', 'DOB', 'Age', 'DiabetesType', 'Treatment', 'AgeGroup', 'Treatment_Num', \
            'Sex_Num', 'AgeGroup_Num')

In [None]:
df.show(5)

In [None]:
df.write.mode('overwrite').parquet('/cephfs/data/cohort_encoded.parquet')

Bool Columns

In [None]:
for label in encodedLabels:
    main = label[0]
    categories = label[1]
    for cat in categories:
        cat = cat.replace('-', '_')
        if main == 'Sex':
            df = df.withColumn(main + '_' + cat, (df.Sex == cat).cast('integer'))
        elif main == 'Treatment':
            df = df.withColumn(main + '_' + cat, (df.Treatment == cat).cast('integer'))
        elif main == 'AgeGroup':
            df = df.withColumn(main + '_' + cat, (df.AgeGroup == cat).cast('integer'))
            

In [None]:
df.columns

In [None]:
df = df.drop('UserId', 'Sex', 'DOB', 'Age', 'DiabetesType', 'Treatment', 'AgeGroup', 'Treatment_Num', \
            'Sex_Num', 'AgeGroup_Num')

In [None]:
df.dtypes

In [None]:
df.write.mode('overwrite').parquet('/cephfs/data/cohort_bool_encoded.parquet')

---------------------------

In [None]:
types = ['train', 'test', 'val']

for dataType in types:
    
    files_directory=os.listdir('/cephfs/summary_stats/' + dataType + '_cat')
    files=['/cephfs/summary_stats/' + dataType + '_cat/' + i for i in files_directory if not ('.crc' in i or 'SUCCESS' in i)]

    # Read in Summary Statistics
    summary_stats= spark.read.format('parquet').load(files)
    
    for encodeType in ['cohort_encoded', 'cohort_bool_encoded']:
        one_hot_encoding = spark.read.format('parquet').load('/cephfs/data/' + encodeType + '.parquet')
        merged = summary_stats.join(one_hot_encoding, on='NumId', how='left')
        
        merged.write.parquet('/cephfs/summary_stats/encoded/one_hot_' + dataType \
                             + '/summary_stats_' + encodeType + '.parquet')        
        

In [None]:
# one_hot_encoding = spark.read.format('parquet').load('/cephfs/data/cohort_encoded.parquet')


In [None]:
import time

start = time.time()
print(time.time()-start)

In [None]:
merged.filter(col('NumId') == 19).select(col('Sex_Female'), col('Sex_Male'),\
                                         col('Treatment_yes_both'), col('AgeGroup_50'),\
                                         col('AgeGroup_60'), col('AgeGroup_70'),\
                                         col('AgeGroup_40'), col('AgeGroup_30'), col('AgeGroup_80'),\
                                         col('AgeGroup_90'), col('AgeGroup_10')).show(5)

In [None]:
import pathlib

allPaths = [str(x) for x in list(pathlib.Path("/cephfs/train_test_val/train_set").glob('*.parquet')) if 'part-00' in str(x)]


In [None]:
allPaths[42]

In [None]:
test = spark.read.format('parquet').load(allPaths[42])

In [None]:
test.select(col('NumId')).distinct().count()