In [150]:
import os
import yaml
import pandas as pd
import numpy as np
import azureml.core
from azureml.core import Workspace, Datastore, ComputeTarget
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient
from azure.ai.ml import command
from azure.ai.ml import Input, Output
from azure.ai.ml import load_component
from azure.ai.ml.entities import Environment, Data, PipelineJob, Job, Schedule
from datetime import datetime, timedelta

In [151]:
# Read the YAML file
with open('./api.yaml', 'r') as yaml_file:
    data = yaml.safe_load(yaml_file)

# Access the API keys and other configuration data
weaviate_url = data.get('weaviate').get('url')
weaviate_api_key = data.get('weaviate').get('api_key')
cohere_api_key = data.get('cohere').get('api_key')
openai_api_key = data.get('openai').get('api_key')
serper_api_key = data.get('serper').get('api_key')

os.environ["OPENAI_API_KEY"] = openai_api_key
os.environ["SERPER_API_KEY"] = serper_api_key
SUBSCRIPTION = data.get('azure').get('subscription_id')
RESOURCE_GROUP = data.get('azure').get('resource_group_name')
WS_NAME = data.get('azure').get('workspace_name')

In [152]:
ws = Workspace.from_config()
datastore = Datastore.get(ws, "workspaceblobstore")

# authenticate
credential = DefaultAzureCredential()

# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id=SUBSCRIPTION,
    resource_group_name=RESOURCE_GROUP,
    workspace_name=WS_NAME,
)

In [153]:
# Create an environment with Conda dependencies
env = Environment(
    name="rss-env",
    image="mcr.microsoft.com/azureml/curated/sklearn-1.5:2",
    conda_file={
        "dependencies": [
            "python=3.8",
            {
                "pip": [
                    "feedparser",
                    "beautifulsoup4",
                    "pandas",
                    "swifter",
                     "article-parser"
                ]
            }
        ]
    }
)

In [154]:
pipeline_dir = "./rss_pipeline"
os.makedirs(pipeline_dir, exist_ok=True)

In [155]:
ml_models_src_dir = "./rss_components/ml_models"
os.makedirs(ml_models_src_dir, exist_ok=True)

In [156]:
%%writefile {ml_models_src_dir}/rss_fetch_feed.py

import requests
import os
import base64
import argparse
import pandas as pd
import re
import swifter

from bs4 import BeautifulSoup
from datetime import datetime
from dateutil.parser import parse
from urllib.parse import urlparse, urljoin
from requests.exceptions import RequestException, ConnectionError, Timeout

def parse_pubdate(pubdate_string):
    try:
        parsed_date = parse(pubdate_string)
        return parsed_date
    except ValueError:
        print(f"Error parsing pubDate: {pubdate_string}")
        return ''

def get_summary_from_item(item, feed_url):
    summary_tags = ['description', 'content']
    for tag in summary_tags:
        summary = item.find(tag)
        if summary:
            soup = BeautifulSoup(' '.join(summary.stripped_strings), 'html.parser')
            return soup.get_text(strip=True)
    return ''

def get_url_from_item(item):
    link = item.find('link')
    return link.get('href') if link and link.get('href') else link.string.strip() if link and link.string else None
    
def get_pubDate_from_item(item):
    default = {'pubDateRaw': '', 'pubDate': ''}
    if item.pubDate:
        pubDateRaw = item.pubDate.text
        return {'pubDateRaw': pubDateRaw, 'pubDate': parse_pubdate(pubDateRaw)}
        
    return default

def parse_rss_feed(feed_url):
    if feed_url is None:
        return []
    try:
        response = requests.get(feed_url)
        soup = BeautifulSoup(response.content, 'xml')
    except requests.RequestException as e:
        print(f"Error fetching RSS feed: {e}")
        return []

    articles = []
    item_tags = ['item', 'entry']
    media_tags = ['enclosure', 'media:thumbnail', 'media:content']

    for item_tag in item_tags:
        for item in soup.find_all(item_tag):

            link = get_url_from_item(item)
            summary = get_summary_from_item(item, feed_url)
            
            article = {
                'title': item.title.text,
                'link': link,
                'summary': summary,
                'images': []
            }
            article.update(get_pubDate_from_item(item))

            # Find all media elements
            for tag in media_tags:
                for media in item.find_all(tag):
                    # Determine if the media is an image based on the tag
                    is_image = (tag == 'enclosure' and media.get('type', '').startswith('image/')) or (tag in ['media:thumbnail', 'media:content'])
                    
                    if is_image:
                        image_url = media.get('url')
                        image_description = media.get('description', '')
                        image_width = media.get('width', '')
                        image_height = media.get('height', '')
                        image_length = media.get('length', '')
                        
                        if image_url:  # Ensure the URL is present
                            image_name = f"{base64.urlsafe_b64encode(image_url.encode()).decode().rstrip('=')}"
                            max_length = 250
                            image_name_truncate = image_name[:max_length]+ ".jpg"
                            article['images'].append({
                                'url': image_url,
                                'img_name': image_name_truncate,
                                'description': image_description,
                                'width': image_width,
                                'height': image_height,
                                'length': image_length
                            })

            articles.append(article)

    return articles

