<a href="https://colab.research.google.com/github/sntrenter/FA2020GroupProject/blob/analytics/Backend/analytics_notebooks/analytics_cleanup_raw.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Under-the-hood stuff

In [38]:
! pip install dnspython



In [39]:
# set to false if running in a standalone script
is_notebook = True

In [40]:
# set to edit DB
do_db_edit = True

# Import useful libraries

In [41]:
# General Imports
from datetime import datetime
from pprint import pprint

In [42]:
# Database imports
import pymongo
import json
import bson
from bson.objectid import ObjectId

In [43]:
# Analytics Imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Connect to DB

In [105]:
pym_client = pymongo.MongoClient("mongodb+srv://FrontEnd:Pass-word1@cluster0.7fauj.mongodb.net/CS5500Healthcare?retryWrites=true&w=majority")
db_healthcare = pym_client["CS5500Healthcare"]

db_coll_rawactivity = db_healthcare["RawActivity"]
db_coll_cleandata = db_healthcare["CleanedData"]
db_coll_patient = db_healthcare["Patient_dev"]

# Convenience boilerplate

In [106]:
# A record version structure for internal use.  Doesn't neet to be in schema.
# The purpose of this is so we can select and destroy records from old versions to automatically cleanup the database as we develop.
analytics_format_version_dict = {
  "analytics_format_ver": {
        "format_type": "prototype",
        "version_major": 1,
        "version_minor": 1 # 1: fixing handling of None/nan/null -> null
    },
}

In [107]:
record_meta_data = ["patient_record_id", "device_id", "date_time"]

In [108]:
def set_relevant_keys(cur_d, update_d):
	_d = cur_d.copy()
	_d.update( (k, update_d[k]) for k in (_d.keys() & update_d.keys()) )
	return _d

In [109]:
def get_processed_interactions(db_handle, action="get", outgoing_list=None):
  _interactions_record = {
      "record_type": "meta",
      "data": {
          "data_type": "interactions_list",
          "quality": "processed",
          "list": []
      }
  }

  _interactions_record.update(analytics_format_version_dict)

  processed_lists_filter = {"record_type": "meta", "data.data_type": "interactions_list", "data.quality": "processed"}
  processed_lists_cursor = db_handle.find(processed_lists_filter)
  processed_interactions = []
  for entry in processed_lists_cursor:
    processed_interactions.extend(entry["data"]["list"])

  if action=="delete":
    db_handle.delete_many(processed_lists_filter)
    return processed_interactions

  if action == "get":
    return processed_interactions

  if action == "put":
    _interactions_record["data"]["list"] = outgoing_list
    db_handle.insert_one(_interactions_record)
    return processed_interactions.extend(outgoing_list)

  return processed_interactions

In [110]:
def get_empty_analytics_atom(rec_type):
  _simple_atom = {
      "patient_record_id": None,  # unique patient ID
      "device_id": None,   # device id raw data originated from
      "record_type": rec_type,  # data category: e.g. health or symptoms
	    "date_time": None,   
	    "data": {
          "data_type": None,   # the data sub category, e.g. sleep or cough
          "quantity": None,    # if the data is numeric, put value here
          "quality": None,     # if the data is categorical, put value here
          }
      }
  _simple_atom.update(analytics_format_version_dict)

  return _simple_atom
    

In [111]:
def pre_populate_atom(row, rec_type):
  # empty atom
  _atm = get_empty_analytics_atom(rec_type)
  _atm = _atm.copy()

  # set keys in a way that forces the dictionary structure doesnt change
  _meta_d = row[record_meta_data].to_dict()
  _atm = set_relevant_keys(_atm, _meta_d)

  # transform datetime to string
  _atm["date_time"] = row["date_time"].isoformat()

  # set the interaction id. this is the MongoDB internal _id of the rawactivity record
  _atm["interaction_id"] = row.name

  return _atm

In [112]:
convert_null = lambda x: None if pd.isnull(x) else x

In [113]:
def try_type_conversion(val, d_type):
  try:
    _v = d_type(val)
  except:
    _v = val

  return _v

In [114]:
def set_data_values(record, d_type, d_quality=None, d_quantity=None):
  _data_d = {
      "data": {
          "data_type": convert_null(d_type),
          "quality": convert_null(d_quality),
          "quantity": convert_null(d_quantity)
          }
        }

  _r  = record.copy()
  _r = set_relevant_keys(_r, _data_d)

  return _r

In [115]:
def flatten_nested_list(nl):
  return [_e for _l in nl for _e in _l]

In [116]:
def seed_db_from_entry_list(entry_list, db_handle):
    entry_list = [pymongo.InsertOne(item) for item in entry_list]
    write_ret = db_handle.bulk_write(entry_list)
    return write_ret

# Get patient records so we can lookup raw data and map to patient ids

