# spotify_million_playlist_dataset in Datenbank schreiben

## init des skripts

imports und wichtige variablen deffinieren

 - unter dem Pfad PATH liegen die json Dateien des spotify_million_playlist_dataset

In [16]:
import os, sys
import numpy as np
import pandas as pd
import json
from tqdm import tqdm
from getpass import getpass
from io import StringIO
import psycopg2

PATH = "D:/SpotifyDataset/spotify_million_playlist_dataset/data"


## Funktion: readFile2dictLists(filename)

Iteration über playlists und tracks in einem mitgegebenen File

Speichen alle playlists, tracks, albums, interprets und playlist_enthaelt_song als Dictionaries in den 5 Listen nach ER-Modell

Die 5 *_dictList Listen sind Listen, die mit Dictionaries gefüllt werden, da so die Erstellung von DataSets effizienter läuft, als wenn wir direkt mit Dataframes von Pandas arbeiten und jedes Element nacheinander in ein Dataframe einfügen.



In [17]:
def readFile2dictLists(filename):
    with open(PATH + "/" + filename) as f:
        #Convert to string and replace
        obj_str = json.dumps(json.load(f)).replace(';', ',')

        #Get obj back with replacement
        data = json.loads(obj_str)

        playlists = data["playlists"]
        for playlist in playlists:
            
            dPlaylist = {"Playlist_ID": playlist["pid"], "name": playlist["name"].replace("\\", "\\\\"), "collaborative": playlist["collaborative"], "modified_at": playlist["modified_at"], "num_tracks": playlist["num_tracks"], "num_albums": playlist["num_albums"], "num_followers": playlist["num_followers"], "num_edits": playlist["num_edits"], "duration_ms": playlist["duration_ms"], "num_artists": playlist["num_artists"]}
            playlists_dictList.append(dPlaylist)

            for track in playlist["tracks"]:
                playlist_enthaelt_song.append({"enthaelt_ID": None, "Playlist": playlist["pid"], "Song": track["track_uri"], "pos": track["pos"]})
                
                dTrack = {"Song_ID": track["track_uri"], "Interpret": track["artist_uri"], "Album": track["album_uri"], "track_name": track["track_name"].replace("\\", "\\\\") , "duration_ms": track["duration_ms"]}
                songs_dictList.append(dTrack)

                dInterpret = {"Interpret_ID": track["artist_uri"], "artist_name": track["artist_name"].replace("\\", "\\\\") }
                artists_dictList.append(dInterpret)

                dAlbum = {"Album_ID": track["album_uri"], "album_name": track["album_name"].replace("\\", "\\\\") }
                albums_dictList.append(dAlbum)
        

            


# Funktion: listsToFrames()

## Dictionaries zu Dataframes umwandeln

Jetzt können wir schön schnell die Listen von Dicts in Dataframes umwandeln und dabei Duplicate entfernen.
dfPlaylist_enthaelt_song wird niemals Duplikate aufweisen.

In [18]:
def listsToFrames():
    dfAlbums =  pd.DataFrame(albums_dictList).drop_duplicates(ignore_index=True)
    dfArtists = pd.DataFrame(artists_dictList).drop_duplicates(ignore_index=True)
    dfSongs =   pd.DataFrame(songs_dictList).drop_duplicates(ignore_index=True)
    dfPlaylists =   pd.DataFrame(playlists_dictList).drop_duplicates(ignore_index=True)

    dfPlaylist_enthaelt_song = pd.DataFrame(playlist_enthaelt_song)
    dfPlaylist_enthaelt_song['enthaelt_ID'] = range(0, len(dfPlaylist_enthaelt_song))
    dfPlaylist_enthaelt_song["Playlist"] = dfPlaylist_enthaelt_song["Playlist"].astype(int)
    dfPlaylist_enthaelt_song["pos"] = dfPlaylist_enthaelt_song["pos"].astype(int)

    return [dfAlbums, dfArtists, dfSongs, dfPlaylists, dfPlaylist_enthaelt_song]

## Alle Dataframes nacheinander anzeigen

# Datenbank Kram
Jetzt haben wir die Dataframes fertig, jetzt müssen wir sie in die Datenbank bekommen, also kommt im Folgenden Zeug zum Thema Datenbank.

## Login abfrage für Datenbank

In [19]:
user = input("Bitte User für DB Eingeben:")
pswd = getpass("Bitte Password für DB Eingeben:")