def save_images(articles, output_dir, output_suffix):
    # Create the directory if it doesn't exist
    if not os.path.exists(os.path.join(output_dir, 'images', output_suffix)):
        os.makedirs(os.path.join(output_dir, 'images', output_suffix))

    for article in articles:
        for image in article['images']:
            image_url = image['url']
            image_filename = os.path.join(output_dir, 'images', output_suffix, image['img_name'])

            try:
                response = requests.get(image_url)
                with open(image_filename, 'wb') as file:
                    file.write(response.content)
                print(f"Saved image: {image_filename}")
            except requests.RequestException as e:
                print(f"Error downloading image {image_url}: {e}")

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--feed_url", type=str, help="path to feed url")
    parser.add_argument("--crawled_path", type=str, help="path to crawled data")
    parser.add_argument("--output_suffix", type=str, help="suffix of the output file and folder")
    args = parser.parse_args()

    input_path = os.path.join(args.feed_url, "topic_websites_feed_url.tsv")
    output_suffix = args.output_suffix
    output_path = os.path.join(args.crawled_path, "rss_crawled_data_" + output_suffix + ".csv")

    topic_websites = pd.read_csv(input_path, sep='\t', encoding="latin_1")
    topic_websites = topic_websites.drop_duplicates(subset='feed_url')

    articles_list = topic_websites['feed_url'].swifter.apply(parse_rss_feed)
    
    article_flattened = [item for sublist in articles_list for item in sublist]

    df = pd.DataFrame(article_flattened)
    df['dateTime'] = output_suffix
    df.to_csv(output_path, sep = '\t', index=False)
    print('RSS data saved successfully.')

    articles_list.swifter.apply(save_images, output_dir = args.crawled_path, output_suffix = output_suffix)

    index_path = os.path.join(args.crawled_path, "rss_crawled_data_index.tsv")
    crawled_index = pd.read_csv(index_path, sep='\t', encoding="latin_1")
    crawled_index.loc[len(crawled_index)] = {'suffix': output_suffix}
    crawled_index.to_csv(index_path, index=False)

if __name__ == '__main__':
    main()

Overwriting ./rss_components/ml_models/rss_fetch_feed.py


In [157]:
%%writefile {ml_models_src_dir}/rss_fetch_feed.yml
# <component>
name: fetch_feed_topic
display_name: fetch and save rss feed data
type: command
inputs:
  feed_url:
    type: uri_folder
  output_suffix:
    type: string
outputs:
  crawled_path:
    type: uri_folder
code: .
environment:
  azureml:rss-env:12
compute:
  azureml:qqwjq99161
command: >-
  python rss_fetch_feed.py 
  --feed_url ${{inputs.feed_url}}
  --output_suffix ${{inputs.output_suffix}}
  --crawled_path ${{outputs.crawled_path}}
# </component>

Overwriting ./rss_components/ml_models/rss_fetch_feed.yml


In [158]:
compute_instance = ComputeTarget(workspace=ws, name="qqwjq99161")

if compute_instance.get_status().state != 'Running':
    compute_instance.start(wait_for_completion=True)

# Loading the component from the yml file
rss_fetch_feed_component = load_component(source=os.path.join(ml_models_src_dir, "rss_fetch_feed.yml"))

# Now we register the component to the workspace
rss_fetch_feed_component = ml_client.create_or_update(rss_fetch_feed_component)

# Create (register) the component in your workspace
print(
    f"Component {rss_fetch_feed_component.name} with Version {rss_fetch_feed_component.version} is registered"
)

# the dsl decorator tells the sdk that we are defining an Azure Machine Learning pipeline
from azure.ai.ml import dsl, Input, Output

crawl_path = ml_client.data.get("rss_crawl", version="3").path
input_data = Input(type='uri_folder', path = crawl_path)
output_data = Output(type="uri_folder", path=crawl_path, mode="rw_mount")

@dsl.pipeline(
    compute="serverless",  # "serverless" value runs pipeline on serverless compute
    description="fetch rss feed",
)

