# Import Libraries

In [11]:
%load_ext autoreload
%autoreload 2


from prefect import Flow, task
from prefect.tasks.shell import ShellTask
from prefect.schedules import Schedule
from dask.distributed import Client
from prefect.executors import LocalDaskExecutor

import os
import sys
import pandas as pd

module_path = os.path.abspath(os.path.join(".."))
if module_path not in sys.path:
    sys.path.append(module_path)
from src.data import io

cwd = os.getcwd().split("/")[-1]
if cwd == "jupyter_notebooks":
    os.chdir("../")

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [12]:
executor=LocalDaskExecutor()

# Scrape Events Flow

In [15]:
event_schedule = Schedule([IntervalClock(start_date=pendulum.datetime(2020, 12, 13, 6, tz="America/Los_Angeles"),
                               interval=timedelta(days=7))])

shell_task = ShellTask(helper_script="cd scrapy_ufcstats")

with Flow("Scrape Events", executor=executor) as f: #schedule=event_schedule
    # Scrapy crawls events and save the URLs as a csv.
    scrapy_crawl_events_cmd = f"scrapy crawl events -O ../data/events/event_urls.csv"
    scrapy_shell_task = shell_task(command=scrapy_crawl_events_cmd)
    
    # Reads the scraped event_urls and uploads them to a postgres table.
    event_urls_df = io.read_csv("data/events/event_urls.csv", upstream_tasks=[scrapy_shell_task])
    
    # Adds the date the URL was scraped.
    
    
    # Creates a sqlalchemy engine.
    engine = io.connect_postgres()
    
    # Copies the recently scraped events_df to a postgres table.
    io.df_to_table(df=event_urls_df, engine=engine, table_name="event_urls")
    
    
f.run()    
#f.register("scrape-ufcstats")


[2021-01-15 22:33:07-0800] INFO - prefect.FlowRunner | Beginning Flow run for 'Scrape Events'
[2021-01-15 22:33:07-0800] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2021-01-15 22:33:07-0800] INFO - prefect.TaskRunner | Task 'connect_postgres': Starting task run...
[2021-01-15 22:33:07-0800] INFO - prefect.connect_postgres | Connecting to the PostgreSQL database fightdata ...
[2021-01-15 22:33:07-0800] INFO - prefect.connect_postgres | Connection to db successful.
[2021-01-15 22:33:07-0800] INFO - prefect.TaskRunner | Task 'connect_postgres': Finished task run for task with final state: 'Success'
[2021-01-15 22:33:44-0800] INFO - prefect.TaskRunner | Task 'ShellTask': Finished task run for task with final state: 'Success'
[2021-01-15 22:33:44-0800] INFO - prefect.TaskRunner | Task 'read_csv': Starting task run...
[2021-01-15 22:33:44-0800] INFO - prefect.TaskRunner | Task 'read_csv': Finished task run for task with final state: 'Success'
[2021-01-15 22:33:44-0800

<Success: "All reference tasks succeeded.">

In [7]:
with Flow("Test", executor=executor) as f:
    conn = io.connect_postgres()
# Registers the Flow to the Prefect project.
#f.register("scrape-ufcstats")
f.run()

[2021-01-15 22:05:16-0800] INFO - prefect.FlowRunner | Beginning Flow run for 'Test'
[2021-01-15 22:05:16-0800] INFO - prefect.TaskRunner | Task 'connect_postgres': Starting task run...
[2021-01-15 22:05:16-0800] INFO - prefect.connect_postgres | Connecting to the PostgreSQL database fightdata ...
[2021-01-15 22:05:16-0800] INFO - prefect.connect_postgres | Connection to db successful.
[2021-01-15 22:05:16-0800] INFO - prefect.TaskRunner | Task 'connect_postgres': Finished task run for task with final state: 'Success'
[2021-01-15 22:05:16-0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded


<Success: "All reference tasks succeeded.">