Okay, let's clean up the data. We got from the scrape job. There are a few tasks. We need to remove the archive.org prefixes to the URLs, and then filter them by relevance to the task.

In [81]:
import os 
import shutil

input_dir = "01-dirty-data"
output_dir = "02-cleaner-data"

if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
shutil.copytree(input_dir, output_dir)


'02-cleaner-data'

In [82]:
import json
from urllib.parse import urlparse
import re

directories = [
  "ft.com",
  "drudgereport.com",
  "bloomberg.com"
]

def clean_items_in_dir(dir):
  # make a filelist of files matching {YYYY-MM-DD HH:MM:SS}.jsonl
  filelist = [f for f in os.listdir(f'{output_dir}/{dir}') if re.match(r'\d{4}-\d{2}-\d{2}.jsonl', f)]
  for filename in filelist:
    filepath = os.path.join(f'{output_dir}/{dir}', filename)
    
    with open(filepath, 'r') as f:
      data = [json.loads(line) for line in f]
    for item in data:
      if item['url'] is not None:
        item['url'] = clean_archive_org_link(item['url'])
      if item['text'] is not None:
        item['text'] = item['text'].strip()

    # remove empty and anchor links
    data = [item for item in data if item['url'] is not None and not item['url'].startswith('#')]
    # remove items with no text or text less than 3 words
    data = [item for item in data if item['text'] is not None and item['text'] != '' and len(item['text'].split()) > 3]
    # remove items with null or empty url
    data = [item for item in data if item['url'] is not None and item['url'] != '']
    # remove items with a url with no path component
    data = [item for item in data if urlparse(item['url']).path != '']

    # special handling for bloomberg
    if 'bloomberg' in dir:
      # remove if /news/article not in url
      data = [item for item in data if "/news/articles/" in item['url']]
    # special handling for drudge
    elif 'drudgereport' in dir:
      # remove if drudge is in the url
      data = [item for item in data if "drudge" not in item['url']]
    # special handling for ft
    elif 'ft.com' in dir:
      # remove if /content/ not in url
      data = [item for item in data if "/content/" in item['url']]

    with open(filepath, 'w') as f:
        for item in data:
            f.write(json.dumps(item) + "\n")

def clean_archive_org_link(url):
    pattern = r"https?:\/\/web\.archive\.org\/web\/\d+\/(https?:\/\/[^\/\s]+\/[^?#\s]*)" # matches the 'http(s)://web.archive.org/web/[timestamp]/' part and captures the rest of the URL
    pattern_direct = r"\/web\/\d+\/(https?:\/\/[^\/\s]+\/[^?#\s]*)" # matches URLs that start directly with '/web/[timestamp]/' and captures the rest of the URL

    match = re.search(pattern, url)
    if match:
        # Return the original URL without the archive.org part
        return match.group(1)
    else:
        # Check if the URL starts directly with '/web/[timestamp]/'
        match_direct = re.search(pattern_direct, url)
        if match_direct:
            # Return the original URL without the '/web/[timestamp]/' part
            return match_direct.group(1)
        else:
            # If no pattern matches, return the URL as is
            return url

def clean():
  for dir in directories:
    clean_items_in_dir(dir)

In [83]:
clean()

Let's keep track of how many items we're removing from each source visually. For each directory, we'll plot a series based on the number of items in the original and cleaned files.

In [84]:
# graph of dirty vs clean data using plotly, excluding missing values
import plotly.graph_objects as go
import os
import json
from datetime import datetime

