### USDOT web scraping challenge



#### Key Links
https://ai.fmcsa.dot.gov/SMS/Tools/Downloads.aspx

https://ai.fmcsa.dot.gov/SMS/Carrier/21800/Overview.aspx?FirstView=True

https://ai.fmcsa.dot.gov/SMS/Carrier/21800/CarrierRegistration.aspx

In [1]:
import pandas as pd
from bs4 import BeautifulSoup

from pathlib import Path 
import shutil
import os
import urllib.request
import re

import sqlalchemy as sa
from sqlalchemy import text

from time import sleep

### Step 1 - Setup

In [22]:
ENGINE = sa.create_engine(f"postgresql://docker:docker@localhost:5432/fmcas")

#todo - come up with a better schema
query = """
CREATE TABLE IF NOT EXISTS cargo (
  carrier_id integer NOT NULL,
  date_pulled text,
  cargo_carried text ARRAY
);


CREATE TABLE IF NOT EXISTS vehicles (
  carrier_id integer NOT NULL,
  type text,
  date_pulled text,
  owned integer,
  term_leased integer,
  trip_leased integer
);
"""
#create table if details
with ENGINE.connect() as con:
    con.execute(text(query))

In [3]:
#get the latest download link for the zip
def getHTMLContent(url):
    res = urllib.request.urlopen(url)
    return res.read().decode("utf-8")
def getSoup(url):
    return BeautifulSoup(getHTMLContent(url), "html.parser")

soup = getSoup('https://ai.fmcsa.dot.gov/SMS/Tools/Downloads.aspx')
link = soup.find("ul", class_="downloadLinks").find("a")["href"]

DOWNLOAD_URL = 'https://ai.fmcsa.dot.gov' + link
ZIP_NAME = Path(link).name

(DOWNLOAD_URL, ZIP_NAME)

('https://ai.fmcsa.dot.gov/SMS/files/FMCSA_CENSUS1_2023May.zip',
 'FMCSA_CENSUS1_2023May.zip')

In [4]:
#setup raw folder and download the latest Motor Carrier Census Information

RAW_DATA_FOLDER = Path('./raw')
if not RAW_DATA_FOLDER.exists():
    os.mkdir(RAW_DATA_FOLDER)
    
ZIP_FILE = RAW_DATA_FOLDER / ZIP_NAME
if not ZIP_FILE.exists():
    print(f'DOWNLOADING {ZIP_NAME}')
    urllib.request.urlretrieve(DOWNLOAD_URL, ZIP_FILE)
    print(f'EXTRACTING')
    shutil.unpack_archive(ZIP_FILE, RAW_DATA_FOLDER)

### Step 2 - Read FMCSA

In [5]:
#read the latest spreadsheet to get carrier ids
FMCSA_DF = pd.read_csv(RAW_DATA_FOLDER / ZIP_NAME.replace('.zip','.txt'), encoding = 'latin-1')

In [6]:
FMCSA_DF.sample(3).transpose()

Unnamed: 0,854009,1309396,30181
DOT_NUMBER,2704744,3404155,107837
LEGAL_NAME,INSIGHT INVESTMENTS CORP,EDWARD DAVIS,MILLERBERND MANUFACTURING CO
DBA_NAME,,TRUCK TO GO,
CARRIER_OPERATION,C,C,A
HM_FLAG,N,N,N
PC_FLAG,N,N,N
PHY_STREET,600 CITY PARKWAY WEST #500,2250 PELICAN DR # 304,622 6TH ST S
PHY_CITY,ORANGE,NORCROSS,WINSTED
PHY_STATE,CA,GA,MN
PHY_ZIP,92868,30071-4617,55395


In [7]:
#check entires that already exist in the database
query = f"""
SELECT carrier_id, date_pulled from cargo 
"""
already_scraped = pd.read_sql(text(query), con = ENGINE)
already_scraped.tail(4)

Unnamed: 0,carrier_id,date_pulled
179,1000188,5/26/2023
180,1000189,5/26/2023
181,1000191,5/26/2023
182,1000194,5/26/2023


In [8]:
#create a subselection , not scraped already
FMCSA_SUB = FMCSA_DF[~FMCSA_DF['DOT_NUMBER'].isin(already_scraped['carrier_id'])]
print(len(FMCSA_DF) - len(FMCSA_SUB), 'already scraped')

120 already scraped


### Step 3 - Scrape

In [9]:
carrier_id = 1000196
url = f'https://ai.fmcsa.dot.gov/SMS/Carrier/{carrier_id}/CarrierRegistration.aspx'
html_content = getHTMLContent(url)
soup = getSoup(url)

date = re.search('(\d*\/\d*\/\d*)',soup.find("span", class_="asOf").text)
if date:
    date = date.group()
else:
    date = 'Unknown'

