In [10]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import Count, Top
from apache_beam import DoFn
import json
from datetime import datetime

In [11]:
# --- Sample Data ---
employee_data= [
    {"id": 1, "name": "Alice", "salary": 70000, "department": "HR"},
    {"id": 2, "name": "Bob", "salary": 60000, "department": "IT"},
    {"id": 3, "name": "Charlie", "salary": 70000, "department": "HR"},
    {"id": 4, "name": "David", "salary": 55000, "department": "IT"},
    {"id": 5, "name": "Eve", "salary": 80000, "department": "Finance"},
    {"id": 6, "name": "Frank", "salary": 50000, "department": "Finance"},
    {"id": 7, "name": "Grace", "salary": 65000, "department": "HR"},
    {"id": 8, "name": "Heidi", "salary": 55000, "department": "IT"},
    {"id": 9, "name": "Ivan", "salary": None, "department": "Sales"},
    {"id": 10, "name": "Judy", "salary": 80000, "department": "Finance"}
]

orders_data = [
    {"order_id": 1, "product_name": "Laptop", "price": 1200},
    {"order_id": 2, "product_name": "Mouse", "price": 25},
    {"order_id": 3, "product_name": "Keyboard", "price": 75},
    {"order_id": 4, "product_name": "Monitor", "price": 300},
    {"order_id": 5, "product_name": "Laptop", "price": 1200}
]

In [13]:
# 1. Write an Apache Beam program to find the top 3 most occurring words in a given dataset.
with beam.Pipeline() as pipeline:
    output = (
        pipeline
             | 'CreateEmployees' >> beam.Create(employee_data)
             | 'ExtractDepartments' >> beam.Map(lambda record: record.get("department"))
             | 'FilterNoneDepartments' >> beam.Filter(lambda dept: dept is not None)
             | 'PairWithOne' >> beam.Map(lambda dept: (dept, 1))
             | 'CountDepartments' >> beam.CombinePerKey(sum)
             | 'FindTop3' >> beam.combiners.Top.Of(3, key=lambda kv: kv[1]) # Top 3 by count
             | 'PrintResult' >> beam.Map(print)
    )




[('Finance', 3), ('IT', 3), ('HR', 3)]


In [16]:
# 2. Given a PCollection, write an Apache Beam code to remove duplicate records.
with beam.Pipeline() as pipeline:
    output = (
        pipeline
             | 'CreateOrders' >> beam.Create(employee_data)
             | 'KeyRecords' >> beam.Map(lambda record: (tuple(sorted(record.items())), record))
             | 'GroupById' >> beam.GroupByKey()
             | 'SelectFirstElement' >> beam.Map(lambda kv: kv[1][0]) # Take the first instance of each unique record
             | 'PrintResult' >> beam.Map(print)
            )


{'id': 1, 'name': 'Alice', 'salary': 70000, 'department': 'HR'}
{'id': 2, 'name': 'Bob', 'salary': 60000, 'department': 'IT'}
{'id': 3, 'name': 'Charlie', 'salary': 70000, 'department': 'HR'}
{'id': 4, 'name': 'David', 'salary': 55000, 'department': 'IT'}
{'id': 5, 'name': 'Eve', 'salary': 80000, 'department': 'Finance'}
{'id': 6, 'name': 'Frank', 'salary': 50000, 'department': 'Finance'}
{'id': 7, 'name': 'Grace', 'salary': 65000, 'department': 'HR'}
{'id': 8, 'name': 'Heidi', 'salary': 55000, 'department': 'IT'}
{'id': 9, 'name': 'Ivan', 'salary': None, 'department': 'Sales'}
{'id': 10, 'name': 'Judy', 'salary': 80000, 'department': 'Finance'}


In [20]:
# 2. Given a PCollection, write an Apache Beam code to remove duplicate records.
with beam.Pipeline() as pipeline:
    output = (
        pipeline
             | 'CreateOrders' >> beam.Create(orders_data)
             # Key each record by the value of the specified column
             | 'KeyBySpecifiedColumn' >> beam.Map(lambda record: (record['product_name'], record))
             | 'GroupByKey' >> beam.GroupByKey()
             # For each group (i.e., for each unique product_name), take only the first record
             | 'SelectFirstElementForColumn' >> beam.Map(lambda kv: kv[1][0])
             | 'PrintResult' >> beam.Map(lambda x: print(f"De-duplicated by '{'product_name'}': {x}"))
    )

De-duplicated by 'product_name': {'order_id': 1, 'product_name': 'Laptop', 'price': 1200}
De-duplicated by 'product_name': {'order_id': 2, 'product_name': 'Mouse', 'price': 25}
De-duplicated by 'product_name': {'order_id': 3, 'product_name': 'Keyboard', 'price': 75}
De-duplicated by 'product_name': {'order_id': 4, 'product_name': 'Monitor', 'price': 300}


