In [6]:
import json
import logging
import os
import pandas as pd
import requests
import sqlite3
import sys
import time

In [8]:
def main():
    # Scrapyd's API Manager
    # https://scrapyd.readthedocs.io/en/latest/api.html

    # logging: debug info warning error critical
    logging.basicConfig(level=logging.DEBUG)
    logging.info("Initialize Scrapyd's API Manager")

    SLEEP_TIME = 180
    logging.info(f"SLEEP_TIME_IN_SECONDS: {SLEEP_TIME}")

    LISTJOBS_PATH = "/home/monk3yd/GDrive/theLab/work/pjud_scraper/scrapyd_APIManager/services/listjobs.json"
    logging.info(f"LISTJOBS_FILE_PATH: {LISTJOBS_PATH}")

    DAEMONSTATUS_PATH = "/home/monk3yd/GDrive/theLab/work/pjud_scraper/scrapyd_APIManager/services/daemonstatus.json"
    logging.info(f"DAEMONSTATUS_FILE_PATH: {DAEMONSTATUS_PATH}")


    logging.info("Listening to scrapyd server...")
    while True:
        logging.info("Checking status...")

        # --- DaemonStatus ---
        logging.info("Save daemonstatus...")
        save_daemonstatus(DAEMONSTATUS_PATH)

        # --- ListJobs ---
        logging.info("Save listjobs...")
        save_listjobs(LISTJOBS_PATH)
       
        # W8 for next check
        logging.info(f"Time for next status report: {SLEEP_TIME} seconds...\n")
        time.sleep(SLEEP_TIME)

    # --- Schedule ---
    # spider_data = {}
    # add_to_schedule(spider_data)

In [9]:

def save_listjobs(file_path):
    '''
    Get the list of pending, running and finished jobs of some project.

    Supported Request Methods: GET
    Parameters:
        project (string, option) - restrict results to project name
    '''

    url = "http://127.0.0.1:6800/listjobs.json"
    response = requests.get(url=url)

    # Update listjobs JSON file
    logging.info("Saving listjobs in JSON file...")
    with open(file_path, "w") as file:
        file.write(json.dumps(response.json(), indent=4))

    # Read new datra in listjobs
    logging.info("Reading listjobs JSON file...")
    with open(file_path, "r") as file:
        json_data = json.loads(file.read())

    pending_jobs = json_data["pending"]
    running_jobs = json_data["running"]
    finished_jobs = json_data["finished"]  # List of dicts

    if not pending_jobs and not running_jobs and not finished_jobs:
        logger.warning("Couldn't find any jobs in listjobs...")  # listjobs is empty
        sys.exit()

    # Jobs finished
    if not pending_jobs and not running_jobs and finished_jobs:
        finished_df = pd.DataFrame(finished_jobs)
        
        # Save to db
        connect_to_sql(merged_df)
    
    # Jobs almost finished
    if not pending_jobs and running_jobs and finished_jobs:
        running_df = pd.DataFrame(running_jobs)
        finished_df = pd.DataFrame(finished_jobs)
        
        # Merge
        # merged_df = finished_df.merge(running_df, how="outer")

        # Save to db
        # connect_to_sql(merged_df)
    
    # Jobs are stale (not scraping)
    if pending_jobs and not running_jobs and finished_jobs:
        pending_df = pd.DataFrame(pending_jobs)
        finished_df = pd.DataFrame(finished_jobs)
        
        # Merge
        # merged_df = finished_df.merge(pending_df, how="outer")

        # Save to db
        # connect_to_sql(merged_df)
    
    # Fully scraping jobs
    if pending_jobs and running_jobs and finished_jobs:
        pending_df = pd.DataFrame(pending_jobs)
        running_df = pd.DataFrame(running_jobs)
        finished_df = pd.DataFrame(finished_jobs)
        
        # Merge
        merged_df = finished_df.merge(running_df, how="outer").merge(pending_df, how="outer")

        # Save to db
        connect_to_sql(merged_df)

