In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
  .appName('DexcomCleaning') \
  .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/06 01:15:27 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/09/06 01:15:27 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/09/06 01:15:28 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/09/06 01:15:28 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [2]:
#Set output configuration
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [3]:
BUCKET_NAME = "maude-device-reports"

In [None]:
#View datasets in blob storage
from google.cloud import storage

gcs_client = storage.Client()
bucket = gcs_client.bucket(BUCKET_NAME)

list(bucket.list_blobs())

In [32]:
from pyspark.sql.types import *

def load_text_data(year):
    
    text_schema = StructType([
        StructField("MDR_REPORT_KEY", IntegerType()),
        StructField("MDR_TEXT_KEY", IntegerType()),
        StructField("TEXT_TYPE_CODE", StringType()),
        StructField("PATIENT_SEQUENCE_NUMBER", IntegerType()),
        StructField("DATE_REPORT", DateType()),
        StructField("FOI_TEXT", StringType())
    ])
    
    
    text_df = spark \
        .read \
        .schema(text_schema) \
        .option("header" , "true") \
        .option("delimiter", "|") \
        .csv(f"gs://{BUCKET_NAME}/foitext{year}.txt")
    
    return text_df

def load_device_data(year):

    device_schema = StructType([
        StructField("MDR_REPORT_KEY", IntegerType()),
        StructField("DEVICE_EVENT_KEY", IntegerType()),
        StructField("IMPLANT_FLAG", StringType()),
        StructField("DATE_REMOVED_FLAG", DateType()),
        StructField("DEVICE_SEQUENCE_NO", IntegerType()),
        StructField("DATE_RECEIVED", DateType()),
        StructField("BRAND_NAME", StringType()),
        StructField("GENERIC_NAME", StringType()),
        StructField("MANUFACTURER_D_NAME", StringType()),
        StructField("MANUFACTURER_D_ADDRESS_1", StringType()),
        StructField("MANUFACTURER_D_ADDRESS_2", StringType()),
        StructField("MANUFACTURER_D_CITY", StringType()),
        StructField("MANUFACTURER_D_STATE_CODE", StringType()),
        StructField("MANUFACTURER_D_ZIP_CODE", StringType()),
        StructField("MANUFACTURER_D_ZIP_CODE_EXT", StringType()),
        StructField("MANUFACTURER_D_COUNTRY_CODE", StringType()),
        StructField("MANUFACTURER_D_POSTAL_CODE", StringType()),
        StructField("DEVICE_OPERATOR", StringType()),
        StructField("EXPIRATION_DATE_OF_DEVICE", DateType()),
        StructField("MODEL_NUMBER", IntegerType()),
        StructField("CATALOG_NUMBER", IntegerType()),
        StructField("LOT_NUMBER", IntegerType()),
        StructField("OTHER_ID_NUMBER", StringType()),
        StructField("DEVICE_AVAILABILITY", StringType()),
        StructField("DATE_RETURNED_TO_MANUFACTURER", DateType()),
        StructField("DEVICE_REPORT_PRODUCT_CODE", StringType()),
        StructField("DEVICE_AGE_TEXT", StringType()),
        StructField("DEVICE_EVALUATED_BY_MANUFACTUR", StringType()),
        StructField("COMBINATION_PRODUCT_FLAG", StringType())
    ])
    
    
    device_df = spark \
        .read \
        .schema(device_schema) \
        .option("header" , "true") \
        .option("delimiter", "|") \
        .csv(f"gs://{BUCKET_NAME}/device{year}.txt")
    
    return device_df

In [36]:
text16 = load_text_data(2016)
text17 = load_text_data(2017)
text18 = load_text_data(2018)
text19 = load_text_data(2019)
text20 = load_text_data(2020)
text21 = load_text_data(2021)

In [37]:
device16 = load_device_data(2016)
device17 = load_device_data(2017)
device18 = load_device_data(2018)
device19 = load_device_data(2019)
device20 = load_device_data(2020)
device21 = load_device_data(2021)

In [41]:
from functools import reduce
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

In [42]:
text_all = unionAll(text16, text17, text18, text19, text20, text21) #16802162 records
device_all = unionAll(device16, device17, device18, device19, device20, device21) #7790750 records

In [49]:
#initial reports have text_type_code = "D"
text_init = text_all.filter(text_all.TEXT_TYPE_CODE == "D")

In [64]:
#Inner Join text On MDR_REPORT_KEY
reports_all = device_all.join(text_init, device_all.MDR_REPORT_KEY ==  text_init.MDR_REPORT_KEY, "left")

In [54]:
reports_all.head()

                                                                                

Row(MDR_REPORT_KEY=9796745, DEVICE_EVENT_KEY=None, IMPLANT_FLAG=None, DATE_REMOVED_FLAG=None, DEVICE_SEQUENCE_NO=1, DATE_RECEIVED=None, BRAND_NAME='PUMP MMT-1714K 630G BLACK MMOL CANADA', GENERIC_NAME='ARTIFICIAL PANCREAS DEVICE SYSTEM, THRESHOLD SUSPEND', MANUFACTURER_D_NAME='MEDTRONIC PUERTO RICO OPERATIONS CO.', MANUFACTURER_D_ADDRESS_1='CEIBA NORTE IND. PARK #50 ROAD', MANUFACTURER_D_ADDRESS_2=None, MANUFACTURER_D_CITY='JUNCOS', MANUFACTURER_D_STATE_CODE=None, MANUFACTURER_D_ZIP_CODE='00777', MANUFACTURER_D_ZIP_CODE_EXT='-386', MANUFACTURER_D_COUNTRY_CODE=None, MANUFACTURER_D_POSTAL_CODE='00777-3869', DEVICE_OPERATOR='0LP', EXPIRATION_DATE_OF_DEVICE=None, MODEL_NUMBER=None, CATALOG_NUMBER=None, LOT_NUMBER=None, OTHER_ID_NUMBER=None, DEVICE_AVAILABILITY='R', DATE_RETURNED_TO_MANUFACTURER=None, DEVICE_REPORT_PRODUCT_CODE='OZO', DEVICE_AGE_TEXT='DA', DEVICE_EVALUATED_BY_MANUFACTUR='R', COMBINATION_PRODUCT_FLAG='N', MDR_REPORT_KEY=9796745, MDR_TEXT_KEY=182417249, TEXT_TYPE_CODE='D', PA

In [65]:
dexcom_reports = reports_all.where(reports_all.BRAND_NAME.contains("DEXCOM"))
dexcom_reports = dexcom_reports.dropDuplicates(["MDR_REPORT_KEY"])

In [66]:
#Export file to bucket
#all_filepath = f"gs://{BUCKET_NAME}/reports_all.csv"
dexcom_filepath = f"gs://{BUCKET_NAME}/dexcom_reports_all.csv"

#reports_all.write.mode('overwrite').csv(all_filepath)
dexcom_reports.coalesce(1).write.mode('overwrite').csv(dexcom_filepath)

AnalysisException: Found duplicate column(s) when inserting into gs://maude-device-reports/dexcom_reports_all.csv: `mdr_report_key`