In [16]:
cargo_ul = soup.find("ul", class_="cargo")
cargo_li_checked = cargo_ul.find_all("li", class_="checked")

#convert to postgres array 
cargo_carried = '{'+ ','.join([li.text.lstrip('X').rstrip(',') for li in cargo_li_checked]) + '}'

cargo_df = pd.DataFrame([{'cargo_carried': cargo_carried, 'carrier_id': carrier_id, 'date_pulled': date}])
cargo_df

Unnamed: 0,cargo_carried,carrier_id,date_pulled
0,"{MACHINERY, LARGE OBJECTS,AGGREGATES}",1000196,5/26/2023


In [17]:
cargo_li_checked

[<li class="checked"><span class="chk">X</span>MACHINERY, LARGE OBJECTS</li>,
 <li class="checked"><span class="chk">X</span>AGGREGATES,</li>]

In [18]:
cargo_df['cargo_carried'].iloc[0]

'{MACHINERY, LARGE OBJECTS,AGGREGATES}'

In [19]:
cargo_df.to_sql('cargo', con=ENGINE, if_exists= 'append', index = False)

1

In [20]:
vehicle_type_df = pd.read_html(html_content)[0]
#filter out those without any value s in Owned, Term Leased, Trip Leased
filt_df = vehicle_type_df[(vehicle_type_df[['Owned', 'Term Leased', 'Trip Leased']] > 0).any(axis=1)].copy()


#rename columns to match sql
filt_df.columns = ['type','owned','term_leased','trip_leased']
#add identifiers
filt_df['date_pulled'] = date
filt_df['carrier_id'] = carrier_id
filt_df

Unnamed: 0,type,owned,term_leased,trip_leased,date_pulled,carrier_id
1,Truck Tractors,2,6,0,5/26/2023,1000196
2,Trailers*,6,7,0,5/26/2023,1000196


In [21]:
filt_df.to_sql('vehicles', con=ENGINE, if_exists= 'append', index = False)

2

In [16]:
def scrapeDataAndAppendToDB(carrier_id):
    url = f'https://ai.fmcsa.dot.gov/SMS/Carrier/{carrier_id}/CarrierRegistration.aspx'
    html_content = getHTMLContent(url)
    soup = getSoup(url)
    
    #get date
    date = re.search('(\d*\/\d*\/\d*)',soup.find("span", class_="asOf").text)
    if date:
        date = date.group()
    else:
        date = 'Unknown'
        
    
    # --- vehicle_type
    vehicle_type_df = pd.read_html(html_content)[0]
    #filter out those without any value s in Owned, Term Leased, Trip Leased
    filt_df = vehicle_type_df[(vehicle_type_df[['Owned', 'Term Leased', 'Trip Leased']] > 0).any(axis=1)].copy()
    #rename columns to match sql
    filt_df.columns = ['type','owned','term_leased','trip_leased']
    #add identifiers
    filt_df['date_pulled'] = date
    filt_df['carrier_id'] = carrier_id
    filt_df.to_sql('vehicles', con=ENGINE, if_exists= 'append', index = False)
    
    
    # --- cargo
    cargo_ul = soup.find("ul", class_="cargo")
    cargo_li_checked = cargo_ul.find_all("li", class_="checked")

    #convert to postgres array 
    cargo_carried = '{'+ ','.join([li.text.lstrip('X') for li in cargo_li_checked]) + '}'

    cargo_df = pd.DataFrame([{'cargo_carried': cargo_carried, 'carrier_id': carrier_id, 'date_pulled': date}])
    cargo_df.to_sql('cargo', con=ENGINE, if_exists= 'append', index = False)
    
    # avoid rate limits, maybe?
    sleep(0.005)
    
    # return basic stats for debuging
    return pd.Series([url, cargo_carried, 
                      filt_df[['type','owned','term_leased','trip_leased']].to_dict(orient = 'records')])

In [17]:
def run(row):
    #print once in a while
    if row.name % 100 == 0:
        print('on row', row.name, 'of', len(FMCSA_SUB))
    return scrapeDataAndAppendToDB(row['DOT_NUMBER'])

#test randomly and spot check
s = FMCSA_SUB[1000:1005].apply(run, axis = 1)

In [18]:
for i in s.iloc[4]:
    print(i)

https://ai.fmcsa.dot.gov/SMS/Carrier/1002481/CarrierRegistration.aspx
{GENERAL FREIGHT,HOUSEHOLD GOODS,BUILDING MATERIALS}
[{'type': 'Straight Trucks', 'owned': 3, 'term_leased': 0, 'trip_leased': 0}]


### Step 4 - multithreading and rate limits 

- note: only works on unix systems (docker), does NOT work on windows, use the commented script or implement subprocess
- might need [proxies](https://github.com/topics/proxy-scraper) to avoid rate limits