In [11]:
import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery

In [12]:
#Read from bigquery
with beam.Pipeline() as pipeline:
    (
    pipeline |'Read BQ Table' >> ReadFromBigQuery(
    table = 'my-gcp-proj1-473011.my_dataset.customers',
    gcs_location = 'gs://my-gcp-proj1-bucket'    
    )
    |'Print results' >> beam.Map(print)
    )    



{'customer_id': 1, 'customer_name': 'Alice', 'region': 'North'}
{'customer_id': 2, 'customer_name': 'Bob', 'region': 'South'}
{'customer_id': 3, 'customer_name': 'Charlie', 'region': 'East'}
{'customer_id': 4, 'customer_name': 'David', 'region': 'West'}


In [8]:
#write to bigquery
import apache_beam as beam
from apache_beam.io.gcp.bigquery import WriteToBigQuery

data = [
    {'comp_id': 101, 'model': 'Dell'},
    {'comp_id': 102, 'model': 'HP'},
    {'comp_id': 103, 'model': 'MacBook'},
]

table_schema = {
    'fields': [
        {'name': 'comp_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'model', 'type': 'STRING', 'mode': 'NULLABLE'},
    ]
}

with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'Create BigQuery Data' >> beam.Create(data)
        | 'Write to BigQuery' >> WriteToBigQuery(
            table='my-gcp-proj1-473011.my_dataset.computers',
            schema=table_schema,
            custom_gcs_temp_location='gs://my-gcp-proj1-bucket/temp' 
        )
    )





In [24]:
import apache_beam as beam
from apache_beam.io import ReadFromText
import json
from datetime import datetime

data = [
    {'id': 1, 'name': 'Ram', 'salary': 60000, 'address': 'Delhi', 'department': 'Engineering', 'joined_date': '2022-01-15'},
    {'id': 2, 'name': 'Raj', 'salary': 75000, 'address': 'Mumbai', 'department': 'Marketing', 'joined_date': '2021-05-20'},
    {'id': 3, 'name': 'Sara', 'salary': 85000, 'address': 'Bangalore', 'department': 'Finance', 'joined_date': '2023-03-10'},
    {'id': 4, 'name': 'Mark', 'salary': 55000, 'address': 'Chennai', 'department': 'IT', 'joined_date': '2020-08-01'},
    {'id': 5, 'name': 'Anna', 'salary': 65000, 'address': 'Delhi', 'department': 'Sales', 'joined_date': '2022-11-25'},
    {'id': 6, 'name': 'John', 'salary': 72000, 'address': 'Mumbai', 'department': 'IT', 'joined_date': '2021-02-18'},
    {'id': 7, 'name': 'Lisa', 'salary': 68000, 'address': 'Bangalore', 'department': 'Sales', 'joined_date': '2023-01-05'},
    {'id': 8, 'name': 'Peter', 'salary': 58000, 'address': 'Chennai', 'department': 'IT', 'joined_date': '2020-12-12'}
]

with beam.Pipeline() as pipeline:
    employee_pc = pipeline | 'Create Employees' >> beam.Create(data) \
    | 'Print to Console' >> beam.Map(print)


{'id': 1, 'name': 'Ram', 'salary': 60000, 'address': 'Delhi', 'department': 'Engineering', 'joined_date': '2022-01-15'}
{'id': 2, 'name': 'Raj', 'salary': 75000, 'address': 'Mumbai', 'department': 'Marketing', 'joined_date': '2021-05-20'}
{'id': 3, 'name': 'Sara', 'salary': 85000, 'address': 'Bangalore', 'department': 'Finance', 'joined_date': '2023-03-10'}
{'id': 4, 'name': 'Mark', 'salary': 55000, 'address': 'Chennai', 'department': 'IT', 'joined_date': '2020-08-01'}
{'id': 5, 'name': 'Anna', 'salary': 65000, 'address': 'Delhi', 'department': 'Sales', 'joined_date': '2022-11-25'}
{'id': 6, 'name': 'John', 'salary': 72000, 'address': 'Mumbai', 'department': 'IT', 'joined_date': '2021-02-18'}
{'id': 7, 'name': 'Lisa', 'salary': 68000, 'address': 'Bangalore', 'department': 'Sales', 'joined_date': '2023-01-05'}
{'id': 8, 'name': 'Peter', 'salary': 58000, 'address': 'Chennai', 'department': 'IT', 'joined_date': '2020-12-12'}


In [26]:
def add_bonus(employee):
    new_employee = employee.copy()
    new_employee['bonus'] = 1000
    return new_employee

with beam.Pipeline() as pipeline:
    employee_pc = pipeline | 'Create Employees' >> beam.Create(data)

    employee_with_bonus = employee_pc | 'Add Bonus Column' >> beam.Map(add_bonus)
    employee_with_bonus | 'Print to Console' >> beam.Map(print)


