In [1]:
import pyspark
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import when, lit, count, datediff, floor, concat
import json
from pyspark.sql.types import IntegerType, StringType
from datetime import datetime

#### Create Spark Session

In [2]:
spark = SparkSession.builder.appName('uds').getOrCreate()

In [3]:
patients = spark.read.option("multiline","true").json("fhir/Patient")

In [4]:
df = patients.select('birthDate', 'gender', 'id')

#### Age & gender

In [5]:
jun_30 = datetime.strptime('2021-06-30', '%Y-%m-%d').date()
# per UDS manual page 27: https://bphc.hrsa.gov/sites/default/files/bphc/datareporting/pdf/2021-uds-manual.pdf
# Use the individual’s age on June 30, 2021. 

In [6]:
df_with_age = df.withColumn('age', floor((datediff(lit(jun_30), df.birthDate)) / 365) )

#### Map age to ageGroup values for UDS Table 3

In [7]:
df_mapped = df_with_age.withColumn("ageGroup", 
                        when(df_with_age.age <= 1, "Under age 1")
                       .when(df_with_age.age.between(25, 29), "Ages 25-29")
                       .when(df_with_age.age.between(30, 34), "Ages 30-34")
                       .when(df_with_age.age.between(35, 39), "Ages 35-39")
                       .when(df_with_age.age.between(40, 44), "Ages 40-44")
                       .when(df_with_age.age.between(45, 49), "Ages 45-49")
                       .when(df_with_age.age.between(50, 54), "Ages 50-54")
                       .when(df_with_age.age.between(55, 59), "Ages 55-59")
                       .when(df_with_age.age.between(60, 64), "Ages 60-64")
                       .when(df_with_age.age.between(65, 69), "Ages 65-69")
                       .when(df_with_age.age.between(70, 74), "Ages 70-74")
                       .when(df_with_age.age.between(75, 79), "Ages 75-79")
                       .when(df_with_age.age.between(80, 84), "Ages 80-84")
                       .when(df_with_age.age >= 85, "Age 85 and over")
                       .otherwise(concat(lit('Age '), df_with_age.age.cast(IntegerType()).cast(StringType())))
                        )

#### Group by ageGroup

In [8]:
df_grouped = (df_mapped
    .groupby(df_mapped.ageGroup)
    .pivot("gender")
    .agg(count("id"))
    ).sort('ageGroup').fillna(0).withColumnRenamed("ageGroup", "Age Groups")

#### Import and merge template

In [9]:
template = spark.read.csv("uds/templates/template_3a.csv", header='True', inferSchema='True')
# source: https://bphc.hrsa.gov/sites/default/files/bphc/datareporting/reporting/2021-uds-tables.xlsx

In [10]:
tbl_3a = template.join(df_grouped, on=['Age Groups'], how='left').sort('Line').fillna(0).select(
    "Line","Age Groups","male", "female")

In [11]:
tbl_3a.show(40, False)

+----+----------------+----+------+
|Line|Age Groups      |male|female|
+----+----------------+----+------+
|1   |Under Age 1     |0   |0     |
|2   |Age 1           |0   |0     |
|3   |Age 2           |0   |1     |
|4   |Age 3           |0   |0     |
|5   |Age 4           |0   |1     |
|6   |Age 5           |1   |0     |
|7   |Age 6           |1   |0     |
|8   |Age 7           |0   |0     |
|9   |Age 8           |1   |0     |
|10  |Age 9           |0   |0     |
|11  |Age 10          |0   |0     |
|12  |Age 11          |0   |1     |
|13  |Age 12          |1   |0     |
|14  |Age 13          |0   |1     |
|15  |Age 14          |2   |0     |
|16  |Age 15          |0   |1     |
|17  |Age 16          |1   |0     |
|18  |Age 17          |1   |0     |
|19  |Age 18          |2   |0     |
|20  |Age 19          |4   |1     |
|21  |Age 20          |0   |1     |
|22  |Age 21          |0   |0     |
|23  |Age 22          |0   |0     |
|24  |Age 23          |0   |2     |
|25  |Age 24          |1   |