def plot_data_counts(directories):
    source_date_counts = {}

    # Function to process each file in the directory
    def process_file(file_path, is_cleaned):
        # Extract directory and filename from file_path
        dir = os.path.basename(os.path.dirname(file_path))
        file = os.path.basename(file_path)
        
        # Extract date from filename
        date_str = file.split('.')[0]
        try:
            date_obj = datetime.strptime(date_str, '%Y-%m-%d')
        except ValueError as e:
            print(f"Error parsing date from filename {file}: {e}")
            return
        date_key = date_obj.strftime('%Y-%m-%d')
        
        if file.endswith('.jsonl'):
            if date_key not in source_date_counts[dir]:
                source_date_counts[dir][date_key] = {'original': 0, 'cleaned': 0}
            with open(file_path, 'r') as f:
                if is_cleaned:
                    source_date_counts[dir][date_key]['cleaned'] += sum(1 for line in f if line.strip())
                else:
                    source_date_counts[dir][date_key]['original'] += sum(1 for line in f if line.strip())

    # Iterate through each directory to count original and cleaned items
    for dir in directories:
        source_date_counts[dir] = {}
        dirty_dir = os.path.join('01-dirty-data', dir)
        clean_dir = os.path.join('02-cleaner-data', dir)
        for file in os.listdir(dirty_dir):
            process_file(os.path.join(dirty_dir, file), is_cleaned=False)
        for file in os.listdir(clean_dir):
            process_file(os.path.join(clean_dir, file), is_cleaned=True)

    # Prepare data for plotting
    fig = go.Figure()

    for dir, date_counts in source_date_counts.items():
        dates = sorted(date_counts.keys())
        original_counts = [date_counts[date]['original'] for date in dates if date_counts[date]['original'] > 0]
        cleaned_counts = [date_counts[date]['cleaned'] for date in dates if date_counts[date]['cleaned'] > 0]
        
        # Create traces for the original and cleaned data counts for each source, excluding missing values
        fig.add_trace(go.Scatter(
            x=dates,
            y=original_counts,
            mode='lines+markers',
            name=f'{dir} Original'
        ))
        fig.add_trace(go.Scatter(
            x=dates,
            y=cleaned_counts,
            mode='lines+markers',
            name=f'{dir} Cleaned'
        ))

    # Layout for the time series plot
    fig.update_layout(
        title='Original vs Cleaned Data Counts Over Time by Source',
        xaxis=dict(title='Date'),
        yaxis=dict(title='Count')
    )

    # Show the figure
    fig.show()

# Assuming 'directories' variable is available and contains the directories to iterate over
plot_data_counts(directories)



Okay, we've cleaned the data. Let's get it loaded up into a DataFrame and start doing some analysis.

In [85]:
import pandas as pd
import glob
import os

def cleaned_df():
    # Define the pattern to match all jsonl files in 02-cleaner-data and its subdirectories
    pattern = '02-cleaner-data/**/*.jsonl'
    
    # Use glob to find all files matching the pattern
    files = glob.glob(pattern, recursive=True)
    
    # Initialize an empty list to store dataframes
    dfs = []
    
    # Iterate over the files, read them into a dataframe, and append to the list
    for file in files:
        try:
            df = pd.read_json(file, lines=True)
            # Check if the dataframe is empty
            if not df.empty:
                # Extract source from the file path and add it as a new column
                source = os.path.basename(os.path.dirname(file))
                df['source'] = source
                
                # Add a rank field corresponding to the row's index in an individual file
                df['rank'] = df.index + 1

                dfs.append(df)
        except ValueError as e:
            print(f"Skipping empty or invalid file: {file}. Error: {e}")
    
    # Check if dfs is not empty before concatenating
    if dfs:
        # Concatenate all dataframes into a single dataframe
        combined_df = pd.concat(dfs, ignore_index=True)
        
        # Convert the 'date' column to datetime format (without time)
        combined_df['date'] = pd.to_datetime(combined_df['date'])

        # Convert 'source' to a categorical column
        combined_df['source'] = combined_df['source'].astype('category')
        
        # Set the 'date' column as the index of the dataframe
        combined_df.set_index('date', inplace=True)
    else:
        # Return an empty dataframe with specified columns if no data was loaded
        combined_df = pd.DataFrame(columns=['source']).set_index('date')
    
    return combined_df



In [86]:
# %pip install pyarrow

In [87]:
def save_as_parquet():
    df = cleaned_df()
    df.to_parquet(f'{output_dir}/cleaned_data.parquet')

In [88]:
save_as_parquet()
cleaned_df().head()



Unnamed: 0_level_0,url,text,isMorning,source,rank
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2021-02-07,https://www.bloomberg.com/news/articles/2021-0...,Renaissance Hit With $5 Billion in Redemptions...,False,bloomberg.com,1
2021-02-07,https://www.bloomberg.com/news/articles/2021-0...,Reddit’s Populist Stock Movement Was 15 Years ...,False,bloomberg.com,2
2021-02-07,https://www.bloomberg.com/news/articles/2021-0...,Flash Loans Are Providing Instant Cash to Cryp...,False,bloomberg.com,3
2021-02-07,https://www.bloomberg.com/news/articles/2021-0...,BofA Divided as Bankers Cry Foul Over Special ...,False,bloomberg.com,4
2021-02-07,https://www.bloomberg.com/news/articles/2021-0...,Bain Capital Boosts Japan Headcount by 25% as ...,False,bloomberg.com,5
