In [1]:
!pip install pyspark



In [2]:
!git clone https://github.com/wandersondsm/teste_engenheiro

Cloning into 'teste_engenheiro'...
remote: Enumerating objects: 1189, done.[K
remote: Total 1189 (delta 0), reused 0 (delta 0), pack-reused 1189 (from 2)[K
Receiving objects: 100% (1189/1189), 49.85 MiB | 3.54 MiB/s, done.
Resolving deltas: 100% (1148/1148), done.
Updating files: 100% (1182/1182), done.


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [4]:
spark = SparkSession.builder.appName('Create Condition').getOrCreate()

In [5]:
spark

## Funções auxiliares

In [6]:
@F.udf('string')
def remove_uuid(uuid):
  if uuid:
    return uuid.replace('urn:uuid:','')
  return None

## Leitura dos dados


In [7]:
df = spark.read.option('multiline','true').json('/content/teste_engenheiro/data/*.json')

In [8]:
df.show(5)

+--------------------+------------+-----------+
|               entry|resourceType|       type|
+--------------------+------------+-----------+
|[{urn:uuid:071812...|      Bundle|transaction|
|[{urn:uuid:df121e...|      Bundle|transaction|
|[{urn:uuid:f156b8...|      Bundle|transaction|
|[{urn:uuid:b0f49c...|      Bundle|transaction|
|[{urn:uuid:37ff59...|      Bundle|transaction|
+--------------------+------------+-----------+
only showing top 5 rows



In [9]:
df.count()

1180

In [10]:
df_condition = df.select(F.explode('entry'))

In [11]:
df_condition.count()

527113

In [12]:
df_condition.show(5,False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
df_save = df_condition.withColumn('RESOURCE_TYPE', F.col('col.resource.resourceType'))\
                      .where("RESOURCE_TYPE like 'Condition'")\
                      .withColumn('CODING', F.explode(F.col('col.resource.code.coding')))\
                      .withColumn('CODE', F.col('CODING.code'))\
                      .withColumn('DISPLAY', F.col('CODING.display'))\
                      .withColumn('CLINICAL_STATUS_CODING', F.explode(F.col('col.resource.clinicalStatus.coding')))\
                      .withColumn('CLINICAL_STATUS_CODE', F.col('CLINICAL_STATUS_CODING.code'))\
                      .withColumn('VERIFICATION_STATUS_CODING', F.explode(F.col('col.resource.verificationStatus.coding')))\
                      .withColumn('VERIFICATION_STATUS_CODE', F.col('VERIFICATION_STATUS_CODING.code'))\
                      .withColumn('PATIENT_ID', remove_uuid(F.col('col.resource.subject.reference')))\
                      .withColumn('ON_SET_DATETIME', F.col('col.resource.onsetDateTime').cast('timestamp'))\
                      .withColumn('ABATEMENT_DATETIME', F.col('col.resource.abatementDateTime').cast('timestamp'))\
                      .withColumn('RECORDED_DATE', F.col('col.resource.recordedDate').cast('timestamp'))\
                      .drop('col','RESOURCE_TYPE','CODING','CLINICAL_STATUS_CODING','VERIFICATION_STATUS_CODING')

In [14]:
df_save.printSchema()

root
 |-- CODE: string (nullable = true)
 |-- DISPLAY: string (nullable = true)
 |-- CLINICAL_STATUS_CODE: string (nullable = true)
 |-- VERIFICATION_STATUS_CODE: string (nullable = true)
 |-- PATIENT_ID: string (nullable = true)
 |-- ON_SET_DATETIME: timestamp (nullable = true)
 |-- ABATEMENT_DATETIME: timestamp (nullable = true)
 |-- RECORDED_DATE: timestamp (nullable = true)



In [16]:
df_save.show(10,False)

+---------+-----------------------------------------+--------------------+------------------------+------------------------------------+-------------------+------------------+-------------------+
|CODE     |DISPLAY                                  |CLINICAL_STATUS_CODE|VERIFICATION_STATUS_CODE|PATIENT_ID                          |ON_SET_DATETIME    |ABATEMENT_DATETIME|RECORDED_DATE      |
+---------+-----------------------------------------+--------------------+------------------------+------------------------------------+-------------------+------------------+-------------------+
|59621000 |Hypertension                             |active              |confirmed               |0718123b-5034-4965-a145-3d8d71b11389|1935-09-30 14:04:10|NULL              |1935-09-30 14:04:10|
|74400008 |Appendicitis                             |active              |confirmed               |0718123b-5034-4965-a145-3d8d71b11389|1945-07-31 14:04:10|NULL              |1945-07-31 14:04:10|
|428251008|History o

In [17]:
df_save.coalesce(1).write.option('header',True).option('delimiter',';').csv('condition')