In [117]:
def get_patient_table(patient_db_collection):
  _all_patients = patient_db_collection.find()
  _all_patients = list(_all_patients) # this exhausts the dbr_all_patients cursor
  _df_pats = pd.DataFrame(_all_patients)
  _df_pats["_id"] = _df_pats["_id"].astype(str)
  _df_pats = _df_pats.rename(columns={"_id": "patient_record_id"})
  _df_pats = _df_pats.set_index("patient_record_id")
  return _df_pats


In [118]:
df_patients = get_patient_table(db_coll_patient)

In [119]:
if is_notebook:
  display(df_patients)

Unnamed: 0_level_0,name,device_id,dob,gender,patient_id
patient_record_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
5fb3173a25090c2c10c1242b,John Doe,amzn1.ask.device.AFGWKVT5646AZQBMZ3SPE5WGVO25H...,1950:01:01T12:00:00+00:00,cis_male,ad7a441e-0157-4ac2-a667-55956cab03a2
5fb319c825090c2c10c1242c,Jane Doe,amzn1.ask.device.AGUILHDF6CC3FFH2FWMRKYK533EZM...,1960:01:01T12:00:00+00:00,cis_female,2b7d567f-0dfe-4baa-9b0f-7bafaa0780da
5fb319d725090c2c10c1242d,Siri,amzn1.ask.device.FAKE_ENTRY_1,2000:01:01T12:00:00+00:00,non_binary,4de70769-2149-4fd3-8d7d-b5cd6c5ca71b
5fb31a5925090c2c10c1242e,Jack Bauer,amzn1.ask.device.FAKE_ENTRY_2,2000:01:01T12:00:00+00:00,trans_male,acf1b4b9-0d06-4062-a98a-7ce187296866
5fb31ab225090c2c10c1242f,Ford Prefect,amzn1.ask.device.FAKE_ENTRY_3,1984:01:01T12:00:00+00:00,alien,c1ef34f1-e1f0-4be3-beb5-22a4557f98c6


In [120]:
device_to_patient_map = {v: k for k, v in df_patients["device_id"].to_dict().items()}

# Get Already processed interactions


In [121]:
processed_interactions = get_processed_interactions(db_coll_cleandata)

# Get raw data

In [122]:
def map_nones(elem):
  _is_null = False
  if pd.isna(elem): return np.nan
  none_str_set = {
      "none",
      "no_symp",
      "nothing",
      "nada",
      "nope",
  }
  if isinstance(elem, str):
    _e = _e = elem.strip()
    _e = _e.replace(' ', '_')
    _e = _e.lower()
    for n_s in none_str_set:
      if n_s in _e: return np.nan
  
  # didn't match null types

  return elem

In [123]:
def get_records_table_from_record_list(rec_list):
  df_raw_cleanup = pd.DataFrame(rec_list)
  df_raw_cleanup["patient_record_id"] = df_raw_cleanup["device"].map(device_to_patient_map)
  df_raw_cleanup["time"] = pd.to_datetime(df_raw_cleanup["time"])
  df_raw_cleanup = df_raw_cleanup.applymap(map_nones)
  df_raw_cleanup["_id"] = df_raw_cleanup["_id"].astype(str)
  df_raw_cleanup["symptoms"] = df_raw_cleanup["symptoms"].fillna("no_symptoms")
  df_raw_cleanup = df_raw_cleanup.rename(columns={"_id": "interaction_id", "time": "date_time", "device": "device_id"})
  df_raw_cleanup = df_raw_cleanup.set_index("interaction_id")

  return df_raw_cleanup

In [124]:
_all_raw = db_coll_rawactivity.find()
record_list = [_record for _record in _all_raw if "device" in _record.keys()]
valid_device_records = [_record for _record in record_list if _record["device"] in df_patients["device_id"].unique()]
df_records = get_records_table_from_record_list(valid_device_records)

In [125]:
if is_notebook:
  display(df_records)

