In [None]:
import pyspark
import dxpy
import dxdata
import pandas as pd
import random
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec
from pyspark.sql.functions import col, udf, to_date, mean, expr, concat_ws
from pyspark.sql.types import StringType, ArrayType, IntegerType, DoubleType
from pyspark.ml.feature import Word2Vec
from pyspark.sql.window import Window
import ast

In [None]:
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

In [None]:
dispensed_database_name = dxpy.find_one_data_object(classname="database", name="app*", folder="/", name_mode="glob", describe=True)["describe"]["name"]
dispensed_dataset_id = dxpy.find_one_data_object(typename="Dataset", name="app*.dataset", folder="/", name_mode="glob")["id"]

In [None]:
spark.sql("USE " + dispensed_database_name)

### Word2Vec Omics Cohort 

In [None]:

combined_query = spark.sql("""
WITH EarliestCConds AS (
    SELECT 
        c.eid,
        MIN(TO_DATE(c.condition_start_date, 'dd/MM/yyyy')) as earliest_cond_date
    FROM 
        omop_condition_occurrence c
    INNER JOIN 
        olink_instance_0_0001 o ON c.eid = o.eid
    WHERE 
        c.condition_source_value LIKE 'C%'
    GROUP BY 
        c.eid
),
FilteredPatients AS (
    SELECT 
        ecc.eid,
        ecc.earliest_cond_date,
        TO_DATE(p.p53_i0, 'yyyy-MM-dd') AS proteomics_date 
    FROM 
        EarliestCConds ecc
    INNER JOIN 
        participant_0001 p ON ecc.eid = p.eid
    WHERE 
        ecc.earliest_cond_date < TO_DATE(p.p53_i0, 'yyyy-MM-dd')
        AND ecc.earliest_cond_date >= ADD_MONTHS(TO_DATE(p.p53_i0, 'yyyy-MM-dd'), -12)
)

SELECT 
    fp.eid, 
    c.concept_id, 
    c.record_date,
    DATE_FORMAT(c.record_date, 'yyyy-MM-dd') as formatted_date
FROM 
    FilteredPatients fp
JOIN (
    SELECT 
        o.eid, 
        o.condition_concept_id as concept_id, 
        TO_DATE(o.condition_start_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_condition_occurrence o
    UNION ALL
    SELECT 
        o.eid, 
        o.procedure_concept_id as concept_id, 
        TO_DATE(o.procedure_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_procedure_occurrence o
    UNION ALL
    SELECT 
        o.eid, 
        o.drug_concept_id as concept_id, 
        TO_DATE(o.drug_exposure_start_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_drug_exposure o
    UNION ALL
    SELECT 
        o.eid, 
        o.observation_concept_id as concept_id, 
        TO_DATE(o.observation_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_observation o
    UNION ALL
    SELECT 
        o.eid, 
        o.measurement_concept_id as concept_id, 
        TO_DATE(o.measurement_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_measurement o
) c ON fp.eid = c.eid


""")

In [None]:
%%time
distinct_eids = combined_query.select("eid").distinct()
num_distinct_eids = distinct_eids.count()

print(f"Number of distinct eids: {num_distinct_eids}")

In [None]:
%%time

combined_query.show()

In [None]:
%%time
# Count the number of rows in the result
row_count = combined_query.count()

# Print the row count
print(f"Number of rows in the query result: {row_count}")

In [None]:
%%time

# Initialize Spark Session
spark = SparkSession.builder.appName("Word2Vec Training").getOrCreate()

combined_query = combined_query.withColumn("concept_id", combined_query["concept_id"].cast(IntegerType()))

# Group by 'eid' (person_id) and 'month_year'
grouped_data = (combined_query.groupBy("eid", "formatted_date")
                .agg(F.collect_list("concept_id").alias("concept_ids")))

# Define a UDF to convert integers to strings
int_to_string_udf = udf(lambda x: [str(i) for i in x], ArrayType(StringType()))

# Apply the UDF to the 'concept_ids' column
word2Vec_data = grouped_data.withColumn("words", int_to_string_udf(col("concept_ids")))

# Define the Word2Vec model
print('started training')
word2vec = Word2Vec(vectorSize=400, windowSize=100, minCount=5, inputCol="words", outputCol="wordVectors").setMaxIter(3)