In [21]:
# 3. How do you perform word count using Apache Beam?

with beam.Pipeline() as pipeline:
    output = (
        pipeline
             | 'CreateEmployees' >> beam.Create(employee_data)
             | 'ExtractDepartments' >> beam.Map(lambda record: record.get("department"))
             | 'FilterNoneDepartments' >> beam.Filter(lambda dept: dept is not None)
             | 'PairWithOne' >> beam.Map(lambda dept: (dept, 1))
             | 'CountDepartments' >> beam.CombinePerKey(sum)
             | 'PrintResult' >> beam.Map(print)
    )

('HR', 3)
('IT', 3)
('Finance', 3)
('Sales', 1)


In [22]:
# 4. Write an Apache Beam job to group by a column and calculate the average value.

with beam.Pipeline() as pipeline:
    output = (
        pipeline
             | 'CreateEmployees' >> beam.Create(employee_data)
             | 'KeyByDepartmentAndSalary' >> beam.Map(lambda record: (record["department"], record.get("salary")))
             | 'FilterNoneSalaries' >> beam.Filter(lambda kv: kv[1] is not None)
             | 'CalculateAverage' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
             | 'PrintResult' >> beam.Map(print)
    )

('HR', 68333.33333333333)
('IT', 56666.666666666664)
('Finance', 70000.0)


In [30]:
# 4. Write an Apache Beam job to group by a column and calculate the average value.

with beam.Pipeline() as pipeline:
    output = (
        pipeline
             | 'CreateEmployees' >> beam.Create(employee_data)
             | 'KeyByDepartmentAndSalary' >> beam.Map(lambda record: (record["department"], record.get("salary")))
             | 'FilterNoneSalaries' >> beam.Filter(lambda kv: kv[1] is not None)
             | 'CalculateAverage' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
             | 'RoundAverage' >> beam.Map(lambda kv: (kv[0], round(kv[1], 2))) # Round the average to 2 decimal places
             | 'PrintResult' >> beam.Map(print)
    )


('HR', 68333.33)
('IT', 56666.67)
('Finance', 70000.0)


In [60]:
# 5. How do you handle missing/null values in an Apache Beam PCollection?
options = PipelineOptions()
with beam.Pipeline(options=options) as pipeline:
        (pipeline
         | 'OriginalData' >> beam.Create(employee_data)
         | 'PrintOriginal' >> beam.Map(lambda x: print(f"Original: {x}"))
         | 'PrintOriginalFooter' >> beam.Map(lambda x: print("---------------------")) # Footer for spacing
        )
        # Filter out records where salary is None - Correctly starts a new branch from 'p'
        (pipeline
         | 'CreateEmployeesFilter' >> beam.Create(employee_data)
         | 'FilterOutNullSalary' >> beam.Filter(lambda record: record.get("salary") is not None)
         | 'PrintFiltered' >> beam.Map(lambda x: print(f"Filtered (No Null Salary): {x}"))
         | 'PrintFilteredFooter' >> beam.Map(lambda x: print("---------------------")) # Footer for spacing
        )
        print() # This prints after the Filtered PCollection is processed
        # Fill None salary with 0 - Correctly starts a new branch from 'p'
        (pipeline
         | 'CreateEmployeesFill' >> beam.Create(employee_data)
         | 'FillNullSalary' >> beam.Map(
             lambda record: {**record, 'salary': record['salary'] if record.get('salary') is not None else 0}
           )
         | 'PrintFilled' >> beam.Map(lambda x: print(f"Filled (Null Salary=0): {x}"))
         | 'PrintFilledFooter' >> beam.Map(lambda x: print("---------------------\n")) # Footer for spacing
        )