In [14]:

def connect_to_sql(dataframe):
    database = "../dbs/listjobs.db"
    logging.info(f"Create listjobs database at {database}...")
    conn = sqlite3.connect(database)
    cur = conn.cursor()

    # If database doesn't exist
    if not os.path.exists(database):
        # Create db from dataframe
        dataframe.to_sql(name="listjobs", con=conn)
        
    else:
        # Read db into dataframe
        database_df = pd.read_sql("SELECT * FROM listjobs", conn)

        # Get delta of updated_df - database_df
        # delta_df = dataframe - database_df

        #
        # TODO: Update existing entries
        cur.execute("UPDATE listjobs ")
        # TODO: Add new entries
        ...

    conn.close()
    return 0

In [15]:

def save_daemonstatus(file_path):
    '''
    Load status of a service.

    Supported Request Methods: GET

    '''
    url = "http://localhost:6800/daemonstatus.json"
    response = requests.get(url=url)
    
    # Save into json
    with open(file_path, "w") as file:
        file.write(json.dumps(response.json(), indent=4))
    return 0

    # TODO: save into db

In [16]:

if __name__ == "__main__":
    main()

INFO:root:Initialize Scrapyd's API Manager
INFO:root:SLEEP_TIME_IN_SECONDS: 180
INFO:root:LISTJOBS_FILE_PATH: /home/monk3yd/GDrive/theLab/work/pjud_scraper/scrapyd_APIManager/services/listjobs.json
INFO:root:DAEMONSTATUS_FILE_PATH: /home/monk3yd/GDrive/theLab/work/pjud_scraper/scrapyd_APIManager/services/daemonstatus.json
INFO:root:Listening to scrapyd server...
INFO:root:Checking status...
INFO:root:Save daemonstatus...
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost:6800
DEBUG:urllib3.connectionpool:http://localhost:6800 "GET /daemonstatus.json HTTP/1.1" 200 89
INFO:root:Save listjobs...
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): 127.0.0.1:6800
DEBUG:urllib3.connectionpool:http://127.0.0.1:6800 "GET /listjobs.json HTTP/1.1" 200 2655725
INFO:root:Saving listjobs in JSON file...
INFO:root:Reading listjobs JSON file...
INFO:root:Create listjobs database at ../dbs/listjobs.db...


DatabaseError: Execution failed on sql 'SELECT * FROM listjobs': no such table: listjobs

### Daemonstatus
Load status of a service.
Supported Request Methods: GET

In [2]:
# Daemonstatus dataframe
daemonstatus_path = "/home/monk3yd/GDrive/theLab/work/pjud_scraper/scrapyd_APIManager/services/daemonstatus.json"
with open(daemonstatus_path, "r") as file:
    json_data = json.loads(file.read())

daemonstatus_df = pd.DataFrame.from_dict(json_data, orient="index")
daemonstatus_df


Unnamed: 0,0
node_name,RFTL
status,ok
pending,11013
running,32
finished,8660


### Listjobs
Get the list of pending, running and finished jobs of some project.

Supported Request Methods: GET
Parameters:
    project (string, option) - restrict results to project name

In [None]:
# Listjobs dataframe
listjobs_path = "/home/monk3yd/GDrive/theLab/work/pjud_scraper/scrapyd_APIManager/services/listjobs.json"
with open(listjobs_path, "r") as file:
    json_data = json.loads(file.read())

server_name = json_data["node_name"]  # str
server_status = json_data["status"]  # str

# Dataframes
pending_jobs = json_data["pending"]  # List of dicts
if pending_jobs:
    pending_df = pd.DataFrame(pending_jobs)

running_jobs = json_data["running"]  # List of dicts
if running_jobs:
    running_df = pd.DataFrame(running_jobs)

