In [1]:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import requests
import json
from bs4 import BeautifulSoup
import time
import sys
import pandas as pd
import numpy as np
from datetime import datetime
import re

In [2]:
BASE_URL = 'https://soundcloud.com'
CACHE_FILENAME = 'cache.json'
CACHE_DICT = {}
SCROLL_PAUSE_TIME = 1
CHROMEDRIVER_PATH = './chromedriver'

In [3]:
def open_cache():
    '''
    Opens the cache file if it exists and loads the JSON into
    the CACHE_DICT dictionary.
    if the cache file doesn't exist, creates a new cache dictionary

    Parameters
    ----------
    None

    Returns
    -------
    cache_dict (dict): the cache variable
    '''
    try:
        cache_file = open(CACHE_FILENAME, 'r')
        cache_contents = cache_file.read()
        cache_dict = json.loads(cache_contents)
        cache_file.close()
    except:
        cache_dict = {}

    return cache_dict

def save_cache(cache_dict):
    '''
    Saves the current state of the cache to disk

    Parameters
    ----------
    cache_dict (dict): dictionary to write to disk

    Returns
    -------
    None
    '''
    dumped_json_cache = json.dumps(cache_dict, indent=2)
    fw = open(CACHE_FILENAME,"w")
    fw.write(dumped_json_cache)
    fw.close()

In [12]:
def get_source_scrollable(url):
    """
    Gets the page source for a site that requires multiple scrolls but has a finite end
    """
    CACHE_DICT = open_cache()
    if url in CACHE_DICT.keys():
        print('Fetching from cache...')
        return CACHE_DICT[url]
    else:
        print('Making new headless browser')
        # Setup headless chromedriver
        chrome_options = Options()
        chrome_options.add_argument("--headless")
        driver = webdriver.Chrome(CHROMEDRIVER_PATH, options=chrome_options)
        driver.get(url)
        # Get scroll height
        last_height = driver.execute_script("return document.body.scrollHeight")

        while True:
            # Scroll down to bottom
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

            # Wait to load page
            time.sleep(SCROLL_PAUSE_TIME)

            # Calculate new scroll height and compare with last scroll height
            new_height = driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                break
            last_height = new_height

        html = driver.page_source
        CACHE_DICT[url] = html
        driver.close()
        save_cache(CACHE_DICT)
        return html

In [13]:
def request_with_cache(url):
    if url in CACHE_DICT.keys():
        print('Fetching from cache...')
        return CACHE_DICT[url]
    else:
        print('Fetching new data...')
        r = requests.get(url)
        html = r.text
        CACHE_DICT[url] = html
        save_cache(CACHE_DICT)
        return html

In [40]:
def get_top_track_links(html):
    soup = BeautifulSoup(html, "html.parser")
    # Links to top artists in genres
    links = soup.find('div', {'class': 'chartTracks'}).find_all('a', href=re.compile('^/[^/]*$'))
    return list(set(links))

def get_artist_stats(link):
    full_path = f"https://soundcloud.com{link['href']}"
    headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36',
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'}
    r = requests.get(full_path, headers=headers)
    soup = BeautifulSoup(r.text, "html.parser")
    try:
        x = soup.find_all('script')[9].string.split('= ')[1].strip('][').split('{')[-4].split(',')
        stat_dict = {}
        for val in x[6:30]:
            kps = val.split(':')
            stat_dict[kps[0].strip('"')] = kps[1].strip('"')
        print(f"Success for {link['href']}")
        return stat_dict
    except:
        print(f"Issue with {link['href']}")

In [41]:
url = "https://soundcloud.com/charts"
html = request_with_cache(url)
soup = BeautifulSoup(html, "html.parser")
final_list = []
# Links to all genres (e.g. alternativerock, dancehall)
category_links = soup.find_all('a', href=re.compile("^(/charts/top)((?!all-).)*$"))

for category_link in category_links:
    full_path = f"https://soundcloud.com{category_link['href']}"
    print(f"{full_path}")
    html = get_source_scrollable(full_path)
    links = get_top_track_links(html)
    for link in links:
        stat_dict = get_artist_stats(link)
        if stat_dict:
            final_list.append(stat_dict)