def rss_fetch_feed_pipeline(pipeline_feed_url):

    # Format the time as YYYY-MM-DD-HH
    output_suffix = datetime.now().strftime("%Y-%m-%dT%H")

    # using data_prep_function like a python call with its own inputs
    rss_fetch_feed_job = rss_fetch_feed_component(feed_url = pipeline_feed_url, output_suffix = output_suffix)
    rss_fetch_feed_job.allow_reuse = False  # Disable caching for this step

    rss_fetch_feed_job.outputs.crawled_path = output_data

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "JSON_FORMAT_curation": rss_fetch_feed_job.outputs.crawled_path,
    }

[32mUploading ml_models (0.01 MBs):   0%|          | 0/7524 [00:00<?, ?it/s][32mUploading ml_models (0.01 MBs): 100%|██████████| 7524/7524 [00:00<00:00, 140725.36it/s]
[39m



Component fetch_feed_topic with Version 2024-09-02-09-57-46-8606825 is registered


In [159]:
# Let's instantiate the pipeline with the parameters of our choice
pipeline = rss_fetch_feed_pipeline(input_data)

pipeline.allow_reuse = False

# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="rss-crawl-exp-1"
)

ml_client.jobs.stream(pipeline_job.name)

# # Define the schedule parameters
# schedule = Schedule(
#     name="rss-pipeline-schedule",
#     description="Runs the pipeline every day at 1:00 AM",
#     pipeline_job=pipeline_job,
#     recurrence={
#         "frequency": "Day",
#         "interval": 1,
#         "start_time": datetime.utcnow() + timedelta(days=1),  # Start tomorrow
#         "hours": [1]  # Run at 1:00 AM UTC
#     },
#     experiment_name="rss-crawl-exp-1",  # Replace with your experiment name
#     wait_for_provisioning=True,
#     wait_timeout=300
# )

# # Create the schedule job
# try:
#     submitted_schedule_job = ml_client.jobs.create_or_update(
#         schedule
#     )
#     print(f"Schedule submitted successfully with name: {submitted_schedule_job.name}")

# except Exception as e:
#     print(f"Error creating schedule job: {e}")

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored


RunId: sad_jicama_xvp85h3td3
Web View: https://ml.azure.com/runs/sad_jicama_xvp85h3td3?wsid=/subscriptions/541beb67-718e-41c5-958e-8cc0ba95b210/resourcegroups/awesome_rag_dev/workspaces/rag_book_demo

Streaming logs/azureml/executionlogs.txt

[2024-09-02 09:57:52Z] Submitting 1 runs, first five are: be9a0e54:64249c73-e7ab-4389-ac02-c45cf0ab0a2d


In [None]:
# # Define the schedule
# schedule = Schedule(
#     name="rss_crawl_schedule",
#     description="Runs the pipeline every day at 2AM",
#     recurrence={"frequency": "day", "interval": 1},
#     pipeline_job=pipeline_job,
#     plan={"plan_type": "fixed_schedule", "plan_details": {"time_of_day": "02:00"}}
# )

# # Create the schedule
# ml_client.schedules.create_or_update(schedule)

In [None]:
# # Define the schedule parameters
# schedule_params = {
#     "name": "rss-pipeline-schedule",
#     "description": "Runs the pipeline every day at 1:00 AM",
#     "recurrence": {
#         "frequency": "Day",
#         "interval": 1,
#         "start_time": datetime.utcnow() + timedelta(days=1),  # Start tomorrow
#         "hours": [1]  # Run at 1:00 AM UTC
#     },
#     "experiment_name": "rss-crawl-exp-1",  # Replace with your experiment name
#     "wait_for_provisioning": True,
#     "wait_timeout": 300
# }


# # Create the scheduled job
# try:
#     submitted_job = ml_client.jobs.create_or_update(
#         pipeline_job,
#         experiment_name=schedule_params["experiment_name"],
#         schedule=schedule_params
#     )
#     print(f"Scheduled job submitted successfully with name: {submitted_job.name}")

# except Exception as e:
#     print(f"Error creating scheduled job: {e}")

In [None]:
# from azure.ai.ml.entities import Schedule, RecurrenceTrigger, RecurrencePattern

# # Define the recurrence trigger
# recurrence_trigger = RecurrenceTrigger(
#     frequency="Day",
#     interval=1,
#     start_time=(datetime.utcnow() + timedelta(days=1)).isoformat(),
#     time_zone="UTC",
#     schedule=RecurrencePattern(hours=[1])
# )

# schedule = Schedule(
#     name=schedule_params["name"],
#     description=schedule_params["description"],
#     trigger=recurrence_trigger,
#     create_job=pipeline_job
# )

# submitted_schedule = ml_client.schedules.create_or_update(schedule)
# print(f"Scheduled job submitted successfully with name: {submitted_schedule.name}")