## Apache Beam (Additional Output) Data Processing Notebook

This notebook demonstrates a basic data processing pipeline using Apache Beam. It reads employee department data, extracts employee names, and categorizes them into different outputs based on length and a specific starting character.

### Input Format:

The notebook expects a text file named `dept_data.txt` to be uploaded to the `/content/` directory. Each line in this file should represent a record, with fields separated by commas. The second field (index 1) of each line is treated as the employee's `name`.

**Example `dept_data.txt` content:**
```
ID,Name,Age,Department,HireDate
1,Marco,30,Sales,01-01-2020
2,Rebekah,25,HR,15-03-2021
3,Kyle,35,Engineering,01-07-2019
```

### Output Format:

The pipeline produces three separate text files in the `/content/` directory, each containing a list of names based on specific criteria. Apache Beam's `WriteToText` creates sharded output files, so the actual filenames will include a shard identifier (e.g., `-00000-of-00001`).

1.  **`/content/short_name*-of-*`**: Contains names where the length is less than or equal to a defined `cut_off_len` (currently 4 characters).
2.  **`/content/long_name*-of-*`**: Contains names where the length is greater than the defined `cut_off_len` (currently 4 characters).
3.  **`/content/marker_b*-of-*`**: Contains names that start with a specific `marker` character (currently 'K').

In [29]:
# Install the Apache Beam SDK with GCP support
!pip install --quiet apache-beam[gcp]

In [30]:
import logging

# Configure logging to display INFO level messages with a specific format
logging.basicConfig(
    level=logging.INFO,   # Set logging level to INFO
    format="%(asctime)s - %(levelname)s - %(message)s"
)

logging.info("Logging is enabled!")

In [None]:
from google.colab import files
# Prompt the user to upload files (e.g., dept_data.txt)
files.upload()

In [32]:
import apache_beam as beam
import logging

# Create a Beam Pipeline object
p1=beam.Pipeline()

# Define a DoFn (Do Function) to process elements and produce multiple outputs
class ProcessOutputs(beam.DoFn):
  def process(self,element,cut_off_len,marker):
    # Extract the name from the input line (assuming name is the second field)
    name = element.split(',')[1]

    # Categorize names based on length and yield to appropriate tagged outputs
    if len(name)<=cut_off_len:
      yield beam.pvalue.TaggedOutput('Short_Name',name)
    else:
      yield beam.pvalue.TaggedOutput('Long_Name',name)

    # Categorize names that start with a specific marker character
    if name.startswith(marker):
      # Log the processing of names starting with the marker (for debugging/monitoring)
      logging.info("Processing name starting with %s: %s", marker, name)
      yield beam.pvalue.TaggedOutput('marker_b',name) # Explicitly tag for marker_b output

# Construct the Apache Beam pipeline
result_pcoll = (
    p1
    | beam.io.ReadFromText('/content/dept_data.txt')  # Read input data from text file
    | beam.ParDo(ProcessOutputs(), cut_off_len=4, marker='K') # Apply custom DoFn with parameters
      .with_outputs('Long_Name','Short_Name','marker_b') # Specify all tagged outputs
)

# Get the individual PCollections for each tagged output
short_name = result_pcoll.Short_Name
long_name = result_pcoll.Long_Name
marker_b = result_pcoll.marker_b

# Write each PCollection to a separate text file
short_name | 'Write short names' >> beam.io.WriteToText('/content/short_name')
long_name | 'Write long names' >> beam.io.WriteToText('/content/long_name')
marker_b | 'Write names with b' >> beam.io.WriteToText('/content/marker_b')

# Run the Beam pipeline
p1.run()

# Display the first 5 lines of each output file to verify results
print("\n--- Short Names ---")
!{('head -n 5 /content/short_name*')}
print("\n--- Long Names ---")
!{('head -n 5 /content/long_name*')}
print("\n--- Names starting with 'K' ---")
!{('head -n 5 /content/marker_b*')}




--- Short Names ---
Itoe
Kyle
Kyle
Olga
Kirk

--- Long Names ---
Marco
Rebekah
Edouard
Kumiko
Gaston

--- Names starting with 'K' ---
Kyle
Kyle
Kumiko
Kirk
Kaori


In [None]:
with open('/content/dept_data.txt') as file:
  name_list = list()
  for line in file:
    name_list.append(line.split(',')[1]) # Extract the name (second column)
  for name in name_list:
    if name.startswith('K'):
      print(name) # Print names that start with 'K'
  # print(name_list) # Uncomment to print the full list of names