<a href="https://colab.research.google.com/github/jhajagos/PHR2OHDSI/blob/main/Working_out_DQ_with_Mapped_Data_OHDSI_Statistics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install pyspark==3.5.5

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pyspark
spark = pyspark.sql.SparkSession.builder\
    .config("spark.driver.memory", "16g") \
    .getOrCreate()

In [None]:
CDA_FILE_PATH = "/content/drive/MyDrive/phr_ohdsi/source/jgh_documents/"
metadata_json = CDA_FILE_PATH + "/output/ohdsi/ps_configuration.json.generated.parquet.json"

In [None]:
import json
with open(metadata_json) as f:
  metadata = json.load(f)

In [None]:
def load_tables(sptr, metadata_dict, domains_to_load=["ohdsi", "concept", "prepared_source"]):
  sdf_dict = {}
  for domain in domains_to_load:
    print(f"Loading domain: {domain}")
    for table in metadata_dict[domain]:
      print(f"Loading table: {table}")
      sdf_dict[table] = sptr.read.parquet(metadata_dict[domain][table])
      sdf_dict[table].createOrReplaceTempView(table)
  return sdf_dict

In [None]:
%%time
from types import MemberDescriptorType
sdf_dict = load_tables(spark, metadata)

## Measurement table

In [None]:
meas_counts_df = spark.sql("""
select count(distinct person_id) as n, count(*) as n_r, measurement_concept_id, c.concept_name as measurement_concept_name,
  min(value_as_number) as min_value_as_number, max(value_as_number) as max_value_as_number, c2.concept_code as unit_concept_code,
  min(measurement_date) as min_measurement_date, max(measurement_date) as max_measurement_date, percentile(value_as_number, 0.25) as p25,
  percentile(value_as_number, 0.5) as p50, percentile(value_as_number, 0.75) as p75
  from measurement m join concept c on c.concept_id = m.measurement_concept_id
left outer join concept c2 on c2.concept_id = m.unit_concept_id group by measurement_concept_id, c.concept_name, c2.concept_code order by count(1) desc

""").toPandas()
meas_counts_df

In [None]:
meas_counts_df[meas_counts_df.n_r >= 5]

In [None]:
spark.sql("""
select count(*), measurement_concept_id, measurement_concept_name, unit_source_value from (
select m.measurement_concept_id,  c1.concept_name as measurement_concept_name, unit_source_value from measurement m
join concept c1 on c1.concept_id = m.measurement_concept_id where unit_concept_id = 0
) t group by measurement_concept_id, measurement_concept_name, unit_source_value order by count(*) desc
""").toPandas()

In [None]:
spark.sql("""
select m.measurement_concept_id, c1.concept_name as measurement_concept_name, value_source_value, unit_source_value from measurement m
join concept c1 on c1.concept_id = m.measurement_concept_id where unit_concept_id = 0 and value_as_number is null limit 1000
""").toPandas()

### Observation

In [None]:
spark.sql("""
select count(distinct person_id) as n, count(*) as n_r, observation_concept_id, concept_name,
min(observation_date) as min_observation_date, max(observation_date) as max_observation_date
from observation o
join concept c on o.observation_concept_id = c.concept_id group by concept_name, observation_concept_id order by count(*) desc, min(observation_date) desc
""").toPandas()

### Condition Occcurrence

In [None]:
cond_df = spark.sql("""
select count(distinct person_id) as n, count(*) as n_r, condition_concept_id, c.concept_name as condition_concept_name,
min(condition_start_date) as min_condition_date, max(coalesce(condition_end_date, condition_start_date)) as max_condition_date
 from condition_occurrence co
  join concept c on c.concept_id = co.condition_concept_id
  group by condition_concept_id, c.concept_name order by count(*) desc, min(condition_start_date) desc""").toPandas()
cond_df

### Drug Exposure

In [None]:
spark.sql("""
select count(distinct person_id) as n, count(*) as n_r, drug_concept_id, c.concept_name  as drug_concept_name,
min(drug_exposure_start_date) as min_drug_date, max(coalesce(drug_exposure_end_date, drug_exposure_start_date)) as max_drug_date
from drug_exposure de join concept c on c.concept_id = de.drug_concept_id group by drug_concept_id, c.concept_name order by count(*) desc, min(drug_exposure_start_date) desc
""").toPandas()

### Procedure Occurrence

In [None]:
spark.sql("""
select count(distinct person_id) as n, count(*) as n_r, procedure_concept_id, c.concept_name as procedure_concept_name ,
min(procedure_date) as min_procedure_date, max(procedure_date) as max_procedure_date
from procedure_occurrence po
join concept c on c.concept_id = po.procedure_concept_id
group by procedure_concept_id, c.concept_name order by count(*) desc, min(procedure_date) desc
""").toPandas()

## Source Note

In [None]:
spark.sql("select s_note_class, count(*) as n_r from source_note group by s_note_class order by count(*) desc").toPandas()