Unnamed: 0_level_0,feelings,sleep,symptoms,date_time,device_id,hours,patient_record_id
interaction_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
5fa242b92ed4e2b3875a541e,good,well,no_symptoms,2020-11-04 05:56:00,amzn1.ask.device.AFGWKVT5646AZQBMZ3SPE5WGVO25H...,,5fb3173a25090c2c10c1242b
5fa3797eba3c071b7fcd26fb,good,well,no_symptoms,2020-11-05 04:01:00,amzn1.ask.device.AFGWKVT5646AZQBMZ3SPE5WGVO25H...,,5fb3173a25090c2c10c1242b
5fa37e06ba3c071b7fcd26fc,good,,no_symptoms,2020-11-05 04:22:00,amzn1.ask.device.AFGWKVT5646AZQBMZ3SPE5WGVO25H...,12.0,5fb3173a25090c2c10c1242b
5fa37f904dfc6cffff5156d2,good,,no_symptoms,2020-11-05 04:28:00,amzn1.ask.device.AFGWKVT5646AZQBMZ3SPE5WGVO25H...,8.0,5fb3173a25090c2c10c1242b
5fa37ffc4dfc6cffff5156d3,good,,no_symptoms,2020-11-05 04:28:00,amzn1.ask.device.AFGWKVT5646AZQBMZ3SPE5WGVO25H...,10.0,5fb3173a25090c2c10c1242b
5fa380d2ba3c071b7fcd26fd,good,,no_symptoms,2020-11-05 04:34:00,amzn1.ask.device.AFGWKVT5646AZQBMZ3SPE5WGVO25H...,9.0,5fb3173a25090c2c10c1242b
5fa3822e4dfc6cffff5156d4,good,,no_symptoms,2020-11-05 04:34:00,amzn1.ask.device.AFGWKVT5646AZQBMZ3SPE5WGVO25H...,11.0,5fb3173a25090c2c10c1242b
5fa38e444dfc6cffff5156d5,good,,no_symptoms,2020-11-05 05:31:00,amzn1.ask.device.AGUILHDF6CC3FFH2FWMRKYK533EZM...,8.0,5fb319c825090c2c10c1242c


In [126]:
skip_existing = True
if skip_existing:
  df_records_to_process = df_records[~df_records.index.isin(processed_interactions)]
else: df_records_to_process = df_records

In [127]:
if df_records_to_process.empty:
  if is_notebook:
    assert False, "Nothing new to process!"
  else:
    print("Nothing new to process!")
    exit()

# Translate raw records table to list of data atoms

In [128]:
def get_health_atoms(row):
  # feelings
  _feel_atm = pre_populate_atom(row, "health")
  _feel_atm = set_data_values(_feel_atm, "feelings", row["feelings"], None)

  # sleep
  _sleep_atm = pre_populate_atom(row, "health")
  _sleep_hrs = try_type_conversion(row["hours"], int)
  _sleep_atm = set_data_values(_sleep_atm, "sleep", row["sleep"], _sleep_hrs)

  # symptoms
  _symp_atm = pre_populate_atom(row, "symptoms")
  _symp_atm = set_data_values(_symp_atm, row["symptoms"], row["symptoms"])

  atoms_lst = [_feel_atm, _sleep_atm, _symp_atm]

  return atoms_lst

# Put the newly processsed atoms in the database and add a new processed list metadata entry

In [129]:
new_atoms = df_records_to_process.apply(get_health_atoms, axis=1).to_list()
new_atoms = flatten_nested_list(new_atoms)

In [130]:
new_atoms

[{'analytics_format_ver': {'format_type': 'prototype',
   'version_major': 1,
   'version_minor': 1},
  'data': {'data_type': 'feelings', 'quality': 'good', 'quantity': None},
  'date_time': '2020-11-04T05:56:00',
  'device_id': 'amzn1.ask.device.AFGWKVT5646AZQBMZ3SPE5WGVO25H7H2DNPIUTAKUST5MCEV42SD5VLYSHELCLJZB4B77GOXOG26LYLLNAQOQWS5JZMPB7U25YL3EVRXAMWDQU7K3H3ONNBTCE7HFQ5MTSTSDW3BEGOQ5Z6T3BI7CBCH2X2TYX5TMAP5BA56PUAYPZGWJ5YBI',
  'interaction_id': '5fa242b92ed4e2b3875a541e',
  'patient_record_id': '5fb3173a25090c2c10c1242b',
  'record_type': 'health'},
 {'analytics_format_ver': {'format_type': 'prototype',
   'version_major': 1,
   'version_minor': 1},
  'data': {'data_type': 'sleep', 'quality': 'well', 'quantity': None},
  'date_time': '2020-11-04T05:56:00',
  'device_id': 'amzn1.ask.device.AFGWKVT5646AZQBMZ3SPE5WGVO25H7H2DNPIUTAKUST5MCEV42SD5VLYSHELCLJZB4B77GOXOG26LYLLNAQOQWS5JZMPB7U25YL3EVRXAMWDQU7K3H3ONNBTCE7HFQ5MTSTSDW3BEGOQ5Z6T3BI7CBCH2X2TYX5TMAP5BA56PUAYPZGWJ5YBI',
  'interaction

In [131]:
if is_notebook:
  assert do_db_edit, "Not editing DB!"
else:
  if not do_db_edit:
    print("Not editing DB!")
    exit()

In [132]:
new_interactions_processed = list({atom["interaction_id"] for atom in new_atoms})

In [133]:
_ = seed_db_from_entry_list(new_atoms, db_coll_cleandata)

In [134]:
get_processed_interactions(db_coll_cleandata, action="put", outgoing_list=new_interactions_processed)