{'id': 1, 'name': 'Ram', 'salary': 60000, 'address': 'Delhi', 'department': 'Engineering', 'joined_date': '2022-01-15', 'bonus': 1000}
{'id': 2, 'name': 'Raj', 'salary': 75000, 'address': 'Mumbai', 'department': 'Marketing', 'joined_date': '2021-05-20', 'bonus': 1000}
{'id': 3, 'name': 'Sara', 'salary': 85000, 'address': 'Bangalore', 'department': 'Finance', 'joined_date': '2023-03-10', 'bonus': 1000}
{'id': 4, 'name': 'Mark', 'salary': 55000, 'address': 'Chennai', 'department': 'IT', 'joined_date': '2020-08-01', 'bonus': 1000}
{'id': 5, 'name': 'Anna', 'salary': 65000, 'address': 'Delhi', 'department': 'Sales', 'joined_date': '2022-11-25', 'bonus': 1000}
{'id': 6, 'name': 'John', 'salary': 72000, 'address': 'Mumbai', 'department': 'IT', 'joined_date': '2021-02-18', 'bonus': 1000}
{'id': 7, 'name': 'Lisa', 'salary': 68000, 'address': 'Bangalore', 'department': 'Sales', 'joined_date': '2023-01-05', 'bonus': 1000}
{'id': 8, 'name': 'Peter', 'salary': 58000, 'address': 'Chennai', 'departm

In [27]:
import apache_beam as beam
from apache_beam.io import ReadFromText
import json
from datetime import datetime

def add_tax(employee):
    new_employee = employee.copy()
    new_employee['tax'] = new_employee['salary'] * 0.10
    return new_employee

with beam.Pipeline() as pipeline:
    # First, create the PCollection and assign it to a variable
    employee_pc = pipeline | 'Create Employees' >> beam.Create(data)

    # Then, apply transformations in separate steps
    employee_with_tax = employee_pc | 'Add Tax Column' >> beam.Map(add_tax)

    # Finally, apply the last transform (e.g., printing)
    employee_with_tax | 'Print to Console' >> beam.Map(print)


{'id': 1, 'name': 'Ram', 'salary': 60000, 'address': 'Delhi', 'department': 'Engineering', 'joined_date': '2022-01-15', 'tax': 6000.0}
{'id': 2, 'name': 'Raj', 'salary': 75000, 'address': 'Mumbai', 'department': 'Marketing', 'joined_date': '2021-05-20', 'tax': 7500.0}
{'id': 3, 'name': 'Sara', 'salary': 85000, 'address': 'Bangalore', 'department': 'Finance', 'joined_date': '2023-03-10', 'tax': 8500.0}
{'id': 4, 'name': 'Mark', 'salary': 55000, 'address': 'Chennai', 'department': 'IT', 'joined_date': '2020-08-01', 'tax': 5500.0}
{'id': 5, 'name': 'Anna', 'salary': 65000, 'address': 'Delhi', 'department': 'Sales', 'joined_date': '2022-11-25', 'tax': 6500.0}
{'id': 6, 'name': 'John', 'salary': 72000, 'address': 'Mumbai', 'department': 'IT', 'joined_date': '2021-02-18', 'tax': 7200.0}
{'id': 7, 'name': 'Lisa', 'salary': 68000, 'address': 'Bangalore', 'department': 'Sales', 'joined_date': '2023-01-05', 'tax': 6800.0}
{'id': 8, 'name': 'Peter', 'salary': 58000, 'address': 'Chennai', 'departm

In [28]:
def add_joined_year(employee):
    new_employee = employee.copy()
    date_obj = datetime.strptime(new_employee['joined_date'], '%Y-%m-%d')
    new_employee['year'] = date_obj.year
    return new_employee

with beam.Pipeline() as pipeline:
    employee_pc = pipeline | 'Create Employees' >> beam.Create(data)

    employee_with_year = employee_pc | 'Add Year Column' >> beam.Map(add_joined_year)

    employee_with_year | 'Print to Console' >> beam.Map(print)


{'id': 1, 'name': 'Ram', 'salary': 60000, 'address': 'Delhi', 'department': 'Engineering', 'joined_date': '2022-01-15', 'year': 2022}
{'id': 2, 'name': 'Raj', 'salary': 75000, 'address': 'Mumbai', 'department': 'Marketing', 'joined_date': '2021-05-20', 'year': 2021}
{'id': 3, 'name': 'Sara', 'salary': 85000, 'address': 'Bangalore', 'department': 'Finance', 'joined_date': '2023-03-10', 'year': 2023}
{'id': 4, 'name': 'Mark', 'salary': 55000, 'address': 'Chennai', 'department': 'IT', 'joined_date': '2020-08-01', 'year': 2020}
{'id': 5, 'name': 'Anna', 'salary': 65000, 'address': 'Delhi', 'department': 'Sales', 'joined_date': '2022-11-25', 'year': 2022}
{'id': 6, 'name': 'John', 'salary': 72000, 'address': 'Mumbai', 'department': 'IT', 'joined_date': '2021-02-18', 'year': 2021}
{'id': 7, 'name': 'Lisa', 'salary': 68000, 'address': 'Bangalore', 'department': 'Sales', 'joined_date': '2023-01-05', 'year': 2023}
{'id': 8, 'name': 'Peter', 'salary': 58000, 'address': 'Chennai', 'department': '

In [33]:
#
def rename_column(employee):
    employee['joining_date'] = employee.pop('joined_date')
    return employee

with beam.Pipeline() as pipeline:
    employee_pc = pipeline | 'Create Employees' >> beam.Create(data)
    employee_renamed = employee_pc | 'Rename Column' >> beam.Map(rename_column)
    employee_renamed | 'Print to Console' >> beam.Map(print)


{'id': 1, 'name': 'Ram', 'salary': 60000, 'address': 'Delhi', 'department': 'Engineering', 'joining_date': '2022-01-15'}
{'id': 2, 'name': 'Raj', 'salary': 75000, 'address': 'Mumbai', 'department': 'Marketing', 'joining_date': '2021-05-20'}
{'id': 3, 'name': 'Sara', 'salary': 85000, 'address': 'Bangalore', 'department': 'Finance', 'joining_date': '2023-03-10'}
{'id': 4, 'name': 'Mark', 'salary': 55000, 'address': 'Chennai', 'department': 'IT', 'joining_date': '2020-08-01'}
{'id': 5, 'name': 'Anna', 'salary': 65000, 'address': 'Delhi', 'department': 'Sales', 'joining_date': '2022-11-25'}
{'id': 6, 'name': 'John', 'salary': 72000, 'address': 'Mumbai', 'department': 'IT', 'joining_date': '2021-02-18'}
{'id': 7, 'name': 'Lisa', 'salary': 68000, 'address': 'Bangalore', 'department': 'Sales', 'joining_date': '2023-01-05'}
{'id': 8, 'name': 'Peter', 'salary': 58000, 'address': 'Chennai', 'department': 'IT', 'joining_date': '2020-12-12'}


In [35]:
#Convert the address column to uppercase
def upper_address(employee):
    new_employee = employee.copy()
    new_employee['address'] = new_employee['address'].upper()
    return new_employee

with beam.Pipeline() as pipeline:
    employee_pc = pipeline | 'Create Employees' >> beam.Create(data)
    employee_upper_address = employee_pc | 'Uppercase Address' >> beam.Map(upper_address)
    employee_upper_address | 'Print to Console' >> beam.Map(print)


    

{'id': 1, 'name': 'Ram', 'salary': 60000, 'address': 'DELHI', 'department': 'Engineering', 'joined_date': '2022-01-15'}
{'id': 2, 'name': 'Raj', 'salary': 75000, 'address': 'MUMBAI', 'department': 'Marketing', 'joined_date': '2021-05-20'}
{'id': 3, 'name': 'Sara', 'salary': 85000, 'address': 'BANGALORE', 'department': 'Finance', 'joined_date': '2023-03-10'}
{'id': 4, 'name': 'Mark', 'salary': 55000, 'address': 'CHENNAI', 'department': 'IT', 'joined_date': '2020-08-01'}
{'id': 5, 'name': 'Anna', 'salary': 65000, 'address': 'DELHI', 'department': 'Sales', 'joined_date': '2022-11-25'}
{'id': 6, 'name': 'John', 'salary': 72000, 'address': 'MUMBAI', 'department': 'IT', 'joined_date': '2021-02-18'}
{'id': 7, 'name': 'Lisa', 'salary': 68000, 'address': 'BANGALORE', 'department': 'Sales', 'joined_date': '2023-01-05'}
{'id': 8, 'name': 'Peter', 'salary': 58000, 'address': 'CHENNAI', 'department': 'IT', 'joined_date': '2020-12-12'}


In [36]:
#Drop column
def drop_column(employee):
    new_employee = employee.copy()
    del new_employee['address']
    return new_employee

with beam.Pipeline() as pipeline:
    employee_pc = pipeline | 'Create Employees' >> beam.Create(data)
    employee_dropped = employee_pc | 'Drop Address Column' >> beam.Map(drop_column)
    employee_dropped | 'Print to Console' >> beam.Map(print)


{'id': 1, 'name': 'Ram', 'salary': 60000, 'department': 'Engineering', 'joined_date': '2022-01-15'}
{'id': 2, 'name': 'Raj', 'salary': 75000, 'department': 'Marketing', 'joined_date': '2021-05-20'}
{'id': 3, 'name': 'Sara', 'salary': 85000, 'department': 'Finance', 'joined_date': '2023-03-10'}
{'id': 4, 'name': 'Mark', 'salary': 55000, 'department': 'IT', 'joined_date': '2020-08-01'}
{'id': 5, 'name': 'Anna', 'salary': 65000, 'department': 'Sales', 'joined_date': '2022-11-25'}
{'id': 6, 'name': 'John', 'salary': 72000, 'department': 'IT', 'joined_date': '2021-02-18'}
{'id': 7, 'name': 'Lisa', 'salary': 68000, 'department': 'Sales', 'joined_date': '2023-01-05'}
{'id': 8, 'name': 'Peter', 'salary': 58000, 'department': 'IT', 'joined_date': '2020-12-12'}
