In [4]:
import pymysql
from pymongo import MongoClient
from bson.objectid import ObjectId
from collections import defaultdict
from bson import ObjectId
import decimal
from datetime import datetime, date, time
from urllib.parse import quote_plus

In [6]:
### Setting up mysql connection
mysql_connection = pymysql.connect(
    host = "mdsmysql.sci.pitt.edu",
    user = "adl171",
    password = "Mds_4813819@",
    database='emr'
)

In [7]:
### Creating mysql cursor
mysql_cursor = mysql_connection.cursor(pymysql.cursors.DictCursor)

In [8]:
### Connecting to MongoDB
password = quote_plus("Mds_4813819@")
mongo_client = MongoClient(f"mongodb://adl171:{password}@mdsmongodb.sci.pitt.edu:27017/")
mongodb = mongo_client["adl171"]

In [5]:
### Sanitizing function
def sanitize_for_mongo(doc):
    if isinstance(doc, dict):
        return {k: sanitize_for_mongo(v) for k, v in doc.items()}
    elif isinstance(doc, list):
        return [sanitize_for_mongo(item) for item in doc]
    elif isinstance(doc, decimal.Decimal):
        return float(doc)
    elif isinstance(doc, date) and not isinstance(doc, datetime):
        return datetime.combine(doc, time())
    else:
        return doc

In [6]:
### Creating a function where we grab all the data from a table and clean it
### then we add it to mongodb and grab its id and mongodb id
### we then store the key value pairs in the id_map and return it at the end
def get_and_insert(table, key):
    mysql_cursor.execute(f'SELECT * FROM {table}')
    all_data = mysql_cursor.fetchall()
    id_map = {}
    for row in all_data:
        clean = sanitize_for_mongo(row)
        result = mongodb[table].insert_one(clean)
        id_map[row[key]] = result.inserted_id
    return id_map

patient_ids = get_and_insert("patient", "patient_id")
provider_ids = get_and_insert("provider", "provider_id")

In [7]:
### writing big query that grabs all the data from the visit, symptom, diagnosis, lab, and clinical procedure tables
giant_visit_query = """
SELECT 
	v.visit_id, v.patient_id, v.provider_id, v.visit_date,
    s.symptom_id, s.note AS symptom_note, 
    d.diagnosis_id, d.name AS diagnosis_name, d.icd10_code AS D_icd10_code,
    l.lab_id, l.cpt_code, l.lab_name,
    p.procedure_id, p.icd10_code AS P_icd10_code, p.proc_name, p.description
FROM visit v
LEFT JOIN visit_symptom vs on v.visit_id = vs.visit_id
LEFT JOIN symptom s on vs.symptom_id = s.symptom_id
LEFT JOIN visit_diagnosis vd on v.visit_id = vd.visit_id
LEFT JOIN diagnosis d on vd.diagnosis_id = d.diagnosis_id
LEFT JOIN visit_lab vl on v.visit_id = vl.visit_id
LEFT JOIN lab l on vl.lab_id = l.lab_id
LEFT JOIN visit_procedure vp on v.visit_id = vp.visit_id
LEFT JOIN clinical_procedures p on vp.procedure_id = p.procedure_id;
"""

In [8]:
### Running big query and grabbing all the results
mysql_cursor.execute(giant_visit_query)
big_visit_data = mysql_cursor.fetchall()

In [19]:
big_visit_data[:5]