Filtered (No Null Salary): {'id': 1, 'name': 'Alice', 'salary': 70000, 'department': 'HR'}
---------------------
Filtered (No Null Salary): {'id': 2, 'name': 'Bob', 'salary': 60000, 'department': 'IT'}
---------------------
Filtered (No Null Salary): {'id': 3, 'name': 'Charlie', 'salary': 70000, 'department': 'HR'}
---------------------
Filtered (No Null Salary): {'id': 4, 'name': 'David', 'salary': 55000, 'department': 'IT'}
---------------------
Filtered (No Null Salary): {'id': 5, 'name': 'Eve', 'salary': 80000, 'department': 'Finance'}
---------------------
Filtered (No Null Salary): {'id': 6, 'name': 'Frank', 'salary': 50000, 'department': 'Finance'}
---------------------
Filtered (No Null Salary): {'id': 7, 'name': 'Grace', 'salary': 65000, 'department': 'HR'}
---------------------
Filtered (No Null Salary): {'id': 8, 'name': 'Heidi', 'salary': 55000, 'department': 'IT'}
---------------------
Filtered (No Null Salary): {'id': 10, 'name': 'Judy', 'salary': 80000, 'department': 'F

In [67]:
# 6. Write an Apache Beam program to count distinct values in a column.

options = PipelineOptions()
with beam.Pipeline(options=options) as pipeline:
# Count distinct departments
            (pipeline
             | 'CreateEmployees_Dept' >> beam.Create(employee_data)
             | 'ExtractDepartment' >> beam.Map(lambda record: record.get("department"))
             | 'FilterNoneDepartments' >> beam.Filter(lambda x: x is not None)
             | 'DistinctDepartments' >> beam.Distinct()
             | 'CountDistinctDepartments' >> beam.combiners.Count.Globally()
             | 'PrintDistinctDepartments' >> beam.Map(lambda count: print(f"Distinct Departments: {count}"))
            )

            # Count distinct salaries
            (pipeline
             | 'CreateEmployees_Salary' >> beam.Create(employee_data)
             | 'ExtractSalary' >> beam.Map(lambda record: record.get("salary"))
             | 'FilterNoneSalaries' >> beam.Filter(lambda x: x is not None)
             | 'DistinctSalaries' >> beam.Distinct()
             | 'CountDistinctSalaries' >> beam.combiners.Count.Globally()
             | 'PrintDistinctSalaries' >> beam.Map(lambda count: print(f"Distinct Salaries: {count}"))
            )




Distinct Departments: 4
Distinct Salaries: 6


In [83]:
# 7. Given a PCollection, write an Apache Beam code to filter records where salary > 50,000.

options = PipelineOptions()
with beam.Pipeline(options=options) as pipeline:
           # --- ORIGINAL DATA ---
            print("\n--- ORIGINAL DATA ---")
            (pipeline
                 | 'CreateEmployeesOriginal' >> beam.Create(employee_data)
                 | 'PrintOriginal' >> beam.Map(lambda x: print(f"Original: {x}"))
                 | 'PrintOriginalFooter' >> beam.Map(lambda x: print("---------------------")) # Footer for spacing
            )
            # --- FILTERED DATA (Salary > 50000) ---
            print("--- FILTERED DATA (Salary > 50000) ---\n")
            (pipeline
                 | 'CreateEmployeesFilter' >> beam.Create(employee_data) # Start a new PCollection from raw data
                 | 'FilterBySalary' >> beam.Filter(lambda record: record.get("salary") is not None and record["salary"] > 50000)
                 | 'PrintFiltered' >> beam.Map(lambda x: print(f"Filtered: {x}"))
                 | 'PrintFilteredFooter' >> beam.Map(lambda x: print("---------------------")) # Footer for spacing
            )
             




--- ORIGINAL DATA ---
--- FILTERED DATA (Salary > 50000) ---

Filtered: {'id': 1, 'name': 'Alice', 'salary': 70000, 'department': 'HR'}
---------------------
Filtered: {'id': 2, 'name': 'Bob', 'salary': 60000, 'department': 'IT'}
---------------------
Filtered: {'id': 3, 'name': 'Charlie', 'salary': 70000, 'department': 'HR'}
---------------------
Filtered: {'id': 4, 'name': 'David', 'salary': 55000, 'department': 'IT'}
---------------------
Filtered: {'id': 5, 'name': 'Eve', 'salary': 80000, 'department': 'Finance'}
---------------------
Filtered: {'id': 7, 'name': 'Grace', 'salary': 65000, 'department': 'HR'}
---------------------
Filtered: {'id': 8, 'name': 'Heidi', 'salary': 55000, 'department': 'IT'}
---------------------
Filtered: {'id': 10, 'name': 'Judy', 'salary': 80000, 'department': 'Finance'}
---------------------
Original: {'id': 1, 'name': 'Alice', 'salary': 70000, 'department': 'HR'}
---------------------
Original: {'id': 2, 'name': 'Bob', 'salary': 60000, 'department':

In [87]:
# 8. Write an Apache Beam job to read a JSON file and convert it into a PCollection.
import json
options = PipelineOptions()
with beam.Pipeline(options=options) as pipeline:
 # If you were reading from a file, it would look like this:
            (pipeline
                | 'ReadJsonLines' >> beam.io.ReadFromText('gs://scoups/raw/sample_file01.json')
                | 'PrintResult' >> beam.Map(print)
            )



[
  {
    "transaction_id": "T2",
    "store_id": "2",
    "category_id": "4",
    "quantity": 1,
    "price": 66.14,
    "timestamp": "2025-06-10T10:00:00",
    "customer_id": "01"
  },
  {
    "transaction_id": "T3",
    "store_id": "4",
    "category_id": "2",
    "quantity": 2,
    "price": 223.95,
    "timestamp": "2025-06-09T10:00:00",
    "customer_id": "02"
  },
  {
    "transaction_id": "T4",
    "store_id": "3",
    "category_id": "2",
    "quantity": 3,
    "price": 110.7,
    "timestamp": "2025-06-08T10:00:00",
    "customer_id": "03"
  },
  {
    "transaction_id": "T5",
    "store_id": "1",
    "category_id": "3",
    "quantity": 4,
    "price": 121.24,
    "timestamp": "2025-06-07T10:00:00",
    "customer_id": "04"
  },
  {
    "transaction_id": "T6",
    "store_id": "2",
    "category_id": "2",
    "quantity": 5,
    "price": 25.14,
    "timestamp": "2025-06-06T10:00:00",
    "customer_id": "05"
  },
  {
    "transaction_id": "T7",
    "store_id": "4",
    "category_id":

In [88]:
# 9. Write an Apache Beam query to find the second highest salary from an employee PCollection.

options = PipelineOptions()
with beam.Pipeline(options=options) as pipeline:
            (pipeline
                 | 'CreateEmployees' >> beam.Create(employee_data)
                 | 'FilterNotNullSalary' >> beam.Filter(lambda record: record.get("salary") is not None)
                 | 'ExtractSalaries' >> beam.Map(lambda record: record["salary"])
                 | 'DistinctSalaries' >> beam.Distinct()
                 | 'FindTop2Salaries' >> beam.combiners.Top.Of(2, key=lambda x: x) # Gets the 2 highest distinct salaries
                 | 'ExtractSecondHighest' >> beam.Map(lambda top_salaries_list: top_salaries_list[1] if len(top_salaries_list) > 1 else None)
                 | 'PrintResult' >> beam.Map(print)
            )




70000


In [90]:
# 10. Write an Apache Beam job to join two PCollections and select specific columns.
options = PipelineOptions()
with beam.Pipeline(options=options) as pipeline:
    department_info_data = [
            {"department": "HR", "location": "North Building", "headcount": 100},
            {"department": "IT", "location": "South Building", "headcount": 150},
            {"department": "Finance", "location": "East Building", "headcount": 80},
            {"department": "Sales", "location": "West Building", "headcount": 120} # No matching employees in this example
    ]
    
    # Prepare PCollections for join: (key, value) pairs
    employees_keyed = (pipeline
                           | 'CreateEmployees' >> beam.Create(employee_data)
                           | 'KeyEmployeesByDept' >> beam.Map(lambda emp: (emp["department"], emp)))
        
    departments_keyed = (pipeline
                             | 'CreateDeptInfo' >> beam.Create(department_info_data)
                             | 'KeyDepartmentsByDept' >> beam.Map(lambda dept: (dept["department"], dept)))

        # CoGroupByKey to perform the join
    joined_pcoll = ({'employees': employees_keyed, 'departments': departments_keyed}
                        | 'CoGroup' >> beam.CoGroupByKey())
        
        # Process the joined result to flatten and select specific columns
    select_cols = ["name", "department", "location", "headcount"]
    (joined_pcoll
         | 'ProcessJoinedData' >> beam.FlatMap(lambda element: [
             # This inner loop ensures we only combine when both employee and department info exist
             {col: ({**emp, **dept_info}).get(col) for col in select_cols}
             for emp in element[1]['employees']
             for dept_info in element[1]['departments']
         ])
         | 'PrintResult' >> beam.Map(print)
        )



{'name': 'Alice', 'department': 'HR', 'location': 'North Building', 'headcount': 100}
{'name': 'Charlie', 'department': 'HR', 'location': 'North Building', 'headcount': 100}
{'name': 'Grace', 'department': 'HR', 'location': 'North Building', 'headcount': 100}
{'name': 'Bob', 'department': 'IT', 'location': 'South Building', 'headcount': 150}
{'name': 'David', 'department': 'IT', 'location': 'South Building', 'headcount': 150}
{'name': 'Heidi', 'department': 'IT', 'location': 'South Building', 'headcount': 150}
{'name': 'Eve', 'department': 'Finance', 'location': 'East Building', 'headcount': 80}
{'name': 'Frank', 'department': 'Finance', 'location': 'East Building', 'headcount': 80}
{'name': 'Judy', 'department': 'Finance', 'location': 'East Building', 'headcount': 80}
{'name': 'Ivan', 'department': 'Sales', 'location': 'West Building', 'headcount': 120}
