In [1]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict

from sklearn import preprocessing

import json
import ijson
from pandas.io.json import json_normalize
from flatten_json import flatten as flatten_json

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

#Flatten array of structs and structs
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
# Useful Functions 

#Flatten function
def pyspark_flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df


# Outputs the columns that need to be dropped
def to_drop(dataset, corr_threshold, useless_columns):
    # Create correlation matrix
    corr_matrix = dataset.corr().abs()
    # Select upper triangle of correlation matrix
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(np.bool))
    # Find index of feature columns with correlation greater than corr_threshold
    drop = [column for column in upper.columns if any(upper[column] > corr_threshold)]
    drop = drop + useless_columns
    return drop

# Drops the outliers from the dataset 
def drop_outliers(dataset):
    for column in dataset._get_numeric_data():
        data_column = dataset[column]
        outliers = data_column[((data_column - data_column.mean()) / data_column.std()).abs() > 3]
        if (len(outliers) < 20) and (len(outliers) != 0):
            dataset[column] = data_column[((data_column - data_column.mean()) / data_column.std()).abs() < 3]
            
# Outputs data ready for use
def clean_data (dataset, drop_columns):
    new_data = dataset.drop(columns=drop_columns)
    drop_outliers(new_data)
#     new_data = new_data.dropna()
    return new_data

In [5]:
# appName = "HealthcareSQL"
# master = "local"

# sc = SparkContext()

# # Create Spark session
# spark = SparkSession.builder \
#     .appName(appName) \
#     .master(master) \
#     .getOrCreate()

In [6]:
# spark

In [33]:
filename = "drug-event-0039-of-0039.json"

# Option 1 
with open(filename, 'r') as json_file:
    data_json = ijson.items(json_file, 'results.item')
    items = list(data_json)

# data_1 = json_normalize(data = items, record_path =[['patient', 'reaction'], 'drug'] , sep='_')
data_1 = json_normalize(items)

# Option 3

In [6]:
columns_to_extract = {
    'companynumb',
    'safetyreportid',
    'safetyreportversion',
    'receiptdate',
    'patientagegroup',
    'patientdeathdate',
    'patientsex',
    'patientweight',
    'serious',
    'seriousnesscongenitalanomali',
    'seriousnessdeath',
    'seriousnessdisabling',
    'seriousnesshospitalization',
    'seriousnesslifethreatening',
    'seriousnessother',
    'actiondrug',
    'activesubstancename',
    'drugadditional',
    'drugadministrationroute',
    'drugcharacterization',
    'drugindication',
    'drugauthorizationnumb',
    'medicinalproduct',
    'drugdosageform',
    'drugdosagetext',
    'reactionoutcome',
    'reactionmeddrapt',
    'reactionmeddraversionpt'}


In [34]:
def filter_by_layer(dic, level):
    layers = []
    for key in dic:
        if dic[key][1] == level:
            if dic[key][2] == True:
                layers.append(key)
    return layers

def create_layers(dic, parent, level):
    layers = {}
    
    for key in dic.keys():
        value = dic[key]
        value_type = type(value)

        if value_type == dict or value_type == list:
            layers.update({key:(parent,level, True)})
        elif value_type == str:
            layers.update({key:(parent,level, False)})
        else:
            layers.update({key:(parent,level, False)})
          
    return layers

def gen_dict_extract(key, var):
    if hasattr(var,'items'):
        for k, v in var.items():
            if k == key:
                yield v
            if isinstance(v, dict):
                for result in gen_dict_extract(key, v):
                    yield result
            elif isinstance(v, list):
                for d in v:
                    for result in gen_dict_extract(key, d):
                        yield result

def findkeys(node, kv):
    if isinstance(node, list):
        for i in node:
            for x in findkeys(i, kv):
                yield x
    elif isinstance(node, dict):
        if kv in node:
            yield node[kv]
        for j in node.values():
            for x in findkeys(j, kv):
                yield x

In [54]:
formated_items = []
for item in items:
    new_dict = {}
    for column in columns_to_extract:
        column_iter_list = tuple(findkeys(item, column))
        if len(column_iter_list) == 1:
            new_dict.update({column:column_iter_list[0]})
        else:
            new_dict.update({column:column_iter_list})
    formated_items.append(new_dict)

In [55]:
final_data = pd.DataFrame(formated_items)

In [66]:
#checking number of duplicates, missing values and columns with a single value
print("Duplicates:",final_data.duplicated().sum())
print("Missing values:",final_data.isna().sum().sum())
print("Single valued columns:", final_data.columns[final_data.nunique()==1])

Duplicates: 0
Missing values: 0
Single valued columns: Index(['patientdeathdate'], dtype='object')


In [79]:
final_data = final_data.drop(columns='patientdeathdate')

In [81]:
final_data.to_csv('drug-event.csv')