login_param = f"dbname='spotify_test' user='{user}' host='localhost' port='5432' password='{pswd}'"
#login_param = f"dbname='orent001_spotify_test' user='{user}' host='localhost' port='9001' password='{pswd}'"
# #login_param = f"dbname='orent001_spotify' user='{user}' host='localhost' port='9001' password='{pswd}'"

## Funktion zur Verbindungsherstellung mit der Datenbnak

In [20]:
def connect(params_dic):
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(params_dic)
        cur = conn.cursor()
        cur.execute("SHOW client_encoding;")
        print(cur.fetchone())
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1) 
    print("Connection successful")
    return conn

conn = connect(login_param)


Connecting to the PostgreSQL database...
('UTF8',)
Connection successful


## Dataframes in Datenbank einfügen

Statt hier viel auszuprobieren wie bisher, haben wir hierfür und direkt nach einem performance Benchmark umgesehen, wie wir am effizientesten mit psycog2 Daten in die postgresDB laden können.

### Quelle:
https://naysan.ca/2020/05/09/pandas-to-postgresql-using-psycopg2-bulk-insert-performance-benchmark/

Das beste Ergebnis ist, copy_from zu verwenden und aus einem gespeicherten File zu laden. Wir haben für das "speichern" oder besser gesagt, buffern StringIO verwendet.

Es folgt die Methode:

In [21]:
def copy_from_stringio(conn, df, table):
    """
    Here we are going save the dataframe in memory 
    and use copy_from() to copy it to the table
    """
    # save dataframe to an in memory buffer
    buffer = StringIO()
    df.to_csv(buffer, index=False, header=False, sep=";", encoding="utf-8")
    buffer.seek(0)
    
    cursor = conn.cursor()
    try:
        cursor.copy_from(buffer, table, sep=";")
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        sys.exit()
    cursor.close()

## Tabellen erstellen
Wir haben jetzt alles wichtige für das einfügen von DataFrames in eine Tabelle in der Datenbank. Was wir noch nicht haben, sind die Tabellen in der Datenbank. Die wollen wir auch Skriptbasiert anlegen. Aber falls es sie schon gibt, sollten die alten zu erst gelöscht werden.
Wir brauchen folgende Tabellen:
 - Album
 - Interpret
 - Song
 - Playlist
 - P_enthaelt_S

analog dazu stecken wir auch die Dataframes in der selben Reihenfolge in eine Liste, um in einer Iteration später copy_from_stringio() mit den richtigen Para,etern aufrufen zu können.

In [22]:
tsAlbum = ["Album_ID", "album_name"]
tsArtist = ["Interpret_ID", "artist_name"]
tsSong = ["Song_ID", "Interpret", "Album", "track_name", "duration_ms"]
tsPlaylist = ["Playlist_ID ", "name", "collaborative", "modified_at", "num_tracks", "num_albums", "num_followers", "num_edits", "duration_ms", "num_artists"]
tsPlaylist_enthaelt_song = ["enthaelt_ID", "Playlist", "Song", "pos"]

tablenames = ["album", "interpret", "song", "playlist", "p_enthaelt_s"]
temp_tablenames = ["temp_album", "temp_interpret", "temp_song", "temp_playlist", "temp_p_enthaelt_s"]
tableshemas = [tsAlbum, tsArtist, tsSong, tsPlaylist, tsPlaylist_enthaelt_song]

## Funktion zum Löschen der Tabellen

Überlegung: tut's nicht auch das selbe, wenn wir alles löschen?

In [23]:
def deleteTable(tablename):
    #Creating a cursor object using the cursor() method
    cursor = conn.cursor()

    #Doping table if already exists
    cursor.execute(f"DROP TABLE IF EXISTS {tablename}")

    #Commit your changes in the database
    conn.commit()
    cursor.close()

for tablename in temp_tablenames[::-1]:
    deleteTable(tablename)
for tablename in tablenames[::-1]:
    deleteTable(tablename)

## Querries zum Erstellen der Tabellen