# Fit the model
model = word2vec.fit(word2Vec_data)
print('done training')


In [None]:
word_vectors = model.getVectors()

pandas_df = word_vectors.toPandas()

pandas_df.to_csv("./omics_lc_word2vec.csv", index=False)


In [None]:
%%bash
dx upload omics_lc_word2vec.csv --path /

### Word2Vec PT Cohort

In [None]:
combined_query = spark.sql("""
WITH EarliestCConds AS (
    SELECT 
        c.eid,
        MIN(TO_DATE(c.condition_start_date, 'dd/MM/yyyy')) as earliest_cond_date
    FROM 
        omop_condition_occurrence c
    WHERE 
        c.condition_source_value LIKE 'C%'
    GROUP BY 
        c.eid
),
FilteredPatients AS (
    SELECT 
        ecc.eid,
        ecc.earliest_cond_date,
        TO_DATE(p.p53_i0, 'yyyy-MM-dd') AS proteomics_date 
    FROM 
        EarliestCConds ecc
    INNER JOIN 
        participant_0001 p ON ecc.eid = p.eid
    WHERE 
        ecc.earliest_cond_date < TO_DATE(p.p53_i0, 'yyyy-MM-dd')
        AND ecc.earliest_cond_date >= ADD_MONTHS(TO_DATE(p.p53_i0, 'yyyy-MM-dd'), -12)
)

SELECT 
    fp.eid, 
    c.concept_id, 
    c.record_date,
    DATE_FORMAT(c.record_date, 'yyyy-MM-dd') as formatted_date
FROM 
    FilteredPatients fp
JOIN (
    SELECT 
        o.eid, 
        o.condition_concept_id as concept_id, 
        TO_DATE(o.condition_start_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_condition_occurrence o
    UNION ALL
    SELECT 
        o.eid, 
        o.procedure_concept_id as concept_id, 
        TO_DATE(o.procedure_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_procedure_occurrence o
    UNION ALL
    SELECT 
        o.eid, 
        o.drug_concept_id as concept_id, 
        TO_DATE(o.drug_exposure_start_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_drug_exposure o
    UNION ALL
    SELECT 
        o.eid, 
        o.observation_concept_id as concept_id, 
        TO_DATE(o.observation_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_observation o
    UNION ALL
    SELECT 
        o.eid, 
        o.measurement_concept_id as concept_id, 
        TO_DATE(o.measurement_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_measurement o
) c ON fp.eid = c.eid

""")

In [None]:
%%time
# Count the number of rows in the result
row_count = combined_query.count()

# Print the row count
print(f"Number of rows in the query result: {row_count}")

In [None]:
%%time
distinct_eids = combined_query.select("eid").distinct()
num_distinct_eids = distinct_eids.count()

print(f"Number of distinct eids: {num_distinct_eids}")

In [None]:
%%time

# Initialize Spark Session
spark = SparkSession.builder.appName("Word2Vec Training").getOrCreate()

combined_query = combined_query.withColumn("concept_id", combined_query["concept_id"].cast(IntegerType()))

# Group by 'eid' (person_id) and 'month_year'
grouped_data = (combined_query.groupBy("eid", "formatted_date")
                .agg(F.collect_list("concept_id").alias("concept_ids")))

# Define a UDF to convert integers to strings
int_to_string_udf = udf(lambda x: [str(i) for i in x], ArrayType(StringType()))

# Apply the UDF to the 'concept_ids' column
word2Vec_data = grouped_data.withColumn("words", int_to_string_udf(col("concept_ids")))

# Define the Word2Vec model
print('started training')
word2vec = Word2Vec(vectorSize=400, windowSize=100, minCount=5, inputCol="words", outputCol="wordVectors").setMaxIter(3)

# Fit the model
model = word2vec.fit(word2Vec_data)
print('done training')


In [None]:
word_vectors = model.getVectors()

pandas_df = word_vectors.toPandas()

pandas_df.to_csv("./PT_lc_word2vec.csv", index=False)


In [None]:
%%bash
dx upload PT_lc_word2vec.csv --path /

### Downstream Processing for omics cohort only

In [None]:
word_vectors = pd.read_csv('./PT_lc_word2vec.csv')
word_vectors['vector'] = word_vectors['vector'].apply(ast.literal_eval)



