# Demo on Apache Beam for data processing

### Dataset used explores the reasons for diabetes in patients

#### Setup apache-beam

In [None]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
run('pip install --quiet apache-beam')

>> pip install --quiet apache-beam
[K     |████████████████████████████████| 10.9 MB 7.7 MB/s 
[K     |████████████████████████████████| 151 kB 59.1 MB/s 
[K     |████████████████████████████████| 270 kB 37.6 MB/s 
[K     |████████████████████████████████| 508 kB 60.5 MB/s 
[K     |████████████████████████████████| 2.4 MB 65.7 MB/s 
[K     |████████████████████████████████| 62 kB 1.4 MB/s 
[K     |████████████████████████████████| 47 kB 5.8 MB/s 
[K     |████████████████████████████████| 1.0 MB 57.8 MB/s 
[?25h  Building wheel for dill (setup.py) ... [?25l[?25hdone
  Building wheel for docopt (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
tensorflow 2.8.2+zzzcolab20220719082949 requires protobuf<3.20,>=3.9.2, but you have protobuf 3.20.2 which is incompatible.[0m



#### Imports

In [None]:
import pandas as pd
import apache_beam as beam
from collections.abc import Collection
from statistics import mode

#### Mount drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
filename = '/content/drive/MyDrive/Colab Notebooks/diabetes.txt'


### Element-wise Operations

**Segregate diabetic, prediabetic and not diabetic patients and write to seperate files**

In [None]:
# Patients with diabetes

p1 = beam.Pipeline()

diabetic = (
    p1
    |"Read patients data" >> beam.io.ReadFromText(filename)
    |"Parse file" >> beam.Map(lambda x : x.split(','))
    |"Filter for diabetes" >> beam.Filter(lambda x : x[0] == '2.0')
    |"Write to file" >> beam.io.WriteToText('diabetic', file_name_suffix = '.txt')
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fb305314b10>

In [None]:
# Prediabetic Patients 

p1 = beam.Pipeline()

pre_diabetic = (
    p1
    |"Read patients data" >> beam.io.ReadFromText(filename)
    |"Parse file" >> beam.Map(lambda x : x.split(','))
    |"Filter for prediabetes" >> beam.Filter(lambda x : x[0] == '1.0')
    |"Write to file" >> beam.io.WriteToText('prediabetic', file_name_suffix = '.txt')
)

p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fb3052b0c50>

In [None]:
# Patients without diabetes

p1 = beam.Pipeline()

not_diabetic = (
    p1
    |"Read patients data" >> beam.io.ReadFromText(filename)
    |"Parse file" >> beam.Map(lambda x : x.split(','))
    |"Filter for not diabetes" >> beam.Filter(lambda x : x[0] == '0.0')
    |"Write to file" >> beam.io.WriteToText('notdiabetic', file_name_suffix = '.txt')
)

p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fb305314250>

### ParDo

**Gender distribution in those with diabetes**

In [None]:
class SplitRow(beam.DoFn):
 
  def process(self, element):
    return  [element.split(',')]
 

class FilterDiabeticPatients(beam.DoFn):
 
  def process(self, element):
    if element[0] == '2.0':
      return [element]  
    
class GetPatientGender(beam.DoFn):
 
  def process(self, element):
    return [(element[18], 1)]
 
class Counting(beam.DoFn):
 
  def process(self, element):
    (key, values) = element         
    return [(key, sum(values))]
     

p1 = beam.Pipeline()

gender_count_w_diabetes = (
    
   p1
    |beam.io.ReadFromText(filename)
    |beam.ParDo(SplitRow())
    |beam.ParDo(FilterDiabeticPatients())
    |beam.ParDo(GetPatientGender())
    |"Group" >> beam.GroupByKey()
    |"Sum using ParDo" >> beam.ParDo(Counting()) 
    |beam.Map(print) 
 
)

p1.run()

('0.0', 18411)
('1.0', 16935)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fb30532b590>

18411 females and 16935 males have diabetes

**Count of pre-diabetic who have had a stroke or have a heart disease, categorized by physical_activity**

In [None]:
class SplitRow(beam.DoFn):
 
  def process(self, element):
    return  [element.split(',')]
 

class FilterPreDiabeticPatients(beam.DoFn):
 
  def process(self, element):
    if element[0] == '1.0':
      return [element]

class FilterHeartCondition(beam.DoFn):
 
  def process(self, element):
    if element[7] == '1.0' or element[6] == '1.0':
      return [element]

class GetPatientPhyActivity(beam.DoFn):
 
  def process(self, element):
    return [(element[7], 1)]     

class Frequency(beam.DoFn):
 
  def process(self, element):
    (key, values) = element         
    return [(key, sum(values))]
     
p = beam.Pipeline()

input = (
      p
      |beam.io.ReadFromText(filename)
      |beam.ParDo(SplitRow())
      |beam.ParDo(FilterPreDiabeticPatients())
      |beam.ParDo(FilterHeartCondition())
      |beam.ParDo(GetPatientPhyActivity())
      |beam.GroupByKey()
      |beam.ParDo(Frequency())
      |beam.Map(print)
      )

p.run()

('0.0', 164)
('1.0', 664)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fb3065fda90>

From among those who are pre-diabetic and have had a heart condition/stroke, 1929 of them did not have any physical activity in the last 30 days, whereas 447 of them did.

### Composite Transform

**Number of people who have high BP and high cholestrol and could not visit a doctor in the last 12 months due to cost, grouped by age and type of diabetes**

In [None]:
class CustomTransformations(beam.PTransform):
  
  def __init__(self, condition):
        self.condition = condition
  
  def expand(self, input_data):
    
    a = ( 
        input_data
          |beam.Filter(filterHealthParams)
          |beam.Map(lambda x : (self.condition + " " + x[19] , 1))
          |beam.CombinePerKey(sum)
          |beam.Map(printOutput)
      )
    return a  

def printOutput(element):
  print(str(element[1]) + " people aged " + element[0].split(" ")[1] + " with " + element[0].split(" ")[0])
  
def filterHealthParams(element):
  if element[1] == '1.0' and element[2] == '1.0' and element[12] == '1.0':
    return element

p = beam.Pipeline()

collection = ( 
              p 
              |beam.io.ReadFromText(filename)
              |beam.Map(lambda x : x.split(','))
            )

diabetes = (
            collection
            |beam.Filter(lambda x: x[0] == '2.0')
            |"Composite diabetes patients" >> CustomTransformations('diabetes')
        )

pre_diabetes = (
              collection
              |beam.Filter(lambda x: x[0] == '1.0')
              |"Composite pre-diabetes patients" >> CustomTransformations('pre-diabetes')
           ) 
p.run()

3094 people aged 9.0 with diabetes
2996 people aged 11.0 with diabetes
3826 people aged 10.0 with diabetes
1912 people aged 12.0 with diabetes
2165 people aged 8.0 with diabetes
1398 people aged 7.0 with diabetes
1720 people aged 13.0 with diabetes
710 people aged 6.0 with diabetes
198 people aged 4.0 with diabetes
368 people aged 5.0 with diabetes
67 people aged 3.0 with diabetes
21 people aged 2.0 with diabetes
9 people aged 1.0 with diabetes
340 people aged 10.0 with pre-diabetes
140 people aged 7.0 with pre-diabetes
224 people aged 12.0 with pre-diabetes
222 people aged 8.0 with pre-diabetes
303 people aged 9.0 with pre-diabetes
231 people aged 13.0 with pre-diabetes
89 people aged 6.0 with pre-diabetes
41 people aged 5.0 with pre-diabetes
300 people aged 11.0 with pre-diabetes
20 people aged 4.0 with pre-diabetes
4 people aged 1.0 with pre-diabetes
11 people aged 3.0 with pre-diabetes
3 people aged 2.0 with pre-diabetes


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fb30517a4d0>

**Number of people of each gender who are smokers and heavy drinkers and have diabetes**

In [None]:
class CustomClass(beam.PTransform):
    
  def expand(self, input_data):
    
    a = ( 
        input_data
          |beam.Filter(lambda x: x[0] == '2.0')
          |beam.Filter(filterLifestlyeParameters)
          |beam.Map(lambda x : (x[18] , 1))
          |"Smokers and drinkers by gender" >> beam.CombinePerKey(sum)
          |beam.Map(print)
      )
    return a  
  
def filterLifestlyeParameters(element):
  if element[12] == '1.0' and element[7] == '1.0':
    return element

p = beam.Pipeline()

collection = ( 
              p 
              |beam.io.ReadFromText(filename)
              |beam.Map(lambda x : x.split(','))
              |CustomClass()
            )

p.run()

('0.0', 3232)
('1.0', 4377)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fb30513ec10>

**Average BMI per category - diabetes, pre-diabetes and no diabetes**

In [None]:
class GroupwiseMean(beam.PTransform):
    
  def expand(self, input_data):
    
    a = ( 
        input_data
          |beam.Map(lambda x : (x[0] , float(x[4])))
          |beam.combiners.Mean.PerKey()
          |beam.ParDo(MapClass())
          |beam.Map(printMeanBMI)
      )
    return a  

    
class MapClass(beam.DoFn):
 
  def process(self, element):
    el_list = list(element)
    if el_list[0] == '0.0':
      el_list[0] = 'No diabetes'
    elif el_list[0] == '1.0':
      el_list[0] = 'Pre diabetic'
    else:
      el_list[0] = 'Diabetes'

    return tuple(el_list)


def printMeanBMI(element):
  print(element)

p = beam.Pipeline()

input = (
      p
      |beam.io.ReadFromText(filename, skip_header_lines = True)
      |beam.Map(lambda x : x.split(','))
      |GroupwiseMean()
      )

p.run()

No diabetes
27.742521162548023
Diabetes
31.94401063769592
Pre diabetic
30.724465558194776


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fb30522a490>

**Check on mental and physical health parameters for all each category of patients**

In [None]:
class GroupWiseHealthCheck(beam.PTransform):
    
  def expand(self, input_data):
    
    a = ( 
        input_data
          |beam.Map(lambda x : (x[0] , (float(x[15]) +float(x[16]))/2))
          |beam.combiners.Mean.PerKey()
          |beam.Map(print)
      )
    return a  

p = beam.Pipeline()

input = (
      p
      |beam.io.ReadFromText(filename, skip_header_lines = True)
      |beam.Map(lambda x : x.split(','))
      |GroupWiseHealthCheck()
      )

p.run()

('0.0', 3.2634099661679996)
('2.0', 6.208142364058168)
('1.0', 5.439106024616714)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fb3050e57d0>

On an average, those with diabetes did not have good mental and physical health for an average of 6 days. Similarly, pre-diabetic for 5 days and non-diabetic for 3 days.

**Most common income range for those who have health coverage and those who don't**

In [None]:
class IncomeHealthcareRelation(beam.PTransform):
    
  def expand(self, input_data):
    
    a = ( 
        input_data
          |beam.Map(lambda x : (x[12], x[21]))
          |beam.GroupByKey()
          |beam.Map(printIncomeValues)
          |beam.Map(lambda x : mode(x[1]))
          |beam.Map(print)
      )
    return a 

def printIncomeValues(element):
  print(element)
  return element


p1 = beam.Pipeline()

collection = (
      p1
      |beam.io.ReadFromText(filename, skip_header_lines = True)
      |beam.Map(lambda x : x.split(','))
      |IncomeHealthcareRelation()
      )

p1.run()

('1.0', ['3.0', '8.0', '6.0', '4.0', '8.0', '7.0', '4.0', '1.0', '3.0', '8.0', '1.0', '7.0', '6.0', '2.0', '8.0', '3.0', '6.0', '7.0', '8.0', '4.0', '3.0', '5.0', '4.0', '6.0', '8.0', '5.0', '4.0', '7.0', '8.0', '4.0', '8.0', '2.0', '3.0', '4.0', '3.0', '7.0', '1.0', '7.0', '4.0', '3.0', '8.0', '8.0', '7.0', '6.0', '2.0', '1.0', '3.0', '5.0', '6.0', '8.0', '8.0', '8.0', '8.0', '6.0', '5.0', '4.0', '2.0', '5.0', '8.0', '3.0', '6.0', '7.0', '4.0', '2.0', '3.0', '5.0', '8.0', '3.0', '7.0', '4.0', '6.0', '8.0', '2.0', '3.0', '8.0', '8.0', '7.0', '4.0', '2.0', '7.0', '8.0', '8.0', '8.0', '8.0', '6.0', '8.0', '8.0', '7.0', '2.0', '7.0', '3.0', '6.0', '3.0', '8.0', '8.0', '5.0', '6.0', '7.0', '2.0', '6.0', '2.0', '5.0', '7.0', '3.0', '6.0', '4.0', '6.0', '8.0', '6.0', '6.0', '7.0', '7.0', '7.0', '6.0', '8.0', '7.0', '4.0', '8.0', '7.0', '7.0', '5.0', '4.0', '8.0', '3.0', '8.0', '6.0', '2.0', '4.0', '8.0', '5.0', '8.0', '6.0', '8.0', '7.0', '8.0', '5.0', '8.0', '1.0', '6.0', '4.0', '6.0', '6.0

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fb3052b0110>

Those with healthcare coverage/insurance, the most common income range iswhich is income more than 75000 USD whereas those without sucha a facility lie in the income range of 4 which is income between 25000 and 30000 USD.