finished_jobs = json_data["finished"]  # List of dicts
if finished_jobs:
    finished_df = pd.DataFrame(finished_jobs)

# Number of rows (finished causas)
len(finished_df.index)

finished_df

In [None]:
# Merge dataframes
super_df = finished_df.merge(running_df, how="outer").merge(pending_df, how="outer")

In [None]:
# SQLite connection
database = "dbs/listjobs.db"
conn = sqlite3.connect(database)
super_df.to_sql(name="listjobs", con=conn)
conn.close()

# Functions

In [None]:
def save_daemonstatus(file_path):
    '''
    Load status of a service.

    Supported Request Methods: GET

    '''
    url = "http://localhost:6800/daemonstatus.json"
    response = requests.get(url=url)
    
    # Save into json
    with open(file_path, "w") as file:
        file.write(json.dumps(response.json(), indent=4))
    return 0

    # TODO: save into db

In [None]:
def save_listjobs(file_path):
    '''
    Get the list of pending, running and finished jobs of some project.

    Supported Request Methods: GET
    Parameters:
        project (string, option) - restrict results to project name
    '''

    url = "http://127.0.0.1:6800/listjobs.json"
    response = requests.get(url=url)

    # Save into json
    with open(file_path, "w") as file:
        file.write(json.dumps(response.json(), indent=4))

    # TODO: Save into db

    # Listjobs dataframe
    logging.info("Save JSON...")
    with open(file_path, "r") as file:
        json_data = json.loads(file.read())

    # SQLite connection
    # connect_to_sql()

In [None]:
def connect_to_sql():
    # List of dicts
    pending_listjobs = json_data["pending"]
    if pending_listjobs:
        pending_df = pd.DataFrame(pending_listjobs)
        pending_database = "dbs/pending.db"
        conn = sqlite3.connect(pending_database)
        pending_df.to_sql(name="pending_listjobs", con=conn)

    running_listjobs = json_data["running"]
    if running_listjobs:
        running_df = pd.DataFrame(running_listjobs)
        running_database = "dbs/running.db"
        conn = sqlite3.connect(running_database)
        running_df.to_sql(name="running_listjobs", con=conn)

    finished_listjobs = json_data["finished"]
    if finished_listjobs:
        finished_df = pd.DataFrame(finished_listjobs)
        finished_database = "dbs/finished.db"
        conn = sqlite3.connect(finished_database)
        finished_df.to_sql(name="finished_listjobs", con=conn)

    # TODO: Merge dataframes
    try:
        super_df = finished_df.merge(running_df, how="outer").merge(pending_df, how="outer")
        if not super_df.empty:
            logging.info("Create listjobs database...")
            database = "dbs/listjobs.db"
            conn = sqlite3.connect(database)
            # If database doesn't exist
            if not os.path.exists(database):
                # Create db from dataframe
                super_df.to_sql(name="listjobs", con=conn)
            else:
                # TODO: Read db into dataframe
                database_df = pd.read_sql("SELECT * FROM listjobs", conn)

                # TODO: Get delta of db dataframe with super/updated dataframe
                delta_df = super_df - database_df

                # TODO: Update existing entries
                # TODO: Add new entries
                ...

            conn.close()
            return 0
    except:
        return 1

In [None]:
def add_to_schedule(spider_data):
    '''
    # Schedule:
    Schedule a spider run (also known as a job), returning the job id.
    
    Supported Request Methods: POST
    Parameters:
        project (string, required) - the project name
        spider (string, required) - the spider name
        setting (string, optional) - a Scrapy setting to use when running the spider
        jobid (string, optional) - a job id used to identify the job, overrides the default generated UUID
        priority (float, optional) - priority for this project’s spider queue — 0 by default
        _version (string, optional) - the version of the project to use
        any other parameter is passed as spider argument
    '''

    url = "http://127.0.0.1:6800/schedule.json"
    response = requests.post(url=url, params=spider_data)
    return response.text