In [1]:
import requests
import sqlalchemy

In [2]:
class ApiBasic:
    def __init__(self):
        self.host = 'http://ws.audioscrobbler.com/2.0'
        self.api_key = 'ba1dd338d5f9fe47d9e98967873806a8'
    
    def _send_request(self, method, params=None, headers=None, json=None, response_type=None):
        response = requests.request('GET', f'{self.host}/?method={method}&api_key={self.api_key}&format=json', 
                                params=params, headers=headers, json=json)
        return response.json()

In [3]:
class LastFm(ApiBasic):
    def _get_music_data(self):
        music_data = {}
        list_artists = self._get_top_artists()['artists']['artist']
        for artist in list_artists:
            music_data[artist['name']] = {}
            list_genres = []
            for tag in self._get_genres_from_artist(artist['name'])['toptags']['tag'][:2]:
                list_genres.append(tag['name'])
                music_data[artist['name']]['genre'] = list_genres
            list_albums = []
            for album in self._get_albums_from_artist(artist['name'])['topalbums']['album'][:3]:
                list_albums.append({album['name']: {'release_year': self._get_release_year(artist['name'], album['name'])['album']['wiki']['published'].split(', ')[0][-4:], 'tracks': []}})
            music_data[artist['name']]['albums'] = list_albums
            for idx in range(len(music_data[artist['name']]['albums'][:3])):
                list_tracks = []
                album = [x for x in music_data[artist['name']]['albums'][idx]][0]
                tracks_from_album = self._get_tracks_from_album(artist['name'], album)['album']
                if 'tracks' in tracks_from_album.keys():
                    for track in tracks_from_album['tracks']['track']:
                        music_data[artist['name']]['albums'][idx][[x for x in music_data[artist['name']]['albums'][idx]][0]]['tracks'].append([track['name'], track['duration']])
                else:
                    del music_data[artist['name']]['albums'][idx]
        return music_data
    
    def _get_top_artists(self):
        params={'limit': 5}
        return self._send_request('chart.getTopArtists', params=params)
    
    def _get_genres_from_artist(self, artist):
        params = {'artist': artist}
        return self._send_request('artist.getTopTags', params=params)
    
    def _get_albums_from_artist(self, artist):
        params = {'limit': 5, 'artist': artist}
        return self._send_request('artist.getTopAlbums', params=params)
    
    def _get_tracks_from_album(self, artist, album):
        params = {'album': album, 'artist': artist}
        return self._send_request('album.getInfo', params=params)
    
    def _get_release_year(self, artist, album):
        params = {'album': album, 'artist': artist}
        return self._send_request('album.getInfo', params=params)    

