high-efficiency text & file scraper with smart tracking
~ client/server networking for building language model datasets fast ~
pip install llm-tractor-beam
or
python3 setup.py install
The Beam
class serves as the core engine of a highly configurable, modular library designed for parallel processing and automation of tasks such as web scraping, data downloading, processing, and storage. This class leverages various components and lower-level functions to orchestrate complex workflows. Here's an in-depth look at its roles and interactions with other components:
Note
Upon initialization, the Beam
class loads and verifies the configuration using the Config
class. It checks if the configuration adheres to the expected structure and format, indicating the system's readiness to execute tasks as defined by the user.
- Job Processing: The
process_job
and_runner
methods are central to executing tasks defined in the configuration. These methods handle the execution flow of each job, including data downloading (Abduct
class), data recording (Visits
class), and data processing (Focus
class). This showcases the class's ability to manage diverse tasks sequentially, ensuring each step is completed before moving to the next. - Parallel and Delayed Execution: The
go
method orchestrates the execution of all jobs, allowing for parallel processing to optimize resource utilization. It uses Python'smultiprocessing
to distribute tasks across available CPU cores, enhancing efficiency, especially for CPU-bound tasks. Additionally, it supports delayed execution for specific jobs, enabling time-controlled or periodic task execution. - Resource Management: By leveraging the
Pool
class frommultiprocessing
for parallel execution, theBeam
class efficiently manages system resources. It calculates the optimal number of processes based on the number of available CPU cores and the number of jobs, ensuring a balance between performance and resource usage.
The Config
class is responsible for loading, parsing, saving, and manipulating configuration data. It can load configuration from a file or a dictionary, parse the configuration data into a structured format, save the configuration to a file, unbox the configuration by creating a project directory, create a new project directory with a configuration file, and destroy a project directory.
# Load configuration from a file
config = Config('config.json')
config.load_conf('config.json')
# Load configuration from a dictionary
config_dict = {
"role": "watcher",
"settings": {
"name": "my_project",
"proj_dir": "/path/to/project",
"jobs": [
{
"url": "https://example.com",
"types": ["type1", "type2"],
"beacon": "beacon1",
"delay": 1.5,
"custom": {
"func": "my_function",
"headers": {"header1": "value1"},
"types": ["type3", "type4"]
}
}
]
}
}
config.load_conf(config_dict)
# Save the configuration to a file
config.save()
# Unbox the configuration by creating a project directory
config.unbox()
# Create a new project directory with a configuration file
config.create()
# Destroy a project directory
config.destroy(confirm="my_project")
- Load configuration from a file or a dictionary
- Parse the configuration data into a structured format
- Save the configuration to a file
- Unbox the configuration by creating a project directory
- Create a new project directory with a configuration file
- Destroy a project directory
__init__(self, conf: Union[str, dict, None] = None)
: Initializes a new instance of theConfig
class and loads the configuration.load_conf(self, conf)
: Loads the configuration from a file or a dictionary.parse_conf(self, conf_dict: Dict[str, Any]) -> Schema
: Parses the configuration data into a structured format.save(self)
: Saves the configuration to a file.unbox(self, overwrite: bool = False)
: Unboxes the configuration by creating a project directory.create(self, config: dict = None)
: Creates a new project directory with a configuration file.destroy(self, confirm: str = None)
: Destroys a project directory.
conf
: The parsed configuration data.conf.settings
: The settings of the configuration.conf.settings.name
: The name of the configuration.conf.settings.proj_dir
: The project directory of the configuration.conf.settings.jobs
: The list of jobs in the configuration.conf.settings.jobs.url
: The URL of a job.conf.settings.jobs.types
: The types of a job.conf.settings.jobs.beacon
: The beacon of a job.conf.settings.jobs.delay
: The delay of a job.conf.settings.jobs.custom
: The custom job data of a job.conf.settings.jobs.custom.func
: The function of a custom job.conf.settings.jobs.custom.headers
: The headers of a custom job.conf.settings.jobs.custom.types
: The types of a custom job.
The BeamState
class is responsible for managing the state of a beam in a laser system. It includes information about the host system, as well as the states of different components such as abduction, focus, and visit.
# Create an instance of BeamState
beam = BeamState()
# Update the abduction state
abduct_state = AbductState(conf={"param": "value"})
beam.abduct_state_update(abduct_state)
# Update the focus state
focus_state = FocusState(conf={"param": "value"})
beam.focus_state_update(focus_state)
# Update the visit state
record_state = RecordState(conf={"param": "value"})
beam.record_state_update(record_state)
# Update the host state
beam.host_state_update()
# Access the current state of the beam
current_state = beam.states
- Get information about the host system, including platform, CPU usage, memory usage, disk usage, network I/O, etc.
- Update and retrieve the states of different components such as abduction, focus, and visit.
- Keep track of the history of host states.
__init__()
: Initializes theBeamState
class by setting the initial host info and states.get_host_info()
: Retrieves the current host information and returns aHostInfo
object.abduct_state_update(state)
: Updates the abduction state by appending a newAbductState
object to theabduct
list instates
.focus_state_update(state)
: Updates the focus state by appending a newFocusState
object to thefocus
list instates
.record_state_update(state)
: Updates the visit state by appending a newRecordState
object to thevisit
list instates
.host_state_update()
: Updates the host state by appending a newHostInfo
object to thehost_info
list.
host_info
: A list ofHostInfo
objects that represent the history of host states.states
: An instance of theStates
class that contains the states of different components such as abduction, focus, and visit.
The Abduct
class is responsible for downloading files from a given URL or a list of URLs. It can handle both simple URLs and URLs with recursion. It also supports the option to overwrite existing files.
# Initialize the Abduct class
abduct = Abduct(conf=conf, job=job)
# Download files from a single URL
abduct.download()
# Download files from a single URL and overwrite existing files
abduct.download(o=True)
# Download files from a single URL and specify a custom file name
abduct.download(f="custom_file_name")
# Download files from a URL with recursion
abduct.download(types=["pdf", "docx"])
# Download files from a URL with recursion and overwrite existing files
abduct.download(types=["pdf", "docx"], o=True)
- Initialize the
Abduct
class with a configuration and a job object. - Download files from a single URL or a list of URLs.
- Handle URLs with recursion and filter files by their types.
- Overwrite existing files if specified.
__init__(self, conf: dict = None, job: Job = None)
: Initializes theAbduct
class with a configuration and a job object. It prints an info message if the configuration is loaded successfully._fetch_to_write(self, attachment, headers, attachment_path, file_name, block_size, o=False)
: Downloads a file from a given URL and writes it to the specified path. It appends the file information to thestate.data
list.download(self, o: bool=False, f: str=None)
: Downloads files from a URL or a list of URLs. It handles both simple URLs and URLs with recursion. It can overwrite existing files if specified. It returns thestate
object.
state
: An instance of theAbductState
class that stores the current state of theAbduct
class.state.conf
: A dictionary that represents the configuration.state.job
: An instance of theJob
class that represents the current job.state.data
: A list of dictionaries that stores the information of downloaded files. Each dictionary contains the file name and its path.
"beacons" play a crucial role in a highly customizable and modular system designed for web scraping, downloading, and processing data from various sources. These beacons, represented by modules like the Stream class, are key to achieving flexibility and modularity in the system. The structure and functionality of the "beacons" can be documented as follows:
Beacons act as interchangeable modules within the system. Each beacon corresponds to a specific source or type of data (e.g., financial filings, news articles) and encapsulates the logic necessary for fetching, parsing, and processing data from that source. This modularity allows users to easily extend the system's capabilities by adding new beacons for different sources without altering the core functionality.
Beacons are designed to be customizable, allowing users to specify parameters and behaviors specific to the data source they target. This is evident in the Stream class, where the fetch method can be tailored to parse and retrieve data according to the unique structure of each source.
Tip
The Helpers class within a beacon further aids in bespoke processing and manipulating the fetched data
Despite their differences in implementation, all beacons share a common interface, exemplified by the mandatory inclusion of a Stream class with consistent functions. This uniformity ensures that the main system can interact with any beacon in a predictable manner, facilitating ease of integration and use.
While the presence of a Stream class is mandatory for basic operations, the inclusion of a Helpers class within a beacon provides additional utility functions that are specific to the data or operations related to that beacon. This structure offers an extended layer of customization, enabling complex data manipulation and processing tasks that are tailored to the beacon's specific use case.
Beacons are seamlessly integrated into the main system, as demonstrated by the use of importlib for dynamic module loading and the structured approach to passing configurations and job details to beacons. This integration allows the system to leverage the unique capabilities of each beacon while maintaining a cohesive workflow.
The "beacons" in this system embody the principles of modularity, customizability, and extensibility, serving as specialized modules that can be dynamically integrated to add or modify the system's data processing capabilities. By adhering to a consistent interface while allowing for beacon-specific customizations, the system achieves a balance between uniformity and flexibility, enabling it to cater to a wide range of data sources and processing requirements. This architecture not only enhances the system's utility and adaptability but also facilitates ease of maintenance and expansion, making it a robust solution for customizable and modular data processing tasks.
The Focus
class is responsible for processing files by reading their contents, detecting the encoding, and performing specific actions based on the file type. It uses the Strip
class to sanitize and extract text content from XML or HTML documents. The processed data is then written to a file using the writeme
function.
# Initialize a Focus object with a configuration and job
focus = Focus(conf=conf, job=job)
# Process a list of files
data = [{'path': 'file1.xml'}, {'path': 'file2.html'}]
result = focus.process(data)
# Destroy a file
focus.destroy(confirm='file1.xml')
- Initialize a
Focus
object with a configuration and job - Process files by reading their contents, detecting the encoding, and extracting text content
- Write the processed data to a file
- Destroy a file if the confirmation matches the file name
__init__(self, conf: dict = None, job: Job = None)
: Initializes aFocus
object with a configuration and job. Prints an initialization message.process(self, data: dict = None)
: Processes a list of files by reading their contents, detecting the encoding, and extracting text content. Writes the processed data to a file. Returns the updated state of theFocus
object.destroy(self, confirm: str = None)
: Removes a file if the confirmation matches the file name. Prints a message indicating whether the file was successfully destroyed or not.
state
: An instance of theFocusState
class that stores the configuration and job information.state.conf
: A dictionary representing the configuration.state.job
: An instance of theJob
class representing the job information.state.data
: A list of dictionaries representing the processed data. Each dictionary contains the path of the file and the path of the cleaned file.
The Visit
class is responsible for creating and managing records in a CSV file. It has methods for initializing the class, creating a new CSV file, seeking specific records, and writing records to the CSV file.
# Initialize the Visit class
visit = Visit(conf=conf, job=job)
# Create a new CSV file
visit.create(data=data)
# Seek specific records
visit.seek(line=2)
# Write records to the CSV file
visit.write()
The main functionalities of the Visit
class are:
- Initializing the class with a configuration and job object
- Creating a new CSV file with headers and data
- Seeking specific records in the CSV file
- Writing records to the CSV file
The Visit
class has the following methods:
__init__(self, conf: dict = None, job: Job = None)
: Initializes the class with a configuration and job object.create(self, data: dict = None, o: bool = False)
: Creates a new CSV file with headers and data.seek(self, line: str | int = None, all: bool = False)
: Seeks specific records in the CSV file.write(self, o: bool = False, ts: bool = True, v: bool = False)
: Writes records to the CSV file.
The Visit
class has the following fields:
headers
: A list to store the headers of the CSV file.state
: An instance of theRecordState
class that stores the configuration, job, and data of the visit.