<a href="https://colab.research.google.com/github/teticio/Deej-A.I./blob/master/notebooks/MP3ToSpotify.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Get Spotify IDs of MP3s using ACRCloud

### Import stuff

In [0]:
import os
import sys
import hmac
import time
import json
import tqdm
import base64
import hashlib
import urllib.request
import urllib.parse
import datetime
from pydub import AudioSegment

### Functions to perform multipart POST

In [0]:
def post_multipart(url, fields, files):
    content_type, body = encode_multipart_formdata(fields, files)

    req = urllib.request.Request(url, data=body)
    req.add_header('Content-Type', content_type)
    req.add_header('Referer', url)
    resp = urllib.request.urlopen(req)
    ares = resp.read().decode('utf8')
    return ares


def encode_multipart_formdata(fields, files):
    boundary = "*****2016.05.27.acrcloud.rec.copyright." + str(
        time.time()) + "*****"
    body = b''
    CRLF = '\r\n'
    L = []
    for (key, value) in list(fields.items()):
        L.append('--' + boundary)
        L.append('Content-Disposition: form-data; name="%s"' % key)
        L.append('')
        L.append(value)
    body = CRLF.join(L).encode('ascii')
    for (key, value) in list(files.items()):
        L = []
        L.append(CRLF + '--' + boundary)
        L.append('Content-Disposition: form-data; name="%s"; filename="%s"' %
                 (key, key))
        L.append('Content-Type: application/octet-stream')
        L.append(CRLF)
        body = body + CRLF.join(L).encode('ascii') + value
    body = body + (CRLF + '--' + boundary + '--' + CRLF + CRLF).encode('ascii')
    content_type = 'multipart/form-data; boundary=%s' % boundary
    return content_type, body

### Function to identify track and retrieve information

In [0]:
def get_track_info(sample):
    http_method = "POST"
    http_url_file = "/v1/identify"
    data_type = "audio"
    signature_version = "1"
    timestamp = int(
        time.mktime(
            datetime.datetime.utcfromtimestamp(time.time()).timetuple()))
    query_data = sample[:5000000]  # make sure sample is not too big
    sample_bytes = str(len(query_data))

    string_to_sign = http_method + "\n" + http_url_file + "\n" + access_key + "\n" + data_type + "\n" + signature_version + "\n" + str(
        timestamp)
    hmac_res = hmac.new(access_secret.encode('ascii'),
                        string_to_sign.encode('ascii'),
                        digestmod=hashlib.sha1).digest()
    sign = base64.b64encode(hmac_res).decode('ascii')

    fields = {
        'access_key': access_key,
        'sample_bytes': sample_bytes,
        'timestamp': str(timestamp),
        'signature': sign,
        'data_type': data_type,
        "signature_version": signature_version
    }

    res = post_multipart('http://' + host + http_url_file, fields,
                         {"sample": query_data})
    parsed_resp = json.loads(res)
    return parsed_resp

### ACRCloud API credentials

In [0]:
#@markdown Get API credentials from https://www.acrcloud.com/
host = 'fill this in with your details' #@param {type: 'string'}
access_key = 'fill this in with your details' #@param {type: 'string'}
access_secret = 'fill this in with your details' #@param {type: 'string'}

### Mount Google Drive if we are running on Google Colab

In [0]:
try:  # are we in Google Colab?
    from google.colab import drive
    drive.mount('/content/drive')
except:
    pass

### Scan directory for MP3s

In [0]:
directory = '/path/to/your/mp3s/and/m4as' #@param {type: 'string'}
ids = {}
mp3s = []
for root, dirs, files in os.walk(directory):
    for file in files:
        if file[-3:] == 'mp3' or file[-3:] == 'm4a':
            mp3s.append(root + '/' + file)

### Compile a list of all the available Spotify IDs

In [0]:
for sound_file in tqdm.tqdm_notebook(mp3s[1971:]):
    if sound_file in ids:
        continue

    try:
        f = open(sound_file, "rb")
        sample = f.read()
        f.close()
        parsed_resp = get_track_info(sample)
        ids[sound_file] = parsed_resp['metadata']['music'][0][
            'external_metadata']['spotify']['track']['id']

    except Exception as e:
        if parsed_resp['status']['code'] == 2004:
            try:
                # re-encode sample as mp3
                audio = AudioSegment.from_file(sound_file,
                                               format=sound_file[-3:])
                audio.export("audio.mp3", format="mp3")
                f = open("audio.mp3", "rb")
                sample = f.read()
                f.close()
                parsed_resp = get_track_info(sample)
                ids[sound_file] = parsed_resp['metadata']['music'][0][
                    'external_metadata']['spotify']['track']['id']
                continue
            except:
                pass

        if 'limit exceeded' in parsed_resp['status']['msg']:
            print(
                f"{parsed_resp['status']['msg']}: Got to {mp3s.index(sound_file)}"
            )
            break
        if parsed_resp['status']['msg'] == 'Success':
            print(f'{e}: Skipping {sound_file}...')
        else:
            print(f"{parsed_resp['status']['msg']}: Skipping {sound_file}...")
        continue

## Add all the Spotify tracks to a playlist

### Import more stuff

In [0]:
import spotipy
import spotipy.util as util

### Spotify API credentials

In [0]:
scope = 'playlist-modify-public'
#@markdown Get API credentials from https://developer.spotify.com/dashboard/login
client_id = 'fill this in with your details' #@param {type: 'string'}
client_secret = 'fill this in with your details' #@param {type: 'string'}
#@markdown The callback URI has to allow you to get hold of the token (e.g. https://github.com/)
redirect_uri = 'fill this in with your details' #@param {type: 'string'}
username = 'fill this in with your details' #@param {type: 'string'}
playlist_name = 'fill this in with your details' #@param {type: 'string'}

In [0]:
# Spotipy function is currently broken

def user_playlist_create(sp,
                         username,
                         playlist_name,
                         description='',
                         public=True):
    data = {
        'name': playlist_name,
        'public': public,
        'description': description
    }
    return sp._post("users/%s/playlists" % (username, ), payload=data)['id']

In [0]:
token = util.prompt_for_user_token(username, scope, client_id, client_secret, redirect_uri)
sp = spotipy.Spotify(token)
playlists = sp.user_playlists(username)
playlist_ids = [playlist['id'] for playlist in playlists['items'] if playlist['name'] == playlist_name]
if len(playlist_ids) == 0:
    user_playlist_create(sp, username, playlist_name)
else:
    playlist_ids = playlist_id

In [0]:
tracks = []
replace = True
for id in ids:
    tracks.append(ids[id])
    if len(tracks) == 100:
        if replace:
            sp.user_playlist_replace_tracks(username, playlist_id, tracks)
            replace = False
        else:
            sp.user_playlist_add_tracks(username, playlist_id, tracks)
        tracks = []