<a href="https://colab.research.google.com/github/itsmpython/python_for_data_analysis/blob/main/apache_beam_basics_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install apache_beam

Collecting apache-beam
  Downloading apache_beam-2.59.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.5 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam)
  Downloading orjson-3.10.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.4/50.4 kB[0m [31m437.9 kB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastavro<2,>=0.23.6 (from apache-bea

In [2]:
import apache_beam as beam

In [22]:
pipe = beam.Pipeline() # Define your pipline object

In [24]:
# Uploaded the file mtcars.csv to Google Colab and reading the file now
ip =(
    pipe
    | beam.io.ReadFromText('/content/mtcars.csv', skip_header_lines=1) # |symbol is apply function
    | beam.Map(lambda record: record.split(',')) # Since we read csv as txt, let us split it using ',' which will create list of lists. Each row is a list
    | beam.Filter(lambda record: record[10] == '4') # Apply a filter function
    #| beam.io.WriteToText('output.txt') # write to a text file
    | beam.combiners.Count.Globally()
    | beam.Map(print) # Print the output
)

In [25]:
pipe.run()

12
12


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7d35f3c3fcd0>

### Instead of initializing the Pipe everytime and then running it, we can use the following method
![green-divider](https://user-images.githubusercontent.com/7065401/52071924-c003ad80-2562-11e9-8297-1c6595f8a7ff.png)


In [26]:
with beam.Pipeline() as pipe1:
  ip1 =(
    pipe1
    | beam.io.ReadFromText('/content/mtcars.csv', skip_header_lines=1) # |symbol is apply function
    | beam.Map(lambda record: record.split(',')) # Since we read csv as txt, let us split it using ',' which will create list of lists. Each row is a list
    | beam.Filter(lambda record: record[10] == '4') # Apply a filter function
    #| beam.io.WriteToText('output.txt') # write to a text file
    | beam.combiners.Count.Globally() # Globally, becuase out data would be processed on distributed computing
    | beam.Map(print) # Print the output
)

12


## Using custom functions
![green-divider](https://user-images.githubusercontent.com/7065401/52071924-c003ad80-2562-11e9-8297-1c6595f8a7ff.png)

We are using lambda function for simple processing. What if we have complex processes?
- We can use custom functions for that. Let us see how

In [55]:
def filter_data(element):
  return element[10] == '4'
#

In [56]:
with beam.Pipeline() as pipe2:
  ip2 =(
    pipe2
    | beam.io.ReadFromText('/content/mtcars.csv', skip_header_lines=1) # |symbol is apply function
    | beam.Map(lambda record: record.split(',')) # Since we read csv as txt, let us split it using ',' which will create list of lists. Each row is a list
    | beam.Filter(filter_data) # Apply a filter function
    | beam.combiners.Count.Globally() # Globally, becuase out data would be processed on distributed computing
    #| beam.Map(print) # Print the output
    | beam.io.WriteToText('output.txt') # write to a text file
)

## Aggregations in Apache beam using *PerKey()*
Apply an aggregator function which acts like groupby

![green-divider](https://user-images.githubusercontent.com/7065401/52071924-c003ad80-2562-11e9-8297-1c6595f8a7ff.png)



In [57]:
with beam.Pipeline() as pipe3:
  ip3 =(
    pipe3
    | beam.io.ReadFromText('/content/mtcars.csv', skip_header_lines=1) # |symbol is apply function
    | beam.Map(lambda record: record.split(',')) # Since we read csv as txt, let us split it using ',' which will create list of lists. Each row is a list
    | beam.Map(lambda record:(record[2], record)) # Apply an aggregator function which acts like
    | beam.combiners.Count.PerKey() # Count PerKey to get the aggregation
    | beam.Map(print) # Print the output
    #| beam.io.WriteToText('output.txt') # write to a text file
)

('6', 7)
('4', 11)
('8', 14)


### Creating and processing *local/inmemory pCollection*
![green-divider](https://user-images.githubusercontent.com/7065401/52071924-c003ad80-2562-11e9-8297-1c6595f8a7ff.png)



In [50]:
a=[12,11,10,9,8,7,6,5,4,3,2,1,0]
with beam.Pipeline() as pipe1:
  ip1 =(pipe1
        |beam.Create(a)
        |beam.Filter(lambda x: x%2 != 0)
        |beam.Map(print)
  )

11
9
7
5
3
1


## Aggregation using multiple keys using *PerKey()*
Apply an aggregator function on multiple keys

![green-divider](https://user-images.githubusercontent.com/7065401/52071924-c003ad80-2562-11e9-8297-1c6595f8a7ff.png)



In [77]:
with beam.Pipeline() as pipe4:
  # Read data from CSV, split each line by comma
  data = (
      pipe4
      | 'ReadFromText' >> beam.io.ReadFromText('/content/mtcars.csv', skip_header_lines=1)
      | 'SplitLines' >> beam.Map(lambda record: record.split(','))
  )

  # Extract model (Model 0) and mpg (index 10) as key-value pairs
  # You can modify the indices based on your actual column positions
  model_mpg = (
      data
      | 'ExtractModelMPG' >> beam.Map(lambda record: (record[2], float(record[1])))  # Assuming mpg is numeric
  )

  # Count occurrences of each model-mpg combination
  # This will output (model, mpg), count
  model_mpg_counts = model_mpg | 'CountPerModelMPG' >> beam.combiners.Count.PerKey()
  #let us print the results
  model_mpg_counts | 'PrintResults' >> beam.Map(print)

  # Optionally write results to a text file
  #model_mpg_counts | 'WriteToText' >> beam.io.WriteToText('output.txt')

  # You can further process model_mpg_counts here (e.g., filter, sort)

('6', 7)
('4', 11)
('8', 14)


## Following is an *independent function* used in a pipeline
  This splits the package names and returns a list
  ![green-divider](https://user-images.githubusercontent.com/7065401/52071924-c003ad80-2562-11e9-8297-1c6595f8a7ff.png)


In [1]:
def splitPackageName(packageName):
   """e.g. given com.example.appname.library.widgetname
           returns com
	           com.example
                   com.example.appname
      etc.
   """
   result = []
   end = packageName.find('.')
   while end > 0:
      result.append(packageName[0:end])
      end = packageName.find('.', end+1)
   # result.append(packageName) # To capture the final package name
   return result

In [7]:
re = splitPackageName('com.example.appname.library.widgetname')
re

# ['com',
#  'com.example',
#  'com.example.appname',
#  'com.example.appname.library',
#  'com.example.appname.library.widgetname']

['com',
 'com.example',
 'com.example.appname',
 'com.example.appname.library',
 'com.example.appname.library.widgetname']