In [4]:
class Table():
    def _fill_data(self):
        for table_name, query_data in data_for_tables.items():
            self._create_table(table_name, query_data)
        self._insert_genre_data()
        self._insert_artist_data()
        self._insert_album_data()
        self._insert_track_data()
        self._insert_genreartist_data()
        self._insert_artistalbum_data()
        self._insert_playlist_data()
        self._insert_playlisttrack_data()
    
    def _create_table(self, table_name, query_data):
        connection.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            id serial PRIMARY KEY,
            {query_data});
        """)
        
    def _fill_table(self, table_name, column, values_data):
        connection.execute(
        f"""INSERT INTO {table_name}({column})
        VALUES ('{values_data}');
        """)
        
    def _update_table(self, table_name, updating_column, values_data, filtering_column, filtering_data):
        connection.execute(
        f"""UPDATE {table_name}
            SET {updating_column} = {values_data}
            WHERE {filtering_column} = '{filtering_data}';
        """)
        
    def _insert_genre_data(self):
        for genre in get_list_genres():
            self._fill_table('genres', 'name', f'{genre}')
            
    def _insert_artist_data(self):
        for artist_name in get_list_artists():
            self._fill_table('artists', 'name', f'{artist_name}')
            
    def _insert_album_data(self):
        for album in get_list_albums():
            self._fill_table('albums', 'name', f'{album[0]}')
            self._update_table('albums', 'release_year', f'{album[1]}', 'name', f'{album[0]}')
            
    def _insert_track_data(self):
        for val in music_data.values():
            for album_data in val['albums']:
                for name, data in album_data.items():
                    album_id = get_id_album(name)
                    for al in data['tracks']:
                        self._fill_table('tracks', 'name', f"""{al[0].replace("'","")}""")
                        if al[1] is not None:
                            self._update_table('tracks', 'length', f'{al[1]}', 'name', f"""{al[0].replace("'","")}""")
                        else:
                            self._update_table('tracks', 'length', 0, 'name', f"""{al[0].replace("'","")}""")
                        self._update_table('tracks', 'album_id', f'{album_id}', 'name', f"""{al[0].replace("'","")}""")
    
    def _insert_genreartist_data(self):
        for artist_name, artist_data in music_data.items():
            for data, _ in artist_data.items():
                for genre in artist_data[data]:
                    self._fill_table('genreartist', 'artist_id', f'{get_artist_id_by_name(artist_name)}')
                    self._update_table('genreartist', 'genre_id', f'{get_genre_id_by_name(genre)}', 'artist_id', f'{get_artist_id_by_name(artist_name)}')
                break
                
    def _insert_artistalbum_data(self):
        for artist_name, artist_data in music_data.items():
            for album in artist_data['albums']:
                self._fill_table('artistalbum', 'artist_id', f'{get_artist_id_by_name(artist_name)}')
                self._update_table('artistalbum', 'album_id', f'{[get_id_album(album_name) for album_name in album.keys()][0]}', 'artist_id', f'{get_artist_id_by_name(artist_name)}')

    def _insert_playlist_data(self):
        names = ['Playlist1', 'Playlist2', 'Playlist3', 'Playlist4', 'Playlist5', 'Playlist6', 'Playlist7', 'Playlist8']
        years = [2022, 2021, 2020, 2019, 2018, 2017, 2016, 2015]
        for name, year in zip(names, years):
            self._fill_table('playlists', 'name', f'{name}')
            self._update_table('playlists', 'release_year', f'{year}', 'name', f'{name}')
            
    def _insert_playlisttrack_data(self):
        list_track_ids = []
        i = 1
        while i < 80:
            j = i+10
            con1 = connection.execute(
            f"""SELECT * FROM tracks WHERE id BETWEEN {i} AND {j};
            """).fetchall()
            list_track_ids.append(con1)
            i += 10
        con2 = connection.execute(
        f"""SELECT * FROM playlists;
        """).fetchall()
        for m, j in zip(list_track_ids, con2):
            for n in m:
                self._fill_table('playlisttrack', 'playlist_id', f'{j[0]}')
                self._update_table('playlisttrack', 'track_id', f'{n[0]}', 'playlist_id', f'{j[0]}')

In [5]:
def get_list_genres():
    list_ = []
    for i in music_data.values():
        [list_.append(genre) for genre in i['genre']]
    return list(set(list_))

In [6]:
def get_list_artists():
    return list(set([artist for artist in music_data.keys()]))

In [7]:
def get_list_albums():
    list_ = []
    for val in music_data.values():
        for album in val['albums']:
            for name in album:
                list_.append([name, album[name]['release_year']])
    return list_

In [8]:
def get_id_album(album_name):
    album_data = connection.execute(
    f"""SELECT * FROM albums;
    """).fetchall()
    for album in album_data:
        if album_name == album[1]:
            return album[0]

In [9]:
def get_artist_id_by_name(artist_name):
    con = connection.execute(
    f"""SELECT * FROM artists;
    """).fetchall()
    
    if artist_name in [el[1] for el in con]:
        return [el[0] for el in con][[el[1] for el in con].index(artist_name)]
    else:
        return 0

In [10]:
def get_genre_id_by_name(genre):
    con = connection.execute(
    f"""SELECT * FROM genres;
    """).fetchall()
    
    if genre in [el[1] for el in con]:
        return [el[0] for el in con][[el[1] for el in con].index(genre)]
    else:
        return 0

In [11]:
if __name__ == '__main__':
    
    data_for_tables = {
        'genres': 'name varchar(100)',
        'artists': 'name varchar(100)',
        'albums': 'name varchar(100), release_year integer',
        'tracks': 'name varchar(100), length integer, album_id integer REFERENCES albums(id)',
        'playlists': 'name varchar(100), release_year integer',
        'genreartist': 'genre_id integer REFERENCES genres(id), artist_id integer REFERENCES artists(id)',
        'artistalbum': 'album_id integer REFERENCES albums(id), artist_id integer REFERENCES artists(id)',
        'playlisttrack': 'track_id integer REFERENCES tracks(id), playlist_id integer REFERENCES playlists(id)'
    }
    
    last_fm = LastFm()
    music_data = last_fm._get_music_data()
    
    engine = sqlalchemy.create_engine('postgresql://postgres:admin@localhost:5432/music_db')
    connection = engine.connect()

    tables = Table()
    fill_with_data = tables._fill_data()

---

In [2]:
# engine = sqlalchemy.create_engine('postgresql://postgres:admin@localhost:5432/music_db')
# connection = engine.connect()

Количество исполнителей в каждом жанре:

In [3]:
connection.execute("""
SELECT g.name, COUNT(ga.artist_id)
FROM genreartist ga
JOIN genres g ON g.id = ga.genre_id
GROUP BY g.name;""").fetchall()

[('rap', 8), ('rnb', 4), ('electronic', 5), ('pop', 4)]

Количество треков, вошедших в альбомы 2019-2020 годов:

In [27]:
connection.execute("""
SELECT a.release_year, COUNT(*)
FROM tracks t
JOIN albums a ON a.id = t.album_id
WHERE a.release_year IN (2019, 2020)
GROUP BY a.release_year;""").fetchall()

[(2019, 12), (2020, 26)]

Средняя продолжительность треков по каждому альбому:

In [29]:
connection.execute("""
SELECT a.name, ROUND(AVG(length), 2)
FROM tracks t
JOIN albums a ON a.id = t.album_id
GROUP BY a.name;""").fetchall()

[('My Beautiful Dark Twisted Fantasy', Decimal('316.62')),
 ('Graduation', Decimal('0.00')),
 ('Hot Pink', Decimal('180.00')),
 ('Starboy', Decimal('155.06')),
 ('Flower Boy', Decimal('199.50')),
 ('Speak Now', Decimal('292.36')),
 ('Igor', Decimal('198.75')),
 ('1989', Decimal('144.92')),
 ('Planet Her', Decimal('188.57')),
 ('Beauty Behind the Madness', Decimal('278.07')),
 ('Fearless', Decimal('188.77')),
 ('After Hours', Decimal('32.08')),
 ('808s & Heartbreak', Decimal('256.46')),
 ('CALL ME IF YOU GET LOST', Decimal('197.63'))]

<span style='background:yellow'>Все исполнители, которые не выпустили альбомы в 2020 году:</span>

In [19]:
connection.execute("""
SELECT DISTINCT name
FROM artists
WHERE name NOT IN (
    SELECT DISTINCT ar.name
    FROM artists ar
    JOIN artistalbum aa ON ar.id = aa.artist_id
    JOIN albums al ON al.id = aa.album_id
    WHERE al.release_year = 2020);""").fetchall()

[('Tyler, the Creator',),
 ('Taylor Swift',),
 ('Drake',),
 ('Doja Cat',),
 ('ABBA',),
 ('Kanye West',)]

<span style='background:yellow'>Названия сборников, в которых присутствует конкретный исполнитель (выберите сами):</span>

In [38]:
connection.execute("""
SELECT DISTINCT p.name
FROM playlisttrack pt
LEFT JOIN tracks t ON t.id = pt.track_id
LEFT JOIN playlists p ON p.id = pt.playlist_id
LEFT JOIN albums al ON al.id = t.album_id
LEFT JOIN artistalbum aa ON al.id = aa.album_id
LEFT JOIN artists ar ON ar.id = aa.artist_id
WHERE ar.name = 'Kanye West';""").fetchall()

[('Playlist2',), ('Playlist4',), ('Playlist8',)]

<span style='background:yellow'>Название альбомов, в которых присутствуют исполнители более 1 жанра:</span>

In [70]:
connection.execute("""
SELECT al.name, ar.name, COUNT(DISTINCT g.name)
FROM artists ar
LEFT JOIN genreartist ga ON ga.artist_id = ar.id
LEFT JOIN genres g ON g.id = ga.genre_id
LEFT JOIN artistalbum aa ON ar.id = aa.artist_id
LEFT JOIN albums al ON aa.album_id = al.id
GROUP BY ar.name, al.name
HAVING COUNT(DISTINCT g.name) > 1;""").fetchall()

[('After Hours', 'The Weeknd', 2)]

Наименование треков, которые не входят в сборники:

In [41]:
connection.execute("""
SELECT t.name
FROM tracks t
LEFT JOIN playlisttrack pt ON pt.track_id = t.id
WHERE pt.playlist_id IS NULL;""").fetchall()

[('Options',),
 ('The Way I Loved You',),
 ('IGORS THEME',),
 ('Youre Not Sorry',),
 ('Need to Know',),
 ('Losers',),
 ('See You In My Nightmares',),
 ('Cant Feel My Face',),
 ('Shameless',),
 ('Long Live',),
 ('Pothole',),
 ('Last Kiss',),
 ('Earned It (Fifty Shades of Grey)',),
 ('I Know Places',),
 ('Streets',),
 ('WILSHIRE',),
 ('Haunted',),
 ('Heartless',),
 ('Never Grow Up',),
 ('CORSO',),
 ('Die for You',),
 ('All You Had to Do Was Stay',),
 ('So Appalled',),
 ('GONE, GONE / THANK YOU',),
 ('Say So',),
 ('Alone Again',),
 ('Alone',),
 ('The Glory',),
 ('Too Late',),
 ('A Lonely Night',),
 ('Six Feet Under',),
 ('I Feel It Coming',),
 ('Wildest Dreams',),
 ('Blank Space',),
 ('Shake It Off',),
 ('Naked',),
 ('Who Dat Boy',),
 ('All of the Lights',),
 ('Bad Blood',),
 ('You Right',),
 ('Foreword',),
 ('I Dont Do Drugs',),
 ('Party Monster',),
 ('Who Will Survive in America',),
 ('SWEET / I THOUGHT YOU WANTED TO DANCE',),
 ('SIR BAUDELAIRE',),
 ('Good Morning',),
 ('Until I Bleed O

Исполнителя(-ей), написавшего самый короткий по продолжительности трек (теоретически таких треков может быть несколько):

In [45]:
connection.execute("""
SELECT DISTINCT ar.name
FROM tracks t
JOIN albums al ON al.id = t.album_id
JOIN artistalbum aa ON aa.album_id = al.id
JOIN artists ar ON ar.id = aa.artist_id
WHERE t.length = (
    SELECT MIN(length)
    FROM tracks t);""").fetchall()

[('Taylor Swift',), ('The Weeknd',)]

Название альбомов, содержащих наименьшее количество треков:

In [46]:
connection.execute("""
SELECT album_name
FROM (
    SELECT al.name AS album_name, COUNT(t.id) AS count_tracks
    FROM albums al
    JOIN tracks t ON t.album_id = al.id
    GROUP BY al.name) t1
ORDER BY count_tracks
LIMIT 1;""").fetchall()

[('Hot Pink',)]