Fetching from cache...
https://soundcloud.com/charts/top?genre=alternativerock
Fetching from cache...
Issue with /smithsmithmusic
Success for /currentjoys
Success for /sufjan-stevens
Success for /beartrapsound
Success for /jorgehl-1
Success for /teen-suicide
Success for /panicatthedisco
Success for /dandelionhands
Issue with /fearlessrecords
Success for /vancejoy
Success for /greenday
Success for /plwushii
Issue with /adam-woodard
Success for /title-fight
Success for /fayewebster
Success for /beachbunnymusic
Success for /ajrbrothers
Success for /fueled_by_ramen
Success for /phoebebridgers
Success for /lil_peep
Success for /kaiyo-986878357
Issue with /mitskiofficial
Success for /user-258835843
Success for /manchesterorchestra
Success for /thisisduster
Success for /tealoversunite
Success for /invoguerecords
Success for /mac-demarco-official
Issue with /hopelessrecords
Success for /tobiasdray
Success for /machinegunkelly
https://soundcloud.com/charts/top?genre=ambient
Fetching from cache.

In [45]:
[x for x in final_list if x is not None]

[]

# Scraping Data for each Artist 

In [None]:
all_artist_urls = list(final_df['artist_url'])

In [None]:
def cache_artist_page(artist_url):

    url = artist_url + '/popular-tracks'
    CACHE_DICT = open_cache()
    if url in CACHE_DICT.keys():
        print("Using Cache")
        return CACHE_DICT[url]

    else:
        print("Fetching")
        browser = webdriver.Chrome("/Users/michael/Downloads/chromedriver")
        browser.get(url)
        time.sleep(3)

        page_source = browser.page_source
        CACHE_DICT[url] = page_source
        save_cache(CACHE_DICT)
        return CACHE_DICT[url]
    

In [None]:
all_artist_info = get_artist_info(all_artist_urls)

In [None]:
artist_df = pd.DataFrame.from_dict(all_artist_info, orient='index')
artist_df = artist_df.reset_index()

In [None]:
print(final_df.columns)
print("")
print(artist_df.columns)

In [None]:
artist_df = artist_df.rename(columns={'index': 'id'})

In [None]:
final_df = final_df.drop('index', axis=1).reset_index()

In [None]:
final_df = final_df.rename(columns={'index': 'id'})

# Creating SQLite Database with Python

## Create/Connect to SQLite DB and Establish Connection

In [None]:
import sqlite3

In [None]:
conn = sqlite3.connect('soundcloud_data.db')

In [None]:
c = conn.cursor()

## Create Tables

In [None]:
query_artists = '''
CREATE TABLE IF NOT EXISTS soundcloud_artists(
    id integer,
    artist_name text,
    artist_url text PRIMARY KEY,
    artist_toptrack text,
    artist_toptrack_views REAL,
    artist_followers REAL,
    artist_numtracks REAL)
'''

In [None]:
c.execute(query_artists)

In [None]:
query_tracks = '''
CREATE TABLE IF NOT EXISTS soundcloud_tracks (
    id integer PRIMARY KEY,
    track_genre text,
    track_title text,
    track_url text,
    track_artist text,
    track_views_week REAL,
    track_views_all REAL,
    track_artist_url text, 
    FOREIGN KEY (track_artist_url) REFERENCES soundcloud_artists (artist_url)
);
'''

In [None]:
c.execute(query_tracks)

## Put Dataframes with Scraped Data in SQL DB

In [None]:
artist_df.to_sql('soundcloud_artists', conn, if_exists='replace', index=False)

In [None]:
final_df.to_sql('soundcloud_tracks', conn, if_exists='replace', index=False)

In [None]:
conn.close()

In [None]:
artist_df.shape

In [None]:
final_df.shape

# Separate Cache to write to Github

In [None]:
cache_full = open_cache()

In [None]:
cache_half1 = dict(list(cache_full.items())[len(cache_full)//2:])
cache_half2 = dict(list(cache_full.items())[:len(cache_full)//2])

In [None]:
save_cache_with_name(cache_half1, 'cache_half1.json')
save_cache_with_name(cache_half2, 'cache_half2.json')

# Combine 2 Cache Files into 1

In [None]:
cache_half1 = open_cache_with_name('cache_half1.json')
cache_half2 = open_cache_with_name('cache_half2.json')

In [None]:
cache_half1.update(cache_half2)

In [None]:
save_cache_with_name(cache_half1, 'sc_cache.json')