
# **Apache Beam: Movie Data Processing Pipeline**

## **Overview**
This Colab notebook demonstrates the use of Apache Beam for processing movie data. The pipeline includes advanced features like Composite Transforms, ParDo, Windowing, and Triggers. It also simulates streaming data and processes it in real-time, with results saved directly to Google Drive.

---

## **Key Features**
- **Composite Transforms:** Encapsulate multiple processing steps into reusable components.
- **ParDo:** Apply custom logic to individual data elements.
- **Windowing and Triggers:** Process streaming data using fixed time windows and triggers.
- **Simulated Streaming:** Generate dynamic data streams to mimic real-time processing.
- **Output Management:** Save processed results directly to Google Drive.

---

## **Steps in the Pipeline**
1. **Simulated Data Generation:**
   - Generates movie-like streaming data dynamically.
2. **Data Parsing:**
   - Reads raw CSV lines or generated data and structures them into dictionaries.
3. **Windowing and Triggering:**
   - Applies fixed 60-second windows and triggers data processing after 30 seconds.
4. **Composite Transform:**
   - Filters movies and extracts key details (title, country).
5. **Formatting:**
   - Formats data using a custom ParDo transform for easy readability.
6. **Output Storage:**
   - Writes results to a text file saved in Google Drive.

---

## **How to Use**
1. **Mount Google Drive:**
   - Run the first cell to mount Google Drive and set up input/output paths.
2. **Install Dependencies:**
   - Install Apache Beam using `!pip install apache-beam[interactive]`.
3. **Run the Pipeline:**
   - Execute all cells to run the pipeline and generate the output.
4. **View Results:**
   - Check the output file in your Google Drive (`output/movies_output.txt`).

---

## **Example Output**
The output file contains movie titles and their respective countries:




In [1]:
from google.colab import files

# Step 1: Upload Kaggle API token
print("Please upload your kaggle.json file (API token from Kaggle):")
files.upload()  # Prompt to upload kaggle.json

# Step 2: Configure Kaggle API
# Create Kaggle directory and set permissions
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Step 3: Download Netflix Movies and TV Shows dataset
!kaggle datasets download -d shivamb/netflix-shows

# Step 4: Extract the downloaded dataset
!unzip netflix-shows.zip -d ./netflix_data/

# Step 5: Confirmation
print("Netflix Movies and TV Shows dataset downloaded and extracted successfully!")


Please upload your kaggle.json file (API token from Kaggle):


Saving kaggle.json to kaggle.json
Dataset URL: https://www.kaggle.com/datasets/shivamb/netflix-shows
License(s): CC0-1.0
Downloading netflix-shows.zip to /content
  0% 0.00/1.34M [00:00<?, ?B/s]
100% 1.34M/1.34M [00:00<00:00, 71.7MB/s]
Archive:  netflix-shows.zip
  inflating: ./netflix_data/netflix_titles.csv  
Netflix Movies and TV Shows dataset downloaded and extracted successfully!


In [2]:
!pip install apache-beam


Collecting apache-beam
  Downloading apache_beam-2.60.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting cloudpickle~=2.2.1 (from apache-beam)
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting fastavro<2,>=0.23.6 (from apache-beam)
  Downloading fastavro-1.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting fasteners<1.0,>=0.3 (from apache-beam)
  D

In [6]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import pandas as pd
import csv

In [4]:


# Load the dataset
df = pd.read_csv('./netflix_data/netflix_titles.csv')

# Display the first few rows
print(df.head())


  show_id     type                  title         director  \
0      s1    Movie   Dick Johnson Is Dead  Kirsten Johnson   
1      s2  TV Show          Blood & Water              NaN   
2      s3  TV Show              Ganglands  Julien Leclercq   
3      s4  TV Show  Jailbirds New Orleans              NaN   
4      s5  TV Show           Kota Factory              NaN   

                                                cast        country  \