In [24]:
def createTables(prod_tablenames, ref_tablenames):
    albumQuerry = f"""
    CREATE TABLE {prod_tablenames[0]} (
    {tableshemas[0][0]} varchar(255) PRIMARY KEY,
    {tableshemas[0][1]} varchar(510)
    )"""

    interpretQuerry = f"""
    CREATE TABLE {prod_tablenames[1]} (
    {tableshemas[1][0]} varchar(255) PRIMARY KEY,
    {tableshemas[1][1]} varchar(510)
    )"""

    songQuerry = f"""
    CREATE TABLE {prod_tablenames[2]} (
    {tableshemas[2][0]} varchar(255) PRIMARY KEY,
    {tableshemas[2][1]} varchar(255) REFERENCES {ref_tablenames[1]} ({tableshemas[1][0]}),
    {tableshemas[2][2]} varchar(255) REFERENCES {ref_tablenames[0]} ({tableshemas[0][0]}),
    {tableshemas[2][3]} varchar(510),
    {tableshemas[2][4]} int
    )"""

    playlistQuerry = f"""
    CREATE TABLE {prod_tablenames[3]} (
    {tableshemas[3][0]} varchar(255) PRIMARY KEY,
    {tableshemas[3][1]} varchar(510),
    {tableshemas[3][2]} boolean,
    {tableshemas[3][3]} int,
    {tableshemas[3][4]} int,
    {tableshemas[3][5]} int,
    {tableshemas[3][6]} int,
    {tableshemas[3][7]} int,
    {tableshemas[3][8]} int,
    {tableshemas[3][9]} int
    )"""

    p_enthaelt_sQuerry = f"""
    CREATE TABLE {prod_tablenames[4]} (
    {tableshemas[4][0]} int PRIMARY KEY,
    {tableshemas[4][1]} varchar(255) REFERENCES {ref_tablenames[3]} ({tableshemas[3][0]}),
    {tableshemas[4][2]} varchar(255) REFERENCES {ref_tablenames[2]} ({tableshemas[2][0]}),
    {tableshemas[4][3]} int

    )"""

    commands = (albumQuerry, interpretQuerry, songQuerry, playlistQuerry, p_enthaelt_sQuerry)

    cur = conn.cursor()
    # create table one by one
    for command in commands:
        cur.execute(command)
    # close communication with the PostgreSQL database server
    cur.close()
    # commit the changes
    conn.commit()

createTables(tablenames,tablenames)

In [25]:
def dataframesInDatenbankSchreiben():
    # temporäre Tabellen erstellen:
    createTables(temp_tablenames, tablenames)

    for i in range(0,5):
        # dataframe in temp tabelle kopieren:
        copy_from_stringio(conn, dataframes[i], temp_tablenames[i])

        # von temp in prod tabelle kopieren:
        verschiebQuerry = f"INSERT INTO {tablenames[i]} SELECT * FROM {temp_tablenames[i]} ON CONFLICT DO NOTHING"
        if (i == 4):
            cur = conn.cursor()
            cur.execute("SELECT COUNT(enthaelt_id) FROM p_enthaelt_s;")
            countRows = cur.fetchone()[0]
            cur.close()
            verschiebQuerry = f"INSERT INTO {tablenames[i]} (enthaelt_ID, Playlist, Song, pos) SELECT enthaelt_ID + {countRows}, Playlist, Song, pos FROM {temp_tablenames[i]} ON CONFLICT DO NOTHING"
        cur = conn.cursor()
        try:
            cur.execute(verschiebQuerry)
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            print(verschiebQuerry)
            conn.rollback()
            cur.close()
            sys.exit()
        else:
            conn.commit()
            cur.close()

    # temporäre Tabellen löschen:
    for ele in temp_tablenames[::-1]:
        deleteTable(ele)


# HIER PASSIERT DAS WICHTIGE ZEUG!



In [26]:
allFilenames = os.listdir(PATH)[:5]
for dieNaechsten10filenamen in tqdm(np.array_split(allFilenames, 5)):

    # Listen leeren:
    artists_dictList = []
    albums_dictList = []
    songs_dictList = []
    playlists_dictList = []
    playlist_enthaelt_song = []
    
    for filename in dieNaechsten10filenamen:

        # aktuelles File in Liste laden
        readFile2dictLists(filename)
    
    # die Listen aus den 10 Files zu Dataframes machen 
    dataframes = listsToFrames()

    dataframesInDatenbankSchreiben()


100%|██████████| 5/5 [01:23<00:00, 16.69s/it]


## Aufrufen der Befüllmethode für die Datenbank
Weil wir die Dataframes in einer Liste in der selben Reihenfolge wie die Tabellennamen haben, können wir jetzt 5 mal iterieren und copy_from_stringio() mit passenden Parametern aufrufen.

In [27]:
# for i in range(0,5):
#     copy_from_stringio(conn, dataframes[i], tablenames[i])