## Przetwarzanie danych w PySpark

In [1]:
# potrzebne importy
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, explode_outer, countDistinct, size, \
input_file_name, when, substring, length, expr

In [2]:
# inicjowanie sesji Spark
spark = (
    SparkSession.builder
    .appName("HDFS")
    .getOrCreate()
)

22/01/12 22:26:13 WARN util.Utils: Your hostname, node1 resolves to a loopback address: 127.0.0.1; using 192.168.137.184 instead (on interface ens3)
22/01/12 22:26:13 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/01/12 22:26:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Artyści

In [3]:
# wczytanie danych dla artystów
artists = (
    spark.read
    .json('hdfs://localhost:8020/user/wisniewskij/spotify/artists/*', multiLine=True)
)

                                                                                

In [4]:
artists.printSchema()

root
 |-- artists: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- external_urls: struct (nullable = true)
 |    |    |    |-- spotify: string (nullable = true)
 |    |    |-- followers: struct (nullable = true)
 |    |    |    |-- href: string (nullable = true)
 |    |    |    |-- total: long (nullable = true)
 |    |    |-- genres: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- href: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- images: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- popularity: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- uri: string (nullable = t

In [5]:
# selekcja potrzebnych artybutów
artists_df = (
    artists
    .select(explode_outer('artists').alias('artists'))
    .select('artists.id',
            'artists.name',
            'artists.followers.total', 
            'artists.genres',
            'artists.popularity')
    .withColumnRenamed('total', 'followers')
)

In [6]:
artists_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- followers: long (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- popularity: long (nullable = true)



In [7]:
artists_df.show()

+--------------------+--------------+---------+--------------------+----------+
|                  id|          name|followers|              genres|popularity|
+--------------------+--------------+---------+--------------------+----------+
|6DIS6PRrLS3wbnZsf...| WALK THE MOON|  1855947|[dance pop, dance...|        72|
|3Gm5F95VdRxW3mqCn...|         Aminé|  1704990|[hip hop, pop, po...|        77|
|6Ha4aES39QiVjR0L2...|      Yo Gotti|  3552613|[dirty south rap,...|        72|
|7jZM5w05mGhw6wTB1...|           ATB|   315432|[german dance, ge...|        71|
|4mAsWDGLUIEdo6imU...|    Boomdabash|   575501|[italian adult po...|        63|
|1fxbULcd6ryMNc1us...|   Quebonafide|  1136285|    [polish hip hop]|        69|
|1XLWox9w1Yvbodui0...|StaySolidRocky|   155335|       [melodic rap]|        65|
|1cZQSpDsxgKIX2yW5...| Lennon Stella|   313373|[alt z, canadian ...|        71|
|21E3waRsmPlU7jZsS...|         Ne-Yo|  6676677|[dance pop, pop, ...|        81|
|5NGO30tJxFlKixkPS...|    The Police|  4

In [8]:
# sprawdzenie duplikatów
artists_df_duplicated = artists_df.groupBy("id", "name").count().filter("count > 1")
artists_df_duplicated.show(truncate=False)
print('Liczba rekordów dla zduplikowanych artystów: ', artists_df_duplicated.groupBy().sum().collect()[0][0])

                                                                                

+----------------------+-------------+-----+
|id                    |name         |count|
+----------------------+-------------+-----+
|5xOvrnVpLjzfGi69GDlzQY|Yuridope     |2    |
|3IW7ScrzXmPvZhB27hmfgy|Jul          |2    |
|49CE2ffZ6Z3zeYSDauSKck|Tungevaag    |2    |
|44mEtidu0VdRkIqO4IbkNa|Marca MP     |2    |
|7HqEmV7FeCi16bQyHMpIrF|VASSY        |3    |
|3dN9MQpjIyNxyeRfz4EDZe|Rochak Kohli |2    |
|7hJpyuLhmpawafcRfxUnlT|Bibič        |2    |
|0FUsrstJwmg4WVHQMTYuUA|Şehinşah     |2    |
|4sbXXFzEWJY2zsZjelerjX|Dadju        |2    |
|3FoFW2AoUGRHBacC6i4x4p|Russ Millions|2    |
+----------------------+-------------+-----+





Liczba rekordów dla zduplikowanych artystów:  21




In [9]:
# wyfiltrowanie rekordów dla zduplikowanych artystów
id_duplicated = [row.id for row in artists_df_duplicated.select("id").collect()]
artists_df.filter(artists_df.id.isin(id_duplicated)).orderBy('name').show(21)



+--------------------+-------------+---------+--------------------+----------+
|                  id|         name|followers|              genres|popularity|
+--------------------+-------------+---------+--------------------+----------+
|7hJpyuLhmpawafcRf...|        Bibič|        1|    [polish hip hop]|        61|
|7hJpyuLhmpawafcRf...|        Bibič|     9759|    [polish hip hop]|        61|
|4sbXXFzEWJY2zsZje...|        Dadju|  3786642|[francoton, frenc...|        74|
|4sbXXFzEWJY2zsZje...|        Dadju|       10|[francoton, frenc...|        74|
|3IW7ScrzXmPvZhB27...|          Jul|        3|[francoton, rap m...|        84|
|3IW7ScrzXmPvZhB27...|          Jul|  4678085|[francoton, rap m...|        84|
|44mEtidu0VdRkIqO4...|     Marca MP|   777285|[corridos tumbado...|        78|
|44mEtidu0VdRkIqO4...|     Marca MP|      387|[corridos tumbado...|        78|
|3dN9MQpjIyNxyeRfz...| Rochak Kohli|    91925|[desi pop, modern...|        71|
|3dN9MQpjIyNxyeRfz...| Rochak Kohli|        1|[desi 

In [10]:
# widok tymczasowy na potrzeby dalszego przetwarzania
artists_df.createGlobalTempView("artists")

In [11]:
# zdeduplikowana ramka danych
artists_df_deduplicated = (
    spark.sql("SELECT id, name, max(followers) as followers, popularity, genres FROM global_temp.artists GROUP BY id, name, popularity, genres")
)
artists_df_deduplicated.filter(artists_df.id.isin(id_duplicated)).orderBy('name').show() # sprawdzenie duplikatów
print('Liczba unikalnych artystów: ', artists_df_deduplicated.count())



+--------------------+-------------+---------+----------+--------------------+
|                  id|         name|followers|popularity|              genres|
+--------------------+-------------+---------+----------+--------------------+
|7hJpyuLhmpawafcRf...|        Bibič|     9759|        61|    [polish hip hop]|
|4sbXXFzEWJY2zsZje...|        Dadju|  3786642|        74|[francoton, frenc...|
|3IW7ScrzXmPvZhB27...|          Jul|  4678085|        84|[francoton, rap m...|
|44mEtidu0VdRkIqO4...|     Marca MP|   777285|        78|[corridos tumbado...|
|3dN9MQpjIyNxyeRfz...| Rochak Kohli|    91925|        71|[desi pop, modern...|
|3FoFW2AoUGRHBacC6...|Russ Millions|   271288|        74|          [uk drill]|
|49CE2ffZ6Z3zeYSDa...|    Tungevaag|   123455|        71|[big room, dance ...|
|7HqEmV7FeCi16bQyH...|        VASSY|    31549|        62|  [australian dance]|
|5xOvrnVpLjzfGi69G...|     Yuridope|     8333|        62|[pinoy hip hop, t...|
|0FUsrstJwmg4WVHQM...|     Şehinşah|  1171527|      



Liczba unikalnych artystów:  4989




In [12]:
# spłaszczenie kolumny zawierającej gatunki, w których tworzy artysta
artists_df_deduplicated_genres = artists_df_deduplicated.withColumn('genres', explode_outer('genres'))

In [13]:
artists_df_deduplicated_genres.count()

                                                                                

13344

In [14]:
# liczba artystów bez gatunków
artists_df_deduplicated_genres.filter(col('genres').isNull()).select('id').distinct().count() 

                                                                                

549

### Albums

In [15]:
# wczytanie danych dla albumów
albums = (
    spark.read
    .json('hdfs://localhost:8020/user/wisniewskij/spotify/albums/*', multiLine=True)
)

                                                                                

In [16]:
albums.printSchema()

root
 |-- href: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- album_group: string (nullable = true)
 |    |    |-- album_type: string (nullable = true)
 |    |    |-- artists: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- external_urls: struct (nullable = true)
 |    |    |    |    |    |-- spotify: string (nullable = true)
 |    |    |    |    |-- href: string (nullable = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |-- uri: string (nullable = true)
 |    |    |-- available_markets: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- external_urls: struct (nullable = true)
 |    |    |    |-- spotify: string (nullable = true)
 |    |    |-- href: string (null

In [17]:
# liczba plików z pustą listą albumów (artystów bez albumów)
albums.select(size('items').alias('size')).groupBy('size').count().filter(col('size') == 0).show()

                                                                                

+----+-----+
|size|count|
+----+-----+
|   0|   20|
+----+-----+



In [18]:
# id artystów bez albumów
albums.filter(size('items')==0).select(split('href', '/')[5].alias('artist_id'), 'items').show(truncate=False)



+----------------------+-----+
|artist_id             |items|
+----------------------+-----+
|0IaGKdJMLQv0vLHc1Bzz6u|[]   |
|0J7ZveNAIWHvq9Z9ZDsB5L|[]   |
|0LyfQWJT6nXafLPZqxe9Of|[]   |
|1bfl0AU8SqmLkElptOprhC|[]   |
|28OHQv3VZ2gifjWzrqBv1v|[]   |
|2OJ9q6gAsEHOdqOTIzqnXx|[]   |
|30d0RIHoQMNIzQpj7vTrXE|[]   |
|3CYrY6YYaOcL3dzuRj4Mah|[]   |
|3ITtDLfVjkXKg2QOsP40G7|[]   |
|3JITwr6Xwkz8p6cm9leNUh|[]   |
|3e2jFFSyQ3RQDSi1YI543i|[]   |
|3rCNOOzBSFj5bBiPHQ6zDI|[]   |
|3uBw3E4TPcqllLfrw0OzMV|[]   |
|4JBWzz3QHMkQksbbFpHOD6|[]   |
|4frjN28tf7pio1ht5Qtiu6|[]   |
|5KOmu3ailnFoXhGPiwIld7|[]   |
|6C9uTGq6VGug6mVIS1fC1e|[]   |
|6ZqX1hMl57m51iWusfz1CU|[]   |
|6ieK3wazeURTP9wJhhjlEx|[]   |
|6zXTZTTYTFywWVmlgRV0yZ|[]   |
+----------------------+-----+





In [19]:
# selekcja potrzebnych artybutów
albums_df = (
    albums
    .select(split('href', '/')[5].alias('artist_id'), 'items')
    .withColumn('albums', explode_outer('items').alias('albums'))
    .select('artist_id',
            col('albums.album_type').alias('album_type'),
            'albums.id',
            'albums.name',
            'albums.release_date', 
            'albums.total_tracks')
    .filter(col('album_type').isin(['album', 'single']))
)

In [20]:
albums_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- album_type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- total_tracks: long (nullable = true)



In [21]:
albums_df.show()

+--------------------+----------+--------------------+--------------------+------------+------------+
|           artist_id|album_type|                  id|                name|release_date|total_tracks|
+--------------------+----------+--------------------+--------------------+------------+------------+
|17ggmeNujCFMkr3zg...|     album|1qYgVhLQzmkpDvGuV...|         A New World|  2021-05-14|           2|
|17ggmeNujCFMkr3zg...|     album|1s4Q1KgUS0CmAx5NC...|Inca Trail Connec...|  2021-04-23|          12|
|17ggmeNujCFMkr3zg...|     album|5AhfRz0R24vwYKeaC...|Journeys: Orchest...|  2021-02-26|           9|
|17ggmeNujCFMkr3zg...|     album|4JOWxk9HERWqCWEmC...|Otto Winter-Hjelm...|  2020-09-11|           8|
|17ggmeNujCFMkr3zg...|     album|5CYHG4Iq2VvjfoyTZ...|             Unleash|  2020-04-10|           4|
|17ggmeNujCFMkr3zg...|     album|4AByWTHCLK61mVk0M...|Berio: Coro & Cri...|  2020-03-06|          38|
|17ggmeNujCFMkr3zg...|     album|7dfhXqmjA7I6bJhgr...|Ole Bull - Stages...|  2020-

In [22]:
albums_df.count()

                                                                                

182303

In [23]:
# liczba unikalnych albumów
albums_df.select('id').distinct().count()

                                                                                

151360

In [24]:
# sprawdzenie braków (artystów bez dopasowanych albumów/albumów nie pasujących do artystów)
artists_albums_outer_join_null = (
    artists_df_deduplicated_genres
    .join(albums_df.withColumnRenamed('id', 'album_id').withColumnRenamed('name', 'album_name'), 
          artists_df_deduplicated.id == albums_df.artist_id, 'outer')
    .filter(col('id').isNull() | col('artist_id').isNull())
)

In [25]:
# artyści bez dopasowanych albumów
artists_albums_outer_join_null.select('id', 'name').distinct().show(30,truncate=False)

                                                                                

+----------------------+---------------------------------+
|id                    |name                             |
+----------------------+---------------------------------+
|0IaGKdJMLQv0vLHc1Bzz6u|Bob Van Ratingen                 |
|4frjN28tf7pio1ht5Qtiu6|DAOKO×米津玄師                   |
|6H93wOohK6r1MwGh41Z4Nb|Ashin Chen                       |
|4BbsSamQy6XSByO4O4Nymt|MC 3L                            |
|2OJ9q6gAsEHOdqOTIzqnXx|DJ Stijco                        |
|0J7ZveNAIWHvq9Z9ZDsB5L|Aurelie Moeremans                |
|45v3Gcc9Je56Lx9YMx4bl1|The High                         |
|28OHQv3VZ2gifjWzrqBv1v|Bene                             |
|null                  |null                             |
|30d0RIHoQMNIzQpj7vTrXE|Borucci                          |
|03IJEZ6IynYczgge2uC4YD|Mark Linett                      |
|1bfl0AU8SqmLkElptOprhC|Julión Álvarez y su Norteño Banda|
|4JBWzz3QHMkQksbbFpHOD6|Maninder Buttar                  |
|5KOmu3ailnFoXhGPiwIld7|chiello_fsk                      |
|

In [26]:
# albumy niedopasowane do artystów
artists_albums_outer_join_null.select('artist_id').distinct().show(truncate=False)

                                                                                

+----------------------+
|artist_id             |
+----------------------+
|null                  |
|2zHzn2oA1QJOB8SPIpoiYD|
|3FgenUy0FsiWi5tatx73Ha|
|2hz61ryzrN6bUBZjOQnKbd|
|56r1N9JpztKsmNNk2FtpDU|
|6ZjisQlDoiiHbr8yN9J1Sc|
|1XsVSBeFDiptyuBlX2FgFs|
|5AVUF0rTgAIMhdP5mtnbN7|
|2iAtipcqILmxM2vctkuJCd|
|13KOtjLXdUPwIDuGsORa5i|
|1LT6utaOOPuP58rAQTCrWl|
|6QqEc36uUltQc78hnqXOgx|
|46mGxneffUCmDhMU1m6zYu|
|7CP9fSApQUk9nKQx0rPSda|
|4CwqmHBrsCzKuBqlQKfYxZ|
+----------------------+



In [27]:
# Morissette - 2x id - 62WbvkXqQGvXQvw74GU3kQ / 2hz61ryzrN6bUBZjOQnKbd
# MC 3L - 2x id - 4BbsSamQy6XSByO4O4Nymt / 6ZjisQlDoiiHbr8yN9J1Sc
# Ashin Chen - 2x id - 6H93wOohK6r1MwGh41Z4Nb / 7CP9fSApQUk9nKQx0rPSda
# pozostałe artist_id (11) - drugie działające id dla zduplikowanych artystów
# 2zHzn2oA1QJOB8SPIpoiYD - Dadju, 3FgenUy0FsiWi5tatx73Ha - Sehinsah, 56r1N9JpztKsmNNk2FtpDU / 1LT6utaOOPuP58rAQTCrWl - Vassy, 
# 6ZjisQlDoiiHbr8yN9J1Sc - MC 3L, 1XsVSBeFDiptyuBlX2FgFs - Rochak Kohli, 5AVUF0rTgAIMhdP5mtnbN7 - Marca MP,
# 2iAtipcqILmxM2vctkuJCd - Bibic, 13KOtjLXdUPwIDuGsORa5i - Yuridope, 6QqEc36uUltQc78hnqXOgx - Jul, 
# 46mGxneffUCmDhMU1m6zYu - Tungevaag, 4CwqmHBrsCzKuBqlQKfYxZ - Russ Millions
# ==> artists + albums łączone przy pomocy inner/left join (zduplikowani artyści pomijani; po zmianie id 3 artystów bez albumów)

In [28]:
# zmiana id dla 3 artystów (opisane powyżej)
albums_df = (
    albums_df
    .withColumn("artist_id", when(albums_df.artist_id == "2hz61ryzrN6bUBZjOQnKbd","62WbvkXqQGvXQvw74GU3kQ") \
                .when(albums_df.artist_id == "6ZjisQlDoiiHbr8yN9J1Sc","4BbsSamQy6XSByO4O4Nymt") \
                .when(albums_df.artist_id == "7CP9fSApQUk9nKQx0rPSda","6H93wOohK6r1MwGh41Z4Nb") \
                .otherwise(albums_df.artist_id)
               )
)

### Tracks

In [29]:
# wczytanie danych dla utworów
tracks = (
    spark.read
    .json('hdfs://localhost:8020/user/wisniewskij/spotify/tracks/*', multiLine=True)
)

                                                                                

In [30]:
# dodanie kolumny z nazwą pliku (celem uzyskania id artysty)
tracks = tracks.withColumn("filename", input_file_name())

In [31]:
tracks.printSchema()

root
 |-- tracks: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- album: struct (nullable = true)
 |    |    |    |-- album_type: string (nullable = true)
 |    |    |    |-- artists: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- external_urls: struct (nullable = true)
 |    |    |    |    |    |    |-- spotify: string (nullable = true)
 |    |    |    |    |    |-- href: string (nullable = true)
 |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |    |-- uri: string (nullable = true)
 |    |    |    |-- external_urls: struct (nullable = true)
 |    |    |    |    |-- spotify: string (nullable = true)
 |    |    |    |-- href: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- images: ar

In [32]:
# liczba plików z pustą listą utworów
tracks.select(size('tracks').alias('size')).groupBy('size').count().filter(col('size') == 0).show()

                                                                                

+----+-----+
|size|count|
+----+-----+
|   0|   52|
+----+-----+



In [33]:
# selekcja potrzebnych artybutów
tracks_df = (
    tracks
    .select(split('filename', '/')[7].alias('filename'), 'tracks')
    .withColumn('tracks', explode('tracks'))
    .select(col('tracks.album.id').alias('album_id'),
            'tracks.duration_ms',
            'tracks.explicit',
            'tracks.id',
            'tracks.name',
            'tracks.popularity',
            'filename'
           )
    .withColumn("filename",expr("substring(filename, 1, length(filename)-5)")) # usunięcie ".json"
    .withColumnRenamed("filename", 'artist_id')
)

In [34]:
tracks_df.show()

+--------------------+-----------+--------+--------------------+--------------------+----------+--------------------+
|            album_id|duration_ms|explicit|                  id|                name|popularity|           artist_id|
+--------------------+-----------+--------+--------------------+--------------------+----------+--------------------+
|3GkXRRRkV3rfgwG1w...|     176232|   false|29m79w9xPMH4YCD6r...|             Excuses|        75|0OS0NZnK7TGIAWx8M...|
|12UWGJjni8Mz24bGQ...|     184102|   false|4faDlXyZMSxEuxBdd...|               Toxic|        72|0OS0NZnK7TGIAWx8M...|
|7LwW8qe6sND4ySnKm...|     188606|   false|5fXslGZPI5Cco6PKH...|Illegal Weapon 2....|        69|0OS0NZnK7TGIAWx8M...|
|2UEWjApikRpHOdFlZ...|     306572|   false|79blZUG11a6vtTL3N...|      Punjabi Mashup|        67|0OS0NZnK7TGIAWx8M...|
|3yd4VQeZysHsK5TBx...|    4676057|   false|2HyMuSCZJIxgnZBku...|High Rated Gabru ...|        66|0OS0NZnK7TGIAWx8M...|
|3S3ekcemSjVZ6Pi3o...|     193398|   false|7fBeejW1BoZFT

In [35]:
tracks_df.count()

                                                                                

47882

In [36]:
# liczba unikalnych utworów
tracks_df.select('id').distinct().count()

                                                                                

42087

In [37]:
# zmiana id 3 artystów (tych samych dla których modyfikacja albumów)
tracks_df = (
    tracks_df
    .withColumn("artist_id", when(tracks_df.artist_id == "2hz61ryzrN6bUBZjOQnKbd","62WbvkXqQGvXQvw74GU3kQ") \
                .when(tracks_df.artist_id == "6ZjisQlDoiiHbr8yN9J1Sc","4BbsSamQy6XSByO4O4Nymt") \
                .when(tracks_df.artist_id == "7CP9fSApQUk9nKQx0rPSda","6H93wOohK6r1MwGh41Z4Nb") \
                .otherwise(tracks_df.artist_id)
               )
)

### Finalne dane

In [38]:
data = (
    artists_df_deduplicated_genres
    .withColumnRenamed('id', 'artist_id')
    .withColumnRenamed('name', 'artist_name')
    .withColumnRenamed('popularity', 'artist_popularity')
    .join(albums_df.withColumnRenamed('id', 'album_id').withColumnRenamed('name', 'album_name'), 'artist_id', 'left')
    .join(tracks_df
          .withColumnRenamed('id', 'track_id')
          .withColumnRenamed('name', 'track_name')
          .withColumnRenamed('popularity', 'track_popularity'), ['artist_id', 'album_id'],  'left')
)

In [39]:
data.count()

                                                                                

566972

In [40]:
data.show()



+--------------------+--------------------+--------------------+---------+-----------------+-------------------+----------+--------------------+------------+------------+-----------+--------+--------+----------+----------------+
|           artist_id|            album_id|         artist_name|followers|artist_popularity|             genres|album_type|          album_name|release_date|total_tracks|duration_ms|explicit|track_id|track_name|track_popularity|
+--------------------+--------------------+--------------------+---------+-----------------+-------------------+----------+--------------------+------------+------------+-----------+--------+--------+----------+----------------+
|00Z3UDoAQwzvGu13H...|6E8WvkEF5UzumaYEr...|         Skizzy Mars|   291381|               59|      indie pop rap|     album|I Can't Take Me A...|  2018-11-29|           9|       null|    null|    null|      null|            null|
|00Z3UDoAQwzvGu13H...|6E8WvkEF5UzumaYEr...|         Skizzy Mars|   291381|          

                                                                                

In [41]:
# uporządkowanie kolumn
data = data.select('artist_id', 'artist_name', 'followers', 'artist_popularity', 'genres', 'album_id', 'album_type', 
                   'album_name', 'release_date', 'total_tracks', 'track_id', 'track_name', 'explicit',
                   'track_popularity', 'duration_ms')

In [42]:
data.write.option('header', 'true').csv('hdfs://localhost:8020/user/wisniewskij/spotify/data')