0                                                NaN  United States   
1  Ama Qamata, Khosi Ngema, Gail Mabalane, Thaban...   South Africa   
2  Sami Bouajila, Tracy Gotoas, Samuel Jouy, Nabi...            NaN   
3                                                NaN            NaN   
4  Mayur More, Jitendra Kumar, Ranjan Raj, Alam K...          India   

           date_added  release_year rating   duration  \
0  September 25, 2021          2020  PG-13     90 min   
1  September 24, 2021          2021  TV-MA  2 Seasons   
2  September 24, 2021        

In [22]:
# Helper function to parse CSV safely
def parse_csv(line):
    try:
        # Use CSV reader to split the line
        row = next(csv.reader([line]))
        # Ensure the row has the expected number of columns
        if len(row) < 12:
            return None  # Skip rows with insufficient data
        return {
            'show_id': row[0],
            'type': row[1],
            'title': row[2],
            'director': row[3],
            'cast': row[4],
            'country': row[5],
            'date_added': row[6],
            'release_year': row[7],
            'rating': row[8],
            'duration': row[9],
            'listed_in': row[10],
            'description': row[11]
        }
    except Exception as e:
        # Skip lines that cannot be parsed
        return None

In [23]:
# Composite Transform to filter movies and map titles and countries
class FilterAndMapMovies(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Filter Movies' >> beam.Filter(lambda x: x and x['type'] == 'Movie')
            | 'Map Title and Country' >> beam.Map(lambda x: f"{x['title']} ({x['country']})")
        )



In [25]:
# Step 5: Set up the pipeline
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

In [26]:
input_file = './netflix_data/netflix_titles.csv'
# Output file path in Google Drive
output_file = '/content/drive/MyDrive/apache_beam/output/movies_output.txt'


In [27]:
# Step 6: Define the pipeline flow
(
    p
    | 'Read CSV' >> beam.io.ReadFromText(input_file, skip_header_lines=1)
    | 'Parse CSV Safely' >> beam.Map(parse_csv)
    | 'Filter and Map Movies' >> FilterAndMapMovies()
    | 'Write to Output' >> beam.io.WriteToText(output_file)
)



<PCollection[[27]: Write to Output/Write/WriteImpl/FinalizeWrite.None] at 0x7accc0466620>

In [28]:
# Step 7: Run the pipeline
result = p.run()
result.wait_until_finish()




'DONE'

In [29]:
# Step 8: Confirm Output
print(f"Output saved to: {output_file}")


Output saved to: /content/drive/MyDrive/apache_beam/output/movies_output.txt


In [34]:
with open('/content/drive/MyDrive/apache_beam/output/movies_output.txt-00000-of-00001', 'r') as f:
    for line in f:
        print(line.strip())


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Operation Christmas Drop (United States)
A Christmas Catch (Canada)
Christmas with a Prince (Canada)
Miss India (India)
Felix Lobrecht: Hype (Germany)
MOTHER (Japan)
The Beginning of Life ()
Ya no estoy aquí: Una conversación entre Guillermo del Toro y Alfonso Cuarón ()
Prospect (Canada, United States)
Christmas Break-In (United States)
Elf Pets: A Fox Cub’s Christmas Tale (United States)
Elf Pets: Santa’s Reindeer Rescue (United States)
Elliot the Littlest Reindeer (Canada)
Little Monsters (United States)
Mile 22 (United States, China, Colombia)
Piercing (United States)
Snowden (United Kingdom, France, Germany, United States)
The Impossible (Spain, Thailand, United States)
The Next Karate Kid (United States)
Wheels of Fortune (United States)
The 12th Man (Norway)
Hidden in Plain Sight (United States)
His House (United Kingdom)
In Line (Nigeria)
Kaali Khuhi (India)
Rogue City (France)
The Day of the Lord (Mexico, Spain)
P

In [35]:
from google.colab import files
files.download('/content/drive/MyDrive/apache_beam/output/movies_output.txt-00000-of-00001')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>