[{'visit_id': 0,
  'patient_id': 1,
  'provider_id': 21,
  'visit_date': datetime.date(2024, 3, 26),
  'symptom_id': None,
  'symptom_note': None,
  'diagnosis_id': None,
  'diagnosis_name': None,
  'D_icd10_code': None,
  'lab_id': None,
  'cpt_code': None,
  'lab_name': None,
  'procedure_id': None,
  'P_icd10_code': None,
  'proc_name': None,
  'description': None},
 {'visit_id': 1,
  'patient_id': 1,
  'provider_id': 6,
  'visit_date': datetime.date(2024, 3, 27),
  'symptom_id': 462,
  'symptom_note': 'Sudden weakness on one side of the body',
  'diagnosis_id': 91,
  'diagnosis_name': 'Psoriasis vulgaris',
  'D_icd10_code': 'L40.0',
  'lab_id': 292,
  'cpt_code': '84550',
  'lab_name': 'Urea nitrogen; quantitative',
  'procedure_id': 49,
  'P_icd10_code': '0JPT0PZ',
  'proc_name': 'Removal of Drainage Device from Trunk Subcutaneous Tissue',
  'description': 'Surgical removal of previously inserted drainage device.'},
 {'visit_id': 1,
  'patient_id': 1,
  'provider_id': 6,
  'visit_

In [21]:
### Creating new dictionary that groups all visit_ids into a list of dictionaries
visit_id_groups = defaultdict(list)

for row in big_visit_data:
    visit_id_groups[row['visit_id']].append(row)

In [23]:
visit_id_groups[1]

[{'visit_id': 1,
  'patient_id': 1,
  'provider_id': 6,
  'visit_date': datetime.date(2024, 3, 27),
  'symptom_id': 462,
  'symptom_note': 'Sudden weakness on one side of the body',
  'diagnosis_id': 91,
  'diagnosis_name': 'Psoriasis vulgaris',
  'D_icd10_code': 'L40.0',
  'lab_id': 292,
  'cpt_code': '84550',
  'lab_name': 'Urea nitrogen; quantitative',
  'procedure_id': 49,
  'P_icd10_code': '0JPT0PZ',
  'proc_name': 'Removal of Drainage Device from Trunk Subcutaneous Tissue',
  'description': 'Surgical removal of previously inserted drainage device.'},
 {'visit_id': 1,
  'patient_id': 1,
  'provider_id': 6,
  'visit_date': datetime.date(2024, 3, 27),
  'symptom_id': 462,
  'symptom_note': 'Sudden weakness on one side of the body',
  'diagnosis_id': 91,
  'diagnosis_name': 'Psoriasis vulgaris',
  'D_icd10_code': 'L40.0',
  'lab_id': 292,
  'cpt_code': '84550',
  'lab_name': 'Urea nitrogen; quantitative',
  'procedure_id': 59,
  'P_icd10_code': '0WQF0ZZ',
  'proc_name': 'Repair Right

In [25]:
### Remove duplicate rows
def dedup_dicts(items):
    seen = set()
    result = []
    for item in items:
        key = tuple(sorted(item.items()))
        if key not in seen:
            seen.add(key)
            result.append(item)
    return result

In [33]:
### Creating final list of dictionaries
### each dictionary will be one visit document
visit_final = [] # this will hold the final list of visit documents
for rows in visit_id_groups.values():
    if not rows:
        continue
    # each visit document holds: visit_id, patient_id, provider_id, visit_date, symptoms, diagnosis, labs, and clinical_procedures
    visit_doc = {
        
        # grabs first 4 values from the first dictionary in the list
        'visit_id': rows[0]['visit_id'],
        'patient_id': ObjectId(patient_ids[rows[0]['patient_id']]),  # Reference patient collection
        'provider_id': ObjectId(provider_ids[rows[0]['provider_id']]),  # Reference provider collection
        'visit_date': rows[0]['visit_date'],

        ### for each visit listed, were building a list of subdocument for symptoms, diagnosis, lab, clinical_procedures
        ### list comprehensions collect the data and duplicate data is removed 
        'symptom': dedup_dicts([
            {'symptom': row['symptom_id'], 
             'note': row['symptom_note']} for row in rows if row['symptom_id'] is not None
        ]),

        'diagnosis': dedup_dicts([
            {'diagnosis_id': row['diagnosis_id'], 
             'name': row['diagnosis_name'], 
             'icd10_code': row['D_icd10_code']} for row in rows if row['diagnosis_id'] is not None
        ]),

        'lab': dedup_dicts([
            {'lab_id': row['lab_id'], 
             'cpt_code': row['cpt_code'], 
             'lab_name': row['lab_name']} for row in rows if row['lab_id'] is not None
        ]),

        'clinical_procedures': dedup_dicts([
            {'procedure_id': row['procedure_id'], 
             'idc10_code': row['P_icd10_code'], 
             'proc_name': row['proc_name'], 
             'description': row['description']} for row in rows if row['procedure_id'] is not None
        ])
    }

    visit_final.append(visit_doc)

In [35]:
visit_final[1]

{'visit_id': 1,
 'patient_id': ObjectId('686076410859b69cf68645f0'),
 'provider_id': ObjectId('686076460859b69cf6864627'),
 'visit_date': datetime.date(2024, 3, 27),
 'symptom': [{'symptom': 462,
   'note': 'Sudden weakness on one side of the body'}],
 'diagnosis': [{'diagnosis_id': 91,
   'name': 'Psoriasis vulgaris',
   'icd10_code': 'L40.0'},
  {'diagnosis_id': 104, 'name': 'Psoriasis vulgaris', 'icd10_code': 'L40.0'},
  {'diagnosis_id': 142,
   'name': 'Migraine, unspecified, not intractable, without status migrainosus',
   'icd10_code': 'G43.909'},
  {'diagnosis_id': 223,
   'name': 'Acute upper respiratory infection, unspecified',
   'icd10_code': 'J06.9'},
  {'diagnosis_id': 339, 'name': 'Pain in right knee', 'icd10_code': 'M25.561'},
  {'diagnosis_id': 386,
   'name': 'Acute upper respiratory infection, unspecified',
   'icd10_code': 'J06.9'},
  {'diagnosis_id': 399, 'name': 'Presbyopia', 'icd10_code': 'H52.4'},
  {'diagnosis_id': 418,
   'name': 'Cellulitis of left lower limb'

In [37]:
### sanitize visit_final and insert visit data into mongodb
cleaned_visits = [sanitize_for_mongo(visit) for visit in visit_final]
mongodb['visit'].insert_many(cleaned_visits)

InsertManyResult([ObjectId('686078200859b69cf6864654'), ObjectId('686078200859b69cf6864655'), ObjectId('686078200859b69cf6864656'), ObjectId('686078200859b69cf6864657'), ObjectId('686078200859b69cf6864658'), ObjectId('686078200859b69cf6864659'), ObjectId('686078200859b69cf686465a'), ObjectId('686078200859b69cf686465b'), ObjectId('686078200859b69cf686465c'), ObjectId('686078200859b69cf686465d'), ObjectId('686078200859b69cf686465e'), ObjectId('686078200859b69cf686465f'), ObjectId('686078200859b69cf6864660'), ObjectId('686078200859b69cf6864661'), ObjectId('686078200859b69cf6864662'), ObjectId('686078200859b69cf6864663'), ObjectId('686078200859b69cf6864664'), ObjectId('686078200859b69cf6864665'), ObjectId('686078200859b69cf6864666'), ObjectId('686078200859b69cf6864667'), ObjectId('686078200859b69cf6864668'), ObjectId('686078200859b69cf6864669'), ObjectId('686078200859b69cf686466a'), ObjectId('686078200859b69cf686466b'), ObjectId('686078200859b69cf686466c'), ObjectId('686078200859b69cf68646

In [41]:
### create json file to output first three patients and their medical history
import json
with open("sample_output.json", "w") as f:
    json.dump(visit_final[:3], f, default=str, indent=4)

In [12]:
### closing connections:
mysql_connection.close()
mysql_cursor.close()
mongo_client.close()