In [None]:
combined_query = spark.sql("""
WITH EarliestCConds AS (
    SELECT 
        c.eid,
        MIN(TO_DATE(c.condition_start_date, 'dd/MM/yyyy')) as earliest_cond_date
    FROM 
        omop_condition_occurrence c
    INNER JOIN 
        olink_instance_0_0001 o ON c.eid = o.eid
    WHERE 
        c.condition_source_value LIKE 'C%'
    GROUP BY 
        c.eid
),
FilteredPatients AS (
    SELECT 
        ecc.eid,
        ecc.earliest_cond_date,
        TO_DATE(p.p53_i0, 'yyyy-MM-dd') AS proteomics_date 
    FROM 
        EarliestCConds ecc
    INNER JOIN 
        participant_0001 p ON ecc.eid = p.eid
    WHERE 
        ecc.earliest_cond_date < TO_DATE(p.p53_i0, 'yyyy-MM-dd')
        AND ecc.earliest_cond_date >= ADD_MONTHS(TO_DATE(p.p53_i0, 'yyyy-MM-dd'), -12)
)

SELECT 
    fp.eid, 
    c.concept_id, 
    c.record_date,
    DATE_FORMAT(c.record_date, 'yyyy-MM-dd') as formatted_date
FROM 
    FilteredPatients fp
JOIN (
    SELECT 
        o.eid, 
        o.condition_concept_id as concept_id, 
        TO_DATE(o.condition_start_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_condition_occurrence o
    UNION ALL
    SELECT 
        o.eid, 
        o.procedure_concept_id as concept_id, 
        TO_DATE(o.procedure_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_procedure_occurrence o
    UNION ALL
    SELECT 
        o.eid, 
        o.drug_concept_id as concept_id, 
        TO_DATE(o.drug_exposure_start_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_drug_exposure o
    UNION ALL
    SELECT 
        o.eid, 
        o.observation_concept_id as concept_id, 
        TO_DATE(o.observation_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_observation o
    UNION ALL
    SELECT 
        o.eid, 
        o.measurement_concept_id as concept_id, 
        TO_DATE(o.measurement_date, 'dd/MM/yyyy') as record_date
    FROM 
        omop_measurement o
) c ON fp.eid = c.eid AND c.record_date <= fp.proteomics_date

""")

In [None]:
%%time
combined_query_results = combined_query.collect()

In [None]:
%%time
pdf = pd.DataFrame(combined_query_results, columns=[field.name for field in combined_query.schema.fields])

In [None]:
word_vectors['word'] = word_vectors['word'].astype(str)

In [None]:
pdf = pdf.merge(word_vectors, how='inner', left_on='concept_id', right_on='word').drop(['word','concept_id'],axis=1)

In [None]:
pdf.shape, pdf['eid'].nunique()

In [None]:
%%time
embeddings_df = pd.DataFrame(pdf['vector'].tolist(), index=pdf.index)
embeddings_df.columns = [f'embedding_{i}' for i in range(embeddings_df.shape[1])]

# Join the new DataFrame with the original DataFrame
embedded_codes = pdf.join(embeddings_df)


In [None]:
embedded_codes.shape

In [None]:
# Convert 'record_date' to datetime format in Pandas
embedded_codes['record_date'] = pd.to_datetime(embedded_codes['record_date'], format='%Y-%m-%d')

In [None]:
%%time
max_dates=32

# 1. Sort the DataFrame
embedded_codes = embedded_codes.sort_values(by=['eid', 'record_date'], ascending=[True, False])

# 2. Rank within each 'eid' group
embedded_codes['date_rank'] = embedded_codes.groupby('eid')['record_date'].rank(method='dense', ascending=False)

# 3. Filter based on rank
filtered_data_pd = embedded_codes[embedded_codes['date_rank'] <= max_dates]

# 4. Define your aggregation expressions
agg_funcs = {f'embedding_{i}': 'mean' for i in range(400)}

# Apply aggregation with the defined expressions
patient_day_embeddings_pd = filtered_data_pd.groupby(['eid', 'record_date']).agg(agg_funcs)


In [None]:
%%time
patient_day_embeddings_pd.reset_index().to_csv('./patient_day_embeddings_omics_PTword2vec_lc.csv', header=True)

In [None]:
%%bash
dx upload patient_day_embeddings_omics_PTword2vec_lc.csv --path /